Skip to content

Event Sourcing Implementation Guide

  • You need to implement complete event sourcing for an aggregate
  • You want to understand event streams and event replay
  • You’re building audit trails and temporal queries
  • You need to implement snapshots for performance
// 1. Define aggregate with event sourcing
@Aggregate('User')
export class User extends AggregateRoot {
private constructor(private props: UserProps) {
super(props.id || '', 'User');
}
// Factory method emits events
static create(props: Omit<UserProps, 'id'>): User {
const id = uuidv4();
const user = new User({ ...props, id });
// Emit creation event
user.raiseEvent('UserCreated', {
email: props.email,
createdAt: new Date()
});
return user;
}
// Business methods emit events
changeEmail(newEmail: string): void {
this.props.email = newEmail;
this.raiseEvent('UserEmailChanged', { newEmail });
}
// Event sourcing: Reconstruct from events
protected applyEventToState(event: DomainEvent): void {
switch (event.eventType) {
case 'UserCreated':
// State already set in constructor
break;
case 'UserEmailChanged':
this.props.email = event.eventData.newEmail as string;
break;
}
}
}
// 2. Save events to event store
const eventStore = BaseService.getEventStore();
await eventStore.append(user.id, user.getUncommittedEvents());
// 3. Load aggregate from events
const events = await eventStore.getEvents(userId);
const user = User.fromEvents(events);
const eventStore = BaseService.getEventStore();
// Get uncommitted events from aggregate
const events = user.getUncommittedEvents();
// Append to event store (with optimistic concurrency)
await eventStore.append(user.id, events);
// Events are now persisted and published
// Get all events for an aggregate
const events = await eventStore.getEvents(userId);
// Reconstruct aggregate from events
const user = User.fromEvents(events);
// Stream events (for large event streams)
const stream = eventStore.streamEvents(userId);
for await (const event of stream) {
console.log(event.eventType, event.occurredAt);
}
import { Aggregate, AggregateRoot, DomainEvent } from '@banyanai/platform-domain-modeling';
@Aggregate('Order')
export class Order extends AggregateRoot {
private constructor(private props: OrderProps) {
super(props.id || '', 'Order');
}
}
static create(customerId: string, items: OrderItem[]): Order {
const id = uuidv4();
const order = new Order({
id,
customerId,
items,
status: 'pending',
createdAt: new Date()
});
// Raise creation event
order.raiseEvent('OrderCreated', {
orderId: id,
customerId,
items,
totalAmount: order.calculateTotal()
});
return order;
}
confirm(): void {
// Validate business rules
if (this.props.status !== 'pending') {
throw new Error('Only pending orders can be confirmed');
}
// Update state
this.props.status = 'confirmed';
this.props.confirmedAt = new Date();
// Raise event
this.raiseEvent('OrderConfirmed', {
confirmedAt: this.props.confirmedAt
});
}
cancel(reason: string): void {
if (this.props.status === 'shipped') {
throw new Error('Cannot cancel shipped orders');
}
this.props.status = 'cancelled';
this.raiseEvent('OrderCancelled', { reason });
}
protected applyEventToState(event: DomainEvent): void {
switch (event.eventType) {
case 'OrderCreated':
// State set in constructor
break;
case 'OrderConfirmed':
this.props.status = 'confirmed';
this.props.confirmedAt = event.eventData.confirmedAt as Date;
break;
case 'OrderCancelled':
this.props.status = 'cancelled';
break;
case 'OrderItemAdded':
this.props.items.push(event.eventData.item as OrderItem);
break;
case 'OrderItemRemoved':
this.props.items = this.props.items.filter(
item => item.id !== event.eventData.itemId
);
break;
}
}
// Version 1
{
type: 'UserCreated',
data: { email: string }
}
// Version 2 (added name)
{
type: 'UserCreated',
version: 2,
data: { email: string, name: string }
}
// Handle both versions in replay
protected applyEventToState(event: DomainEvent): void {
if (event.eventType === 'UserCreated') {
if (event.aggregateVersion === 1) {
// Old version - provide default
this.props.name = 'Unknown';
} else {
// New version
this.props.name = event.eventData.name as string;
}
}
}

For aggregates with many events, use snapshots.

// Save snapshot
await snapshotManager.save(userId, user.toProps());
// Load with snapshot
const snapshot = await snapshotManager.load(userId);
if (snapshot) {
const user = User.fromProps(snapshot.state);
const eventsAfterSnapshot = await eventStore.getEvents(userId, snapshot.version);
user.replayEvents(eventsAfterSnapshot);
} else {
const events = await eventStore.getEvents(userId);
const user = User.fromEvents(events);
}
describe('Order aggregate', () => {
it('should create order and emit event', () => {
const order = Order.create('customer-123', [
{ productId: 'p1', quantity: 2, price: 10 }
]);
const events = order.getUncommittedEvents();
expect(events).toHaveLength(1);
expect(events[0].eventType).toBe('OrderCreated');
expect(events[0].eventData.customerId).toBe('customer-123');
});
it('should only confirm pending orders', () => {
const order = Order.create('customer-123', [...]);
order.confirm();
expect(() => order.confirm()).toThrow('Only pending orders can be confirmed');
});
it('should reconstruct from events', () => {
const events: DomainEvent[] = [
{ eventType: 'OrderCreated', eventData: {...} },
{ eventType: 'OrderConfirmed', eventData: {...} }
];
const order = Order.fromEvents(events);
expect(order.status).toBe('confirmed');
});
});

Don’t query events for reads

// DON'T DO THIS - Slow!
const events = await eventStore.getEvents(userId);
const user = User.fromEvents(events);
return user.email;

Use read models

// DO THIS - Fast!
const user = await UserReadModel.findById<UserReadModel>(userId);
return user.email;