Module 19: Event Sourcing

What is Event Sourcing?

Event sourcing stores the state of an application as a sequence of events rather than just the current state.
┌─────────────────────────────────────────────────────────────────┐ │ EVENT SOURCING OVERVIEW │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ TRADITIONAL (State-based): │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Account: { id: 123, balance: 500 } │ │ │ │ │ │ │ │ Only current state is stored │ │ │ │ History is lost │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ EVENT SOURCING: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Events: │ │ │ │ 1. AccountCreated { id: 123 } │ │ │ │ 2. MoneyDeposited { id: 123, amount: 1000 } │ │ │ │ 3. MoneyWithdrawn { id: 123, amount: 300 } │ │ │ │ 4. MoneyWithdrawn { id: 123, amount: 200 } │ │ │ │ │ │ │ │ Current state = replay all events │ │ │ │ Balance = 0 + 1000 - 300 - 200 = 500 │ │ │ │ │ │ │ │ Full history preserved! │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ Benefits: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Complete audit trail │ │ │ │ • Temporal queries (state at any point in time) │ │ │ │ • Event replay for debugging │ │ │ │ • Build multiple read models from same events │ │ │ │ • Natural fit for event-driven architecture │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ Challenges: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Event schema evolution │ │ │ │ • Rebuilding state can be slow │ │ │ │ • Eventually consistent reads │ │ │ │ • Learning curve │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Event Store Architecture

┌─────────────────────────────────────────────────────────────────┐ │ EVENT STORE ARCHITECTURE │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Event Store │ │ │ │ ┌────────────────────────────────────────────────────┐ │ │ │ │ │ Stream: account-123 │ │ │ │ │ │ ┌─────┬─────────────────────────────┬───────────┐ │ │ │ │ │ │ │ Seq │ Event Type │ Data │ │ │ │ │ │ │ ├─────┼─────────────────────────────┼───────────┤ │ │ │ │ │ │ │ 1 │ AccountCreated │ {...} │ │ │ │ │ │ │ │ 2 │ MoneyDeposited │ {amt:1000}│ │ │ │ │ │ │ │ 3 │ MoneyWithdrawn │ {amt:300} │ │ │ │ │ │ │ │ 4 │ MoneyWithdrawn │ {amt:200} │ │ │ │ │ │ │ └─────┴─────────────────────────────┴───────────┘ │ │ │ │ │ └────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ ┌────────────────────────────────────────────────────┐ │ │ │ │ │ Stream: account-456 │ │ │ │ │ │ ┌─────┬─────────────────────────────┬───────────┐ │ │ │ │ │ │ │ 1 │ AccountCreated │ {...} │ │ │ │ │ │ │ │ 2 │ MoneyDeposited │ {amt:500} │ │ │ │ │ │ │ └─────┴─────────────────────────────┴───────────┘ │ │ │ │ │ └────────────────────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────┘ │ │ │ │ Key Concepts: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ Stream: Sequence of events for an aggregate │ │ │ │ Event: Immutable fact that happened │ │ │ │ Sequence Number: Ordering within a stream │ │ │ │ Position: Global ordering across all streams │ │ │ │ Aggregate: Domain object rebuilt from events │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Event Store Implementation

