System Design Part 6: Event-Driven Architecture - The Complete Guide

Introduction

Event-driven architecture (EDA) transforms how systems communicate - from direct, synchronous calls to loosely coupled, asynchronous events. This paradigm shift enables massive scale, better resilience, and true microservices independence. Let's explore how to design and implement EDA effectively.

1. Design Kafka-Based Event System for Order Updates

Why Kafka for Order Events?

mermaid
graph TB subgraph "Synchronous (Before)" Order[Order Service] -->|HTTP| Inventory Order -->|HTTP| Payment Order -->|HTTP| Notification Order -->|HTTP| Analytics Note1[Problems:<br/>- Coupling<br/>- Cascading failures<br/>- Slow response] end subgraph "Event-Driven (After)" Order2[Order Service] -->|Publish| Kafka[Kafka Topic] Kafka -->|Subscribe| Inventory2[Inventory] Kafka -->|Subscribe| Payment2[Payment] Kafka -->|Subscribe| Notification2[Notification] Kafka -->|Subscribe| Analytics2[Analytics] Note2[Benefits:<br/>- Decoupled<br/>- Fault tolerant<br/>- Fast response] end

Complete Kafka Event System Implementation

go
package events import ( "context" "encoding/json" "fmt" "time" "github.com/segmentio/kafka-go" "github.com/google/uuid" ) // Event envelope with metadata type Event struct { ID string `json:"id"` Type string `json:"type"` Source string `json:"source"` Subject string `json:"subject"` Time time.Time `json:"time"` DataVersion string `json:"data_version"` Data json.RawMessage `json:"data"` CorrelationID string `json:"correlation_id"` CausationID string `json:"causation_id"` } // Order domain events type OrderCreatedEvent struct { OrderID string `json:"order_id"` CustomerID string `json:"customer_id"` Items []OrderItem `json:"items"` TotalAmount float64 `json:"total_amount"` Currency string `json:"currency"` CreatedAt time.Time `json:"created_at"` } type OrderItem struct { ProductID string `json:"product_id"` Quantity int `json:"quantity"` Price float64 `json:"price"` } type OrderStatusChangedEvent struct { OrderID string `json:"order_id"` OldStatus string `json:"old_status"` NewStatus string `json:"new_status"` Reason string `json:"reason,omitempty"` ChangedAt time.Time `json:"changed_at"` ChangedBy string `json:"changed_by"` } type OrderDeliveredEvent struct { OrderID string `json:"order_id"` DeliveredAt time.Time `json:"delivered_at"` DeliveredTo string `json:"delivered_to"` SignedBy string `json:"signed_by,omitempty"` PhotoURL string `json:"photo_url,omitempty"` } // Event producer with reliability guarantees type EventProducer struct { writer *kafka.Writer serviceName string } func NewEventProducer(brokers []string, serviceName string) *EventProducer { writer := &kafka.Writer{ Addr: kafka.TCP(brokers...), Balancer: &kafka.LeastBytes{}, RequiredAcks: kafka.RequireAll, // Wait for all replicas Async: false, // Synchronous for reliability Compression: kafka.Snappy, BatchTimeout: 10 * time.Millisecond, MaxAttempts: 3, } return &EventProducer{ writer: writer, serviceName: serviceName, } } func (p *EventProducer) Publish(ctx context.Context, topic string, eventType string, data interface{}) error { dataBytes, err := json.Marshal(data) if err != nil { return fmt.Errorf("marshal data: %w", err) } event := Event{ ID: uuid.New().String(), Type: eventType, Source: p.serviceName, Subject: extractSubject(data), Time: time.Now().UTC(), DataVersion: "1.0", Data: dataBytes, CorrelationID: getCorrelationID(ctx), CausationID: getCausationID(ctx), } eventBytes, err := json.Marshal(event) if err != nil { return fmt.Errorf("marshal event: %w", err) } // Use order ID as key for ordering guarantees key := extractPartitionKey(data) err = p.writer.WriteMessages(ctx, kafka.Message{ Topic: topic, Key: []byte(key), Value: eventBytes, Headers: []kafka.Header{ {Key: "event_type", Value: []byte(eventType)}, {Key: "correlation_id", Value: []byte(event.CorrelationID)}, }, }) if err != nil { return fmt.Errorf("write message: %w", err) } return nil } // Publish with outbox pattern for exactly-once semantics func (p *EventProducer) PublishWithOutbox(ctx context.Context, tx *sql.Tx, topic string, eventType string, data interface{}) error { dataBytes, err := json.Marshal(data) if err != nil { return err } event := Event{ ID: uuid.New().String(), Type: eventType, Source: p.serviceName, Time: time.Now().UTC(), DataVersion: "1.0", Data: dataBytes, CorrelationID: getCorrelationID(ctx), } eventBytes, _ := json.Marshal(event) _, err = tx.ExecContext(ctx, ` INSERT INTO event_outbox (id, topic, partition_key, event_type, payload, created_at) VALUES ($1, $2, $3, $4, $5, NOW()) `, event.ID, topic, extractPartitionKey(data), eventType, eventBytes) return err } func (p *EventProducer) Close() error { return p.writer.Close() } // Event consumer with consumer group type EventConsumer struct { reader *kafka.Reader handlers map[string]EventHandler serviceName string } type EventHandler func(ctx context.Context, event Event) error func NewEventConsumer(brokers []string, topic, groupID, serviceName string) *EventConsumer { reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, Topic: topic, GroupID: groupID, MinBytes: 1, MaxBytes: 10e6, MaxWait: 500 * time.Millisecond, StartOffset: kafka.FirstOffset, CommitInterval: time.Second, }) return &EventConsumer{ reader: reader, handlers: make(map[string]EventHandler), serviceName: serviceName, } } func (c *EventConsumer) RegisterHandler(eventType string, handler EventHandler) { c.handlers[eventType] = handler } func (c *EventConsumer) Start(ctx context.Context) error { for { select { case <-ctx.Done(): return c.reader.Close() default: msg, err := c.reader.FetchMessage(ctx) if err != nil { if ctx.Err() != nil { return nil } log.Printf("Fetch error: %v", err) continue } if err := c.processMessage(ctx, msg); err != nil { log.Printf("Process error for message %s: %v", string(msg.Key), err) // Don't commit - will be retried continue } if err := c.reader.CommitMessages(ctx, msg); err != nil { log.Printf("Commit error: %v", err) } } } } func (c *EventConsumer) processMessage(ctx context.Context, msg kafka.Message) error { var event Event if err := json.Unmarshal(msg.Value, &event); err != nil { return fmt.Errorf("unmarshal event: %w", err) } handler, ok := c.handlers[event.Type] if !ok { // Unknown event type - log and skip log.Printf("No handler for event type: %s", event.Type) return nil } // Add correlation context ctx = withCorrelationID(ctx, event.CorrelationID) ctx = withCausationID(ctx, event.ID) return handler(ctx, event) } // Example: Inventory service consumer type InventoryService struct { consumer *EventConsumer repo InventoryRepository } func (s *InventoryService) Start(ctx context.Context) error { // Register handlers s.consumer.RegisterHandler("order.created", s.handleOrderCreated) s.consumer.RegisterHandler("order.cancelled", s.handleOrderCancelled) return s.consumer.Start(ctx) } func (s *InventoryService) handleOrderCreated(ctx context.Context, event Event) error { var data OrderCreatedEvent if err := json.Unmarshal(event.Data, &data); err != nil { return err } // Reserve inventory for each item for _, item := range data.Items { err := s.repo.ReserveStock(ctx, item.ProductID, item.Quantity, data.OrderID) if err != nil { // Publish compensation event return s.publishReservationFailed(ctx, data.OrderID, err) } } // Publish success event return s.publishInventoryReserved(ctx, data.OrderID) } func (s *InventoryService) handleOrderCancelled(ctx context.Context, event Event) error { var data struct { OrderID string `json:"order_id"` } if err := json.Unmarshal(event.Data, &data); err != nil { return err } // Release reserved inventory return s.repo.ReleaseReservation(ctx, data.OrderID) }

