System Design Part 6: Event-Driven Architecture - The Complete Guide
Introduction
Event-driven architecture (EDA) transforms how systems communicate - from direct, synchronous calls to loosely coupled, asynchronous events. This paradigm shift enables massive scale, better resilience, and true microservices independence. Let's explore how to design and implement EDA effectively.
1. Design Kafka-Based Event System for Order Updates
Why Kafka for Order Events?
mermaid
graph TB
subgraph "Synchronous (Before)"
Order [Order Service] --> |HTTP| Inventory
Order --> |HTTP| Payment
Order --> |HTTP| Notification
Order --> |HTTP| Analytics
Note1 [Problems:<br/>- Coupling<br/>- Cascading failures<br/>- Slow response]
end
subgraph "Event-Driven (After)"
Order2 [Order Service] --> |Publish| Kafka [Kafka Topic]
Kafka --> |Subscribe| Inventory2 [Inventory]
Kafka --> |Subscribe| Payment2 [Payment]
Kafka --> |Subscribe| Notification2 [Notification]
Kafka --> |Subscribe| Analytics2 [Analytics]
Note2 [Benefits:<br/>- Decoupled<br/>- Fault tolerant<br/>- Fast response]
end
Complete Kafka Event System Implementation
go
package events
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/segmentio/kafka-go"
"github.com/google/uuid"
)
// Event envelope with metadata
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Source string `json:"source"`
Subject string `json:"subject"`
Time time . Time `json:"time"`
DataVersion string `json:"data_version"`
Data json . RawMessage `json:"data"`
CorrelationID string `json:"correlation_id"`
CausationID string `json:"causation_id"`
}
// Order domain events
type OrderCreatedEvent struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Items [ ] OrderItem `json:"items"`
TotalAmount float64 `json:"total_amount"`
Currency string `json:"currency"`
CreatedAt time . Time `json:"created_at"`
}
type OrderItem struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
type OrderStatusChangedEvent struct {
OrderID string `json:"order_id"`
OldStatus string `json:"old_status"`
NewStatus string `json:"new_status"`
Reason string `json:"reason,omitempty"`
ChangedAt time . Time `json:"changed_at"`
ChangedBy string `json:"changed_by"`
}
type OrderDeliveredEvent struct {
OrderID string `json:"order_id"`
DeliveredAt time . Time `json:"delivered_at"`
DeliveredTo string `json:"delivered_to"`
SignedBy string `json:"signed_by,omitempty"`
PhotoURL string `json:"photo_url,omitempty"`
}
// Event producer with reliability guarantees
type EventProducer struct {
writer * kafka . Writer
serviceName string
}
func NewEventProducer ( brokers [ ] string , serviceName string ) * EventProducer {
writer := & kafka . Writer {
Addr : kafka . TCP ( brokers ... ) ,
Balancer : & kafka . LeastBytes { } ,
RequiredAcks : kafka . RequireAll , // Wait for all replicas
Async : false , // Synchronous for reliability
Compression : kafka . Snappy ,
BatchTimeout : 10 * time . Millisecond ,
MaxAttempts : 3 ,
}
return & EventProducer {
writer : writer ,
serviceName : serviceName ,
}
}
func ( p * EventProducer ) Publish ( ctx context . Context , topic string , eventType string , data interface { } ) error {
dataBytes , err := json . Marshal ( data )
if err != nil {
return fmt . Errorf ( "marshal data: %w" , err )
}
event := Event {
ID : uuid . New ( ) . String ( ) ,
Type : eventType ,
Source : p . serviceName ,
Subject : extractSubject ( data ) ,
Time : time . Now ( ) . UTC ( ) ,
DataVersion : "1.0" ,
Data : dataBytes ,
CorrelationID : getCorrelationID ( ctx ) ,
CausationID : getCausationID ( ctx ) ,
}
eventBytes , err := json . Marshal ( event )
if err != nil {
return fmt . Errorf ( "marshal event: %w" , err )
}
// Use order ID as key for ordering guarantees
key := extractPartitionKey ( data )
err = p . writer . WriteMessages ( ctx , kafka . Message {
Topic : topic ,
Key : [ ] byte ( key ) ,
Value : eventBytes ,
Headers : [ ] kafka . Header {
{ Key : "event_type" , Value : [ ] byte ( eventType ) } ,
{ Key : "correlation_id" , Value : [ ] byte ( event . CorrelationID ) } ,
} ,
} )
if err != nil {
return fmt . Errorf ( "write message: %w" , err )
}
return nil
}
// Publish with outbox pattern for exactly-once semantics
func ( p * EventProducer ) PublishWithOutbox ( ctx context . Context , tx * sql . Tx , topic string , eventType string , data interface { } ) error {
dataBytes , err := json . Marshal ( data )
if err != nil {
return err
}
event := Event {
ID : uuid . New ( ) . String ( ) ,
Type : eventType ,
Source : p . serviceName ,
Time : time . Now ( ) . UTC ( ) ,
DataVersion : "1.0" ,
Data : dataBytes ,
CorrelationID : getCorrelationID ( ctx ) ,
}
eventBytes , _ := json . Marshal ( event )
_ , err = tx . ExecContext ( ctx , `
INSERT INTO event_outbox (id, topic, partition_key, event_type, payload, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())
` , event . ID , topic , extractPartitionKey ( data ) , eventType , eventBytes )
return err
}
func ( p * EventProducer ) Close ( ) error {
return p . writer . Close ( )
}
// Event consumer with consumer group
type EventConsumer struct {
reader * kafka . Reader
handlers map [ string ] EventHandler
serviceName string
}
type EventHandler func ( ctx context . Context , event Event ) error
func NewEventConsumer ( brokers [ ] string , topic , groupID , serviceName string ) * EventConsumer {
reader := kafka . NewReader ( kafka . ReaderConfig {
Brokers : brokers ,
Topic : topic ,
GroupID : groupID ,
MinBytes : 1 ,
MaxBytes : 10e6 ,
MaxWait : 500 * time . Millisecond ,
StartOffset : kafka . FirstOffset ,
CommitInterval : time . Second ,
} )
return & EventConsumer {
reader : reader ,
handlers : make ( map [ string ] EventHandler ) ,
serviceName : serviceName ,
}
}
func ( c * EventConsumer ) RegisterHandler ( eventType string , handler EventHandler ) {
c . handlers [ eventType ] = handler
}
func ( c * EventConsumer ) Start ( ctx context . Context ) error {
for {
select {
case <- ctx . Done ( ) :
return c . reader . Close ( )
default :
msg , err := c . reader . FetchMessage ( ctx )
if err != nil {
if ctx . Err ( ) != nil {
return nil
}
log . Printf ( "Fetch error: %v" , err )
continue
}
if err := c . processMessage ( ctx , msg ) ; err != nil {
log . Printf ( "Process error for message %s: %v" , string ( msg . Key ) , err )
// Don't commit - will be retried
continue
}
if err := c . reader . CommitMessages ( ctx , msg ) ; err != nil {
log . Printf ( "Commit error: %v" , err )
}
}
}
}
func ( c * EventConsumer ) processMessage ( ctx context . Context , msg kafka . Message ) error {
var event Event
if err := json . Unmarshal ( msg . Value , & event ) ; err != nil {
return fmt . Errorf ( "unmarshal event: %w" , err )
}
handler , ok := c . handlers [ event . Type ]
if ! ok {
// Unknown event type - log and skip
log . Printf ( "No handler for event type: %s" , event . Type )
return nil
}
// Add correlation context
ctx = withCorrelationID ( ctx , event . CorrelationID )
ctx = withCausationID ( ctx , event . ID )
return handler ( ctx , event )
}
// Example: Inventory service consumer
type InventoryService struct {
consumer * EventConsumer
repo InventoryRepository
}
func ( s * InventoryService ) Start ( ctx context . Context ) error {
// Register handlers
s . consumer . RegisterHandler ( "order.created" , s . handleOrderCreated )
s . consumer . RegisterHandler ( "order.cancelled" , s . handleOrderCancelled )
return s . consumer . Start ( ctx )
}
func ( s * InventoryService ) handleOrderCreated ( ctx context . Context , event Event ) error {
var data OrderCreatedEvent
if err := json . Unmarshal ( event . Data , & data ) ; err != nil {
return err
}
// Reserve inventory for each item
for _ , item := range data . Items {
err := s . repo . ReserveStock ( ctx , item . ProductID , item . Quantity , data . OrderID )
if err != nil {
// Publish compensation event
return s . publishReservationFailed ( ctx , data . OrderID , err )
}
}
// Publish success event
return s . publishInventoryReserved ( ctx , data . OrderID )
}
func ( s * InventoryService ) handleOrderCancelled ( ctx context . Context , event Event ) error {
var data struct {
OrderID string `json:"order_id"`
}
if err := json . Unmarshal ( event . Data , & data ) ; err != nil {
return err
}
// Release reserved inventory
return s . repo . ReleaseReservation ( ctx , data . OrderID )
}
Kafka Topic Design
go
// Topic configuration for order events
type TopicConfig struct {
Name string
Partitions int
ReplicationFactor int
RetentionDays int
CleanupPolicy string
}
func OrderTopicsConfig ( ) [ ] TopicConfig {
return [ ] TopicConfig {
{
Name : "orders.events" ,
Partitions : 12 , // Match consumer instances
ReplicationFactor : 3 , // High availability
RetentionDays : 30 , // Keep for audit
CleanupPolicy : "delete" ,
} ,
{
Name : "orders.changelog" ,
Partitions : 12 ,
ReplicationFactor : 3 ,
RetentionDays : - 1 , // Keep forever
CleanupPolicy : "compact" , // Only keep latest per key
} ,
{
Name : "orders.dlq" , // Dead letter queue
Partitions : 3 ,
ReplicationFactor : 3 ,
RetentionDays : 90 ,
CleanupPolicy : "delete" ,
} ,
}
}
// Admin client for topic management
type KafkaAdmin struct {
conn * kafka . Conn
}
func ( a * KafkaAdmin ) CreateTopics ( configs [ ] TopicConfig ) error {
for _ , cfg := range configs {
err := a . conn . CreateTopics ( kafka . TopicConfig {
Topic : cfg . Name ,
NumPartitions : cfg . Partitions ,
ReplicationFactor : cfg . ReplicationFactor ,
ConfigEntries : [ ] kafka . ConfigEntry {
{ ConfigName : "retention.ms" , ConfigValue : fmt . Sprintf ( "%d" , cfg . RetentionDays * 24 * 60 * 60 * 1000 ) } ,
{ ConfigName : "cleanup.policy" , ConfigValue : cfg . CleanupPolicy } ,
} ,
} )
if err != nil {
return err
}
}
return nil
}
Scaling Behavior
Normal Load (1K events/sec):
├── Partitions: 12
├── Consumer instances: 4
├── Partitions per consumer: 3
├── Lag: <100 messages
└── Processing latency: <10ms
Peak Load (10K events/sec):
├── Scale consumers to: 12
├── Partitions per consumer: 1
├── May need to increase partitions
└── Processing latency: <50ms
Extreme Load (100K events/sec):
├── Add more partitions: 24-48
├── Consumer instances: 24-48
├── Consider Kafka cluster scaling
└── Enable batch processing
2. Event Sourcing: When and How to Use
What is Event Sourcing?
mermaid
graph TB
subgraph "Traditional (State-Based)"
DB1 [(Database)]
DB1 --> |Current Balance| B1 [Balance: $500]
end
subgraph "Event Sourcing"
ES [(Event Store)]
ES --> |Event 1| E1 [Deposit $1000]
ES --> |Event 2| E2 [Withdraw $300]
ES --> |Event 3| E3 [Transfer $200]
E1 --> Replay [Replay]
E2 --> Replay
E3 --> Replay
Replay --> B2 [Balance: $500]
end
Complete Event Sourcing Implementation
go
package eventsourcing
import (
"context"
"encoding/json"
"errors"
"reflect"
"sync"
"time"
)
// Base event interface
type DomainEvent interface {
EventType ( ) string
AggregateID ( ) string
OccurredAt ( ) time . Time
}
// Event store
type EventStore interface {
Append ( ctx context . Context , aggregateID string , events [ ] DomainEvent , expectedVersion int ) error
Load ( ctx context . Context , aggregateID string ) ( [ ] StoredEvent , error )
LoadFrom ( ctx context . Context , aggregateID string , fromVersion int ) ( [ ] StoredEvent , error )
}
type StoredEvent struct {
ID string
AggregateID string
Type string
Data json . RawMessage
Metadata json . RawMessage
Version int
Timestamp time . Time
}
// PostgreSQL event store
type PostgresEventStore struct {
db * sql . DB
}
func ( es * PostgresEventStore ) Append ( ctx context . Context , aggregateID string , events [ ] DomainEvent , expectedVersion int ) error {
tx , err := es . db . BeginTx ( ctx , nil )
if err != nil {
return err
}
defer tx . Rollback ( )
// Optimistic concurrency check
var currentVersion int
err = tx . QueryRowContext ( ctx , `
SELECT COALESCE(MAX(version), 0)
FROM events
WHERE aggregate_id = $1
` , aggregateID ) . Scan ( & currentVersion )
if err != nil {
return err
}
if currentVersion != expectedVersion {
return ErrConcurrencyConflict
}
// Append events
for i , event := range events {
data , err := json . Marshal ( event )
if err != nil {
return err
}
version := expectedVersion + i + 1
_ , err = tx . ExecContext ( ctx , `
INSERT INTO events (id, aggregate_id, type, data, version, timestamp)
VALUES ($1, $2, $3, $4, $5, $6)
` , uuid . New ( ) . String ( ) , aggregateID , event . EventType ( ) , data , version , event . OccurredAt ( ) )
if err != nil {
return err
}
}
return tx . Commit ( )
}
func ( es * PostgresEventStore ) Load ( ctx context . Context , aggregateID string ) ( [ ] StoredEvent , error ) {
return es . LoadFrom ( ctx , aggregateID , 0 )
}
func ( es * PostgresEventStore ) LoadFrom ( ctx context . Context , aggregateID string , fromVersion int ) ( [ ] StoredEvent , error ) {
rows , err := es . db . QueryContext ( ctx , `
SELECT id, aggregate_id, type, data, version, timestamp
FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version
` , aggregateID , fromVersion )
if err != nil {
return nil , err
}
defer rows . Close ( )
var events [ ] StoredEvent
for rows . Next ( ) {
var e StoredEvent
err := rows . Scan ( & e . ID , & e . AggregateID , & e . Type , & e . Data , & e . Version , & e . Timestamp )
if err != nil {
return nil , err
}
events = append ( events , e )
}
return events , nil
}
// Aggregate root base
type AggregateRoot struct {
id string
version int
uncommitted [ ] DomainEvent
}
func ( a * AggregateRoot ) ID ( ) string { return a . id }
func ( a * AggregateRoot ) Version ( ) int { return a . version }
func ( a * AggregateRoot ) TrackChange ( event DomainEvent ) {
a . uncommitted = append ( a . uncommitted , event )
a . version ++
}
func ( a * AggregateRoot ) GetUncommittedEvents ( ) [ ] DomainEvent {
return a . uncommitted
}
func ( a * AggregateRoot ) ClearUncommittedEvents ( ) {
a . uncommitted = nil
}
// Bank Account aggregate
type BankAccount struct {
AggregateRoot
balance float64
status string
holder string
}
// Events
type AccountOpenedEvent struct {
AccountID string `json:"account_id"`
Holder string `json:"holder"`
Timestamp time . Time `json:"timestamp"`
}
func ( e AccountOpenedEvent ) EventType ( ) string { return "account.opened" }
func ( e AccountOpenedEvent ) AggregateID ( ) string { return e . AccountID }
func ( e AccountOpenedEvent ) OccurredAt ( ) time . Time { return e . Timestamp }
type MoneyDepositedEvent struct {
AccountID string `json:"account_id"`
Amount float64 `json:"amount"`
TransactionID string `json:"transaction_id"`
Timestamp time . Time `json:"timestamp"`
}
func ( e MoneyDepositedEvent ) EventType ( ) string { return "money.deposited" }
func ( e MoneyDepositedEvent ) AggregateID ( ) string { return e . AccountID }
func ( e MoneyDepositedEvent ) OccurredAt ( ) time . Time { return e . Timestamp }
type MoneyWithdrawnEvent struct {
AccountID string `json:"account_id"`
Amount float64 `json:"amount"`
TransactionID string `json:"transaction_id"`
Timestamp time . Time `json:"timestamp"`
}
func ( e MoneyWithdrawnEvent ) EventType ( ) string { return "money.withdrawn" }
func ( e MoneyWithdrawnEvent ) AggregateID ( ) string { return e . AccountID }
func ( e MoneyWithdrawnEvent ) OccurredAt ( ) time . Time { return e . Timestamp }
// Commands
func OpenAccount ( id , holder string ) * BankAccount {
account := & BankAccount { }
account . id = id
event := AccountOpenedEvent {
AccountID : id ,
Holder : holder ,
Timestamp : time . Now ( ) ,
}
account . Apply ( event )
account . TrackChange ( event )
return account
}
func ( a * BankAccount ) Deposit ( amount float64 , txID string ) error {
if amount <= 0 {
return errors . New ( "deposit amount must be positive" )
}
if a . status != "active" {
return errors . New ( "account is not active" )
}
event := MoneyDepositedEvent {
AccountID : a . id ,
Amount : amount ,
TransactionID : txID ,
Timestamp : time . Now ( ) ,
}
a . Apply ( event )
a . TrackChange ( event )
return nil
}
func ( a * BankAccount ) Withdraw ( amount float64 , txID string ) error {
if amount <= 0 {
return errors . New ( "withdrawal amount must be positive" )
}
if a . status != "active" {
return errors . New ( "account is not active" )
}
if a . balance < amount {
return errors . New ( "insufficient funds" )
}
event := MoneyWithdrawnEvent {
AccountID : a . id ,
Amount : amount ,
TransactionID : txID ,
Timestamp : time . Now ( ) ,
}
a . Apply ( event )
a . TrackChange ( event )
return nil
}
// Event application (rebuilds state)
func ( a * BankAccount ) Apply ( event DomainEvent ) {
switch e := event . ( type ) {
case AccountOpenedEvent :
a . id = e . AccountID
a . holder = e . Holder
a . status = "active"
a . balance = 0
case MoneyDepositedEvent :
a . balance += e . Amount
case MoneyWithdrawnEvent :
a . balance -= e . Amount
}
}
// Repository
type BankAccountRepository struct {
eventStore EventStore
eventTypes map [ string ] reflect . Type
}
func NewBankAccountRepository ( es EventStore ) * BankAccountRepository {
return & BankAccountRepository {
eventStore : es ,
eventTypes : map [ string ] reflect . Type {
"account.opened" : reflect . TypeOf ( AccountOpenedEvent { } ) ,
"money.deposited" : reflect . TypeOf ( MoneyDepositedEvent { } ) ,
"money.withdrawn" : reflect . TypeOf ( MoneyWithdrawnEvent { } ) ,
} ,
}
}
func ( r * BankAccountRepository ) Load ( ctx context . Context , id string ) ( * BankAccount , error ) {
events , err := r . eventStore . Load ( ctx , id )
if err != nil {
return nil , err
}
if len ( events ) == 0 {
return nil , ErrAggregateNotFound
}
account := & BankAccount { }
for _ , stored := range events {
eventType , ok := r . eventTypes [ stored . Type ]
if ! ok {
return nil , fmt . Errorf ( "unknown event type: %s" , stored . Type )
}
event := reflect . New ( eventType ) . Interface ( )
if err := json . Unmarshal ( stored . Data , event ) ; err != nil {
return nil , err
}
account . Apply ( event . ( DomainEvent ) )
account . version = stored . Version
}
return account , nil
}
func ( r * BankAccountRepository ) Save ( ctx context . Context , account * BankAccount ) error {
events := account . GetUncommittedEvents ( )
if len ( events ) == 0 {
return nil
}
expectedVersion := account . Version ( ) - len ( events )
err := r . eventStore . Append ( ctx , account . ID ( ) , events , expectedVersion )
if err != nil {
return err
}
account . ClearUncommittedEvents ( )
return nil
}
// Snapshots for performance
type Snapshot struct {
AggregateID string
Version int
State json . RawMessage
Timestamp time . Time
}
type SnapshotStore interface {
Save ( ctx context . Context , snapshot Snapshot ) error
Load ( ctx context . Context , aggregateID string ) ( * Snapshot , error )
}
func ( r * BankAccountRepository ) LoadWithSnapshot ( ctx context . Context , id string , snapshotStore SnapshotStore ) ( * BankAccount , error ) {
// Try to load snapshot
snapshot , err := snapshotStore . Load ( ctx , id )
var account * BankAccount
var fromVersion int
if err == nil && snapshot != nil {
// Restore from snapshot
account = & BankAccount { }
if err := json . Unmarshal ( snapshot . State , account ) ; err != nil {
return nil , err
}
fromVersion = snapshot . Version
} else {
account = & BankAccount { }
fromVersion = 0
}
// Load events after snapshot
events , err := r . eventStore . LoadFrom ( ctx , id , fromVersion )
if err != nil {
return nil , err
}
// Apply remaining events
for _ , stored := range events {
eventType , ok := r . eventTypes [ stored . Type ]
if ! ok {
continue
}
event := reflect . New ( eventType ) . Interface ( )
json . Unmarshal ( stored . Data , event )
account . Apply ( event . ( DomainEvent ) )
account . version = stored . Version
}
return account , nil
}
When to Use Event Sourcing
Use Event Sourcing Don't Use Event Sourcing Audit requirements (finance, healthcare) Simple CRUD applications Complex business logic with state changes High-frequency updates (counters) Need to replay/debug historical states Large binary data Event-driven microservices Simple reporting needs Temporal queries ("What was balance on date X?") Team unfamiliar with pattern
3. Handling Duplicate Events (Idempotency)
The Duplicate Event Problem
mermaid
sequenceDiagram
participant Producer
participant Kafka
participant Consumer
Producer ->> Kafka : Send Event A
Kafka -->> Producer : Timeout (network issue)
Producer ->> Kafka : Retry Event A
Kafka ->> Consumer : Event A (first delivery)
Kafka ->> Consumer : Event A (duplicate!)
Note over Consumer : Must handle<br/>both deliveries<br/>correctly!
Comprehensive Idempotency Implementation
go
package idempotency
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
var (
ErrDuplicateEvent = errors . New ( "duplicate event" )
ErrEventExpired = errors . New ( "event already processed and expired" )
)
// Idempotency key generator
type IdempotencyKey struct {
EventID string
EventType string
ConsumerID string
}
func ( k IdempotencyKey ) String ( ) string {
data := fmt . Sprintf ( "%s:%s:%s" , k . EventType , k . EventID , k . ConsumerID )
hash := sha256 . Sum256 ( [ ] byte ( data ) )
return hex . EncodeToString ( hash [ : ] )
}
// Redis-based idempotency store
type IdempotencyStore struct {
client * redis . Client
ttl time . Duration
}
func NewIdempotencyStore ( client * redis . Client , ttl time . Duration ) * IdempotencyStore {
return & IdempotencyStore {
client : client ,
ttl : ttl ,
}
}
type ProcessingResult struct {
Success bool
Error string
ProcessedAt time . Time
}
// Check and mark as processing (atomic)
func ( s * IdempotencyStore ) TryProcess ( ctx context . Context , key IdempotencyKey ) ( bool , error ) {
// Use SET NX (set if not exists)
result , err := s . client . SetNX ( ctx , key . String ( ) , "processing" , s . ttl ) . Result ( )
if err != nil {
return false , err
}
return result , nil // true = can process, false = already processed/processing
}
// Mark as completed
func ( s * IdempotencyStore ) MarkComplete ( ctx context . Context , key IdempotencyKey , result ProcessingResult ) error {
data , _ := json . Marshal ( result )
return s . client . Set ( ctx , key . String ( ) , data , s . ttl ) . Err ( )
}
// Get previous result (for returning to caller)
func ( s * IdempotencyStore ) GetResult ( ctx context . Context , key IdempotencyKey ) ( * ProcessingResult , error ) {
data , err := s . client . Get ( ctx , key . String ( ) ) . Bytes ( )
if err != nil {
if err == redis . Nil {
return nil , nil
}
return nil , err
}
var result ProcessingResult
if err := json . Unmarshal ( data , & result ) ; err != nil {
return nil , err
}
return & result , nil
}
// Idempotent event processor
type IdempotentProcessor struct {
store * IdempotencyStore
consumerID string
handler EventHandler
metrics * ProcessorMetrics
}
type EventHandler func ( ctx context . Context , event Event ) error
type ProcessorMetrics struct {
Processed int64
Duplicates int64
Errors int64
mu sync . Mutex
}
func NewIdempotentProcessor ( store * IdempotencyStore , consumerID string , handler EventHandler ) * IdempotentProcessor {
return & IdempotentProcessor {
store : store ,
consumerID : consumerID ,
handler : handler ,
metrics : & ProcessorMetrics { } ,
}
}
func ( p * IdempotentProcessor ) Process ( ctx context . Context , event Event ) error {
key := IdempotencyKey {
EventID : event . ID ,
EventType : event . Type ,
ConsumerID : p . consumerID ,
}
// Try to acquire processing lock
canProcess , err := p . store . TryProcess ( ctx , key )
if err != nil {
return fmt . Errorf ( "idempotency check failed: %w" , err )
}
if ! canProcess {
// Already processed or being processed
p . metrics . mu . Lock ( )
p . metrics . Duplicates ++
p . metrics . mu . Unlock ( )
// Check if we have a result
result , _ := p . store . GetResult ( ctx , key )
if result != nil {
if result . Success {
return nil // Already succeeded
}
return errors . New ( result . Error ) // Return original error
}
return ErrDuplicateEvent
}
// Process the event
handlerErr := p . handler ( ctx , event )
// Record result
result := ProcessingResult {
Success : handlerErr == nil ,
ProcessedAt : time . Now ( ) ,
}
if handlerErr != nil {
result . Error = handlerErr . Error ( )
}
if err := p . store . MarkComplete ( ctx , key , result ) ; err != nil {
// Log but don't fail - event was processed
log . Printf ( "Failed to mark event complete: %v" , err )
}
if handlerErr != nil {
p . metrics . mu . Lock ( )
p . metrics . Errors ++
p . metrics . mu . Unlock ( )
return handlerErr
}
p . metrics . mu . Lock ( )
p . metrics . Processed ++
p . metrics . mu . Unlock ( )
return nil
}
// Database-level idempotency (when Redis isn't enough)
type DatabaseIdempotency struct {
db * sql . DB
}
func ( d * DatabaseIdempotency ) ProcessWithIdempotency ( ctx context . Context , eventID string , handler func ( tx * sql . Tx ) error ) error {
tx , err := d . db . BeginTx ( ctx , nil )
if err != nil {
return err
}
defer tx . Rollback ( )
// Try to insert idempotency record
_ , err = tx . ExecContext ( ctx , `
INSERT INTO processed_events (event_id, processed_at)
VALUES ($1, NOW())
ON CONFLICT (event_id) DO NOTHING
` , eventID )
if err != nil {
return err
}
// Check if we inserted (meaning we should process)
var exists bool
err = tx . QueryRowContext ( ctx , `
SELECT EXISTS(
SELECT 1 FROM processed_events
WHERE event_id = $1
AND processed_at < NOW() - INTERVAL '1 second'
)
` , eventID ) . Scan ( & exists )
if exists {
// Already processed by another instance
return nil
}
// Process the event
if err := handler ( tx ) ; err != nil {
return err
}
return tx . Commit ( )
}
// Exactly-once processing with Kafka transactions
type ExactlyOnceProcessor struct {
consumer * kafka . Reader
producer * kafka . Writer
handler func ( msg kafka . Message ) ( [ ] kafka . Message , error )
}
func ( p * ExactlyOnceProcessor ) Process ( ctx context . Context ) error {
for {
msg , err := p . consumer . FetchMessage ( ctx )
if err != nil {
return err
}
// Process and get output messages
outputs , err := p . handler ( msg )
if err != nil {
// Don't commit - will retry
continue
}
// Transactional produce + commit
// (Requires Kafka transactions setup)
err = p . producer . WriteMessages ( ctx , outputs ... )
if err != nil {
continue
}
// Commit consumer offset
p . consumer . CommitMessages ( ctx , msg )
}
}
4. Event Schema Evolution
The Evolution Problem
mermaid
timeline
title Event Schema Evolution
V1 : OrderCreated {orderId, customerId, amount}
V2 : OrderCreated + { currency, items[] }
V3 : OrderCreated + {shippingAddress, discountCode}
V4 : OrderCreated - amount + {subtotal, tax, total}
Backward and Forward Compatible Evolution
go
package schema
import (
"encoding/json"
"fmt"
)
// Schema registry interface
type SchemaRegistry interface {
Register ( subject string , schema string ) ( int , error )
GetSchema ( subject string , version int ) ( string , error )
CheckCompatibility ( subject string , schema string ) ( bool , error )
}
// Event with version support
type VersionedEvent struct {
SchemaVersion int `json:"schema_version"`
Type string `json:"type"`
Data json . RawMessage `json:"data"`
}
// V1 Order Event
type OrderCreatedV1 struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Amount float64 `json:"amount"`
}
// V2 Order Event (backward compatible - new fields have defaults)
type OrderCreatedV2 struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Amount float64 `json:"amount"`
Currency string `json:"currency"` // New field with default
Items [ ] OrderItem `json:"items"` // New field
}
// V3 Order Event (with migration from V2)
type OrderCreatedV3 struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Subtotal float64 `json:"subtotal"` // Renamed from amount
Tax float64 `json:"tax"` // New
Total float64 `json:"total"` // New
Currency string `json:"currency"`
Items [ ] OrderItem `json:"items"`
ShippingAddress * Address `json:"shipping_address"` // New optional
}
// Event upgrader
type EventUpgrader struct {
upgraders map [ string ] map [ int ] func ( json . RawMessage ) ( json . RawMessage , error )
}
func NewEventUpgrader ( ) * EventUpgrader {
eu := & EventUpgrader {
upgraders : make ( map [ string ] map [ int ] func ( json . RawMessage ) ( json . RawMessage , error ) ) ,
}
// Register upgraders
eu . RegisterUpgrader ( "order.created" , 1 , eu . upgradeOrderV1ToV2 )
eu . RegisterUpgrader ( "order.created" , 2 , eu . upgradeOrderV2ToV3 )
return eu
}
func ( eu * EventUpgrader ) RegisterUpgrader ( eventType string , fromVersion int , upgrader func ( json . RawMessage ) ( json . RawMessage , error ) ) {
if eu . upgraders [ eventType ] == nil {
eu . upgraders [ eventType ] = make ( map [ int ] func ( json . RawMessage ) ( json . RawMessage , error ) )
}
eu . upgraders [ eventType ] [ fromVersion ] = upgrader
}
func ( eu * EventUpgrader ) Upgrade ( event VersionedEvent , targetVersion int ) ( json . RawMessage , error ) {
currentVersion := event . SchemaVersion
data := event . Data
for currentVersion < targetVersion {
upgrader , ok := eu . upgraders [ event . Type ] [ currentVersion ]
if ! ok {
return nil , fmt . Errorf ( "no upgrader for %s v%d" , event . Type , currentVersion )
}
var err error
data , err = upgrader ( data )
if err != nil {
return nil , err
}
currentVersion ++
}
return data , nil
}
func ( eu * EventUpgrader ) upgradeOrderV1ToV2 ( data json . RawMessage ) ( json . RawMessage , error ) {
var v1 OrderCreatedV1
if err := json . Unmarshal ( data , & v1 ) ; err != nil {
return nil , err
}
v2 := OrderCreatedV2 {
OrderID : v1 . OrderID ,
CustomerID : v1 . CustomerID ,
Amount : v1 . Amount ,
Currency : "USD" , // Default value
Items : nil , // Unknown from V1
}
return json . Marshal ( v2 )
}
func ( eu * EventUpgrader ) upgradeOrderV2ToV3 ( data json . RawMessage ) ( json . RawMessage , error ) {
var v2 OrderCreatedV2
if err := json . Unmarshal ( data , & v2 ) ; err != nil {
return nil , err
}
v3 := OrderCreatedV3 {
OrderID : v2 . OrderID ,
CustomerID : v2 . CustomerID ,
Subtotal : v2 . Amount ,
Tax : 0 , // Can't derive from V2
Total : v2 . Amount ,
Currency : v2 . Currency ,
Items : v2 . Items ,
}
return json . Marshal ( v3 )
}
// Consumer with version handling
type VersionAwareConsumer struct {
upgrader * EventUpgrader
currentVersion int
handlers map [ string ] func ( json . RawMessage ) error
}
func ( c * VersionAwareConsumer ) Process ( event VersionedEvent ) error {
// Upgrade to current version if needed
data := event . Data
if event . SchemaVersion < c . currentVersion {
var err error
data , err = c . upgrader . Upgrade ( event , c . currentVersion )
if err != nil {
return fmt . Errorf ( "upgrade failed: %w" , err )
}
}
handler , ok := c . handlers [ event . Type ]
if ! ok {
return fmt . Errorf ( "no handler for event type: %s" , event . Type )
}
return handler ( data )
}
// Producer with schema validation
type SchemaValidatingProducer struct {
registry SchemaRegistry
producer * kafka . Writer
}
func ( p * SchemaValidatingProducer ) Publish ( ctx context . Context , topic string , event interface { } , schemaVersion int ) error {
// Serialize event
data , err := json . Marshal ( event )
if err != nil {
return err
}
// Validate against schema registry
eventType := fmt . Sprintf ( "%T" , event )
schema , err := p . registry . GetSchema ( eventType , schemaVersion )
if err != nil {
return fmt . Errorf ( "schema not found: %w" , err )
}
if err := validateAgainstSchema ( data , schema ) ; err != nil {
return fmt . Errorf ( "schema validation failed: %w" , err )
}
// Wrap with version
versioned := VersionedEvent {
SchemaVersion : schemaVersion ,
Type : eventType ,
Data : data ,
}
payload , _ := json . Marshal ( versioned )
return p . producer . WriteMessages ( ctx , kafka . Message {
Topic : topic ,
Value : payload ,
} )
}
Schema Evolution Best Practices
Change Type Backward Compatible Forward Compatible Example Add optional field ✅ ✅ Add discount_code Add required field ❌ ✅ Add currency (required) Remove field ✅ ❌ Remove legacy_id Rename field ❌ ❌ amount → totalChange field type ❌ ❌ amount: int → amount: floatAdd enum value ✅ ❌ Add REFUNDED status Remove enum value ❌ ✅ Remove PENDING status
5. Dead Letter Queue (DLQ) Design
Why Dead Letter Queues?
mermaid
graph TB
subgraph "Without DLQ"
Q1 [Main Queue] --> |Poison Message| C1 [Consumer]
C1 --> |Retry| Q1
C1 --> |Retry| Q1
C1 --> |Retry Forever| Q1
Note1 [System stuck!]
end
subgraph "With DLQ"
Q2 [Main Queue] --> |Normal| C2 [Consumer]
C2 --> |Success| Done [Done]
C2 --> |Retry 3x| Retry [Retry]
Retry --> |Still Fails| DLQ [Dead Letter Queue]
DLQ --> Alert [Alert + Manual Review]
end
Complete DLQ Implementation
go
package dlq
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/segmentio/kafka-go"
)
type DeadLetterMessage struct {
OriginalTopic string `json:"original_topic"`
OriginalPartition int `json:"original_partition"`
OriginalOffset int64 `json:"original_offset"`
OriginalKey string `json:"original_key"`
OriginalValue json . RawMessage `json:"original_value"`
OriginalHeaders [ ] Header `json:"original_headers"`
FailureReason string `json:"failure_reason"`
FailureCount int `json:"failure_count"`
FirstFailedAt time . Time `json:"first_failed_at"`
LastFailedAt time . Time `json:"last_failed_at"`
ConsumerGroup string `json:"consumer_group"`
ConsumerInstance string `json:"consumer_instance"`
}
type Header struct {
Key string `json:"key"`
Value string `json:"value"`
}
type DeadLetterQueue struct {
producer * kafka . Writer
topic string
metrics * DLQMetrics
}
type DLQMetrics struct {
MessagesQueued int64
MessagesReprocessed int64
MessagesDiscarded int64
}
func NewDeadLetterQueue ( brokers [ ] string , topic string ) * DeadLetterQueue {
return & DeadLetterQueue {
producer : & kafka . Writer {
Addr : kafka . TCP ( brokers ... ) ,
Topic : topic ,
Balancer : & kafka . LeastBytes { } ,
RequiredAcks : kafka . RequireAll ,
} ,
topic : topic ,
metrics : & DLQMetrics { } ,
}
}
func ( dlq * DeadLetterQueue ) Send ( ctx context . Context , originalMsg kafka . Message , err error , failureCount int , consumerGroup , consumerInstance string ) error {
headers := make ( [ ] Header , len ( originalMsg . Headers ) )
for i , h := range originalMsg . Headers {
headers [ i ] = Header { Key : h . Key , Value : string ( h . Value ) }
}
dlqMessage := DeadLetterMessage {
OriginalTopic : originalMsg . Topic ,
OriginalPartition : originalMsg . Partition ,
OriginalOffset : originalMsg . Offset ,
OriginalKey : string ( originalMsg . Key ) ,
OriginalValue : originalMsg . Value ,
OriginalHeaders : headers ,
FailureReason : err . Error ( ) ,
FailureCount : failureCount ,
FirstFailedAt : time . Now ( ) , // Should track from first failure
LastFailedAt : time . Now ( ) ,
ConsumerGroup : consumerGroup ,
ConsumerInstance : consumerInstance ,
}
payload , _ := json . Marshal ( dlqMessage )
sendErr := dlq . producer . WriteMessages ( ctx , kafka . Message {
Key : originalMsg . Key ,
Value : payload ,
Headers : [ ] kafka . Header {
{ Key : "dlq_reason" , Value : [ ] byte ( err . Error ( ) ) } ,
{ Key : "original_topic" , Value : [ ] byte ( originalMsg . Topic ) } ,
{ Key : "failure_count" , Value : [ ] byte ( fmt . Sprintf ( "%d" , failureCount ) ) } ,
} ,
} )
if sendErr == nil {
dlq . metrics . MessagesQueued ++
}
return sendErr
}
// Resilient consumer with DLQ
type ResilientConsumer struct {
reader * kafka . Reader
dlq * DeadLetterQueue
handler EventHandler
maxRetries int
retryDelay time . Duration
consumerGroup string
instanceID string
}
type EventHandler func ( ctx context . Context , msg kafka . Message ) error
func NewResilientConsumer (
brokers [ ] string ,
topic , consumerGroup string ,
dlq * DeadLetterQueue ,
handler EventHandler ,
) * ResilientConsumer {
return & ResilientConsumer {
reader : kafka . NewReader ( kafka . ReaderConfig {
Brokers : brokers ,
Topic : topic ,
GroupID : consumerGroup ,
MinBytes : 1 ,
MaxBytes : 10e6 ,
} ) ,
dlq : dlq ,
handler : handler ,
maxRetries : 3 ,
retryDelay : time . Second ,
consumerGroup : consumerGroup ,
instanceID : generateInstanceID ( ) ,
}
}
func ( c * ResilientConsumer ) Start ( ctx context . Context ) error {
for {
select {
case <- ctx . Done ( ) :
return c . reader . Close ( )
default :
msg , err := c . reader . FetchMessage ( ctx )
if err != nil {
continue
}
if err := c . processWithRetry ( ctx , msg ) ; err != nil {
// All retries exhausted - send to DLQ
if dlqErr := c . dlq . Send ( ctx , msg , err , c . maxRetries , c . consumerGroup , c . instanceID ) ; dlqErr != nil {
log . Printf ( "Failed to send to DLQ: %v" , dlqErr )
// Don't commit - will retry
continue
}
}
// Commit regardless of success (either processed or sent to DLQ)
c . reader . CommitMessages ( ctx , msg )
}
}
}
func ( c * ResilientConsumer ) processWithRetry ( ctx context . Context , msg kafka . Message ) error {
var lastErr error
for attempt := 0 ; attempt < c . maxRetries ; attempt ++ {
lastErr = c . handler ( ctx , msg )
if lastErr == nil {
return nil
}
// Check if error is retryable
if ! isRetryable ( lastErr ) {
return lastErr
}
// Exponential backoff
delay := c . retryDelay * time . Duration ( 1 << uint ( attempt ) )
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
case <- time . After ( delay ) :
}
}
return lastErr
}
func isRetryable ( err error ) bool {
// Network errors, timeouts are retryable
// Validation errors, business logic errors are not
var permanent * PermanentError
return ! errors . As ( err , & permanent )
}
type PermanentError struct {
Err error
}
func ( e * PermanentError ) Error ( ) string {
return e . Err . Error ( )
}
// DLQ Reprocessor for manual intervention
type DLQReprocessor struct {
dlqReader * kafka . Reader
originalProducer * kafka . Writer
metrics * DLQMetrics
}
func ( r * DLQReprocessor ) Reprocess ( ctx context . Context , filter func ( DeadLetterMessage ) bool ) error {
for {
msg , err := r . dlqReader . FetchMessage ( ctx )
if err != nil {
return err
}
var dlqMsg DeadLetterMessage
if err := json . Unmarshal ( msg . Value , & dlqMsg ) ; err != nil {
continue
}
if ! filter ( dlqMsg ) {
continue
}
// Reconstruct original message
headers := make ( [ ] kafka . Header , len ( dlqMsg . OriginalHeaders ) )
for i , h := range dlqMsg . OriginalHeaders {
headers [ i ] = kafka . Header { Key : h . Key , Value : [ ] byte ( h . Value ) }
}
headers = append ( headers , kafka . Header {
Key : "reprocessed_from_dlq" ,
Value : [ ] byte ( "true" ) ,
} )
// Send back to original topic
err = r . originalProducer . WriteMessages ( ctx , kafka . Message {
Topic : dlqMsg . OriginalTopic ,
Key : [ ] byte ( dlqMsg . OriginalKey ) ,
Value : dlqMsg . OriginalValue ,
Headers : headers ,
} )
if err != nil {
log . Printf ( "Failed to reprocess: %v" , err )
continue
}
// Commit DLQ message
r . dlqReader . CommitMessages ( ctx , msg )
r . metrics . MessagesReprocessed ++
}
}
// DLQ Dashboard API
type DLQDashboard struct {
dlqReader * kafka . Reader
}
type DLQStats struct {
TotalMessages int64
ByTopic map [ string ] int64
ByError map [ string ] int64
OldestMessage time . Time
NewestMessage time . Time
}
func ( d * DLQDashboard ) GetStats ( ctx context . Context ) ( * DLQStats , error ) {
stats := & DLQStats {
ByTopic : make ( map [ string ] int64 ) ,
ByError : make ( map [ string ] int64 ) ,
}
// This is simplified - in production, use Kafka admin client
// to get partition offsets and calculate lag
return stats , nil
}
func ( d * DLQDashboard ) GetMessages ( ctx context . Context , limit int , filters DLQFilters ) ( [ ] DeadLetterMessage , error ) {
var messages [ ] DeadLetterMessage
for i := 0 ; i < limit ; i ++ {
msg , err := d . dlqReader . FetchMessage ( ctx )
if err != nil {
break
}
var dlqMsg DeadLetterMessage
if err := json . Unmarshal ( msg . Value , & dlqMsg ) ; err != nil {
continue
}
if filters . Match ( dlqMsg ) {
messages = append ( messages , dlqMsg )
}
}
return messages , nil
}
type DLQFilters struct {
Topic string
ErrorContains string
Since time . Time
Until time . Time
}
func ( f DLQFilters ) Match ( msg DeadLetterMessage ) bool {
if f . Topic != "" && msg . OriginalTopic != f . Topic {
return false
}
if f . ErrorContains != "" && ! strings . Contains ( msg . FailureReason , f . ErrorContains ) {
return false
}
if ! f . Since . IsZero ( ) && msg . LastFailedAt . Before ( f . Since ) {
return false
}
if ! f . Until . IsZero ( ) && msg . LastFailedAt . After ( f . Until ) {
return false
}
return true
}
Summary: Event-Driven Architecture Principles
When to Use Events vs Direct Calls
Use Events Use Direct Calls Loose coupling needed Synchronous response required Multiple consumers Single consumer Audit trail important Simple request-response Eventual consistency OK Strong consistency required High throughput Low latency critical Failure isolation Transactional operations
Event-Driven Architecture Checklist
Design Phase:
☐ Define event schema with versioning
☐ Choose between choreography vs orchestration
☐ Plan for idempotency
☐ Design DLQ strategy
☐ Plan schema evolution
Implementation:
☐ Use outbox pattern for reliability
☐ Implement idempotent consumers
☐ Add correlation IDs for tracing
☐ Set up proper partitioning
☐ Configure retention policies
Operations:
☐ Monitor consumer lag
☐ Alert on DLQ growth
☐ Track processing latency
☐ Dashboard for event flow
☐ Runbook for reprocessing
Key Metrics
Metric Warning Action Consumer Lag >10K Scale consumers DLQ Rate >1% Investigate failures Processing Time p99 >1s Optimize handler Duplicate Rate >5% Check idempotency Rebalance Frequency >1/hour Check consumer health
This guide covers the essential event-driven architecture patterns for system design interviews. Remember: events provide decoupling and scalability, but add complexity. Use them when the benefits outweigh the operational overhead.