Message Bus Guide
Message Bus Guide
Section titled “Message Bus Guide”Overview
Section titled “Overview”RabbitMQ is the exclusive communication method between all services in the banyan-core platform. Services never make direct HTTP calls to each other - all communication flows through the message bus.
Architecture
Section titled “Architecture”Service A RabbitMQ Service B │ │ │ │ 1. Send Command │ │ ├─────────────────────────>│ │ │ │ 2. Route to Queue │ │ ├────────────────────────>│ │ │ │ │ │ 3. Process & Respond │ │ │<────────────────────────┤ │ 4. Receive Response │ │ │<─────────────────────────┤ │Connection Configuration
Section titled “Connection Configuration”Environment Variables
Section titled “Environment Variables”# Message bus connectionRABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672
# Connection pool settings (optional)RABBITMQ_POOL_MIN=2RABBITMQ_POOL_MAX=10RABBITMQ_POOL_IDLE_TIMEOUT=30000Programmatic Configuration
Section titled “Programmatic Configuration”import { BaseService } from '@banyanai/platform-base-service';
await BaseService.start({ name: 'my-service', version: '1.0.0', messageBus: { connection: { url: process.env.RABBITMQ_URL || 'amqp://localhost:5672', heartbeat: 60, connectionTimeout: 10000 }, pool: { min: 2, max: 10, idleTimeout: 30000 } }});Queue Naming Convention
Section titled “Queue Naming Convention”Service Queues
Section titled “Service Queues”Format: service.{serviceName}.{queueType}.{messageType}
Examples:
service.user-service.commands.CreateUserservice.user-service.queries.GetUserservice.order-service.commands.ProcessOrderEvent Queues
Section titled “Event Queues”Format: exchange.platform.events.{serviceName}.{eventName}
Examples:
exchange.platform.events.user-service.usercreatedexchange.platform.events.order-service.orderplacedResponse Queues
Section titled “Response Queues”Format: response.{serviceName}.{correlationId}
Examples:
response.api-gateway.cor_abc123xyzresponse.user-service.cor_def456uvwMessage Patterns
Section titled “Message Patterns”1. Request-Response (Commands & Queries)
Section titled “1. Request-Response (Commands & Queries)”Used for: Commands (Create, Update, Delete) and Queries (Get, List)
// Sender (any service)const result = await messageBus.send(CreateUserContract, { email: 'alice@example.com', name: 'Alice Smith'});
// Handler (target service)@CommandHandler(CreateUserContract)export class CreateUserHandler { async handle(input: { email: string; name: string }) { const user = await this.userRepository.create(input); return user; // Automatically sent back to sender }}Flow:
- Sender publishes message to
service.user-service.commands.CreateUser - Handler receives message, processes it
- Handler returns result
- Result sent to sender’s response queue
- Sender receives response via correlation ID
2. Publish-Subscribe (Events)
Section titled “2. Publish-Subscribe (Events)”Used for: Domain events (UserCreated, OrderPlaced, etc.)
// Publisher (any service)await messageBus.publish(UserCreatedEvent, { userId: 'usr_123', email: 'alice@example.com', name: 'Alice Smith'});
// Subscriber (any service)await messageBus.subscribe( UserCreatedEvent, async (event) => { console.log('User created:', event.userId); // Handle event (send welcome email, update analytics, etc.) });Flow:
- Publisher sends event to
exchange.platform.eventsexchange - RabbitMQ routes event to all subscriber queues
- Each subscriber processes event independently
- No response expected (fire-and-forget)
Exchange Configuration
Section titled “Exchange Configuration”Platform Events Exchange
Section titled “Platform Events Exchange”Name: exchange.platform.eventsType: topicDurable: trueAuto-delete: falseRouting Keys: {serviceName}.{eventName}
Examples:
user-service.usercreatedorder-service.orderplacedpayment-service.paymentprocessedService Exchanges
Section titled “Service Exchanges”Created automatically per service:
Name: service.{serviceName}.commandsType: directDurable: trueAuto-delete: falseQueue Configuration
Section titled “Queue Configuration”Command/Query Queues
Section titled “Command/Query Queues”Queue: service.user-service.commands.CreateUserDurable: trueExclusive: falseAuto-delete: falsePrefetch: 10Event Subscriber Queues
Section titled “Event Subscriber Queues”Queue: exchange.platform.events.notification-service.usercreatedDurable: trueExclusive: falseAuto-delete: falsePrefetch: 5Response Queues
Section titled “Response Queues”Queue: response.api-gateway.{correlationId}Durable: falseExclusive: trueAuto-delete: trueTTL: 30000 # 30 secondsMessage Properties
Section titled “Message Properties”Standard Properties
Section titled “Standard Properties”Every message includes:
{ messageId: 'msg_abc123xyz', correlationId: 'cor_def456uvw', timestamp: Date, replyTo: 'response.api-gateway.cor_def456uvw', contentType: 'application/json', contentEncoding: 'utf-8', deliveryMode: 2, // Persistent priority: 1, // 0=low, 1=normal, 2=high headers: { 'x-service-name': 'api-gateway', 'x-message-type': 'CreateUserCommand', 'x-trace-id': 'cor_def456uvw' }}Message Envelope
Section titled “Message Envelope”All payloads wrapped in envelope:
interface MessageEnvelope<T> { id: string; correlationId: string; traceContext?: TraceContextData; timestamp: Date; serviceName: string; messageType: string; payload: T; metadata: { auth?: MessageAuthContext; retry?: RetryMetadata; routing?: RoutingMetadata; };}Connection Pooling
Section titled “Connection Pooling”Pool Management
Section titled “Pool Management”The platform maintains connection pools:
{ min: 2, // Minimum connections max: 10, // Maximum connections idleTimeout: 30000 // Close idle after 30s}Channel Management
Section titled “Channel Management”Channels are acquired/released per operation:
// Acquire channel from poolconst channel = await channelManager.acquireChannel(connection, 'rpc');
try { // Use channel for operation await channel.sendToQueue(queue, message);} finally { // Always release channel channelManager.releaseChannel(channel);}Reliability Features
Section titled “Reliability Features”1. Message Persistence
Section titled “1. Message Persistence”All commands and events are durable:
// Message persisted to diskawait messageBus.send(CreateUserContract, payload);await messageBus.publish(UserCreatedEvent, event);2. Acknowledgments
Section titled “2. Acknowledgments”Messages acknowledged after successful processing:
@CommandHandler(CreateUserContract)export class CreateUserHandler { async handle(input: any) { const result = await this.userRepository.create(input); return result; // Auto-acknowledged on success }}3. Dead Letter Queue
Section titled “3. Dead Letter Queue”Failed messages routed to DLQ:
Queue: dlq.service.user-service.commands.CreateUserAfter max retries (default 3), messages moved to DLQ for inspection.
4. Circuit Breaker
Section titled “4. Circuit Breaker”Automatic circuit breaking on repeated failures:
{ failureThreshold: 5, // Open after 5 failures successThreshold: 2, // Close after 2 successes recoveryTimeout: 30000, // Try recovery after 30s monitoringWindow: 60000 // Track failures over 60s}5. Retry Policy
Section titled “5. Retry Policy”Exponential backoff for transient failures:
{ maxAttempts: 3, initialDelay: 1000, // 1 second maxDelay: 30000, // 30 seconds backoffMultiplier: 2.0, jitter: true}Performance Optimization
Section titled “Performance Optimization”Prefetch Count
Section titled “Prefetch Count”Control concurrent message processing:
// Low prefetch for heavy processingawait messageBus.registerHandler(ProcessVideoContract, handler, { prefetch: 1});
// Higher prefetch for light processingawait messageBus.registerHandler(GetUserContract, handler, { prefetch: 10});Message Batching
Section titled “Message Batching”Batch multiple messages:
await messageBus.publishBatch([ { contract: UserCreatedEvent, payload: user1 }, { contract: UserCreatedEvent, payload: user2 }, { contract: UserCreatedEvent, payload: user3 }]);Message Compression
Section titled “Message Compression”Large messages auto-compressed:
// Messages > 1KB automatically compressed with gzipawait messageBus.send(BulkImportContract, largePayload);Monitoring
Section titled “Monitoring”RabbitMQ Management UI
Section titled “RabbitMQ Management UI”Access at: http://localhost:55672
Credentials: admin / admin123
Features:
- Queue metrics (depth, rate, consumers)
- Connection monitoring
- Channel statistics
- Message rates and details
- Exchange configuration
Key Metrics
Section titled “Key Metrics”Monitor these in RabbitMQ UI or Grafana:
| Metric | Threshold | Action |
|---|---|---|
| Queue Depth | > 1000 | Scale consumers |
| Message Rate | Dropping | Check service health |
| Connection Count | > 50 | Check connection leaks |
| Channel Count | > 200 | Check channel leaks |
| Unacked Messages | > 100 | Check handler performance |
Health Checks
Section titled “Health Checks”# RabbitMQ alarmscurl http://localhost:55672/api/health/checks/alarms
# Queue statscurl -u admin:admin123 http://localhost:55672/api/queuesBest Practices
Section titled “Best Practices”1. Always Use Contracts
Section titled “1. Always Use Contracts”// Good: Type-safe contractawait messageBus.send(CreateUserContract, payload);
// Avoid: Raw queue names (not supported)// await messageBus.sendToQueue('some-queue', payload);2. Handle Errors Gracefully
Section titled “2. Handle Errors Gracefully”@CommandHandler(CreateUserContract)export class CreateUserHandler { async handle(input: any) { try { return await this.userRepository.create(input); } catch (error) { // Log error for observability this.logger.error('Failed to create user', error); // Rethrow for retry logic throw error; } }}3. Set Appropriate Timeouts
Section titled “3. Set Appropriate Timeouts”// Short timeout for queriesawait messageBus.send(GetUserContract, { id: 'usr_123' }, { timeout: 5000 // 5 seconds});
// Longer timeout for long-running commandsawait messageBus.send(ProcessVideoContract, video, { timeout: 300000 // 5 minutes});4. Use Event Subscriptions Wisely
Section titled “4. Use Event Subscriptions Wisely”// Good: Specific event subscriptionawait messageBus.subscribe(UserCreatedEvent, handler, { subscriptionGroup: 'email-service' // Load balancing});
// Avoid: Subscribing to all events// Creates too many queues and overhead5. Clean Up Resources
Section titled “5. Clean Up Resources”// Always disconnect on shutdownprocess.on('SIGTERM', async () => { await messageBus.disconnect(); process.exit(0);});Troubleshooting
Section titled “Troubleshooting”Connection Refused
Section titled “Connection Refused”Cause: RabbitMQ not running or wrong URL
Solution:
# Check RabbitMQ is runningdocker compose ps rabbitmq
# Check connection stringecho $RABBITMQ_URLQueue Not Found
Section titled “Queue Not Found”Cause: Service not running or contract not registered
Solution:
# Check service registered contractscurl http://localhost:3002/api/contracts
# Verify queue existscurl -u admin:admin123 http://localhost:55672/api/queuesMessages Stuck in Queue
Section titled “Messages Stuck in Queue”Cause: No consumers or handler errors
Solution:
# Check consumerscurl -u admin:admin123 http://localhost:55672/api/queues/%2F/service.user-service.commands.CreateUser
# Check service logsdocker compose logs user-serviceHigh Memory Usage
Section titled “High Memory Usage”Cause: Queue buildup or message size
Solution:
# Check queue depthcurl -u admin:admin123 http://localhost:55672/api/queues | grep messages_ready
# Purge queue (development only)curl -u admin:admin123 -X DELETE \ http://localhost:55672/api/queues/%2F/service.user-service.commands.CreateUser/contents