go
package eventsourcing import ( "context" "encoding/json" "fmt" "sync" "time" ) // Event represents a domain event type Event struct { ID string `json:"id"` StreamID string `json:"stream_id"` Type string `json:"type"` Data json.RawMessage `json:"data"` Metadata map[string]string `json:"metadata"` Version int64 `json:"version"` Timestamp time.Time `json:"timestamp"` Position int64 `json:"position"` // Global position } // EventStore interface type EventStore interface { // Append events to a stream Append(ctx context.Context, streamID string, events []Event, expectedVersion int64) error // Read events from a stream ReadStream(ctx context.Context, streamID string, fromVersion int64) ([]Event, error) // Read all events from a position ReadAll(ctx context.Context, fromPosition int64, limit int) ([]Event, error) // Subscribe to new events Subscribe(ctx context.Context, fromPosition int64) (<-chan Event, error) } // InMemoryEventStore is a simple in-memory implementation type InMemoryEventStore struct { mu sync.RWMutex streams map[string][]Event allEvents []Event position int64 subscribers []chan Event subMu sync.RWMutex } func NewInMemoryEventStore() *InMemoryEventStore { return &InMemoryEventStore{ streams: make(map[string][]Event), } } // Append adds events to a stream with optimistic concurrency func (s *InMemoryEventStore) Append( ctx context.Context, streamID string, events []Event, expectedVersion int64, ) error { s.mu.Lock() defer s.mu.Unlock() stream := s.streams[streamID] currentVersion := int64(len(stream)) // Optimistic concurrency check if expectedVersion != -1 && currentVersion != expectedVersion { return fmt.Errorf("concurrency conflict: expected version %d, got %d", expectedVersion, currentVersion) } // Append events for i, event := range events { s.position++ event.StreamID = streamID event.Version = currentVersion + int64(i) + 1 event.Position = s.position event.Timestamp = time.Now() if event.ID == "" { event.ID = fmt.Sprintf("%s-%d", streamID, event.Version) } stream = append(stream, event) s.allEvents = append(s.allEvents, event) // Notify subscribers s.notifySubscribers(event) } s.streams[streamID] = stream return nil } // ReadStream reads events from a specific stream func (s *InMemoryEventStore) ReadStream( ctx context.Context, streamID string, fromVersion int64, ) ([]Event, error) { s.mu.RLock() defer s.mu.RUnlock() stream := s.streams[streamID] if fromVersion > int64(len(stream)) { return nil, nil } result := make([]Event, 0) for _, event := range stream { if event.Version > fromVersion { result = append(result, event) } } return result, nil } // ReadAll reads events across all streams func (s *InMemoryEventStore) ReadAll( ctx context.Context, fromPosition int64, limit int, ) ([]Event, error) { s.mu.RLock() defer s.mu.RUnlock() result := make([]Event, 0, limit) for _, event := range s.allEvents { if event.Position > fromPosition { result = append(result, event) if len(result) >= limit { break } } } return result, nil } // Subscribe creates a subscription to new events func (s *InMemoryEventStore) Subscribe( ctx context.Context, fromPosition int64, ) (<-chan Event, error) { ch := make(chan Event, 100) // Send historical events first events, _ := s.ReadAll(ctx, fromPosition, 10000) go func() { for _, event := range events { select { case ch <- event: case <-ctx.Done(): return } } }() // Register for future events s.subMu.Lock() s.subscribers = append(s.subscribers, ch) s.subMu.Unlock() // Cleanup on context cancel go func() { <-ctx.Done() s.subMu.Lock() for i, sub := range s.subscribers { if sub == ch { s.subscribers = append(s.subscribers[:i], s.subscribers[i+1:]...) break } } s.subMu.Unlock() close(ch) }() return ch, nil } func (s *InMemoryEventStore) notifySubscribers(event Event) { s.subMu.RLock() defer s.subMu.RUnlock() for _, ch := range s.subscribers { select { case ch <- event: default: // Channel full, skip } } }

Aggregate Pattern

