Skip to content

Message Bus Architecture

Core Idea: All inter-service communication flows through RabbitMQ message bus with automatic correlation ID propagation, eliminating direct HTTP dependencies and enabling complete distributed tracing.

The message bus is the nervous system of the banyan-core platform. Every service communication - commands, queries, events - flows through RabbitMQ, providing complete observability, type safety, and protocol independence.

Unlike traditional microservices that use direct HTTP calls between services, banyan-core enforces message bus-only communication. This architectural decision enables powerful abstractions while maintaining enterprise-grade reliability.

Traditional microservices architectures face significant challenges with direct service-to-service communication:

// Traditional approach - direct HTTP calls
import axios from 'axios';
class OrderService {
async createOrder(userId: string, items: OrderItem[]) {
// Problem 1: Hardcoded URLs
const userResponse = await axios.get(`http://user-service:3000/users/${userId}`);
// Problem 2: Manual correlation ID propagation
const inventoryResponse = await axios.post(
'http://inventory-service:3000/check',
{ items },
{ headers: { 'x-correlation-id': req.headers['x-correlation-id'] } }
);
// Problem 3: No compile-time safety
const paymentResponse = await axios.post(
'http://payment-service:3000/charge',
{
amount: inventoryResponse.data.total, // Typo in response field? Runtime error!
userId: userResponsse.data.id // Another typo - won't know until production
}
);
// Problem 4: No automatic retry or circuit breaking
// Problem 5: Service discovery manual
// Problem 6: No type checking on request/response
}
}

Why This Matters:

  • Runtime errors from typos and API contract changes
  • Manual correlation ID management error-prone
  • Service URLs hardcoded or require complex service discovery
  • No automatic retry or circuit breaking
  • Difficult to trace requests across services
  • Testing requires mocking HTTP endpoints

The message bus architecture eliminates these problems through a central RabbitMQ broker with automatic abstractions.

The message bus architecture is built on these principles:

  1. Single Source of Truth: All service communication through one RabbitMQ broker

  2. Automatic Correlation: Correlation IDs propagated automatically via AsyncLocalStorage

  3. Type-Safe Contracts: TypeScript contracts ensure compile-time validation

  4. Pattern-Based Routing: Commands, queries, and events use different exchange patterns

  5. Zero Configuration: Services register handlers, platform manages routing

  6. Protocol Independence: Business logic never sees HTTP, GraphQL, or WebSocket protocols

┌──────────────────────────────────────────────────────────────────────┐
│ Client Request Flow │
└──────────────────────────────────────────────────────────────────────┘
│ 1. HTTP/REST/GraphQL Request
┌───────────────┐
│ API Gateway │
│ │
│ • Generate │
│ Correlation │
│ ID │
│ • Set Async │
│ Context │
└───────┬───────┘
│ 2. Publish to Message Bus
│ (correlation ID in headers)
┌──────────────────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────────┐ │
│ │ Command Exchange│ │ Query Exchange │ │ Event Exchange │ │
│ │ (Direct) │ │ (Direct) │ │ (Topic) │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬─────────┘ │
│ │ │ │ │
│ │ 3. Route by │ │ │
│ │ Service + │ │ │
│ │ Contract │ │ │
└───────────┼────────────────────┼─────────────────────┼──────────────┘
│ │ │
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Service A │ │ Service B │ │ Service C │
│ │ │ │ │ │
│ • Handler │ │ • Handler │ │ • Handler │
│ receives │ │ receives │ │ receives │
│ with │ │ with │ │ with │
│ correlation│ │ correlation│ │ correlation│
│ context │ │ context │ │ context │
└──────┬───────┘ └──────┬───────┘ └──────────────┘
│ │
│ 4. Handler makes │
│ additional │
│ calls - same │
│ correlation │
│ ID automatic │
│ │
└───────────────────┘
  • MessageBusClient: Main abstraction for sending/receiving messages

    • Location: platform/packages/message-bus-client/
    • Provides send(), publish(), subscribe(), registerHandler()
    • Handles connection pooling, channel management, retry logic
  • MessageBusFactory: Factory for creating configured clients

    • createForDevelopment(): Local development configuration
    • createForProduction(): Production with HA setup
    • createForTesting(): In-memory testing mode
  • CorrelationContext: Automatic correlation ID management

    • Uses Node.js AsyncLocalStorage
    • Propagates correlation IDs transparently
    • Zero developer code required
  • Contract System: Type-safe message definitions

    • Location: platform/packages/contract-system/
    • Defines input/output types
    • Compile-time validation
    • Runtime serialization/deserialization

The platform uses three RabbitMQ exchange patterns:

1. Command Exchange (Direct)

  • One service handles each command
  • Request-response pattern
  • Routing key: {serviceName}.{contractName}
  • Example: user-service.CreateUser

