Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.
Add this skill
npx mdskills install sickn33/event-store-designComprehensive event store implementation guide with production-ready SQL and Python examples
1---2name: event-store-design3description: Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.4---56# Event Store Design78Comprehensive guide to designing event stores for event-sourced applications.910## Do not use this skill when1112- The task is unrelated to event store design13- You need a different domain or tool outside this scope1415## Instructions1617- Clarify goals, constraints, and required inputs.18- Apply relevant best practices and validate outcomes.19- Provide actionable steps and verification.20- If detailed examples are required, open `resources/implementation-playbook.md`.2122## Use this skill when2324- Designing event sourcing infrastructure25- Choosing between event store technologies26- Implementing custom event stores27- Optimizing event storage and retrieval28- Setting up event store schemas29- Planning for event store scaling3031## Core Concepts3233### 1. Event Store Architecture3435```36┌─────────────────────────────────────────────────────┐37│ Event Store │38├─────────────────────────────────────────────────────┤39│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │40│ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │41│ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │42│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │43│ │ Event 1 │ │ Event 1 │ │ Event 1 │ │44│ │ Event 2 │ │ Event 2 │ │ Event 2 │ │45│ │ Event 3 │ │ ... │ │ Event 3 │ │46│ │ ... │ │ │ │ Event 4 │ │47│ └─────────────┘ └─────────────┘ └─────────────┘ │48├─────────────────────────────────────────────────────┤49│ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │50└─────────────────────────────────────────────────────┘51```5253### 2. Event Store Requirements5455| Requirement | Description |56| ----------------- | ---------------------------------- |57| **Append-only** | Events are immutable, only appends |58| **Ordered** | Per-stream and global ordering |59| **Versioned** | Optimistic concurrency control |60| **Subscriptions** | Real-time event notifications |61| **Idempotent** | Handle duplicate writes safely |6263## Technology Comparison6465| Technology | Best For | Limitations |66| ---------------- | ------------------------- | -------------------------------- |67| **EventStoreDB** | Pure event sourcing | Single-purpose |68| **PostgreSQL** | Existing Postgres stack | Manual implementation |69| **Kafka** | High-throughput streaming | Not ideal for per-stream queries |70| **DynamoDB** | Serverless, AWS-native | Query limitations |71| **Marten** | .NET ecosystems | .NET specific |7273## Templates7475### Template 1: PostgreSQL Event Store Schema7677```sql78-- Events table79CREATE TABLE events (80 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),81 stream_id VARCHAR(255) NOT NULL,82 stream_type VARCHAR(255) NOT NULL,83 event_type VARCHAR(255) NOT NULL,84 event_data JSONB NOT NULL,85 metadata JSONB DEFAULT '{}',86 version BIGINT NOT NULL,87 global_position BIGSERIAL,88 created_at TIMESTAMPTZ DEFAULT NOW(),8990 CONSTRAINT unique_stream_version UNIQUE (stream_id, version)91);9293-- Index for stream queries94CREATE INDEX idx_events_stream_id ON events(stream_id, version);9596-- Index for global subscription97CREATE INDEX idx_events_global_position ON events(global_position);9899-- Index for event type queries100CREATE INDEX idx_events_event_type ON events(event_type);101102-- Index for time-based queries103CREATE INDEX idx_events_created_at ON events(created_at);104105-- Snapshots table106CREATE TABLE snapshots (107 stream_id VARCHAR(255) PRIMARY KEY,108 stream_type VARCHAR(255) NOT NULL,109 snapshot_data JSONB NOT NULL,110 version BIGINT NOT NULL,111 created_at TIMESTAMPTZ DEFAULT NOW()112);113114-- Subscriptions checkpoint table115CREATE TABLE subscription_checkpoints (116 subscription_id VARCHAR(255) PRIMARY KEY,117 last_position BIGINT NOT NULL DEFAULT 0,118 updated_at TIMESTAMPTZ DEFAULT NOW()119);120```121122### Template 2: Python Event Store Implementation123124```python125from dataclasses import dataclass, field126from datetime import datetime127from typing import Any, Optional, List128from uuid import UUID, uuid4129import json130import asyncpg131132@dataclass133class Event:134 stream_id: str135 event_type: str136 data: dict137 metadata: dict = field(default_factory=dict)138 event_id: UUID = field(default_factory=uuid4)139 version: Optional[int] = None140 global_position: Optional[int] = None141 created_at: datetime = field(default_factory=datetime.utcnow)142143144class EventStore:145 def __init__(self, pool: asyncpg.Pool):146 self.pool = pool147148 async def append_events(149 self,150 stream_id: str,151 stream_type: str,152 events: List[Event],153 expected_version: Optional[int] = None154 ) -> List[Event]:155 """Append events to a stream with optimistic concurrency."""156 async with self.pool.acquire() as conn:157 async with conn.transaction():158 # Check expected version159 if expected_version is not None:160 current = await conn.fetchval(161 "SELECT MAX(version) FROM events WHERE stream_id = $1",162 stream_id163 )164 current = current or 0165 if current != expected_version:166 raise ConcurrencyError(167 f"Expected version {expected_version}, got {current}"168 )169170 # Get starting version171 start_version = await conn.fetchval(172 "SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",173 stream_id174 )175176 # Insert events177 saved_events = []178 for i, event in enumerate(events):179 event.version = start_version + i180 row = await conn.fetchrow(181 """182 INSERT INTO events (id, stream_id, stream_type, event_type,183 event_data, metadata, version, created_at)184 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)185 RETURNING global_position186 """,187 event.event_id,188 stream_id,189 stream_type,190 event.event_type,191 json.dumps(event.data),192 json.dumps(event.metadata),193 event.version,194 event.created_at195 )196 event.global_position = row['global_position']197 saved_events.append(event)198199 return saved_events200201 async def read_stream(202 self,203 stream_id: str,204 from_version: int = 0,205 limit: int = 1000206 ) -> List[Event]:207 """Read events from a stream."""208 async with self.pool.acquire() as conn:209 rows = await conn.fetch(210 """211 SELECT id, stream_id, event_type, event_data, metadata,212 version, global_position, created_at213 FROM events214 WHERE stream_id = $1 AND version >= $2215 ORDER BY version216 LIMIT $3217 """,218 stream_id, from_version, limit219 )220 return [self._row_to_event(row) for row in rows]221222 async def read_all(223 self,224 from_position: int = 0,225 limit: int = 1000226 ) -> List[Event]:227 """Read all events globally."""228 async with self.pool.acquire() as conn:229 rows = await conn.fetch(230 """231 SELECT id, stream_id, event_type, event_data, metadata,232 version, global_position, created_at233 FROM events234 WHERE global_position > $1235 ORDER BY global_position236 LIMIT $2237 """,238 from_position, limit239 )240 return [self._row_to_event(row) for row in rows]241242 async def subscribe(243 self,244 subscription_id: str,245 handler,246 from_position: int = 0,247 batch_size: int = 100248 ):249 """Subscribe to all events from a position."""250 # Get checkpoint251 async with self.pool.acquire() as conn:252 checkpoint = await conn.fetchval(253 """254 SELECT last_position FROM subscription_checkpoints255 WHERE subscription_id = $1256 """,257 subscription_id258 )259 position = checkpoint or from_position260261 while True:262 events = await self.read_all(position, batch_size)263 if not events:264 await asyncio.sleep(1) # Poll interval265 continue266267 for event in events:268 await handler(event)269 position = event.global_position270271 # Save checkpoint272 async with self.pool.acquire() as conn:273 await conn.execute(274 """275 INSERT INTO subscription_checkpoints (subscription_id, last_position)276 VALUES ($1, $2)277 ON CONFLICT (subscription_id)278 DO UPDATE SET last_position = $2, updated_at = NOW()279 """,280 subscription_id, position281 )282283 def _row_to_event(self, row) -> Event:284 return Event(285 event_id=row['id'],286 stream_id=row['stream_id'],287 event_type=row['event_type'],288 data=json.loads(row['event_data']),289 metadata=json.loads(row['metadata']),290 version=row['version'],291 global_position=row['global_position'],292 created_at=row['created_at']293 )294295296class ConcurrencyError(Exception):297 """Raised when optimistic concurrency check fails."""298 pass299```300301### Template 3: EventStoreDB Usage302303```python304from esdbclient import EventStoreDBClient, NewEvent, StreamState305import json306307# Connect308client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")309310# Append events311def append_events(stream_name: str, events: list, expected_revision=None):312 new_events = [313 NewEvent(314 type=event['type'],315 data=json.dumps(event['data']).encode(),316 metadata=json.dumps(event.get('metadata', {})).encode()317 )318 for event in events319 ]320321 if expected_revision is None:322 state = StreamState.ANY323 elif expected_revision == -1:324 state = StreamState.NO_STREAM325 else:326 state = expected_revision327328 return client.append_to_stream(329 stream_name=stream_name,330 events=new_events,331 current_version=state332 )333334# Read stream335def read_stream(stream_name: str, from_revision: int = 0):336 events = client.get_stream(337 stream_name=stream_name,338 stream_position=from_revision339 )340 return [341 {342 'type': event.type,343 'data': json.loads(event.data),344 'metadata': json.loads(event.metadata) if event.metadata else {},345 'stream_position': event.stream_position,346 'commit_position': event.commit_position347 }348 for event in events349 ]350351# Subscribe to all352async def subscribe_to_all(handler, from_position: int = 0):353 subscription = client.subscribe_to_all(commit_position=from_position)354 async for event in subscription:355 await handler({356 'type': event.type,357 'data': json.loads(event.data),358 'stream_id': event.stream_name,359 'position': event.commit_position360 })361362# Category projection ($ce-Category)363def read_category(category: str):364 """Read all events for a category using system projection."""365 return read_stream(f"$ce-{category}")366```367368### Template 4: DynamoDB Event Store369370```python371import boto3372from boto3.dynamodb.conditions import Key373from datetime import datetime374import json375import uuid376377class DynamoEventStore:378 def __init__(self, table_name: str):379 self.dynamodb = boto3.resource('dynamodb')380 self.table = self.dynamodb.Table(table_name)381382 def append_events(self, stream_id: str, events: list, expected_version: int = None):383 """Append events with conditional write for concurrency."""384 with self.table.batch_writer() as batch:385 for i, event in enumerate(events):386 version = (expected_version or 0) + i + 1387 item = {388 'PK': f"STREAM#{stream_id}",389 'SK': f"VERSION#{version:020d}",390 'GSI1PK': 'EVENTS',391 'GSI1SK': datetime.utcnow().isoformat(),392 'event_id': str(uuid.uuid4()),393 'stream_id': stream_id,394 'event_type': event['type'],395 'event_data': json.dumps(event['data']),396 'version': version,397 'created_at': datetime.utcnow().isoformat()398 }399 batch.put_item(Item=item)400 return events401402 def read_stream(self, stream_id: str, from_version: int = 0):403 """Read events from a stream."""404 response = self.table.query(405 KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &406 Key('SK').gte(f"VERSION#{from_version:020d}")407 )408 return [409 {410 'event_type': item['event_type'],411 'data': json.loads(item['event_data']),412 'version': item['version']413 }414 for item in response['Items']415 ]416417# Table definition (CloudFormation/Terraform)418"""419DynamoDB Table:420 - PK (Partition Key): String421 - SK (Sort Key): String422 - GSI1PK, GSI1SK for global ordering423424Capacity: On-demand or provisioned based on throughput needs425"""426```427428## Best Practices429430### Do's431432- **Use stream IDs that include aggregate type** - `Order-{uuid}`433- **Include correlation/causation IDs** - For tracing434- **Version events from day one** - Plan for schema evolution435- **Implement idempotency** - Use event IDs for deduplication436- **Index appropriately** - For your query patterns437438### Don'ts439440- **Don't update or delete events** - They're immutable facts441- **Don't store large payloads** - Keep events small442- **Don't skip optimistic concurrency** - Prevents data corruption443- **Don't ignore backpressure** - Handle slow consumers444445## Resources446447- [EventStoreDB](https://www.eventstore.com/)448- [Marten Events](https://martendb.io/events/)449- [Event Sourcing Pattern](https://docs.microsoft.com/en-us/azure/architecture/patterns/event-sourcing)450
Full transparency — inspect the skill content before installing.