go
package eventsourcing import ( "context" "encoding/json" "fmt" ) // Aggregate represents a domain aggregate type Aggregate interface { ID() string Version() int64 ApplyEvent(event Event) UncommittedEvents() []Event ClearUncommittedEvents() } // BaseAggregate provides common aggregate functionality type BaseAggregate struct { id string version int64 uncommittedEvents []Event } func (a *BaseAggregate) ID() string { return a.id } func (a *BaseAggregate) Version() int64 { return a.version } func (a *BaseAggregate) UncommittedEvents() []Event { return a.uncommittedEvents } func (a *BaseAggregate) ClearUncommittedEvents() { a.uncommittedEvents = nil } func (a *BaseAggregate) SetVersion(v int64) { a.version = v } func (a *BaseAggregate) IncrementVersion() { a.version++ } func (a *BaseAggregate) RaiseEvent(eventType string, data interface{}) { payload, _ := json.Marshal(data) event := Event{ Type: eventType, Data: payload, } a.uncommittedEvents = append(a.uncommittedEvents, event) } // BankAccount is an example aggregate type BankAccount struct { BaseAggregate balance int64 isBlocked bool } // Event data structures type AccountCreatedData struct { OwnerID string `json:"owner_id"` Name string `json:"name"` } type MoneyDepositedData struct { Amount int64 `json:"amount"` Description string `json:"description"` } type MoneyWithdrawnData struct { Amount int64 `json:"amount"` Description string `json:"description"` } type AccountBlockedData struct { Reason string `json:"reason"` } // NewBankAccount creates a new bank account func NewBankAccount(id string, ownerID, name string) *BankAccount { account := &BankAccount{ BaseAggregate: BaseAggregate{id: id}, } // Raise creation event account.RaiseEvent("AccountCreated", AccountCreatedData{ OwnerID: ownerID, Name: name, }) return account } // Deposit adds money to the account func (a *BankAccount) Deposit(amount int64, description string) error { if amount <= 0 { return fmt.Errorf("deposit amount must be positive") } if a.isBlocked { return fmt.Errorf("account is blocked") } a.RaiseEvent("MoneyDeposited", MoneyDepositedData{ Amount: amount, Description: description, }) return nil } // Withdraw removes money from the account func (a *BankAccount) Withdraw(amount int64, description string) error { if amount <= 0 { return fmt.Errorf("withdrawal amount must be positive") } if a.isBlocked { return fmt.Errorf("account is blocked") } if a.balance < amount { return fmt.Errorf("insufficient funds: balance %d, requested %d", a.balance, amount) } a.RaiseEvent("MoneyWithdrawn", MoneyWithdrawnData{ Amount: amount, Description: description, }) return nil } // Block blocks the account func (a *BankAccount) Block(reason string) error { if a.isBlocked { return fmt.Errorf("account already blocked") } a.RaiseEvent("AccountBlocked", AccountBlockedData{ Reason: reason, }) return nil } // ApplyEvent applies an event to update state func (a *BankAccount) ApplyEvent(event Event) { switch event.Type { case "AccountCreated": // Nothing to do, account exists case "MoneyDeposited": var data MoneyDepositedData json.Unmarshal(event.Data, &data) a.balance += data.Amount case "MoneyWithdrawn": var data MoneyWithdrawnData json.Unmarshal(event.Data, &data) a.balance -= data.Amount case "AccountBlocked": a.isBlocked = true } a.version = event.Version } // GetBalance returns current balance func (a *BankAccount) GetBalance() int64 { return a.balance } // IsBlocked returns if account is blocked func (a *BankAccount) IsBlocked() bool { return a.isBlocked }

Repository Pattern

go
package eventsourcing import ( "context" "fmt" ) // Repository handles aggregate persistence type Repository struct { store EventStore factory func(id string) Aggregate } func NewRepository(store EventStore, factory func(id string) Aggregate) *Repository { return &Repository{ store: store, factory: factory, } } // Load retrieves an aggregate by ID func (r *Repository) Load(ctx context.Context, aggregateID string) (Aggregate, error) { // Read all events for this aggregate events, err := r.store.ReadStream(ctx, aggregateID, 0) if err != nil { return nil, err } if len(events) == 0 { return nil, fmt.Errorf("aggregate not found: %s", aggregateID) } // Create new aggregate instance aggregate := r.factory(aggregateID) // Replay events to rebuild state for _, event := range events { aggregate.ApplyEvent(event) } return aggregate, nil } // Save persists an aggregate's uncommitted events func (r *Repository) Save(ctx context.Context, aggregate Aggregate) error { events := aggregate.UncommittedEvents() if len(events) == 0 { return nil } // Append events with optimistic concurrency expectedVersion := aggregate.Version() - int64(len(events)) err := r.store.Append(ctx, aggregate.ID(), events, expectedVersion) if err != nil { return err } // Apply events to update aggregate state for _, event := range events { aggregate.ApplyEvent(event) } aggregate.ClearUncommittedEvents() return nil } // Example usage func ExampleRepository() { store := NewInMemoryEventStore() // Create repository for BankAccount repo := NewRepository(store, func(id string) Aggregate { return &BankAccount{ BaseAggregate: BaseAggregate{id: id}, } }) ctx := context.Background() // Create new account account := NewBankAccount("account-123", "user-456", "Savings") account.Deposit(1000, "Initial deposit") account.Withdraw(200, "ATM withdrawal") if err := repo.Save(ctx, account); err != nil { panic(err) } // Load account later loaded, _ := repo.Load(ctx, "account-123") bankAccount := loaded.(*BankAccount) fmt.Printf("Balance: %d\n", bankAccount.GetBalance()) // 800 }

