Event Sourcing Issues
Event Sourcing Issues
Section titled “Event Sourcing Issues”Quick reference for diagnosing and fixing event sourcing and aggregate problems.
Quick Diagnosis
Section titled “Quick Diagnosis”# Check event store tablesdocker exec postgres psql -U postgres -d platform -c "\dt events*"
# View recent eventsdocker exec postgres psql -U postgres -d platform -c \ "SELECT event_id, aggregate_id, event_type, aggregate_version FROM events ORDER BY occurred_at DESC LIMIT 10;"
# Check aggregate eventsdocker exec postgres psql -U postgres -d platform -c \ "SELECT COUNT(*), MAX(aggregate_version) FROM events WHERE aggregate_id='user-123';"
# View event sourcing logsdocker logs my-service 2>&1 | grep -E "event|aggregate|apply"Common Event Sourcing Problems
Section titled “Common Event Sourcing Problems”1. Aggregate Concurrency Error
Section titled “1. Aggregate Concurrency Error”Error:
AggregateConcurrencyError: Concurrency conflict for aggregate user-123.Expected version 5, but actual version is 6Cause: Multiple commands modifying same aggregate concurrently
Fix - Implement Retry Logic:
import { AggregateConcurrencyError } from '@banyanai/platform-domain-modeling';
async function executeWithRetry(command: Command, maxRetries = 3) { for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await commandBus.execute(command); } catch (error) { if (error instanceof AggregateConcurrencyError && attempt < maxRetries) { console.log(`Retrying after concurrency conflict (${attempt}/${maxRetries})`); await sleep(100 * attempt); // Exponential backoff continue; } throw error; } }}2. Aggregate Not Found
Section titled “2. Aggregate Not Found”Error:
AggregateNotFoundError: User with ID user-123 not foundDiagnostic:
# Check if events exist for aggregatedocker exec postgres psql -U postgres -d platform -c \ "SELECT COUNT(*) FROM events WHERE aggregate_id='user-123';"Fix - Check Existence Before Loading:
@QueryHandler(GetUserQuery)export class GetUserHandler { async handle(query: GetUserQuery) { // Check if aggregate exists const exists = await this.aggregateAccess.exists(query.userId); if (!exists) { throw new NotFoundError('User', query.userId); }
return await this.aggregateAccess.getById(query.userId); }}3. Missing Event Handler
Section titled “3. Missing Event Handler”Error:
AggregateOperationError: Failed to apply event UserCreated to aggregate user-123Cause: Aggregate missing @ApplyEvent decorator for event type
Fix - Add Event Handler:
import { AggregateRoot, ApplyEvent } from '@banyanai/platform-domain-modeling';
@AggregateRoot()export class User { private userId!: string; private email!: string;
// MUST have handler for every event type @ApplyEvent('UserCreated') private onUserCreated(event: UserCreatedEvent): void { this.userId = event.userId; this.email = event.email; }
@ApplyEvent('UserEmailUpdated') private onEmailUpdated(event: UserEmailUpdatedEvent): void { this.email = event.newEmail; }}4. Event Store Schema Not Initialized
Section titled “4. Event Store Schema Not Initialized”Error:
Projections table not found in public schema.Ensure PostgresEventStore.initializeSchema() has been called first.Fix - Initialize Schema:
import { PostgresEventStore } from '@banyanai/platform-event-sourcing';
// Initialize event store FIRSTconst eventStore = new PostgresEventStore(dbConfig);await eventStore.initializeSchema();
// THEN start using itawait BaseService.start({ /* ... */ });5. Large Aggregate Performance Issues
Section titled “5. Large Aggregate Performance Issues”Symptom: Slow aggregate loading, timeouts
Diagnostic:
# Find aggregates with most eventsdocker exec postgres psql -U postgres -d platform -c \ "SELECT aggregate_id, COUNT(*) as event_count FROM events GROUP BY aggregate_id ORDER BY event_count DESC LIMIT 20;"Fix - Implement Snapshots:
import { SnapshotStore } from '@banyanai/platform-event-sourcing';
// Save snapshot every 100 eventsasync function saveWithSnapshot(aggregate: User, correlationId: string) { await this.aggregateAccess.save(aggregate, correlationId);
if (aggregate.version % 100 === 0) { await snapshotStore.saveSnapshot({ aggregateId: aggregate.userId, aggregateType: 'User', version: aggregate.version, state: aggregate.toSnapshot(), createdAt: new Date() }); }}
// Load from snapshotasync function loadFromSnapshot(aggregateId: string) { const snapshot = await snapshotStore.getLatestSnapshot(aggregateId, 'User');
if (snapshot) { const user = User.fromSnapshot(snapshot.state); const events = await eventStore.getEvents(aggregateId, snapshot.version + 1); for (const event of events) { user.applyEvent(event); } return user; }
return this.aggregateAccess.getById(aggregateId);}6. Event Migration Failures
Section titled “6. Event Migration Failures”Error:
No migrations found for event type UserCreatedFix - Define Event Migrations:
import { EventMigration } from '@banyanai/platform-event-sourcing';
// Migration from v1 to v2const userCreatedMigration: EventMigration = { eventType: 'UserCreated', fromVersion: 1, toVersion: 2, migrate: (eventData: any) => { return { ...eventData, status: 'active', // Add new field emailAddress: eventData.email, // Rename field email: undefined // Remove old field }; }};
EventMigrator.registerMigration(userCreatedMigration);7. Event Replay Deadlocks
Section titled “7. Event Replay Deadlocks”Error:
Replay already in progressFix - Use Advisory Locks:
async function replayWithLock(replayId: string) { const lockId = hashCode(replayId); const client = await pool.connect();
try { // Try to acquire lock const result = await client.query( 'SELECT pg_try_advisory_lock($1) as locked', [lockId] );
if (!result.rows[0].locked) { console.log('Replay already in progress, skipping'); return; }
// Perform replay await doReplay();
} finally { await client.query('SELECT pg_advisory_unlock($1)', [lockId]); client.release(); }}Event Sourcing Checklist
Section titled “Event Sourcing Checklist”- Event store schema initialized (
initializeSchema()) - All event types have
@ApplyEventhandlers in aggregate - Concurrency conflicts handled with retry logic
- Aggregate existence checked before loading
- Snapshots used for large aggregates (>100 events)
- Event migrations defined for schema changes
- Event data validated before persisting
- Replay operations use advisory locks
Diagnostic Queries
Section titled “Diagnostic Queries”-- Count events per aggregateSELECT aggregate_id, COUNT(*) as event_countFROM eventsGROUP BY aggregate_idORDER BY event_count DESCLIMIT 20;
-- Find aggregates with version gapsSELECT aggregate_id, aggregate_versionFROM events e1WHERE NOT EXISTS ( SELECT 1 FROM events e2 WHERE e2.aggregate_id = e1.aggregate_id AND e2.aggregate_version = e1.aggregate_version - 1)AND aggregate_version > 1;
-- View recent events by typeSELECT event_type, COUNT(*), MAX(occurred_at) as last_occurredFROM eventsWHERE occurred_at > NOW() - INTERVAL '1 hour'GROUP BY event_type;
-- Check for duplicate versions (concurrency violations)SELECT aggregate_id, aggregate_version, COUNT(*)FROM eventsGROUP BY aggregate_id, aggregate_versionHAVING COUNT(*) > 1;Best Practices
Section titled “Best Practices”- Always initialize schema first
await eventStore.initializeSchema();- Implement retry for concurrency
try { await save(aggregate);} catch (error) { if (error instanceof AggregateConcurrencyError) { // Reload and retry }}- Use snapshots for large aggregates
if (aggregate.version % 100 === 0) { await saveSnapshot(aggregate);}- Version all events
export class UserCreatedEvent { static readonly VERSION = 1; toJSON() { return { version: 1, ...data }; }}- Validate event data
@ApplyEvent('UserCreated')private onCreated(event: UserCreatedEvent): void { if (!event.userId || !event.email) { throw new Error('Invalid event data'); } this.userId = event.userId; this.email = event.email;}