Skip to content

Message Bus Issues

Quick reference for diagnosing and fixing RabbitMQ connection and routing problems.

Terminal window
# Check RabbitMQ status
docker ps | grep rabbitmq
# Test RabbitMQ connection
docker exec my-service nc -zv rabbitmq 5672
# View RabbitMQ management UI
open http://localhost:15672
# Username: guest, Password: guest
# Check message bus logs
docker logs my-service 2>&1 | grep -E "RabbitMQ|MessageBus|connection"
# View RabbitMQ logs
docker logs rabbitmq 2>&1 | tail -100

Error:

MessageBusConnectionError: Failed to connect to message bus

Diagnostic:

Terminal window
# Check RabbitMQ is running
docker ps | grep rabbitmq
# Test connectivity from service
docker exec my-service nc -zv rabbitmq 5672
# Check RabbitMQ logs
docker logs rabbitmq 2>&1 | grep -i error

Fix - Ensure RabbitMQ Running:

Terminal window
# Start RabbitMQ
docker compose up -d rabbitmq
# Wait for ready
docker logs rabbitmq 2>&1 | grep "Server startup complete"
# Check health
docker exec rabbitmq rabbitmq-diagnostics ping

Fix - Correct Connection URL:

docker-compose.yml
services:
my-service:
environment:
# ✓ CORRECT - Use service name
- RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672
# ❌ WRONG - Don't use localhost
# - RABBITMQ_URL=amqp://localhost:5672

Symptom: Handler not called, messages disappearing

Diagnostic:

Terminal window
# Check RabbitMQ queues
curl -u guest:guest http://localhost:15672/api/queues | jq '.[] | {name, messages}'
# View queue bindings
curl -u guest:guest http://localhost:15672/api/bindings | jq
# Check handler discovery
docker logs my-service 2>&1 | grep "Handler discovery"

Fix - Verify Handler Discovery:

src/commands/CreateUserHandler.ts
// Handler MUST be in correct folder
// src/queries/GetUserHandler.ts
// src/events/UserCreatedHandler.ts
@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
async handle(command: CreateUserCommand) {
// ...
}
}

Fix - Check Queue Binding:

Terminal window
# View queue bindings in RabbitMQ UI
# http://localhost:15672/#/queues
# Queue name should match: service-name.command.CommandName
# Binding key should match event/command type

Error:

CircuitBreakerOpenError: Circuit breaker is open for service 'user-service' after 5 failures

Cause: Target service failed repeatedly, circuit breaker protecting against cascade failures

Fix - Wait for Reset:

Terminal window
# Circuit breaker auto-resets after timeout
docker logs my-service 2>&1 | grep "Circuit breaker"
# Should see: State transition: open -> half-open -> closed

Fix - Address Root Cause:

// Find why target service is failing
docker logs target-service 2>&1 | grep -i error
// Common causes:
// - Service not running
// - Database connection issues
// - Handler errors
// - Resource exhaustion

Configure Circuit Breaker:

await BaseService.start({
name: 'my-service',
version: '1.0.0',
circuitBreaker: {
failureThreshold: 5, // Open after 5 failures
resetTimeout: 30000, // Try reset after 30s
halfOpenRequests: 3 // Test with 3 requests in half-open
}
});

Error:

REQUEST_TIMEOUT: Operation 'GetUser' timed out after 5000ms

Diagnostic:

Terminal window
# Check handler execution time
docker logs my-service 2>&1 | grep "Handler execution"
# Monitor RabbitMQ message rates
curl -u guest:guest http://localhost:15672/api/queues | \
jq '.[] | {name, message_stats}'

Fix - Increase Timeout:

// At call site
await queryBus.execute(new GetUserQuery(userId), {
timeout: 10000 // Increase from 5s to 10s
});

Fix - Optimize Handler:

@QueryHandler(GetUserQuery)
export class GetUserHandler {
async handle(query: GetUserQuery) {
// Use index for query
// Add caching
// Avoid N+1 queries
// Optimize database query
}
}

Error:

Connection pool exhausted - no healthy connections available

Diagnostic:

Terminal window
# Check connection pool status
docker logs my-service 2>&1 | grep "Connection pool"
# View active connections
curl -u guest:guest http://localhost:15672/api/connections | jq length

Fix - Increase Pool Size:

await BaseService.start({
name: 'my-service',
version: '1.0.0',
messageBus: {
urls: [process.env.RABBITMQ_URL!],
connectionPool: {
minConnections: 2,
maxConnections: 10, // Increase from default 5
maxChannelsPerConnection: 100
}
}
});

Fix - Check for Connection Leaks:

// Ensure channels are closed after use
const channel = await messageBus.getChannel();
try {
await channel.publish(...);
} finally {
await channel.close(); // IMPORTANT: Always close
}

Symptom: Messages published but never received

Diagnostic:

Terminal window
# Check dead letter queue
curl -u guest:guest http://localhost:15672/api/queues/%2F/dead-letter | jq
# View failed messages
curl -u guest:guest http://localhost:15672/api/queues/%2F/dead-letter/get \
-X POST -d'{"count":10,"ackmode":"ack_requeue_false","encoding":"auto"}' | jq

Fix - Check Exchange and Routing:

Terminal window
# View exchanges
curl -u guest:guest http://localhost:15672/api/exchanges | \
jq '.[] | {name, type}'
# Check bindings
curl -u guest:guest http://localhost:15672/api/bindings | \
jq '.[] | {source, destination, routing_key}'

Fix - Verify Message Format:

// Ensure message matches contract
const command: CreateUserCommand = {
email: 'user@example.com',
password: 'password123'
};
// Publish with correct routing
await messageBus.publish('CreateUser', command, {
correlationId: uuid(),
timestamp: new Date()
});

Error:

ChannelManager is shutting down

Cause: Service stopping but still trying to use message bus

Fix - Check Service Lifecycle:

// Don't use message bus after shutdown initiated
let isShuttingDown = false;
process.on('SIGTERM', () => {
isShuttingDown = true;
});
async function publishMessage(message: any) {
if (isShuttingDown) {
throw new Error('Service is shutting down');
}
await messageBus.publish(message);
}
Terminal window
# List all queues
curl -u guest:guest http://localhost:15672/api/queues | jq
# Get queue details
curl -u guest:guest http://localhost:15672/api/queues/%2F/my-service.command.CreateUser | jq
# List exchanges
curl -u guest:guest http://localhost:15672/api/exchanges | jq
# View bindings
curl -u guest:guest http://localhost:15672/api/bindings | jq
# Get messages from queue (without consuming)
curl -u guest:guest http://localhost:15672/api/queues/%2F/my-queue/get \
-X POST \
-d'{"count":5,"ackmode":"ack_requeue_true","encoding":"auto"}' | jq
# Purge queue
curl -u guest:guest -X DELETE \
http://localhost:15672/api/queues/%2F/my-queue/contents
# Create binding
curl -u guest:guest -X POST \
http://localhost:15672/api/bindings/%2F/e/my-exchange/q/my-queue \
-d'{"routing_key":"my-routing-key"}'
  • RabbitMQ container running and healthy
  • Service can connect to RabbitMQ (port 5672)
  • RABBITMQ_URL uses service name, not localhost
  • Handlers in correct directories with decorators
  • Queues created and bound to exchanges
  • Message format matches contract
  • No circuit breaker open errors
  • Connection pool not exhausted
  • Channels properly closed after use
  1. Use Service Name in URLs:
docker-compose.yml
RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672 # Use 'rabbitmq', not 'localhost'
  1. Add Health Checks:
rabbitmq:
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 10s
timeout: 5s
retries: 5
  1. Monitor Queue Lengths:
setInterval(async () => {
const stats = await rabbitmq.getQueueStats();
if (stats.messages > 1000) {
console.warn('Queue backlog detected:', stats);
}
}, 60000);
  1. Handle Connection Loss:
messageBus.on('connection_lost', () => {
console.warn('Message bus connection lost, will retry');
});
messageBus.on('connection_restored', () => {
console.log('Message bus connection restored');
});
  1. Use Dead Letter Exchange:
await channel.assertQueue('my-queue', {
deadLetterExchange: 'dead-letter',
deadLetterRoutingKey: 'failed'
});