Kafka Topic Design

go
// Topic configuration for order events type TopicConfig struct { Name string Partitions int ReplicationFactor int RetentionDays int CleanupPolicy string } func OrderTopicsConfig() []TopicConfig { return []TopicConfig{ { Name: "orders.events", Partitions: 12, // Match consumer instances ReplicationFactor: 3, // High availability RetentionDays: 30, // Keep for audit CleanupPolicy: "delete", }, { Name: "orders.changelog", Partitions: 12, ReplicationFactor: 3, RetentionDays: -1, // Keep forever CleanupPolicy: "compact", // Only keep latest per key }, { Name: "orders.dlq", // Dead letter queue Partitions: 3, ReplicationFactor: 3, RetentionDays: 90, CleanupPolicy: "delete", }, } } // Admin client for topic management type KafkaAdmin struct { conn *kafka.Conn } func (a *KafkaAdmin) CreateTopics(configs []TopicConfig) error { for _, cfg := range configs { err := a.conn.CreateTopics(kafka.TopicConfig{ Topic: cfg.Name, NumPartitions: cfg.Partitions, ReplicationFactor: cfg.ReplicationFactor, ConfigEntries: []kafka.ConfigEntry{ {ConfigName: "retention.ms", ConfigValue: fmt.Sprintf("%d", cfg.RetentionDays*24*60*60*1000)}, {ConfigName: "cleanup.policy", ConfigValue: cfg.CleanupPolicy}, }, }) if err != nil { return err } } return nil }

Scaling Behavior

Normal Load (1K events/sec): ├── Partitions: 12 ├── Consumer instances: 4 ├── Partitions per consumer: 3 ├── Lag: <100 messages └── Processing latency: <10ms Peak Load (10K events/sec): ├── Scale consumers to: 12 ├── Partitions per consumer: 1 ├── May need to increase partitions └── Processing latency: <50ms Extreme Load (100K events/sec): ├── Add more partitions: 24-48 ├── Consumer instances: 24-48 ├── Consider Kafka cluster scaling └── Enable batch processing

2. Event Sourcing: When and How to Use

What is Event Sourcing?

