Distributed Transactions
Distributed Transactions
Section titled “Distributed Transactions”Use this guide if…
Section titled “Use this guide if…”- You need to coordinate operations across multiple services
- You’re implementing multi-step workflows with compensation
- You want to ensure consistency without distributed locks
- You need to handle failures in complex business processes
Quick Example
Section titled “Quick Example”import { Saga, StartsWith, CompensateWith } from '@banyanai/platform-saga-framework';
@Saga()export class OrderFulfillmentSaga { constructor( private inventoryClient: InventoryServiceClient, private paymentClient: PaymentServiceClient, private shippingClient: ShippingServiceClient ) {}
@StartsWith(OrderCreatedEvent) async onOrderCreated(event: OrderCreatedEvent): Promise<void> { const orderId = event.eventData.orderId;
// Step 1: Reserve inventory (with compensation) await this.execute( () => this.inventoryClient.reserveItems({ orderId }), () => this.inventoryClient.releaseItems({ orderId }) );
// Step 2: Process payment (with compensation) await this.execute( () => this.paymentClient.processPayment({ orderId }), () => this.paymentClient.refundPayment({ orderId }) );
// Step 3: Schedule shipping (with compensation) await this.execute( () => this.shippingClient.scheduleShipment({ orderId }), () => this.shippingClient.cancelShipment({ orderId }) );
// All steps succeeded - saga complete await this.complete({ orderId, status: 'fulfilled' }); }}Saga automatically:
- Tracks workflow state
- Retries failed steps
- Executes compensation on failure
- Ensures eventual consistency
Saga Pattern
Section titled “Saga Pattern”What is a Saga?
Section titled “What is a Saga?”A saga is a sequence of local transactions where each transaction:
- Updates the database
- Publishes an event or sends a command
- Has a compensating transaction to undo its effects
When to Use Sagas
Section titled “When to Use Sagas”✅ Use sagas for:
- Multi-service workflows
- Long-running processes
- Complex business transactions
- Operations that need compensation
❌ Don’t use sagas for:
- Single-service operations (use aggregates)
- Simple CRUD (use commands/queries)
- Read-only operations (use queries)
Step-by-Step Guide
Section titled “Step-by-Step Guide”Step 1: Define the Saga
Section titled “Step 1: Define the Saga”import { Saga, StartsWith } from '@banyanai/platform-saga-framework';
@Saga()export class OrderProcessingSaga { constructor( private inventoryClient: InventoryServiceClient, private paymentClient: PaymentServiceClient ) {}
@StartsWith(OrderCreatedEvent) async processOrder(event: OrderCreatedEvent): Promise<void> { // Saga implementation }}Step 2: Implement Steps with Compensation
Section titled “Step 2: Implement Steps with Compensation”@StartsWith(OrderCreatedEvent)async processOrder(event: OrderCreatedEvent): Promise<void> { const orderId = event.eventData.orderId;
try { // Step 1: Reserve inventory await this.execute({ action: () => this.inventoryClient.reserveItems({ orderId, items: event.eventData.items }), compensation: () => this.inventoryClient.releaseItems({ orderId }) });
// Step 2: Process payment await this.execute({ action: () => this.paymentClient.processPayment({ orderId, amount: event.eventData.totalAmount }), compensation: () => this.paymentClient.refundPayment({ orderId }) });
// Mark saga as complete await this.complete({ orderId, status: 'processed' }); } catch (error) { // Saga framework automatically runs compensation Logger.error('Saga failed, compensating:', error as Error); throw error; }}Step 3: Handle Saga Events
Section titled “Step 3: Handle Saga Events”// Other services react to saga progress
@EventHandlerDecorator(OrderProcessedEvent)export class UpdateOrderStatusHandler extends EventHandler<OrderProcessedEvent, void> { async handle(event: OrderProcessedEvent): Promise<void> { // Update order status after successful saga completion await OrderReadModel.update(event.orderId, { status: 'processed', processedAt: new Date() }); }}Common Patterns
Section titled “Common Patterns”Pattern 1: Sequential Steps
Section titled “Pattern 1: Sequential Steps”Steps execute one after another.
@StartsWith(OrderCreatedEvent)async processOrder(event: OrderCreatedEvent): Promise<void> { // Execute sequentially await this.step1(); // Wait for completion await this.step2(); // Then execute step 2 await this.step3(); // Then execute step 3}Pattern 2: Parallel Steps
Section titled “Pattern 2: Parallel Steps”Independent steps execute concurrently.
@StartsWith(OrderCreatedEvent)async processOrder(event: OrderCreatedEvent): Promise<void> { // Execute in parallel await Promise.all([ this.checkInventory(), this.validateCustomer(), this.calculateTax() ]);
// Then proceed with dependent steps await this.processPayment();}Pattern 3: Conditional Execution
Section titled “Pattern 3: Conditional Execution”Steps based on business logic.
@StartsWith(OrderCreatedEvent)async processOrder(event: OrderCreatedEvent): Promise<void> { await this.reserveInventory();
// Conditional step if (event.eventData.totalAmount > 10000) { await this.requireApproval(); }
await this.processPayment();}Pattern 4: Retry with Timeout
Section titled “Pattern 4: Retry with Timeout”@StartsWith(OrderCreatedEvent)async processOrder(event: OrderCreatedEvent): Promise<void> { await this.execute({ action: () => this.paymentClient.processPayment({...}), compensation: () => this.paymentClient.refundPayment({...}), retries: 3, timeout: 30000 // 30 seconds });}Pattern 5: Event-Driven Saga
Section titled “Pattern 5: Event-Driven Saga”React to multiple events.
@Saga()export class OrderFulfillmentSaga { private sagaState: Map<string, SagaState> = new Map();
@StartsWith(OrderCreatedEvent) async onOrderCreated(event: OrderCreatedEvent): Promise<void> { const state = this.initState(event.eventData.orderId); state.orderCreated = true; await this.checkCompletion(state); }
@Handles(InventoryReservedEvent) async onInventoryReserved(event: InventoryReservedEvent): Promise<void> { const state = this.getState(event.orderId); state.inventoryReserved = true; await this.checkCompletion(state); }
@Handles(PaymentProcessedEvent) async onPaymentProcessed(event: PaymentProcessedEvent): Promise<void> { const state = this.getState(event.orderId); state.paymentProcessed = true; await this.checkCompletion(state); }
private async checkCompletion(state: SagaState): Promise<void> { if (state.orderCreated && state.inventoryReserved && state.paymentProcessed) { await this.complete({ orderId: state.orderId }); } }}Compensation Strategies
Section titled “Compensation Strategies”Backward Recovery (Undo)
Section titled “Backward Recovery (Undo)”Reverse the effects of completed steps.
try { await this.step1(); // Reserve inventory await this.step2(); // Process payment await this.step3(); // Ship order - FAILS} catch (error) { // Compensate in reverse order await this.undoStep2(); // Refund payment await this.undoStep1(); // Release inventory}Forward Recovery (Retry)
Section titled “Forward Recovery (Retry)”Retry failed steps until they succeed.
async processPayment(): Promise<void> { let attempts = 0; while (attempts < 5) { try { return await this.paymentClient.processPayment({...}); } catch (error) { attempts++; if (attempts >= 5) throw error; await this.delay(Math.pow(2, attempts) * 1000); // Exponential backoff } }}Testing Sagas
Section titled “Testing Sagas”describe('OrderProcessingSaga', () => { let saga: OrderProcessingSaga; let mockInventoryClient: jest.Mocked<InventoryServiceClient>; let mockPaymentClient: jest.Mocked<PaymentServiceClient>;
beforeEach(() => { mockInventoryClient = { reserveItems: jest.fn().mockResolvedValue({ success: true }), releaseItems: jest.fn().mockResolvedValue({ success: true }) } as any;
mockPaymentClient = { processPayment: jest.fn().mockResolvedValue({ success: true }), refundPayment: jest.fn().mockResolvedValue({ success: true }) } as any;
saga = new OrderProcessingSaga(mockInventoryClient, mockPaymentClient); });
it('should complete saga successfully', async () => { const event: OrderCreatedEvent = {...}; await saga.processOrder(event);
expect(mockInventoryClient.reserveItems).toHaveBeenCalled(); expect(mockPaymentClient.processPayment).toHaveBeenCalled(); });
it('should compensate on payment failure', async () => { mockPaymentClient.processPayment.mockRejectedValue(new Error('Payment failed'));
const event: OrderCreatedEvent = {...}; await expect(saga.processOrder(event)).rejects.toThrow();
// Verify compensation expect(mockInventoryClient.releaseItems).toHaveBeenCalled(); });});Anti-Patterns to Avoid
Section titled “Anti-Patterns to Avoid”❌ Don’t use distributed locks
// DON'T DO THISconst lock = await redisClient.lock('order-123');try { await this.processOrder();} finally { await lock.release();}✅ Use sagas with compensation
// DO THISawait this.execute({ action: () => this.processOrder(), compensation: () => this.cancelOrder()});❌ Don’t ignore compensation
// DON'T DO THISawait this.step1();await this.step2(); // If this fails, step1 not undone✅ Always provide compensation
// DO THISawait this.execute({ action: () => this.step1(), compensation: () => this.undoStep1()});