Projections

┌─────────────────────────────────────────────────────────────────┐ │ PROJECTIONS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Projections build read models from events │ │ │ │ ┌────────────┐ ┌───────────┐ ┌──────────────────┐ │ │ │ Events │───►│ Projection│───►│ Read Model │ │ │ │ Store │ │ Handler │ │ (Database) │ │ │ └────────────┘ └───────────┘ └──────────────────┘ │ │ │ │ Example: Account Balance Projection │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ Event: MoneyDeposited { account: 123, amount: 100 } │ │ │ │ ↓ │ │ │ │ UPDATE account_balances │ │ │ │ SET balance = balance + 100 │ │ │ │ WHERE account_id = 123 │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ Benefits: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Optimized for specific queries │ │ │ │ • Multiple views from same events │ │ │ │ • Can rebuild at any time │ │ │ │ • Separate read/write concerns │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Projection Implementation

go
package eventsourcing import ( "context" "encoding/json" "sync" ) // Projection processes events and updates a read model type Projection interface { Handle(ctx context.Context, event Event) error Name() string } // ProjectionManager manages multiple projections type ProjectionManager struct { store EventStore projections []Projection positions map[string]int64 mu sync.Mutex } func NewProjectionManager(store EventStore) *ProjectionManager { return &ProjectionManager{ store: store, positions: make(map[string]int64), } } // Register adds a projection func (m *ProjectionManager) Register(projection Projection) { m.projections = append(m.projections, projection) } // Start begins processing events for all projections func (m *ProjectionManager) Start(ctx context.Context) error { // Subscribe to events events, err := m.store.Subscribe(ctx, 0) if err != nil { return err } go func() { for event := range events { m.processEvent(ctx, event) } }() return nil } func (m *ProjectionManager) processEvent(ctx context.Context, event Event) { for _, projection := range m.projections { m.mu.Lock() lastPosition := m.positions[projection.Name()] m.mu.Unlock() if event.Position <= lastPosition { continue // Already processed } if err := projection.Handle(ctx, event); err != nil { // Log error, maybe retry continue } m.mu.Lock() m.positions[projection.Name()] = event.Position m.mu.Unlock() } } // Rebuild replays all events through a projection func (m *ProjectionManager) Rebuild(ctx context.Context, projection Projection) error { position := int64(0) batchSize := 1000 for { events, err := m.store.ReadAll(ctx, position, batchSize) if err != nil { return err } if len(events) == 0 { break } for _, event := range events { if err := projection.Handle(ctx, event); err != nil { return err } position = event.Position } if len(events) < batchSize { break } } m.mu.Lock() m.positions[projection.Name()] = position m.mu.Unlock() return nil } // AccountBalanceProjection maintains account balances type AccountBalanceProjection struct { balances map[string]int64 mu sync.RWMutex } func NewAccountBalanceProjection() *AccountBalanceProjection { return &AccountBalanceProjection{ balances: make(map[string]int64), } } func (p *AccountBalanceProjection) Name() string { return "account_balances" } func (p *AccountBalanceProjection) Handle(ctx context.Context, event Event) error { p.mu.Lock() defer p.mu.Unlock() switch event.Type { case "AccountCreated": p.balances[event.StreamID] = 0 case "MoneyDeposited": var data MoneyDepositedData json.Unmarshal(event.Data, &data) p.balances[event.StreamID] += data.Amount case "MoneyWithdrawn": var data MoneyWithdrawnData json.Unmarshal(event.Data, &data) p.balances[event.StreamID] -= data.Amount } return nil } func (p *AccountBalanceProjection) GetBalance(accountID string) int64 { p.mu.RLock() defer p.mu.RUnlock() return p.balances[accountID] } // TransactionHistoryProjection maintains transaction history type TransactionHistoryProjection struct { transactions map[string][]Transaction mu sync.RWMutex } type Transaction struct { Type string `json:"type"` Amount int64 `json:"amount"` Description string `json:"description"` Timestamp string `json:"timestamp"` } func NewTransactionHistoryProjection() *TransactionHistoryProjection { return &TransactionHistoryProjection{ transactions: make(map[string][]Transaction), } } func (p *TransactionHistoryProjection) Name() string { return "transaction_history" } func (p *TransactionHistoryProjection) Handle(ctx context.Context, event Event) error { p.mu.Lock() defer p.mu.Unlock() var tx Transaction switch event.Type { case "MoneyDeposited": var data MoneyDepositedData json.Unmarshal(event.Data, &data) tx = Transaction{ Type: "deposit", Amount: data.Amount, Description: data.Description, Timestamp: event.Timestamp.String(), } case "MoneyWithdrawn": var data MoneyWithdrawnData json.Unmarshal(event.Data, &data) tx = Transaction{ Type: "withdrawal", Amount: -data.Amount, Description: data.Description, Timestamp: event.Timestamp.String(), } default: return nil // Ignore other events } p.transactions[event.StreamID] = append(p.transactions[event.StreamID], tx) return nil } func (p *TransactionHistoryProjection) GetHistory(accountID string) []Transaction { p.mu.RLock() defer p.mu.RUnlock() return p.transactions[accountID] }

