Skip to content

Message Bus Infrastructure

Core Idea: RabbitMQ provides reliable, scalable message routing with three exchange patterns (command, query, event) and automatic reliability features.

The platform uses RabbitMQ 3.13 as the message bus infrastructure, providing persistent message routing, automatic retries, dead letter queues, and high availability through clustering.

Configuration is handled by the platform - services use MessageBusClient abstraction without managing RabbitMQ directly.

  • Version: 3.13-management-alpine
  • Port: 5672 (AMQP), 15672 (Management UI)
  • Credentials: admin/admin123 (development)
  • Virtual Host: /
  • Clustering: Supported for production HA

1. Command Exchange (Direct)

  • Purpose: Request-response commands
  • Routing: {serviceName}.{contractName}
  • Pattern: One consumer per queue
  • TTL: 30 seconds default

2. Query Exchange (Direct)

  • Purpose: Request-response queries
  • Routing: {serviceName}.{contractName}
  • Pattern: One consumer per queue
  • TTL: 5 seconds default
  • Caching: Results cached in Redis

3. Event Exchange (Topic)

  • Purpose: Pub-sub events
  • Routing: {eventType}.{aggregateType}
  • Pattern: Multiple consumers
  • TTL: No timeout (fire-and-forget)
Client → API Gateway → Command Exchange → Service Queue → Handler
↓ Query Exchange → Service Queue → Handler
↓ Event Exchange → Multiple Queues → Handlers
  • Persistent Messages: Messages survive broker restart
  • Publisher Confirms: Ensure message delivered to broker
  • Consumer Acknowledgments: Ensure message processed
  • Automatic Retries: Exponential backoff with max attempts
  • Dead Letter Queue: Failed messages after max retries
  • Circuit Breaker: Prevent cascading failures
rabbitmq:
image: rabbitmq:3.13-management-alpine
ports:
- "55671:5672" # AMQP
- "55672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin123
volumes:
- rabbitmq-data:/var/lib/rabbitmq
const messageBus = MessageBusFactory.createForProduction(
'my-service',
[
'amqp://rabbitmq-1:5672',
'amqp://rabbitmq-2:5672',
'amqp://rabbitmq-3:5672'
],
{
username: process.env.RABBITMQ_USER,
password: process.env.RABBITMQ_PASSWORD
}
);
  • Queue Depth: Messages waiting for processing
  • Consumer Count: Active consumers per queue
  • Message Rate: Messages/second in/out
  • Connection Count: Active connections
  • Channel Count: Active channels

Access at http://localhost:55672 (development)

  • View queues and exchanges
  • Monitor message rates
  • Inspect message contents
  • Purge queues for testing

Set up alerts for:

  • Queue depth >1000 messages
  • Consumer count = 0 (no service instances)
  • Message rate drops to 0 (broker issue)
  • Connection failures
  1. Use Connection Pooling

    • Platform provides automatic pooling
    • Max 10 connections per service default
  2. Monitor Queue Depths

    • Increasing depth indicates processing bottleneck
    • Scale service instances if persistent
  3. Test Failure Scenarios

    • Broker restart
    • Network partition
    • Consumer crash
  4. Use Publisher Confirms

    • Ensure message delivered before returning
    • waitForConfirmation: true option
Terminal window
# Check queue bindings
docker exec rabbitmq rabbitmqctl list_bindings
# Check queue depth
docker exec rabbitmq rabbitmqctl list_queues
# Check consumers
docker exec rabbitmq rabbitmqctl list_consumers
  • Check queue depth (backlog?)
  • Check consumer count (need more instances?)
  • Check network latency (broker far from services?)
Terminal window
# View DLQ messages
# Access Management UI → Queues → {service}.dlq → Get messages