Message Routing Reference
Message Routing Reference
Section titled “Message Routing Reference”Overview
Section titled “Overview”This reference documents how messages are routed through RabbitMQ in the banyan-core platform, including queue naming, exchange configuration, and routing patterns.
Routing Patterns
Section titled “Routing Patterns”1. Direct Routing (Commands & Queries)
Section titled “1. Direct Routing (Commands & Queries)”Messages routed directly to specific service queues:
Sender → Queue → Handler (1:1)Example:
api-gateway → service.user-service.commands.CreateUser → CreateUserHandler2. Topic Routing (Events)
Section titled “2. Topic Routing (Events)”Messages broadcast to all subscriber queues via exchange:
Publisher → Exchange → [Subscriber Queue 1, Subscriber Queue 2, ...]Example:
user-service → exchange.platform.events → [ exchange.platform.events.email-service.usercreated, exchange.platform.events.analytics-service.usercreated, exchange.platform.events.notification-service.usercreated]Queue Naming Conventions
Section titled “Queue Naming Conventions”Command Queues
Section titled “Command Queues”Format: service.{serviceName}.commands.{CommandName}
Examples:
service.user-service.commands.CreateUserservice.order-service.commands.ProcessOrderservice.payment-service.commands.CapturePaymentProperties:
- Durable: Yes
- Exclusive: No
- Auto-delete: No
- Prefetch: 10 (default)
Query Queues
Section titled “Query Queues”Format: service.{serviceName}.queries.{QueryName}
Examples:
service.user-service.queries.GetUserservice.order-service.queries.ListOrdersservice.product-service.queries.SearchProductsProperties:
- Durable: Yes
- Exclusive: No
- Auto-delete: No
- Prefetch: 10 (default)
Event Subscriber Queues
Section titled “Event Subscriber Queues”Format: exchange.platform.events.{subscriberService}.{eventName}
Examples:
exchange.platform.events.email-service.usercreatedexchange.platform.events.analytics-service.usercreatedexchange.platform.events.notification-service.orderplacedNote: Event name is lowercase with no dashes.
Properties:
- Durable: Yes
- Exclusive: No
- Auto-delete: No
- Prefetch: 5 (default)
Response Queues
Section titled “Response Queues”Format: response.{serviceName}.{correlationId}
Examples:
response.api-gateway.cor_abc123xyzresponse.user-service.cor_def456uvwProperties:
- Durable: No (transient)
- Exclusive: Yes (single consumer)
- Auto-delete: Yes (deleted when consumer disconnects)
- TTL: 30 seconds
Dead Letter Queues
Section titled “Dead Letter Queues”Format: dlq.{originalQueueName}
Examples:
dlq.service.user-service.commands.CreateUserdlq.exchange.platform.events.email-service.usercreatedProperties:
- Durable: Yes
- Exclusive: No
- Auto-delete: No
Exchange Configuration
Section titled “Exchange Configuration”Platform Events Exchange
Section titled “Platform Events Exchange”Name: exchange.platform.events
Properties:
Type: topicDurable: trueAuto-delete: falseInternal: falseRouting Keys: {serviceName}.{eventName}
Examples:
user-service.usercreatedorder-service.orderplacedpayment-service.paymentprocessedService Command Exchanges (Optional)
Section titled “Service Command Exchanges (Optional)”Name: service.{serviceName}.commands
Properties:
Type: directDurable: trueAuto-delete: falseUsage: Advanced routing scenarios
Routing Keys
Section titled “Routing Keys”Event Routing Keys
Section titled “Event Routing Keys”Format: {serviceName}.{eventName}
Examples:
user-service.usercreateduser-service.userupdateduser-service.userdeletedorder-service.orderplacedorder-service.ordershipppedpayment-service.paymentprocessedPattern Matching:
# Subscribe to all user eventsuser-service.*
# Subscribe to all events from any service*.*
# Subscribe to specific event from any service*.usercreatedBinding Patterns
Section titled “Binding Patterns”// Subscribe to all user eventsawait channel.bindQueue( 'my-service-queue', 'exchange.platform.events', 'user-service.*');
// Subscribe to UserCreated from any serviceawait channel.bindQueue( 'my-service-queue', 'exchange.platform.events', '*.usercreated');
// Subscribe to all eventsawait channel.bindQueue( 'my-service-queue', 'exchange.platform.events', '#');Message Priority
Section titled “Message Priority”Priority Levels
Section titled “Priority Levels”| Priority | Value | Use Case |
|---|---|---|
| Low | 0 | Background tasks, analytics |
| Normal | 1 | Standard operations (default) |
| High | 2 | Critical operations, payments |
Setting Priority
Section titled “Setting Priority”// Send with high priorityawait messageBus.send( ProcessPaymentContract, payload, { priority: 'high' });
// Publish with low priorityawait messageBus.publish( AnalyticsEvent, data, { priority: 'low' });Queue Priority Configuration
Section titled “Queue Priority Configuration”// Enable queue with priority supportawait channel.assertQueue('service.payment-service.commands.CapturePayment', { durable: true, arguments: { 'x-max-priority': 2 // Support priorities 0-2 }});Routing Metadata
Section titled “Routing Metadata”Routing Options
Section titled “Routing Options”interface RoutingMetadata { /** Message priority */ priority?: 'low' | 'normal' | 'high';
/** Response timeout in milliseconds */ timeout?: number;
/** Custom routing key override */ routingKey?: string;
/** Custom exchange override */ exchange?: string;
/** Custom queue override (direct routing) */ queue?: string;}Custom Routing
Section titled “Custom Routing”// Override routing keyawait messageBus.publish( CustomEvent, data, { metadata: { routing: { routingKey: 'custom.event.key' } } });
// Direct to specific queueawait messageBus.send( CustomCommand, data, { metadata: { routing: { queue: 'service.custom-service.commands.Custom' } } });Request-Response Flow
Section titled “Request-Response Flow”Flow Diagram
Section titled “Flow Diagram”1. Client generates correlation ID ↓2. Client creates response queue: response.client.{correlationId} ↓3. Client sends message with replyTo = response queue ↓4. Handler processes message ↓5. Handler sends response to replyTo queue ↓6. Client receives response via correlation IDImplementation
Section titled “Implementation”// Sender side (automatic)const correlationId = generateCorrelationId();const replyQueue = `response.api-gateway.${correlationId}`;
await channel.assertQueue(replyQueue, { exclusive: true, autoDelete: true, expires: 30000 // 30 seconds});
await channel.sendToQueue( 'service.user-service.commands.CreateUser', Buffer.from(JSON.stringify(payload)), { correlationId, replyTo: replyQueue });
// Handler side (automatic)channel.consume('service.user-service.commands.CreateUser', async (msg) => { const result = await handler(msg.content);
// Send response back await channel.sendToQueue( msg.properties.replyTo, Buffer.from(JSON.stringify(result)), { correlationId: msg.properties.correlationId } );
channel.ack(msg);});Load Balancing
Section titled “Load Balancing”Competing Consumers
Section titled “Competing Consumers”Multiple instances share the same queue:
Queue: service.user-service.commands.CreateUser ↓[Instance 1] [Instance 2] [Instance 3] Message 1 Message 2 Message 3Distribution: Round-robin by default
Prefetch Configuration
Section titled “Prefetch Configuration”// Low concurrency for CPU-intensive tasksawait channel.prefetch(1);
// High concurrency for I/O-bound tasksawait channel.prefetch(10);
// Per-consumer prefetchawait channel.consume(queue, handler, { prefetch: 5});Message TTL
Section titled “Message TTL”Queue-Level TTL
Section titled “Queue-Level TTL”// All messages expire after 1 hourawait channel.assertQueue('service.temp-service.commands.Process', { durable: true, arguments: { 'x-message-ttl': 3600000 // 1 hour in milliseconds }});Message-Level TTL
Section titled “Message-Level TTL”// Individual message TTLawait channel.sendToQueue(queue, message, { expiration: '60000' // 60 seconds (string!)});Response Queue TTL
Section titled “Response Queue TTL”// Response queues auto-delete after 30 secondsawait channel.assertQueue(responseQueue, { exclusive: true, autoDelete: true, arguments: { 'x-expires': 30000 // Queue expires if unused for 30s }});Dead Letter Exchanges
Section titled “Dead Letter Exchanges”Configuration
Section titled “Configuration”// Configure queue with DLXawait channel.assertQueue('service.user-service.commands.CreateUser', { durable: true, arguments: { 'x-dead-letter-exchange': 'dlx.platform', 'x-dead-letter-routing-key': 'dlq.service.user-service.commands.CreateUser' }});
// Create DLQawait channel.assertQueue('dlq.service.user-service.commands.CreateUser', { durable: true});
// Bind DLQ to DLXawait channel.bindQueue( 'dlq.service.user-service.commands.CreateUser', 'dlx.platform', 'dlq.service.user-service.commands.CreateUser');Message Rejection
Section titled “Message Rejection”// Reject with requeue (retry)channel.nack(msg, false, true);
// Reject without requeue (send to DLQ)channel.nack(msg, false, false);Routing Scenarios
Section titled “Routing Scenarios”Scenario 1: Command to Single Service
Section titled “Scenario 1: Command to Single Service”Client: api-gatewayTarget: user-serviceMessage: CreateUserCommand
Routing: api-gateway → service.user-service.commands.CreateUser → user-service (instance 1, 2, or 3) → response.api-gateway.{correlationId} → api-gatewayScenario 2: Event to Multiple Services
Section titled “Scenario 2: Event to Multiple Services”Publisher: user-serviceEvent: UserCreated
Routing: user-service → exchange.platform.events (routing key: user-service.usercreated) → exchange.platform.events.email-service.usercreated → email-service
AND → exchange.platform.events.analytics-service.usercreated → analytics-service
AND → exchange.platform.events.notification-service.usercreated → notification-serviceScenario 3: Query with Caching
Section titled “Scenario 3: Query with Caching”Client: api-gatewayTarget: user-serviceMessage: GetUserQuery
Routing (cache miss): api-gateway → service.user-service.queries.GetUser → user-service → response.api-gateway.{correlationId} → api-gateway → (cache result)
Routing (cache hit): api-gateway → (return cached result, no message sent)Performance Optimization
Section titled “Performance Optimization”Connection Pooling
Section titled “Connection Pooling”// Maintain pool of connectionsconst pool = new ConnectionPool({ min: 2, max: 10, idleTimeout: 30000});
// Acquire connection for operationconst conn = await pool.acquire();try { await sendMessage(conn, message);} finally { pool.release(conn);}Channel Pooling
Section titled “Channel Pooling”// Reuse channels for operationsconst channel = await channelPool.acquire('rpc');try { await channel.sendToQueue(queue, message);} finally { channelPool.release(channel);}Batch Publishing
Section titled “Batch Publishing”// Publish multiple events in batchawait channel.publish(exchange, '', Buffer.from(message1));await channel.publish(exchange, '', Buffer.from(message2));await channel.publish(exchange, '', Buffer.from(message3));await channel.waitForConfirms(); // Wait once for allMonitoring Routing
Section titled “Monitoring Routing”Queue Metrics
Section titled “Queue Metrics”# Check queue depthcurl -u admin:admin123 http://localhost:55672/api/queues/%2F/service.user-service.commands.CreateUser | \ jq '.messages_ready'
# Check consumerscurl -u admin:admin123 http://localhost:55672/api/queues/%2F/service.user-service.commands.CreateUser | \ jq '.consumers'
# Check message ratescurl -u admin:admin123 http://localhost:55672/api/queues/%2F/service.user-service.commands.CreateUser | \ jq '.message_stats'Exchange Metrics
Section titled “Exchange Metrics”# Check exchange publish ratecurl -u admin:admin123 http://localhost:55672/api/exchanges/%2F/exchange.platform.events | \ jq '.message_stats.publish_in'
# Check bindingscurl -u admin:admin123 http://localhost:55672/api/exchanges/%2F/exchange.platform.events/bindings/source | \ jq '.[].routing_key'Troubleshooting
Section titled “Troubleshooting”Message Not Reaching Handler
Section titled “Message Not Reaching Handler”Check:
- Queue exists:
curl -u admin:admin123 http://localhost:55672/api/queues - Handler registered: Check service logs
- Consumer active: Check queue consumers count
- Message in queue: Check
messages_ready
Messages Going to DLQ
Section titled “Messages Going to DLQ”Check:
- Handler errors in logs
- DLQ depth: Check
dlq.*queues - Retry count exceeded: Check message headers
x-death
Slow Message Processing
Section titled “Slow Message Processing”Check:
- Prefetch count: May be too low
- Handler performance: Check handler duration in logs
- Queue depth: May need more consumers
Best Practices
Section titled “Best Practices”1. Use Consistent Naming
Section titled “1. Use Consistent Naming”Follow the naming conventions strictly for easy debugging.
2. Set Appropriate Prefetch
Section titled “2. Set Appropriate Prefetch”// CPU-intensiveprefetch: 1
// I/O-boundprefetch: 10
// Memory-intensiveprefetch: 33. Configure Dead Letter Queues
Section titled “3. Configure Dead Letter Queues”Always configure DLQ for automatic error handling.
4. Monitor Queue Depth
Section titled “4. Monitor Queue Depth”Alert when depth > 100 for normal operations.
5. Use Priority Sparingly
Section titled “5. Use Priority Sparingly”Too many priority levels can complicate routing.