Snapshots

┌─────────────────────────────────────────────────────────────────┐ │ SNAPSHOTS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Problem: Replaying thousands of events is slow │ │ │ │ Solution: Periodically save aggregate state │ │ │ │ Without snapshots: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ Load: Event 1 → Event 2 → ... → Event 10000 │ │ │ │ (slow - replay all events) │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ With snapshots: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ Load: Snapshot@9900 → Event 9901 → ... → Event 10000 │ │ │ │ (fast - only replay recent events) │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ Snapshot Strategy: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Every N events (e.g., every 100 events) │ │ │ │ • Every N minutes │ │ │ │ • When aggregate is saved │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Snapshot Implementation

go
package eventsourcing import ( "context" "encoding/json" "sync" ) // Snapshot represents a point-in-time aggregate state type Snapshot struct { AggregateID string `json:"aggregate_id"` Version int64 `json:"version"` State json.RawMessage `json:"state"` } // SnapshotStore stores snapshots type SnapshotStore interface { Save(ctx context.Context, snapshot Snapshot) error Load(ctx context.Context, aggregateID string) (*Snapshot, error) } // InMemorySnapshotStore is a simple in-memory implementation type InMemorySnapshotStore struct { mu sync.RWMutex snapshots map[string]Snapshot } func NewInMemorySnapshotStore() *InMemorySnapshotStore { return &InMemorySnapshotStore{ snapshots: make(map[string]Snapshot), } } func (s *InMemorySnapshotStore) Save(ctx context.Context, snapshot Snapshot) error { s.mu.Lock() defer s.mu.Unlock() s.snapshots[snapshot.AggregateID] = snapshot return nil } func (s *InMemorySnapshotStore) Load(ctx context.Context, aggregateID string) (*Snapshot, error) { s.mu.RLock() defer s.mu.RUnlock() snapshot, ok := s.snapshots[aggregateID] if !ok { return nil, nil } return &snapshot, nil } // Snapshotable interface for aggregates that support snapshots type Snapshotable interface { Aggregate ToSnapshot() (json.RawMessage, error) FromSnapshot(data json.RawMessage) error } // SnapshotRepository extends Repository with snapshot support type SnapshotRepository struct { eventStore EventStore snapshotStore SnapshotStore factory func(id string) Snapshotable snapshotEvery int64 } func NewSnapshotRepository( eventStore EventStore, snapshotStore SnapshotStore, factory func(id string) Snapshotable, snapshotEvery int64, ) *SnapshotRepository { return &SnapshotRepository{ eventStore: eventStore, snapshotStore: snapshotStore, factory: factory, snapshotEvery: snapshotEvery, } } // Load retrieves an aggregate, using snapshot if available func (r *SnapshotRepository) Load(ctx context.Context, aggregateID string) (Snapshotable, error) { aggregate := r.factory(aggregateID) // Try to load snapshot snapshot, _ := r.snapshotStore.Load(ctx, aggregateID) fromVersion := int64(0) if snapshot != nil { // Restore from snapshot if err := aggregate.FromSnapshot(snapshot.State); err != nil { return nil, err } fromVersion = snapshot.Version } // Load events after snapshot events, err := r.eventStore.ReadStream(ctx, aggregateID, fromVersion) if err != nil { return nil, err } // Replay events for _, event := range events { aggregate.ApplyEvent(event) } return aggregate, nil } // Save persists aggregate and creates snapshot if needed func (r *SnapshotRepository) Save(ctx context.Context, aggregate Snapshotable) error { events := aggregate.UncommittedEvents() if len(events) == 0 { return nil } // Save events expectedVersion := aggregate.Version() - int64(len(events)) if err := r.eventStore.Append(ctx, aggregate.ID(), events, expectedVersion); err != nil { return err } // Apply events for _, event := range events { aggregate.ApplyEvent(event) } aggregate.ClearUncommittedEvents() // Check if snapshot needed if aggregate.Version()%r.snapshotEvery == 0 { state, err := aggregate.ToSnapshot() if err != nil { return err } snapshot := Snapshot{ AggregateID: aggregate.ID(), Version: aggregate.Version(), State: state, } r.snapshotStore.Save(ctx, snapshot) } return nil } // Make BankAccount snapshotable func (a *BankAccount) ToSnapshot() (json.RawMessage, error) { state := map[string]interface{}{ "balance": a.balance, "is_blocked": a.isBlocked, } return json.Marshal(state) } func (a *BankAccount) FromSnapshot(data json.RawMessage) error { var state map[string]interface{} if err := json.Unmarshal(data, &state); err != nil { return err } a.balance = int64(state["balance"].(float64)) a.isBlocked = state["is_blocked"].(bool) return nil }