2. Query Exchange (Direct)

  • One service handles each query
  • Request-response with caching
  • Routing key: {serviceName}.{contractName}
  • Example: order-service.GetOrder

3. Event Exchange (Topic)

  • Multiple services can subscribe
  • Pub-sub pattern
  • Routing key: {eventType}.{aggregateType}
  • Example: UserRegistered.User
// Service A - Sending a command
import { MessageBusClient } from '@banyanai/platform-message-bus-client';
import { CreateOrderContract } from './contracts';
class OrderService {
constructor(private messageBus: MessageBusClient) {}
async placeOrder(userId: string, items: OrderItem[]) {
// Type-safe send with compile-time validation
// Correlation ID automatically included from current context
const result = await this.messageBus.send(
CreateOrderContract,
{ userId, items }
);
return result; // TypeScript knows exact return type
}
}
// Service B - Handling the command
import { CommandHandler } from '@banyanai/platform-cqrs';
import { CreateOrderContract } from './contracts';
@CommandHandler(CreateOrderContract)
class CreateOrderHandler {
async handle(command: CreateOrderCommand) {
// Correlation ID automatically available in context
// No manual propagation needed
const order = await this.orderRepository.create(command);
// Publish event - correlation ID auto-included
await this.eventBus.publish(
OrderCreatedEvent,
{ orderId: order.id, userId: command.userId }
);
return { orderId: order.id };
}
}

Key Points:

  • No correlation ID management code
  • Type-safe contracts prevent runtime errors
  • Automatic retry and circuit breaking
  • Complete distributed tracing
  • Protocol-independent (same code works for HTTP, GraphQL, WebSocket clients)
  • Complete Observability: Every message includes correlation ID for distributed tracing
  • Type Safety: Compile-time validation prevents API contract errors
  • Protocol Independence: Services don’t know about HTTP/REST/GraphQL
  • Automatic Reliability: Retry logic, circuit breaking, dead letter queues built-in
  • Simplified Testing: In-memory message bus for fast unit tests
  • Decoupling: Services can scale, deploy, fail independently
  • Latency Overhead: RabbitMQ adds ~5-10ms vs direct HTTP (negligible for most use cases)
  • Eventual Consistency: Events processed asynchronously
  • Message Bus Dependency: Single point of failure (mitigated with clustering)
  • Debugging Complexity: Async message flow requires different debugging approach
  • Learning Curve: Teams must understand message patterns

Use message bus architecture when:

  • Building distributed microservices
  • Need complete observability across services
  • Want type-safe service communication
  • Require decoupling for independent scaling
  • Event-driven architecture is appropriate

Avoid message bus-only when:

  • Building simple monolith
  • Real-time synchronous responses critical (<10ms latency required)
  • Team unfamiliar with message-based patterns
AspectMessage BusDirect HTTP
Type SafetyCompile-time via contractsManual or runtime only
CorrelationAutomatic propagationManual header management
Service DiscoveryAutomatic via contractsManual or service mesh
Retry/Circuit BreakingBuilt-inManual implementation
Protocol IndependenceCompleteHTTP-coupled
Latency+5-10msBaseline
DebuggingRequires tracing toolsStandard HTTP debugging

Message Bus vs Service Mesh (Istio/Linkerd)

Section titled “Message Bus vs Service Mesh (Istio/Linkerd)”
AspectMessage BusService Mesh
Communication ModelAsync message-basedSync HTTP-based
Type SafetyTypeScript contractsNo compile-time safety
Protocol SupportAll (HTTP, GraphQL, WS)HTTP/gRPC
ObservabilityBuilt-in correlationInjected sidecars
ComplexitySimple RabbitMQComplex Kubernetes setup
Operational OverheadLow (one broker)High (sidecar per pod)

Example 1: Order Processing with Multiple Services

Section titled “Example 1: Order Processing with Multiple Services”
// API Gateway receives HTTP request, generates correlation ID
// POST /orders { userId: "123", items: [...] }
// 1. Order Service creates order
@CommandHandler(CreateOrderContract)
class CreateOrderHandler {
async handle(command: CreateOrderCommand) {
// Correlation ID already in context
// Check inventory - correlation ID auto-propagated
const availability = await this.messageBus.send(
CheckInventoryContract,
{ items: command.items }
);
if (!availability.available) {
throw new InsufficientInventoryError();
}
// Create order
const order = await this.orderRepository.create(command);
// Publish event - correlation ID auto-propagated
await this.messageBus.publish(
OrderCreatedEvent,
{ orderId: order.id, userId: command.userId, items: command.items }
);
return { orderId: order.id };
}
}
// 2. Inventory Service handles check
@QueryHandler(CheckInventoryContract)
class CheckInventoryHandler {
async handle(query: CheckInventoryQuery) {
// Same correlation ID from original request
// Query logged with correlation for tracing
const availability = await this.inventoryRepository.checkAvailability(
query.items
);
return { available: availability.allAvailable };
}
}
// 3. Payment Service reacts to event
@EventHandler(OrderCreatedEvent)
class ProcessPaymentHandler {
async handle(event: OrderCreatedEvent) {
// Same correlation ID from original request
// Complete trace: HTTP → CreateOrder → CheckInventory → OrderCreated → ProcessPayment
await this.paymentProcessor.charge({
orderId: event.orderId,
amount: event.total
});
}
}