mermaid
graph TB subgraph "Traditional (State-Based)" DB1[(Database)] DB1 -->|Current Balance| B1[Balance: $500] end subgraph "Event Sourcing" ES[(Event Store)] ES -->|Event 1| E1[Deposit $1000] ES -->|Event 2| E2[Withdraw $300] ES -->|Event 3| E3[Transfer $200] E1 --> Replay[Replay] E2 --> Replay E3 --> Replay Replay --> B2[Balance: $500] end

Complete Event Sourcing Implementation

go
package eventsourcing import ( "context" "encoding/json" "errors" "reflect" "sync" "time" ) // Base event interface type DomainEvent interface { EventType() string AggregateID() string OccurredAt() time.Time } // Event store type EventStore interface { Append(ctx context.Context, aggregateID string, events []DomainEvent, expectedVersion int) error Load(ctx context.Context, aggregateID string) ([]StoredEvent, error) LoadFrom(ctx context.Context, aggregateID string, fromVersion int) ([]StoredEvent, error) } type StoredEvent struct { ID string AggregateID string Type string Data json.RawMessage Metadata json.RawMessage Version int Timestamp time.Time } // PostgreSQL event store type PostgresEventStore struct { db *sql.DB } func (es *PostgresEventStore) Append(ctx context.Context, aggregateID string, events []DomainEvent, expectedVersion int) error { tx, err := es.db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // Optimistic concurrency check var currentVersion int err = tx.QueryRowContext(ctx, ` SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1 `, aggregateID).Scan(&currentVersion) if err != nil { return err } if currentVersion != expectedVersion { return ErrConcurrencyConflict } // Append events for i, event := range events { data, err := json.Marshal(event) if err != nil { return err } version := expectedVersion + i + 1 _, err = tx.ExecContext(ctx, ` INSERT INTO events (id, aggregate_id, type, data, version, timestamp) VALUES ($1, $2, $3, $4, $5, $6) `, uuid.New().String(), aggregateID, event.EventType(), data, version, event.OccurredAt()) if err != nil { return err } } return tx.Commit() } func (es *PostgresEventStore) Load(ctx context.Context, aggregateID string) ([]StoredEvent, error) { return es.LoadFrom(ctx, aggregateID, 0) } func (es *PostgresEventStore) LoadFrom(ctx context.Context, aggregateID string, fromVersion int) ([]StoredEvent, error) { rows, err := es.db.QueryContext(ctx, ` SELECT id, aggregate_id, type, data, version, timestamp FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version `, aggregateID, fromVersion) if err != nil { return nil, err } defer rows.Close() var events []StoredEvent for rows.Next() { var e StoredEvent err := rows.Scan(&e.ID, &e.AggregateID, &e.Type, &e.Data, &e.Version, &e.Timestamp) if err != nil { return nil, err } events = append(events, e) } return events, nil } // Aggregate root base type AggregateRoot struct { id string version int uncommitted []DomainEvent } func (a *AggregateRoot) ID() string { return a.id } func (a *AggregateRoot) Version() int { return a.version } func (a *AggregateRoot) TrackChange(event DomainEvent) { a.uncommitted = append(a.uncommitted, event) a.version++ } func (a *AggregateRoot) GetUncommittedEvents() []DomainEvent { return a.uncommitted } func (a *AggregateRoot) ClearUncommittedEvents() { a.uncommitted = nil } // Bank Account aggregate type BankAccount struct { AggregateRoot balance float64 status string holder string } // Events type AccountOpenedEvent struct { AccountID string `json:"account_id"` Holder string `json:"holder"` Timestamp time.Time `json:"timestamp"` } func (e AccountOpenedEvent) EventType() string { return "account.opened" } func (e AccountOpenedEvent) AggregateID() string { return e.AccountID } func (e AccountOpenedEvent) OccurredAt() time.Time { return e.Timestamp } type MoneyDepositedEvent struct { AccountID string `json:"account_id"` Amount float64 `json:"amount"` TransactionID string `json:"transaction_id"` Timestamp time.Time `json:"timestamp"` } func (e MoneyDepositedEvent) EventType() string { return "money.deposited" } func (e MoneyDepositedEvent) AggregateID() string { return e.AccountID } func (e MoneyDepositedEvent) OccurredAt() time.Time { return e.Timestamp } type MoneyWithdrawnEvent struct { AccountID string `json:"account_id"` Amount float64 `json:"amount"` TransactionID string `json:"transaction_id"` Timestamp time.Time `json:"timestamp"` } func (e MoneyWithdrawnEvent) EventType() string { return "money.withdrawn" } func (e MoneyWithdrawnEvent) AggregateID() string { return e.AccountID } func (e MoneyWithdrawnEvent) OccurredAt() time.Time { return e.Timestamp } // Commands func OpenAccount(id, holder string) *BankAccount { account := &BankAccount{} account.id = id event := AccountOpenedEvent{ AccountID: id, Holder: holder, Timestamp: time.Now(), } account.Apply(event) account.TrackChange(event) return account } func (a *BankAccount) Deposit(amount float64, txID string) error { if amount <= 0 { return errors.New("deposit amount must be positive") } if a.status != "active" { return errors.New("account is not active") } event := MoneyDepositedEvent{ AccountID: a.id, Amount: amount, TransactionID: txID, Timestamp: time.Now(), } a.Apply(event) a.TrackChange(event) return nil } func (a *BankAccount) Withdraw(amount float64, txID string) error { if amount <= 0 { return errors.New("withdrawal amount must be positive") } if a.status != "active" { return errors.New("account is not active") } if a.balance < amount { return errors.New("insufficient funds") } event := MoneyWithdrawnEvent{ AccountID: a.id, Amount: amount, TransactionID: txID, Timestamp: time.Now(), } a.Apply(event) a.TrackChange(event) return nil } // Event application (rebuilds state) func (a *BankAccount) Apply(event DomainEvent) { switch e := event.(type) { case AccountOpenedEvent: a.id = e.AccountID a.holder = e.Holder a.status = "active" a.balance = 0 case MoneyDepositedEvent: a.balance += e.Amount case MoneyWithdrawnEvent: a.balance -= e.Amount } } // Repository type BankAccountRepository struct { eventStore EventStore eventTypes map[string]reflect.Type } func NewBankAccountRepository(es EventStore) *BankAccountRepository { return &BankAccountRepository{ eventStore: es, eventTypes: map[string]reflect.Type{ "account.opened": reflect.TypeOf(AccountOpenedEvent{}), "money.deposited": reflect.TypeOf(MoneyDepositedEvent{}), "money.withdrawn": reflect.TypeOf(MoneyWithdrawnEvent{}), }, } } func (r *BankAccountRepository) Load(ctx context.Context, id string) (*BankAccount, error) { events, err := r.eventStore.Load(ctx, id) if err != nil { return nil, err } if len(events) == 0 { return nil, ErrAggregateNotFound } account := &BankAccount{} for _, stored := range events { eventType, ok := r.eventTypes[stored.Type] if !ok { return nil, fmt.Errorf("unknown event type: %s", stored.Type) } event := reflect.New(eventType).Interface() if err := json.Unmarshal(stored.Data, event); err != nil { return nil, err } account.Apply(event.(DomainEvent)) account.version = stored.Version } return account, nil } func (r *BankAccountRepository) Save(ctx context.Context, account *BankAccount) error { events := account.GetUncommittedEvents() if len(events) == 0 { return nil } expectedVersion := account.Version() - len(events) err := r.eventStore.Append(ctx, account.ID(), events, expectedVersion) if err != nil { return err } account.ClearUncommittedEvents() return nil } // Snapshots for performance type Snapshot struct { AggregateID string Version int State json.RawMessage Timestamp time.Time } type SnapshotStore interface { Save(ctx context.Context, snapshot Snapshot) error Load(ctx context.Context, aggregateID string) (*Snapshot, error) } func (r *BankAccountRepository) LoadWithSnapshot(ctx context.Context, id string, snapshotStore SnapshotStore) (*BankAccount, error) { // Try to load snapshot snapshot, err := snapshotStore.Load(ctx, id) var account *BankAccount var fromVersion int if err == nil && snapshot != nil { // Restore from snapshot account = &BankAccount{} if err := json.Unmarshal(snapshot.State, account); err != nil { return nil, err } fromVersion = snapshot.Version } else { account = &BankAccount{} fromVersion = 0 } // Load events after snapshot events, err := r.eventStore.LoadFrom(ctx, id, fromVersion) if err != nil { return nil, err } // Apply remaining events for _, stored := range events { eventType, ok := r.eventTypes[stored.Type] if !ok { continue } event := reflect.New(eventType).Interface() json.Unmarshal(stored.Data, event) account.Apply(event.(DomainEvent)) account.version = stored.Version } return account, nil }