Best Practices

┌─────────────────────────────────────────────────────────────────┐ │ EVENT SOURCING BEST PRACTICES │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. EVENTS ARE IMMUTABLE │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Never modify stored events │ │ │ │ • To fix mistakes, add compensating events │ │ │ │ • Events represent facts that happened │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 2. DESIGN EVENTS CAREFULLY │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Name events in past tense (OrderPlaced, not PlaceOrder) │ │ │ │ • Include all necessary data in event │ │ │ │ • Keep events small and focused │ │ │ │ • Version your event schemas │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 3. HANDLE SCHEMA EVOLUTION │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Use upcasters to transform old events │ │ │ │ • Add optional fields with defaults │ │ │ │ • Never remove required fields │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 4. USE SNAPSHOTS FOR PERFORMANCE │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Snapshot after N events │ │ │ │ • Keep some recent events for debugging │ │ │ │ • Test snapshot restore │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 5. PROJECTIONS ARE DISPOSABLE │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Can always rebuild from events │ │ │ │ • Keep projections focused │ │ │ │ • Use different stores for different needs │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Interview Questions

  1. What is event sourcing?
    • Storing state as a sequence of events rather than current state
    • Events are immutable facts
    • State is rebuilt by replaying events
  2. When would you use event sourcing?
    • Audit requirements
    • Temporal queries
    • Complex domain logic
    • Event-driven architecture
  3. What are projections?
    • Read models built from events
    • Optimized for queries
    • Can be rebuilt anytime
  4. How do you handle performance with many events?
    • Snapshots
    • Projection caching
    • Pagination
  5. What's the difference between event sourcing and event-driven architecture?
    • Event sourcing: How state is stored
    • Event-driven: How services communicate
    • Often used together

Summary

┌─────────────────────────────────────────────────────────────────┐ │ EVENT SOURCING SUMMARY │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Core Concepts: │ │ • Events: Immutable facts that happened │ │ • Aggregates: Domain objects rebuilt from events │ │ • Projections: Read models for queries │ │ • Snapshots: Performance optimization │ │ │ │ Benefits: │ │ • Complete audit trail │ │ • Temporal queries │ │ • Multiple read models │ │ • Natural event-driven integration │ │ │ │ Challenges: │ │ • Schema evolution │ │ • Eventually consistent reads │ │ • Learning curve │ │ │ │ Key Insight: │ │ "In event sourcing, the event log is the source of truth. │ │ Everything else is just a cache." │ │ │ └─────────────────────────────────────────────────────────────────┘

All Blogs
Tags:event-sourcingeventsarchitecturedistributed-systems