Outcome: Complete distributed trace with single correlation ID across 4 service calls, zero manual correlation code.

// Multiple services react to UserRegisteredEvent
// Email Service
@EventHandler(UserRegisteredEvent)
class SendWelcomeEmailHandler {
async handle(event: UserRegisteredEvent) {
await this.emailService.send({
to: event.email,
template: 'welcome',
data: { name: event.name }
});
}
}
// Analytics Service
@EventHandler(UserRegisteredEvent)
class TrackUserSignupHandler {
async handle(event: UserRegisteredEvent) {
await this.analytics.track('user_signup', {
userId: event.userId,
source: event.registrationSource
});
}
}
// CRM Service
@EventHandler(UserRegisteredEvent)
class CreateCRMContactHandler {
async handle(event: UserRegisteredEvent) {
await this.crmClient.createContact({
email: event.email,
name: event.name,
signupDate: event.occurredAt
});
}
}

Outcome: Three independent services react to single event, each with same correlation ID, all failures isolated.

This message bus architecture connects to:

Pattern 1: Request-Response (Commands/Queries)

Section titled “Pattern 1: Request-Response (Commands/Queries)”
// Synchronous-style API using async message bus
const result = await messageBus.send(
GetUserContract,
{ userId: '123' },
{ timeout: 5000 } // Wait up to 5s for response
);
// Publish event without waiting for processing
await messageBus.publish(
UserRegisteredEvent,
{ userId: '123', email: 'user@example.com' },
{ waitForConfirmation: true } // Wait for broker confirmation only
);
// Subscribe to events with automatic correlation context
const subscription = await messageBus.subscribe(
OrderCreatedEvent,
async (envelope) => {
// Correlation ID from original request available
console.log('Correlation ID:', envelope.correlationId);
await processOrder(envelope.payload);
}
);

Misconception 1: “Message bus makes everything slower”

Section titled “Misconception 1: “Message bus makes everything slower””

Reality: RabbitMQ adds ~5-10ms overhead, but eliminates service discovery, retry logic, and circuit breaking complexity. Total latency often lower than hand-rolled HTTP solutions.

Why This Matters: Teams prematurely optimize for direct HTTP when message bus latency is negligible compared to database queries (10-50ms) and business logic.

Misconception 2: “I can’t do synchronous request-response with a message bus”

Section titled “Misconception 2: “I can’t do synchronous request-response with a message bus””

Reality: Message bus supports request-response pattern with timeouts. Commands and queries work exactly like HTTP calls from developer perspective.

Why This Matters: Message bus architecture doesn’t force async-only patterns. Synchronous workflows supported via RPC pattern.

Misconception 3: “Correlation IDs require manual propagation code”

Section titled “Misconception 3: “Correlation IDs require manual propagation code””

Reality: Platform uses AsyncLocalStorage to propagate correlation IDs automatically. Zero developer code required.

Why This Matters: Distributed tracing works out-of-the-box without instrumentation code in business logic.

  1. Use Contracts for All Communication

    • Define TypeScript contracts for every command/query/event
    • Leverage compile-time validation
    • Example: Create CreateUserContract instead of sending raw objects
  2. Design Idempotent Handlers

    • Handlers may receive duplicate messages (at-least-once delivery)
    • Use unique IDs to detect duplicates
    • Example: Check if order already exists before creating
  3. Set Appropriate Timeouts

    • Commands: 30 seconds default
    • Queries: 5 seconds default (should be fast)
    • Long-running operations: Use sagas with timeouts up to 5 minutes
  4. Monitor Message Bus Health

    • Track pending responses
    • Monitor queue depths
    • Set up alerts for circuit breaker triggers
  5. Use Dead Letter Queues

    • Automatic for failed messages after max retries
    • Monitor DLQ for systemic issues
    • Replay messages after fixing bugs

Now that you understand message bus architecture:

AsyncLocalStorage: Node.js API for maintaining context across async operations (used for correlation ID propagation).

Circuit Breaker: Pattern that prevents cascading failures by stopping calls to failing services.

Correlation ID: Unique identifier tracking a request across multiple services.

Dead Letter Queue (DLQ): Queue for messages that failed processing after max retries.

Exchange: RabbitMQ component that routes messages to queues based on routing rules.

Message Envelope: Wrapper containing message payload plus metadata (correlation ID, timestamp, etc.).

Routing Key: String used by exchanges to route messages to appropriate queues.

RPC (Remote Procedure Call): Pattern enabling request-response over message bus.