When to Use Event Sourcing

Use Event SourcingDon't Use Event Sourcing
Audit requirements (finance, healthcare)Simple CRUD applications
Complex business logic with state changesHigh-frequency updates (counters)
Need to replay/debug historical statesLarge binary data
Event-driven microservicesSimple reporting needs
Temporal queries ("What was balance on date X?")Team unfamiliar with pattern

3. Handling Duplicate Events (Idempotency)

The Duplicate Event Problem

mermaid
sequenceDiagram participant Producer participant Kafka participant Consumer Producer->>Kafka: Send Event A Kafka-->>Producer: Timeout (network issue) Producer->>Kafka: Retry Event A Kafka->>Consumer: Event A (first delivery) Kafka->>Consumer: Event A (duplicate!) Note over Consumer: Must handle<br/>both deliveries<br/>correctly!

Comprehensive Idempotency Implementation

go
package idempotency import ( "context" "crypto/sha256" "encoding/hex" "errors" "fmt" "sync" "time" "github.com/redis/go-redis/v9" ) var ( ErrDuplicateEvent = errors.New("duplicate event") ErrEventExpired = errors.New("event already processed and expired") ) // Idempotency key generator type IdempotencyKey struct { EventID string EventType string ConsumerID string } func (k IdempotencyKey) String() string { data := fmt.Sprintf("%s:%s:%s", k.EventType, k.EventID, k.ConsumerID) hash := sha256.Sum256([]byte(data)) return hex.EncodeToString(hash[:]) } // Redis-based idempotency store type IdempotencyStore struct { client *redis.Client ttl time.Duration } func NewIdempotencyStore(client *redis.Client, ttl time.Duration) *IdempotencyStore { return &IdempotencyStore{ client: client, ttl: ttl, } } type ProcessingResult struct { Success bool Error string ProcessedAt time.Time } // Check and mark as processing (atomic) func (s *IdempotencyStore) TryProcess(ctx context.Context, key IdempotencyKey) (bool, error) { // Use SET NX (set if not exists) result, err := s.client.SetNX(ctx, key.String(), "processing", s.ttl).Result() if err != nil { return false, err } return result, nil // true = can process, false = already processed/processing } // Mark as completed func (s *IdempotencyStore) MarkComplete(ctx context.Context, key IdempotencyKey, result ProcessingResult) error { data, _ := json.Marshal(result) return s.client.Set(ctx, key.String(), data, s.ttl).Err() } // Get previous result (for returning to caller) func (s *IdempotencyStore) GetResult(ctx context.Context, key IdempotencyKey) (*ProcessingResult, error) { data, err := s.client.Get(ctx, key.String()).Bytes() if err != nil { if err == redis.Nil { return nil, nil } return nil, err } var result ProcessingResult if err := json.Unmarshal(data, &result); err != nil { return nil, err } return &result, nil } // Idempotent event processor type IdempotentProcessor struct { store *IdempotencyStore consumerID string handler EventHandler metrics *ProcessorMetrics } type EventHandler func(ctx context.Context, event Event) error type ProcessorMetrics struct { Processed int64 Duplicates int64 Errors int64 mu sync.Mutex } func NewIdempotentProcessor(store *IdempotencyStore, consumerID string, handler EventHandler) *IdempotentProcessor { return &IdempotentProcessor{ store: store, consumerID: consumerID, handler: handler, metrics: &ProcessorMetrics{}, } } func (p *IdempotentProcessor) Process(ctx context.Context, event Event) error { key := IdempotencyKey{ EventID: event.ID, EventType: event.Type, ConsumerID: p.consumerID, } // Try to acquire processing lock canProcess, err := p.store.TryProcess(ctx, key) if err != nil { return fmt.Errorf("idempotency check failed: %w", err) } if !canProcess { // Already processed or being processed p.metrics.mu.Lock() p.metrics.Duplicates++ p.metrics.mu.Unlock() // Check if we have a result result, _ := p.store.GetResult(ctx, key) if result != nil { if result.Success { return nil // Already succeeded } return errors.New(result.Error) // Return original error } return ErrDuplicateEvent } // Process the event handlerErr := p.handler(ctx, event) // Record result result := ProcessingResult{ Success: handlerErr == nil, ProcessedAt: time.Now(), } if handlerErr != nil { result.Error = handlerErr.Error() } if err := p.store.MarkComplete(ctx, key, result); err != nil { // Log but don't fail - event was processed log.Printf("Failed to mark event complete: %v", err) } if handlerErr != nil { p.metrics.mu.Lock() p.metrics.Errors++ p.metrics.mu.Unlock() return handlerErr } p.metrics.mu.Lock() p.metrics.Processed++ p.metrics.mu.Unlock() return nil } // Database-level idempotency (when Redis isn't enough) type DatabaseIdempotency struct { db *sql.DB } func (d *DatabaseIdempotency) ProcessWithIdempotency(ctx context.Context, eventID string, handler func(tx *sql.Tx) error) error { tx, err := d.db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // Try to insert idempotency record _, err = tx.ExecContext(ctx, ` INSERT INTO processed_events (event_id, processed_at) VALUES ($1, NOW()) ON CONFLICT (event_id) DO NOTHING `, eventID) if err != nil { return err } // Check if we inserted (meaning we should process) var exists bool err = tx.QueryRowContext(ctx, ` SELECT EXISTS( SELECT 1 FROM processed_events WHERE event_id = $1 AND processed_at < NOW() - INTERVAL '1 second' ) `, eventID).Scan(&exists) if exists { // Already processed by another instance return nil } // Process the event if err := handler(tx); err != nil { return err } return tx.Commit() } // Exactly-once processing with Kafka transactions type ExactlyOnceProcessor struct { consumer *kafka.Reader producer *kafka.Writer handler func(msg kafka.Message) ([]kafka.Message, error) } func (p *ExactlyOnceProcessor) Process(ctx context.Context) error { for { msg, err := p.consumer.FetchMessage(ctx) if err != nil { return err } // Process and get output messages outputs, err := p.handler(msg) if err != nil { // Don't commit - will retry continue } // Transactional produce + commit // (Requires Kafka transactions setup) err = p.producer.WriteMessages(ctx, outputs...) if err != nil { continue } // Commit consumer offset p.consumer.CommitMessages(ctx, msg) } }

