Implement saga patterns for distributed transactions and cross-aggregate workflows. Use when coordinating multi-step business processes, handling compensating transactions, or managing long-running workflows.
Add this skill
npx mdskills install sickn33/saga-orchestrationComprehensive saga orchestration with detailed implementation patterns and clear examples
1---2name: saga-orchestration3description: Implement saga patterns for distributed transactions and cross-aggregate workflows. Use when coordinating multi-step business processes, handling compensating transactions, or managing long-running workflows.4---56# Saga Orchestration78Patterns for managing distributed transactions and long-running business processes.910## Do not use this skill when1112- The task is unrelated to saga orchestration13- You need a different domain or tool outside this scope1415## Instructions1617- Clarify goals, constraints, and required inputs.18- Apply relevant best practices and validate outcomes.19- Provide actionable steps and verification.20- If detailed examples are required, open `resources/implementation-playbook.md`.2122## Use this skill when2324- Coordinating multi-service transactions25- Implementing compensating transactions26- Managing long-running business workflows27- Handling failures in distributed systems28- Building order fulfillment processes29- Implementing approval workflows3031## Core Concepts3233### 1. Saga Types3435```36Choreography Orchestration37┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐38│Svc A│─►│Svc B│─►│Svc C│ │ Orchestrator│39└─────┘ └─────┘ └─────┘ └──────┬──────┘40 │ │ │ │41 ▼ ▼ ▼ ┌─────┼─────┐42 Event Event Event ▼ ▼ ▼43 ┌────┐┌────┐┌────┐44 │Svc1││Svc2││Svc3│45 └────┘└────┘└────┘46```4748### 2. Saga Execution States4950| State | Description |51| ---------------- | ------------------------------ |52| **Started** | Saga initiated |53| **Pending** | Waiting for step completion |54| **Compensating** | Rolling back due to failure |55| **Completed** | All steps succeeded |56| **Failed** | Saga failed after compensation |5758## Templates5960### Template 1: Saga Orchestrator Base6162```python63from abc import ABC, abstractmethod64from dataclasses import dataclass, field65from enum import Enum66from typing import List, Dict, Any, Optional67from datetime import datetime68import uuid6970class SagaState(Enum):71 STARTED = "started"72 PENDING = "pending"73 COMPENSATING = "compensating"74 COMPLETED = "completed"75 FAILED = "failed"767778@dataclass79class SagaStep:80 name: str81 action: str82 compensation: str83 status: str = "pending"84 result: Optional[Dict] = None85 error: Optional[str] = None86 executed_at: Optional[datetime] = None87 compensated_at: Optional[datetime] = None888990@dataclass91class Saga:92 saga_id: str93 saga_type: str94 state: SagaState95 data: Dict[str, Any]96 steps: List[SagaStep]97 current_step: int = 098 created_at: datetime = field(default_factory=datetime.utcnow)99 updated_at: datetime = field(default_factory=datetime.utcnow)100101102class SagaOrchestrator(ABC):103 """Base class for saga orchestrators."""104105 def __init__(self, saga_store, event_publisher):106 self.saga_store = saga_store107 self.event_publisher = event_publisher108109 @abstractmethod110 def define_steps(self, data: Dict) -> List[SagaStep]:111 """Define the saga steps."""112 pass113114 @property115 @abstractmethod116 def saga_type(self) -> str:117 """Unique saga type identifier."""118 pass119120 async def start(self, data: Dict) -> Saga:121 """Start a new saga."""122 saga = Saga(123 saga_id=str(uuid.uuid4()),124 saga_type=self.saga_type,125 state=SagaState.STARTED,126 data=data,127 steps=self.define_steps(data)128 )129 await self.saga_store.save(saga)130 await self._execute_next_step(saga)131 return saga132133 async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):134 """Handle successful step completion."""135 saga = await self.saga_store.get(saga_id)136137 # Update step138 for step in saga.steps:139 if step.name == step_name:140 step.status = "completed"141 step.result = result142 step.executed_at = datetime.utcnow()143 break144145 saga.current_step += 1146 saga.updated_at = datetime.utcnow()147148 # Check if saga is complete149 if saga.current_step >= len(saga.steps):150 saga.state = SagaState.COMPLETED151 await self.saga_store.save(saga)152 await self._on_saga_completed(saga)153 else:154 saga.state = SagaState.PENDING155 await self.saga_store.save(saga)156 await self._execute_next_step(saga)157158 async def handle_step_failed(self, saga_id: str, step_name: str, error: str):159 """Handle step failure - start compensation."""160 saga = await self.saga_store.get(saga_id)161162 # Mark step as failed163 for step in saga.steps:164 if step.name == step_name:165 step.status = "failed"166 step.error = error167 break168169 saga.state = SagaState.COMPENSATING170 saga.updated_at = datetime.utcnow()171 await self.saga_store.save(saga)172173 # Start compensation from current step backwards174 await self._compensate(saga)175176 async def _execute_next_step(self, saga: Saga):177 """Execute the next step in the saga."""178 if saga.current_step >= len(saga.steps):179 return180181 step = saga.steps[saga.current_step]182 step.status = "executing"183 await self.saga_store.save(saga)184185 # Publish command to execute step186 await self.event_publisher.publish(187 step.action,188 {189 "saga_id": saga.saga_id,190 "step_name": step.name,191 **saga.data192 }193 )194195 async def _compensate(self, saga: Saga):196 """Execute compensation for completed steps."""197 # Compensate in reverse order198 for i in range(saga.current_step - 1, -1, -1):199 step = saga.steps[i]200 if step.status == "completed":201 step.status = "compensating"202 await self.saga_store.save(saga)203204 await self.event_publisher.publish(205 step.compensation,206 {207 "saga_id": saga.saga_id,208 "step_name": step.name,209 "original_result": step.result,210 **saga.data211 }212 )213214 async def handle_compensation_completed(self, saga_id: str, step_name: str):215 """Handle compensation completion."""216 saga = await self.saga_store.get(saga_id)217218 for step in saga.steps:219 if step.name == step_name:220 step.status = "compensated"221 step.compensated_at = datetime.utcnow()222 break223224 # Check if all compensations complete225 all_compensated = all(226 s.status in ("compensated", "pending", "failed")227 for s in saga.steps228 )229230 if all_compensated:231 saga.state = SagaState.FAILED232 await self._on_saga_failed(saga)233234 await self.saga_store.save(saga)235236 async def _on_saga_completed(self, saga: Saga):237 """Called when saga completes successfully."""238 await self.event_publisher.publish(239 f"{self.saga_type}Completed",240 {"saga_id": saga.saga_id, **saga.data}241 )242243 async def _on_saga_failed(self, saga: Saga):244 """Called when saga fails after compensation."""245 await self.event_publisher.publish(246 f"{self.saga_type}Failed",247 {"saga_id": saga.saga_id, "error": "Saga failed", **saga.data}248 )249```250251### Template 2: Order Fulfillment Saga252253```python254class OrderFulfillmentSaga(SagaOrchestrator):255 """Orchestrates order fulfillment across services."""256257 @property258 def saga_type(self) -> str:259 return "OrderFulfillment"260261 def define_steps(self, data: Dict) -> List[SagaStep]:262 return [263 SagaStep(264 name="reserve_inventory",265 action="InventoryService.ReserveItems",266 compensation="InventoryService.ReleaseReservation"267 ),268 SagaStep(269 name="process_payment",270 action="PaymentService.ProcessPayment",271 compensation="PaymentService.RefundPayment"272 ),273 SagaStep(274 name="create_shipment",275 action="ShippingService.CreateShipment",276 compensation="ShippingService.CancelShipment"277 ),278 SagaStep(279 name="send_confirmation",280 action="NotificationService.SendOrderConfirmation",281 compensation="NotificationService.SendCancellationNotice"282 )283 ]284285286# Usage287async def create_order(order_data: Dict):288 saga = OrderFulfillmentSaga(saga_store, event_publisher)289 return await saga.start({290 "order_id": order_data["order_id"],291 "customer_id": order_data["customer_id"],292 "items": order_data["items"],293 "payment_method": order_data["payment_method"],294 "shipping_address": order_data["shipping_address"]295 })296297298# Event handlers in each service299class InventoryService:300 async def handle_reserve_items(self, command: Dict):301 try:302 # Reserve inventory303 reservation = await self.reserve(304 command["items"],305 command["order_id"]306 )307 # Report success308 await self.event_publisher.publish(309 "SagaStepCompleted",310 {311 "saga_id": command["saga_id"],312 "step_name": "reserve_inventory",313 "result": {"reservation_id": reservation.id}314 }315 )316 except InsufficientInventoryError as e:317 await self.event_publisher.publish(318 "SagaStepFailed",319 {320 "saga_id": command["saga_id"],321 "step_name": "reserve_inventory",322 "error": str(e)323 }324 )325326 async def handle_release_reservation(self, command: Dict):327 # Compensating action328 await self.release_reservation(329 command["original_result"]["reservation_id"]330 )331 await self.event_publisher.publish(332 "SagaCompensationCompleted",333 {334 "saga_id": command["saga_id"],335 "step_name": "reserve_inventory"336 }337 )338```339340### Template 3: Choreography-Based Saga341342```python343from dataclasses import dataclass344from typing import Dict, Any345import asyncio346347@dataclass348class SagaContext:349 """Passed through choreographed saga events."""350 saga_id: str351 step: int352 data: Dict[str, Any]353 completed_steps: list354355356class OrderChoreographySaga:357 """Choreography-based saga using events."""358359 def __init__(self, event_bus):360 self.event_bus = event_bus361 self._register_handlers()362363 def _register_handlers(self):364 self.event_bus.subscribe("OrderCreated", self._on_order_created)365 self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)366 self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)367 self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)368369 # Compensation handlers370 self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)371 self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)372373 async def _on_order_created(self, event: Dict):374 """Step 1: Order created, reserve inventory."""375 await self.event_bus.publish("ReserveInventory", {376 "saga_id": event["order_id"],377 "order_id": event["order_id"],378 "items": event["items"]379 })380381 async def _on_inventory_reserved(self, event: Dict):382 """Step 2: Inventory reserved, process payment."""383 await self.event_bus.publish("ProcessPayment", {384 "saga_id": event["saga_id"],385 "order_id": event["order_id"],386 "amount": event["total_amount"],387 "reservation_id": event["reservation_id"]388 })389390 async def _on_payment_processed(self, event: Dict):391 """Step 3: Payment done, create shipment."""392 await self.event_bus.publish("CreateShipment", {393 "saga_id": event["saga_id"],394 "order_id": event["order_id"],395 "payment_id": event["payment_id"]396 })397398 async def _on_shipment_created(self, event: Dict):399 """Step 4: Complete - send confirmation."""400 await self.event_bus.publish("OrderFulfilled", {401 "saga_id": event["saga_id"],402 "order_id": event["order_id"],403 "tracking_number": event["tracking_number"]404 })405406 # Compensation handlers407 async def _on_payment_failed(self, event: Dict):408 """Payment failed - release inventory."""409 await self.event_bus.publish("ReleaseInventory", {410 "saga_id": event["saga_id"],411 "reservation_id": event["reservation_id"]412 })413 await self.event_bus.publish("OrderFailed", {414 "order_id": event["order_id"],415 "reason": "Payment failed"416 })417418 async def _on_shipment_failed(self, event: Dict):419 """Shipment failed - refund payment and release inventory."""420 await self.event_bus.publish("RefundPayment", {421 "saga_id": event["saga_id"],422 "payment_id": event["payment_id"]423 })424 await self.event_bus.publish("ReleaseInventory", {425 "saga_id": event["saga_id"],426 "reservation_id": event["reservation_id"]427 })428```429430### Template 4: Saga with Timeouts431432```python433class TimeoutSagaOrchestrator(SagaOrchestrator):434 """Saga orchestrator with step timeouts."""435436 def __init__(self, saga_store, event_publisher, scheduler):437 super().__init__(saga_store, event_publisher)438 self.scheduler = scheduler439440 async def _execute_next_step(self, saga: Saga):441 if saga.current_step >= len(saga.steps):442 return443444 step = saga.steps[saga.current_step]445 step.status = "executing"446 step.timeout_at = datetime.utcnow() + timedelta(minutes=5)447 await self.saga_store.save(saga)448449 # Schedule timeout check450 await self.scheduler.schedule(451 f"saga_timeout_{saga.saga_id}_{step.name}",452 self._check_timeout,453 {"saga_id": saga.saga_id, "step_name": step.name},454 run_at=step.timeout_at455 )456457 await self.event_publisher.publish(458 step.action,459 {"saga_id": saga.saga_id, "step_name": step.name, **saga.data}460 )461462 async def _check_timeout(self, data: Dict):463 """Check if step has timed out."""464 saga = await self.saga_store.get(data["saga_id"])465 step = next(s for s in saga.steps if s.name == data["step_name"])466467 if step.status == "executing":468 # Step timed out - fail it469 await self.handle_step_failed(470 data["saga_id"],471 data["step_name"],472 "Step timed out"473 )474```475476## Best Practices477478### Do's479480- **Make steps idempotent** - Safe to retry481- **Design compensations carefully** - They must work482- **Use correlation IDs** - For tracing across services483- **Implement timeouts** - Don't wait forever484- **Log everything** - For debugging failures485486### Don'ts487488- **Don't assume instant completion** - Sagas take time489- **Don't skip compensation testing** - Most critical part490- **Don't couple services** - Use async messaging491- **Don't ignore partial failures** - Handle gracefully492493## Resources494495- [Saga Pattern](https://microservices.io/patterns/data/saga.html)496- [Designing Data-Intensive Applications](https://dataintensive.net/)497
Full transparency — inspect the skill content before installing.