Event Store Infrastructure
Event Store Infrastructure
Section titled “Event Store Infrastructure”Core Idea: PostgreSQL provides durable, ACID-compliant event storage with shared tables across all services and optimistic concurrency control.
Overview
Section titled “Overview”The event store uses PostgreSQL 16 with three shared tables (events, snapshots, projection_positions) accessed by all services. Services are differentiated by aggregate_type column, enabling centralized event storage without database per service.
Database Schema
Section titled “Database Schema”Events Table
Section titled “Events Table”CREATE TABLE events ( event_id UUID PRIMARY KEY, aggregate_type VARCHAR(255) NOT NULL, -- Service differentiation aggregate_id VARCHAR(255) NOT NULL, version INTEGER NOT NULL, event_type VARCHAR(255) NOT NULL, event_data JSONB NOT NULL, metadata JSONB, occurred_at TIMESTAMP WITH TIME ZONE NOT NULL, correlation_id UUID, causation_id UUID, UNIQUE (aggregate_type, aggregate_id, version));
CREATE INDEX idx_events_aggregate ON events (aggregate_type, aggregate_id, version);CREATE INDEX idx_events_type ON events (event_type);CREATE INDEX idx_events_occurred ON events (occurred_at);Snapshots Table
Section titled “Snapshots Table”CREATE TABLE snapshots ( aggregate_type VARCHAR(255) NOT NULL, aggregate_id VARCHAR(255) NOT NULL, version INTEGER NOT NULL, state JSONB NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL, PRIMARY KEY (aggregate_type, aggregate_id));Projection Positions Table
Section titled “Projection Positions Table”CREATE TABLE projection_positions ( projection_name VARCHAR(255) PRIMARY KEY, last_processed_position BIGINT NOT NULL, last_processed_at TIMESTAMP WITH TIME ZONE NOT NULL);Configuration
Section titled “Configuration”Development
Section titled “Development”postgres: image: postgres:16-alpine environment: POSTGRES_DB: eventstore POSTGRES_USER: actor_user POSTGRES_PASSWORD: actor_pass123 ports: - "55432:5432" volumes: - postgres-data:/var/lib/postgresql/dataConnection
Section titled “Connection”# Environment variablesDATABASE_HOST=postgresDATABASE_NAME=eventstoreDATABASE_USER=actor_userDATABASE_PASSWORD=actor_pass123DATABASE_PORT=5432Performance Optimization
Section titled “Performance Optimization”Indexes
Section titled “Indexes”aggregate_type + aggregate_id + version: Fast aggregate loadingevent_type: Fast event type queriesoccurred_at: Temporal queries
Snapshots
Section titled “Snapshots”- Created every 50-100 events (configurable)
- Reduces load time for large aggregates
- Automatic snapshot management
Connection Pooling
Section titled “Connection Pooling”- Max 10 connections per service
- Idle timeout: 30 seconds
- Connection timeout: 5 seconds
Monitoring
Section titled “Monitoring”Key Metrics
Section titled “Key Metrics”- Event Insert Rate: Events/second written
- Aggregate Load Time: Time to load aggregate
- Snapshot Hit Rate: % loads using snapshot
- Table Size: Disk usage trends
Queries
Section titled “Queries”-- Event count by aggregate typeSELECT aggregate_type, COUNT(*)FROM eventsGROUP BY aggregate_type;
-- Largest aggregatesSELECT aggregate_type, aggregate_id, COUNT(*) as event_countFROM eventsGROUP BY aggregate_type, aggregate_idORDER BY event_count DESCLIMIT 10;
-- Snapshot coverageSELECT COUNT(DISTINCT aggregate_id) as total_aggregates, COUNT(DISTINCT s.aggregate_id) as snapshotted_aggregatesFROM events eLEFT JOIN snapshots s USING (aggregate_type, aggregate_id);Best Practices
Section titled “Best Practices”-
Use Snapshots for Large Aggregates
- Configure snapshot frequency based on event volume
- Monitor aggregate load times
-
Monitor Table Growth
- Events table grows indefinitely
- Plan for archival strategy (e.g., archive events >1 year old)
-
Partition Events Table
- Consider partitioning by
occurred_atfor very high volume
- Consider partitioning by
-
Regular Vacuum
- PostgreSQL vacuum essential for performance
- Configure autovacuum appropriately
Troubleshooting
Section titled “Troubleshooting”Issue: Slow Aggregate Loading
Section titled “Issue: Slow Aggregate Loading”-- Check event count for aggregateSELECT COUNT(*) FROM eventsWHERE aggregate_type = 'Order' AND aggregate_id = 'ord-123';
-- Check if snapshot existsSELECT * FROM snapshotsWHERE aggregate_type = 'Order' AND aggregate_id = 'ord-123';Solution: Create snapshot if >100 events and no snapshot
Issue: Concurrency Errors
Section titled “Issue: Concurrency Errors”Error: “Optimistic concurrency check failed”
Cause: Two processes tried to append events with same version
Solution: Retry with exponential backoff