Events Reference
Events Reference
Section titled “Events Reference”Overview
Section titled “Overview”Events represent things that happened in the system. They follow publish-subscribe pattern where multiple services can subscribe to and react to events independently.
Event Characteristics
Section titled “Event Characteristics”- Intent: Notify about state changes
- Pattern: Publish-subscribe (asynchronous)
- Subscribers: Zero or more subscribers per event
- Response: No response (fire-and-forget)
- Routing: Broadcast via exchange to all subscribers
- Side Effects: Multiple independent actions
- Eventual Consistency: Subscribers process asynchronously
Event Message Structure
Section titled “Event Message Structure”Complete Example
Section titled “Complete Example”{ "id": "evt_abc123xyz789", "correlationId": "cor_def456uvw012", "traceContext": { "traceId": "0af7651916cd43dd8448eb211c80319c", "spanId": "b7ad6b7169203331", "traceFlags": "01" }, "timestamp": "2025-11-15T10:30:00.123Z", "serviceName": "user-service", "messageType": "UserCreated", "payload": { "userId": "usr_abc123", "email": "alice@example.com", "name": "Alice Smith", "role": "user", "createdAt": "2025-11-15T10:30:00.000Z" }, "metadata": { "routing": { "priority": "normal" } }}Event Naming Convention
Section titled “Event Naming Convention”Pattern
Section titled “Pattern”{Noun}{PastTenseVerb}
Examples:- UserCreated- OrderPlaced- PaymentProcessed- SubscriptionCancelledBest Practices
Section titled “Best Practices”// Good: Past tense, domain eventUserCreatedOrderPlacedPaymentProcessedEmailSent
// Avoid: Present tense, command-likeCreateUser // This is a commandPlaceOrder // This is a commandUserCreate // Wrong tenseOrderPlace // Wrong tenseQueue Naming
Section titled “Queue Naming”Format
Section titled “Format”exchange.platform.events.{subscriberService}.{eventName}Examples
Section titled “Examples”exchange.platform.events.email-service.usercreatedexchange.platform.events.analytics-service.usercreatedexchange.platform.events.notification-service.orderplacedNote: Event name is lowercase, no dashes.
Publishing Events
Section titled “Publishing Events”Basic Publish
Section titled “Basic Publish”import { messageBus } from '@banyanai/platform-message-bus-client';import { UserCreatedEvent } from './contracts/UserEvents.js';
// Publish eventawait messageBus.publish(UserCreatedEvent, { userId: 'usr_abc123', email: 'alice@example.com', name: 'Alice Smith', role: 'user', createdAt: new Date()});
console.log('Event published');// No response - fire and forgetWith Options
Section titled “With Options”// Publish with priorityawait messageBus.publish( PaymentProcessedEvent, { paymentId: 'pay_xyz789', orderId: 'ord_abc123', amount: 99.99, currency: 'USD' }, { priority: 'high', // Prioritize this event waitForConfirmation: true // Wait for RabbitMQ confirmation });From Command Handler
Section titled “From Command Handler”@CommandHandler(CreateUserContract)export class CreateUserHandler { async handle(input: { email: string; name: string }) { // Create user const user = await this.userRepository.create(input);
// Publish event await this.messageBus.publish(UserCreatedEvent, { userId: user.id, email: user.email, name: user.name, role: user.role, createdAt: user.createdAt });
return user; }}Subscribing to Events
Section titled “Subscribing to Events”Basic Subscription
Section titled “Basic Subscription”import { messageBus } from '@banyanai/platform-message-bus-client';import { UserCreatedEvent } from './contracts/UserEvents.js';
// Subscribe to eventawait messageBus.subscribe( UserCreatedEvent, async (event) => { console.log('User created:', event.userId); // Handle event (send welcome email, update analytics, etc.) });Event Handler Class
Section titled “Event Handler Class”import { EventHandler } from '@banyanai/platform-cqrs';import { UserCreatedEvent } from '../contracts/UserEvents.js';
@EventHandler(UserCreatedEvent)export class UserCreatedHandler { constructor( private readonly emailService: EmailService, private readonly logger: Logger ) {}
async handle(event: { userId: string; email: string; name: string; }) { this.logger.info('Processing UserCreated event', { userId: event.userId });
// Send welcome email await this.emailService.sendWelcomeEmail({ to: event.email, name: event.name });
this.logger.info('Welcome email sent', { userId: event.userId }); }}Multiple Subscribers
Section titled “Multiple Subscribers”Different services can subscribe to the same event:
// Email Service@EventHandler(UserCreatedEvent)export class SendWelcomeEmailHandler { async handle(event: any) { await this.emailService.sendWelcomeEmail(event); }}
// Analytics Service@EventHandler(UserCreatedEvent)export class TrackUserCreationHandler { async handle(event: any) { await this.analytics.track('user_created', event); }}
// Notification Service@EventHandler(UserCreatedEvent)export class NotifyAdminsHandler { async handle(event: any) { await this.notifications.notifyAdmins('New user registered', event); }}With Subscription Options
Section titled “With Subscription Options”await messageBus.subscribe( OrderPlacedEvent, async (event) => { await this.processOrder(event); }, { subscriptionGroup: 'order-processor', // Load balancing group concurrency: 5, // Process 5 events concurrently prefetch: 10, // Prefetch 10 messages autoAck: true // Auto-acknowledge on success });Common Event Patterns
Section titled “Common Event Patterns”Entity Created
Section titled “Entity Created”// UserCreated{ "messageType": "UserCreated", "payload": { "userId": "usr_abc123", "email": "alice@example.com", "name": "Alice Smith", "role": "user", "createdAt": "2025-11-15T10:30:00Z" }}Entity Updated
Section titled “Entity Updated”// UserUpdated{ "messageType": "UserUpdated", "payload": { "userId": "usr_abc123", "changes": { "name": { "old": "Alice Smith", "new": "Alice Johnson" }, "role": { "old": "user", "new": "admin" } }, "updatedAt": "2025-11-15T10:35:00Z", "updatedBy": "usr_admin_456" }}Entity Deleted
Section titled “Entity Deleted”// UserDeleted{ "messageType": "UserDeleted", "payload": { "userId": "usr_abc123", "email": "alice@example.com", "deletedAt": "2025-11-15T10:40:00Z", "deletedBy": "usr_admin_456", "reason": "User requested account deletion" }}State Transition
Section titled “State Transition”// OrderStatusChanged{ "messageType": "OrderStatusChanged", "payload": { "orderId": "ord_xyz789", "oldStatus": "pending", "newStatus": "processing", "changedAt": "2025-11-15T10:45:00Z", "reason": "Payment confirmed" }}Business Event
Section titled “Business Event”// PaymentProcessed{ "messageType": "PaymentProcessed", "payload": { "paymentId": "pay_abc123", "orderId": "ord_xyz789", "amount": 99.99, "currency": "USD", "method": "credit_card", "processedAt": "2025-11-15T10:50:00Z" }}Event Sourcing Integration
Section titled “Event Sourcing Integration”Publishing Domain Events
Section titled “Publishing Domain Events”@CommandHandler(PlaceOrderContract)export class PlaceOrderHandler { async handle(input: any) { // Create order aggregate const order = new Order(); order.place(input);
// Save events await this.eventStore.save(order.id, order.uncommittedEvents);
// Publish events to message bus for (const event of order.uncommittedEvents) { await this.messageBus.publish(event.type, event.data); }
return { orderId: order.id }; }}Event Store Integration
Section titled “Event Store Integration”// Events stored in event store AND published to message busawait this.eventStore.appendToStream('order-' + orderId, [ { type: 'OrderPlaced', data: { orderId, userId, items, total }, metadata: { correlationId: context.correlationId, causationId: context.messageId } }]);
// Automatically published to message busawait this.messageBus.publish(OrderPlacedEvent, { orderId, userId, items, total});Error Handling
Section titled “Error Handling”Retry on Failure
Section titled “Retry on Failure”@EventHandler(UserCreatedEvent)export class SendWelcomeEmailHandler { async handle(event: any) { try { await this.emailService.sendWelcomeEmail(event); } catch (error) { this.logger.error('Failed to send welcome email', error, { userId: event.userId });
// Throw to trigger retry throw error; } }}Dead Letter Queue
Section titled “Dead Letter Queue”Events that fail after max retries go to DLQ:
Queue: dlq.exchange.platform.events.email-service.usercreatedManual inspection and reprocessing required.
Idempotent Handlers
Section titled “Idempotent Handlers”@EventHandler(UserCreatedEvent)export class SendWelcomeEmailHandler { async handle(event: { userId: string; email: string }) { // Check if already processed (idempotency) const processed = await this.processedEvents.exists( `welcome-email:${event.userId}` );
if (processed) { this.logger.info('Event already processed, skipping', { userId: event.userId }); return; }
// Send email await this.emailService.sendWelcomeEmail(event);
// Mark as processed await this.processedEvents.add( `welcome-email:${event.userId}`, 86400 // 24h TTL ); }}Event Versioning
Section titled “Event Versioning”Version 1
Section titled “Version 1”// UserCreatedV1{ "messageType": "UserCreated", "version": 1, "payload": { "userId": "usr_abc123", "email": "alice@example.com", "name": "Alice Smith" }}Version 2 (Added fields)
Section titled “Version 2 (Added fields)”// UserCreatedV2{ "messageType": "UserCreated", "version": 2, "payload": { "userId": "usr_abc123", "email": "alice@example.com", "name": "Alice Smith", "role": "user", // New field "preferences": { // New field "newsletter": true } }}Handling Multiple Versions
Section titled “Handling Multiple Versions”@EventHandler(UserCreatedEvent)export class UserCreatedHandler { async handle(event: any) { const version = event.version || 1;
switch (version) { case 1: return this.handleV1(event); case 2: return this.handleV2(event); default: this.logger.warn('Unknown event version', { version }); } }
private async handleV1(event: any) { // Handle v1 format await this.emailService.sendWelcomeEmail({ to: event.email, name: event.name }); }
private async handleV2(event: any) { // Handle v2 format with new fields await this.emailService.sendWelcomeEmail({ to: event.email, name: event.name, includeNewsletter: event.preferences?.newsletter }); }}Event Ordering
Section titled “Event Ordering”Guaranteed Ordering
Section titled “Guaranteed Ordering”Events from same aggregate are ordered:
// Order 1await messageBus.publish(OrderPlacedEvent, { orderId: 'ord_123' });
// Order 2await messageBus.publish(OrderPaymentProcessedEvent, { orderId: 'ord_123' });
// Order 3await messageBus.publish(OrderShippedEvent, { orderId: 'ord_123' });Subscriber receives in order for same orderId.
Out-of-Order Handling
Section titled “Out-of-Order Handling”@EventHandler(OrderShippedEvent)export class OrderShippedHandler { async handle(event: { orderId: string }) { // Check order exists and is paid const order = await this.orderRepository.findById(event.orderId);
if (!order) { this.logger.warn('Order not found, requeueing event', { orderId: event.orderId }); // Retry later throw new Error('Order not found'); }
if (order.status !== 'paid') { this.logger.warn('Order not paid yet, requeueing event', { orderId: event.orderId }); // Retry later throw new Error('Order not paid'); }
// Process shipping await this.shippingService.ship(order); }}Best Practices
Section titled “Best Practices”1. Use Past Tense
Section titled “1. Use Past Tense”// GoodUserCreatedOrderPlacedPaymentProcessed
// AvoidCreateUserPlaceOrderProcessPayment2. Include Complete Data
Section titled “2. Include Complete Data”// Good: Complete event dataawait messageBus.publish(UserCreatedEvent, { userId: user.id, email: user.email, name: user.name, role: user.role, createdAt: user.createdAt, createdBy: context.userId});
// Avoid: Minimal data (requires lookup)await messageBus.publish(UserCreatedEvent, { userId: user.id // Subscribers have to fetch rest});3. Make Handlers Idempotent
Section titled “3. Make Handlers Idempotent”// Always check if already processedconst processed = await this.processedEvents.exists(eventId);if (processed) return;
// Process eventawait this.doSomething();
// Mark as processedawait this.processedEvents.add(eventId);4. Log Event Processing
Section titled “4. Log Event Processing”@EventHandler(UserCreatedEvent)export class UserCreatedHandler { async handle(event: any) { this.logger.info('Processing UserCreated event', { userId: event.userId });
await this.sendWelcomeEmail(event);
this.logger.info('UserCreated event processed', { userId: event.userId }); }}5. Don’t Block on Event Publishing
Section titled “5. Don’t Block on Event Publishing”// Good: Publish and continueawait this.userRepository.create(user);await this.messageBus.publish(UserCreatedEvent, user); // Fire and forgetreturn user;
// Avoid: Waiting for all subscribers// Events are async - don't wait for subscribers to process