4. Event Schema Evolution

The Evolution Problem

mermaid
timeline title Event Schema Evolution V1 : OrderCreated {orderId, customerId, amount} V2 : OrderCreated + {currency, items[]} V3 : OrderCreated + {shippingAddress, discountCode} V4 : OrderCreated - amount + {subtotal, tax, total}

Backward and Forward Compatible Evolution

go
package schema import ( "encoding/json" "fmt" ) // Schema registry interface type SchemaRegistry interface { Register(subject string, schema string) (int, error) GetSchema(subject string, version int) (string, error) CheckCompatibility(subject string, schema string) (bool, error) } // Event with version support type VersionedEvent struct { SchemaVersion int `json:"schema_version"` Type string `json:"type"` Data json.RawMessage `json:"data"` } // V1 Order Event type OrderCreatedV1 struct { OrderID string `json:"order_id"` CustomerID string `json:"customer_id"` Amount float64 `json:"amount"` } // V2 Order Event (backward compatible - new fields have defaults) type OrderCreatedV2 struct { OrderID string `json:"order_id"` CustomerID string `json:"customer_id"` Amount float64 `json:"amount"` Currency string `json:"currency"` // New field with default Items []OrderItem `json:"items"` // New field } // V3 Order Event (with migration from V2) type OrderCreatedV3 struct { OrderID string `json:"order_id"` CustomerID string `json:"customer_id"` Subtotal float64 `json:"subtotal"` // Renamed from amount Tax float64 `json:"tax"` // New Total float64 `json:"total"` // New Currency string `json:"currency"` Items []OrderItem `json:"items"` ShippingAddress *Address `json:"shipping_address"` // New optional } // Event upgrader type EventUpgrader struct { upgraders map[string]map[int]func(json.RawMessage) (json.RawMessage, error) } func NewEventUpgrader() *EventUpgrader { eu := &EventUpgrader{ upgraders: make(map[string]map[int]func(json.RawMessage) (json.RawMessage, error)), } // Register upgraders eu.RegisterUpgrader("order.created", 1, eu.upgradeOrderV1ToV2) eu.RegisterUpgrader("order.created", 2, eu.upgradeOrderV2ToV3) return eu } func (eu *EventUpgrader) RegisterUpgrader(eventType string, fromVersion int, upgrader func(json.RawMessage) (json.RawMessage, error)) { if eu.upgraders[eventType] == nil { eu.upgraders[eventType] = make(map[int]func(json.RawMessage) (json.RawMessage, error)) } eu.upgraders[eventType][fromVersion] = upgrader } func (eu *EventUpgrader) Upgrade(event VersionedEvent, targetVersion int) (json.RawMessage, error) { currentVersion := event.SchemaVersion data := event.Data for currentVersion < targetVersion { upgrader, ok := eu.upgraders[event.Type][currentVersion] if !ok { return nil, fmt.Errorf("no upgrader for %s v%d", event.Type, currentVersion) } var err error data, err = upgrader(data) if err != nil { return nil, err } currentVersion++ } return data, nil } func (eu *EventUpgrader) upgradeOrderV1ToV2(data json.RawMessage) (json.RawMessage, error) { var v1 OrderCreatedV1 if err := json.Unmarshal(data, &v1); err != nil { return nil, err } v2 := OrderCreatedV2{ OrderID: v1.OrderID, CustomerID: v1.CustomerID, Amount: v1.Amount, Currency: "USD", // Default value Items: nil, // Unknown from V1 } return json.Marshal(v2) } func (eu *EventUpgrader) upgradeOrderV2ToV3(data json.RawMessage) (json.RawMessage, error) { var v2 OrderCreatedV2 if err := json.Unmarshal(data, &v2); err != nil { return nil, err } v3 := OrderCreatedV3{ OrderID: v2.OrderID, CustomerID: v2.CustomerID, Subtotal: v2.Amount, Tax: 0, // Can't derive from V2 Total: v2.Amount, Currency: v2.Currency, Items: v2.Items, } return json.Marshal(v3) } // Consumer with version handling type VersionAwareConsumer struct { upgrader *EventUpgrader currentVersion int handlers map[string]func(json.RawMessage) error } func (c *VersionAwareConsumer) Process(event VersionedEvent) error { // Upgrade to current version if needed data := event.Data if event.SchemaVersion < c.currentVersion { var err error data, err = c.upgrader.Upgrade(event, c.currentVersion) if err != nil { return fmt.Errorf("upgrade failed: %w", err) } } handler, ok := c.handlers[event.Type] if !ok { return fmt.Errorf("no handler for event type: %s", event.Type) } return handler(data) } // Producer with schema validation type SchemaValidatingProducer struct { registry SchemaRegistry producer *kafka.Writer } func (p *SchemaValidatingProducer) Publish(ctx context.Context, topic string, event interface{}, schemaVersion int) error { // Serialize event data, err := json.Marshal(event) if err != nil { return err } // Validate against schema registry eventType := fmt.Sprintf("%T", event) schema, err := p.registry.GetSchema(eventType, schemaVersion) if err != nil { return fmt.Errorf("schema not found: %w", err) } if err := validateAgainstSchema(data, schema); err != nil { return fmt.Errorf("schema validation failed: %w", err) } // Wrap with version versioned := VersionedEvent{ SchemaVersion: schemaVersion, Type: eventType, Data: data, } payload, _ := json.Marshal(versioned) return p.producer.WriteMessages(ctx, kafka.Message{ Topic: topic, Value: payload, }) }

