Message Bus Issues
Message Bus Issues
Section titled “Message Bus Issues”Quick reference for diagnosing and fixing RabbitMQ connection and routing problems.
Quick Diagnosis
Section titled “Quick Diagnosis”# Check RabbitMQ statusdocker ps | grep rabbitmq
# Test RabbitMQ connectiondocker exec my-service nc -zv rabbitmq 5672
# View RabbitMQ management UIopen http://localhost:15672# Username: guest, Password: guest
# Check message bus logsdocker logs my-service 2>&1 | grep -E "RabbitMQ|MessageBus|connection"
# View RabbitMQ logsdocker logs rabbitmq 2>&1 | tail -100Common Message Bus Problems
Section titled “Common Message Bus Problems”1. Connection Failed
Section titled “1. Connection Failed”Error:
MessageBusConnectionError: Failed to connect to message busDiagnostic:
# Check RabbitMQ is runningdocker ps | grep rabbitmq
# Test connectivity from servicedocker exec my-service nc -zv rabbitmq 5672
# Check RabbitMQ logsdocker logs rabbitmq 2>&1 | grep -i errorFix - Ensure RabbitMQ Running:
# Start RabbitMQdocker compose up -d rabbitmq
# Wait for readydocker logs rabbitmq 2>&1 | grep "Server startup complete"
# Check healthdocker exec rabbitmq rabbitmq-diagnostics pingFix - Correct Connection URL:
services: my-service: environment: # ✓ CORRECT - Use service name - RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672
# ❌ WRONG - Don't use localhost # - RABBITMQ_URL=amqp://localhost:56722. Messages Not Being Received
Section titled “2. Messages Not Being Received”Symptom: Handler not called, messages disappearing
Diagnostic:
# Check RabbitMQ queuescurl -u guest:guest http://localhost:15672/api/queues | jq '.[] | {name, messages}'
# View queue bindingscurl -u guest:guest http://localhost:15672/api/bindings | jq
# Check handler discoverydocker logs my-service 2>&1 | grep "Handler discovery"Fix - Verify Handler Discovery:
// Handler MUST be in correct folder// src/queries/GetUserHandler.ts// src/events/UserCreatedHandler.ts
@CommandHandler(CreateUserCommand)export class CreateUserHandler { async handle(command: CreateUserCommand) { // ... }}Fix - Check Queue Binding:
# View queue bindings in RabbitMQ UI# http://localhost:15672/#/queues
# Queue name should match: service-name.command.CommandName# Binding key should match event/command type3. Circuit Breaker Open
Section titled “3. Circuit Breaker Open”Error:
CircuitBreakerOpenError: Circuit breaker is open for service 'user-service' after 5 failuresCause: Target service failed repeatedly, circuit breaker protecting against cascade failures
Fix - Wait for Reset:
# Circuit breaker auto-resets after timeoutdocker logs my-service 2>&1 | grep "Circuit breaker"
# Should see: State transition: open -> half-open -> closedFix - Address Root Cause:
// Find why target service is failingdocker logs target-service 2>&1 | grep -i error
// Common causes:// - Service not running// - Database connection issues// - Handler errors// - Resource exhaustionConfigure Circuit Breaker:
await BaseService.start({ name: 'my-service', version: '1.0.0', circuitBreaker: { failureThreshold: 5, // Open after 5 failures resetTimeout: 30000, // Try reset after 30s halfOpenRequests: 3 // Test with 3 requests in half-open }});4. Request Timeout
Section titled “4. Request Timeout”Error:
REQUEST_TIMEOUT: Operation 'GetUser' timed out after 5000msDiagnostic:
# Check handler execution timedocker logs my-service 2>&1 | grep "Handler execution"
# Monitor RabbitMQ message ratescurl -u guest:guest http://localhost:15672/api/queues | \ jq '.[] | {name, message_stats}'Fix - Increase Timeout:
// At call siteawait queryBus.execute(new GetUserQuery(userId), { timeout: 10000 // Increase from 5s to 10s});Fix - Optimize Handler:
@QueryHandler(GetUserQuery)export class GetUserHandler { async handle(query: GetUserQuery) { // Use index for query // Add caching // Avoid N+1 queries // Optimize database query }}5. Connection Pool Exhausted
Section titled “5. Connection Pool Exhausted”Error:
Connection pool exhausted - no healthy connections availableDiagnostic:
# Check connection pool statusdocker logs my-service 2>&1 | grep "Connection pool"
# View active connectionscurl -u guest:guest http://localhost:15672/api/connections | jq lengthFix - Increase Pool Size:
await BaseService.start({ name: 'my-service', version: '1.0.0', messageBus: { urls: [process.env.RABBITMQ_URL!], connectionPool: { minConnections: 2, maxConnections: 10, // Increase from default 5 maxChannelsPerConnection: 100 } }});Fix - Check for Connection Leaks:
// Ensure channels are closed after useconst channel = await messageBus.getChannel();try { await channel.publish(...);} finally { await channel.close(); // IMPORTANT: Always close}6. Message Delivery Failures
Section titled “6. Message Delivery Failures”Symptom: Messages published but never received
Diagnostic:
# Check dead letter queuecurl -u guest:guest http://localhost:15672/api/queues/%2F/dead-letter | jq
# View failed messagescurl -u guest:guest http://localhost:15672/api/queues/%2F/dead-letter/get \ -X POST -d'{"count":10,"ackmode":"ack_requeue_false","encoding":"auto"}' | jqFix - Check Exchange and Routing:
# View exchangescurl -u guest:guest http://localhost:15672/api/exchanges | \ jq '.[] | {name, type}'
# Check bindingscurl -u guest:guest http://localhost:15672/api/bindings | \ jq '.[] | {source, destination, routing_key}'Fix - Verify Message Format:
// Ensure message matches contractconst command: CreateUserCommand = { email: 'user@example.com', password: 'password123'};
// Publish with correct routingawait messageBus.publish('CreateUser', command, { correlationId: uuid(), timestamp: new Date()});7. ChannelManager Shutting Down
Section titled “7. ChannelManager Shutting Down”Error:
ChannelManager is shutting downCause: Service stopping but still trying to use message bus
Fix - Check Service Lifecycle:
// Don't use message bus after shutdown initiatedlet isShuttingDown = false;
process.on('SIGTERM', () => { isShuttingDown = true;});
async function publishMessage(message: any) { if (isShuttingDown) { throw new Error('Service is shutting down'); } await messageBus.publish(message);}RabbitMQ Management Commands
Section titled “RabbitMQ Management Commands”# List all queuescurl -u guest:guest http://localhost:15672/api/queues | jq
# Get queue detailscurl -u guest:guest http://localhost:15672/api/queues/%2F/my-service.command.CreateUser | jq
# List exchangescurl -u guest:guest http://localhost:15672/api/exchanges | jq
# View bindingscurl -u guest:guest http://localhost:15672/api/bindings | jq
# Get messages from queue (without consuming)curl -u guest:guest http://localhost:15672/api/queues/%2F/my-queue/get \ -X POST \ -d'{"count":5,"ackmode":"ack_requeue_true","encoding":"auto"}' | jq
# Purge queuecurl -u guest:guest -X DELETE \ http://localhost:15672/api/queues/%2F/my-queue/contents
# Create bindingcurl -u guest:guest -X POST \ http://localhost:15672/api/bindings/%2F/e/my-exchange/q/my-queue \ -d'{"routing_key":"my-routing-key"}'Message Bus Troubleshooting Checklist
Section titled “Message Bus Troubleshooting Checklist”- RabbitMQ container running and healthy
- Service can connect to RabbitMQ (port 5672)
- RABBITMQ_URL uses service name, not localhost
- Handlers in correct directories with decorators
- Queues created and bound to exchanges
- Message format matches contract
- No circuit breaker open errors
- Connection pool not exhausted
- Channels properly closed after use
Best Practices
Section titled “Best Practices”- Use Service Name in URLs:
RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672 # Use 'rabbitmq', not 'localhost'- Add Health Checks:
rabbitmq: healthcheck: test: rabbitmq-diagnostics -q ping interval: 10s timeout: 5s retries: 5- Monitor Queue Lengths:
setInterval(async () => { const stats = await rabbitmq.getQueueStats(); if (stats.messages > 1000) { console.warn('Queue backlog detected:', stats); }}, 60000);- Handle Connection Loss:
messageBus.on('connection_lost', () => { console.warn('Message bus connection lost, will retry');});
messageBus.on('connection_restored', () => { console.log('Message bus connection restored');});- Use Dead Letter Exchange:
await channel.assertQueue('my-queue', { deadLetterExchange: 'dead-letter', deadLetterRoutingKey: 'failed'});