Schema Evolution Best Practices

Change TypeBackward CompatibleForward CompatibleExample
Add optional fieldAdd discount_code
Add required fieldAdd currency (required)
Remove fieldRemove legacy_id
Rename fieldamounttotal
Change field typeamount: intamount: float
Add enum valueAdd REFUNDED status
Remove enum valueRemove PENDING status

5. Dead Letter Queue (DLQ) Design

Why Dead Letter Queues?

mermaid
graph TB subgraph "Without DLQ" Q1[Main Queue] -->|Poison Message| C1[Consumer] C1 -->|Retry| Q1 C1 -->|Retry| Q1 C1 -->|Retry Forever| Q1 Note1[System stuck!] end subgraph "With DLQ" Q2[Main Queue] -->|Normal| C2[Consumer] C2 -->|Success| Done[Done] C2 -->|Retry 3x| Retry[Retry] Retry -->|Still Fails| DLQ[Dead Letter Queue] DLQ --> Alert[Alert + Manual Review] end

Complete DLQ Implementation

go
package dlq import ( "context" "encoding/json" "fmt" "time" "github.com/segmentio/kafka-go" ) type DeadLetterMessage struct { OriginalTopic string `json:"original_topic"` OriginalPartition int `json:"original_partition"` OriginalOffset int64 `json:"original_offset"` OriginalKey string `json:"original_key"` OriginalValue json.RawMessage `json:"original_value"` OriginalHeaders []Header `json:"original_headers"` FailureReason string `json:"failure_reason"` FailureCount int `json:"failure_count"` FirstFailedAt time.Time `json:"first_failed_at"` LastFailedAt time.Time `json:"last_failed_at"` ConsumerGroup string `json:"consumer_group"` ConsumerInstance string `json:"consumer_instance"` } type Header struct { Key string `json:"key"` Value string `json:"value"` } type DeadLetterQueue struct { producer *kafka.Writer topic string metrics *DLQMetrics } type DLQMetrics struct { MessagesQueued int64 MessagesReprocessed int64 MessagesDiscarded int64 } func NewDeadLetterQueue(brokers []string, topic string) *DeadLetterQueue { return &DeadLetterQueue{ producer: &kafka.Writer{ Addr: kafka.TCP(brokers...), Topic: topic, Balancer: &kafka.LeastBytes{}, RequiredAcks: kafka.RequireAll, }, topic: topic, metrics: &DLQMetrics{}, } } func (dlq *DeadLetterQueue) Send(ctx context.Context, originalMsg kafka.Message, err error, failureCount int, consumerGroup, consumerInstance string) error { headers := make([]Header, len(originalMsg.Headers)) for i, h := range originalMsg.Headers { headers[i] = Header{Key: h.Key, Value: string(h.Value)} } dlqMessage := DeadLetterMessage{ OriginalTopic: originalMsg.Topic, OriginalPartition: originalMsg.Partition, OriginalOffset: originalMsg.Offset, OriginalKey: string(originalMsg.Key), OriginalValue: originalMsg.Value, OriginalHeaders: headers, FailureReason: err.Error(), FailureCount: failureCount, FirstFailedAt: time.Now(), // Should track from first failure LastFailedAt: time.Now(), ConsumerGroup: consumerGroup, ConsumerInstance: consumerInstance, } payload, _ := json.Marshal(dlqMessage) sendErr := dlq.producer.WriteMessages(ctx, kafka.Message{ Key: originalMsg.Key, Value: payload, Headers: []kafka.Header{ {Key: "dlq_reason", Value: []byte(err.Error())}, {Key: "original_topic", Value: []byte(originalMsg.Topic)}, {Key: "failure_count", Value: []byte(fmt.Sprintf("%d", failureCount))}, }, }) if sendErr == nil { dlq.metrics.MessagesQueued++ } return sendErr } // Resilient consumer with DLQ type ResilientConsumer struct { reader *kafka.Reader dlq *DeadLetterQueue handler EventHandler maxRetries int retryDelay time.Duration consumerGroup string instanceID string } type EventHandler func(ctx context.Context, msg kafka.Message) error func NewResilientConsumer( brokers []string, topic, consumerGroup string, dlq *DeadLetterQueue, handler EventHandler, ) *ResilientConsumer { return &ResilientConsumer{ reader: kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, Topic: topic, GroupID: consumerGroup, MinBytes: 1, MaxBytes: 10e6, }), dlq: dlq, handler: handler, maxRetries: 3, retryDelay: time.Second, consumerGroup: consumerGroup, instanceID: generateInstanceID(), } } func (c *ResilientConsumer) Start(ctx context.Context) error { for { select { case <-ctx.Done(): return c.reader.Close() default: msg, err := c.reader.FetchMessage(ctx) if err != nil { continue } if err := c.processWithRetry(ctx, msg); err != nil { // All retries exhausted - send to DLQ if dlqErr := c.dlq.Send(ctx, msg, err, c.maxRetries, c.consumerGroup, c.instanceID); dlqErr != nil { log.Printf("Failed to send to DLQ: %v", dlqErr) // Don't commit - will retry continue } } // Commit regardless of success (either processed or sent to DLQ) c.reader.CommitMessages(ctx, msg) } } } func (c *ResilientConsumer) processWithRetry(ctx context.Context, msg kafka.Message) error { var lastErr error for attempt := 0; attempt < c.maxRetries; attempt++ { lastErr = c.handler(ctx, msg) if lastErr == nil { return nil } // Check if error is retryable if !isRetryable(lastErr) { return lastErr } // Exponential backoff delay := c.retryDelay * time.Duration(1<<uint(attempt)) select { case <-ctx.Done(): return ctx.Err() case <-time.After(delay): } } return lastErr } func isRetryable(err error) bool { // Network errors, timeouts are retryable // Validation errors, business logic errors are not var permanent *PermanentError return !errors.As(err, &permanent) } type PermanentError struct { Err error } func (e *PermanentError) Error() string { return e.Err.Error() } // DLQ Reprocessor for manual intervention type DLQReprocessor struct { dlqReader *kafka.Reader originalProducer *kafka.Writer metrics *DLQMetrics } func (r *DLQReprocessor) Reprocess(ctx context.Context, filter func(DeadLetterMessage) bool) error { for { msg, err := r.dlqReader.FetchMessage(ctx) if err != nil { return err } var dlqMsg DeadLetterMessage if err := json.Unmarshal(msg.Value, &dlqMsg); err != nil { continue } if !filter(dlqMsg) { continue } // Reconstruct original message headers := make([]kafka.Header, len(dlqMsg.OriginalHeaders)) for i, h := range dlqMsg.OriginalHeaders { headers[i] = kafka.Header{Key: h.Key, Value: []byte(h.Value)} } headers = append(headers, kafka.Header{ Key: "reprocessed_from_dlq", Value: []byte("true"), }) // Send back to original topic err = r.originalProducer.WriteMessages(ctx, kafka.Message{ Topic: dlqMsg.OriginalTopic, Key: []byte(dlqMsg.OriginalKey), Value: dlqMsg.OriginalValue, Headers: headers, }) if err != nil { log.Printf("Failed to reprocess: %v", err) continue } // Commit DLQ message r.dlqReader.CommitMessages(ctx, msg) r.metrics.MessagesReprocessed++ } } // DLQ Dashboard API type DLQDashboard struct { dlqReader *kafka.Reader } type DLQStats struct { TotalMessages int64 ByTopic map[string]int64 ByError map[string]int64 OldestMessage time.Time NewestMessage time.Time } func (d *DLQDashboard) GetStats(ctx context.Context) (*DLQStats, error) { stats := &DLQStats{ ByTopic: make(map[string]int64), ByError: make(map[string]int64), } // This is simplified - in production, use Kafka admin client // to get partition offsets and calculate lag return stats, nil } func (d *DLQDashboard) GetMessages(ctx context.Context, limit int, filters DLQFilters) ([]DeadLetterMessage, error) { var messages []DeadLetterMessage for i := 0; i < limit; i++ { msg, err := d.dlqReader.FetchMessage(ctx) if err != nil { break } var dlqMsg DeadLetterMessage if err := json.Unmarshal(msg.Value, &dlqMsg); err != nil { continue } if filters.Match(dlqMsg) { messages = append(messages, dlqMsg) } } return messages, nil } type DLQFilters struct { Topic string ErrorContains string Since time.Time Until time.Time } func (f DLQFilters) Match(msg DeadLetterMessage) bool { if f.Topic != "" && msg.OriginalTopic != f.Topic { return false } if f.ErrorContains != "" && !strings.Contains(msg.FailureReason, f.ErrorContains) { return false } if !f.Since.IsZero() && msg.LastFailedAt.Before(f.Since) { return false } if !f.Until.IsZero() && msg.LastFailedAt.After(f.Until) { return false } return true }

Summary: Event-Driven Architecture Principles

When to Use Events vs Direct Calls

Use EventsUse Direct Calls
Loose coupling neededSynchronous response required
Multiple consumersSingle consumer
Audit trail importantSimple request-response
Eventual consistency OKStrong consistency required
High throughputLow latency critical
Failure isolationTransactional operations

Event-Driven Architecture Checklist

Design Phase: ☐ Define event schema with versioning ☐ Choose between choreography vs orchestration ☐ Plan for idempotency ☐ Design DLQ strategy ☐ Plan schema evolution Implementation: ☐ Use outbox pattern for reliability ☐ Implement idempotent consumers ☐ Add correlation IDs for tracing ☐ Set up proper partitioning ☐ Configure retention policies Operations: ☐ Monitor consumer lag ☐ Alert on DLQ growth ☐ Track processing latency ☐ Dashboard for event flow ☐ Runbook for reprocessing

Key Metrics

MetricWarningAction
Consumer Lag>10KScale consumers
DLQ Rate>1%Investigate failures
Processing Time p99>1sOptimize handler
Duplicate Rate>5%Check idempotency
Rebalance Frequency>1/hourCheck consumer health
This guide covers the essential event-driven architecture patterns for system design interviews. Remember: events provide decoupling and scalability, but add complexity. Use them when the benefits outweigh the operational overhead.
All Blogs
Tags:system-designevent-driven-architecturekafkamessagingcqrssaga-patterndistributed-systemsmicroservicesbackendinterview-prep