Module 20: CQRS Pattern

What is CQRS?

CQRS (Command Query Responsibility Segregation) separates read and write operations into different models.
┌─────────────────────────────────────────────────────────────────┐ │ CQRS OVERVIEW │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ TRADITIONAL (Single Model): │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Client ──── CRUD ────► Single Model ────► Database │ │ │ │ │ │ │ │ │ (Same model for R and W) │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ CQRS (Separated Models): │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ ┌─────────────────┐ │ │ │ │ Commands ──►│ Write Model │──► Write DB │ │ │ │ │ (Domain Logic) │ │ │ │ │ └────────┬────────┘ │ │ │ │ │ Events │ │ │ │ ▼ │ │ │ │ ┌─────────────────┐ │ │ │ │ Queries ◄───│ Read Model │◄── Read DB │ │ │ │ │ (Optimized) │ │ │ │ │ └─────────────────┘ │ │ │ │ │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ Benefits: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Optimized read models for queries │ │ │ │ • Scale reads and writes independently │ │ │ │ • Simplified domain model (writes only) │ │ │ │ • Different storage for different needs │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ Trade-offs: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Eventual consistency between models │ │ │ │ • More infrastructure complexity │ │ │ │ • Data synchronization overhead │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

CQRS Architecture

┌─────────────────────────────────────────────────────────────────┐ │ CQRS ARCHITECTURE │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ API Layer │ │ │ │ ┌─────────────────┐ ┌─────────────────────────┐ │ │ │ │ │ Command Handler │ │ Query Handler │ │ │ │ │ │ POST/PUT/DELETE │ │ GET │ │ │ │ │ └────────┬────────┘ └───────────┬─────────────┘ │ │ │ └───────────┼──────────────────────────┼───────────────┘ │ │ │ │ │ │ ┌───────────▼───────────┐ ┌───────────▼───────────┐ │ │ │ WRITE SIDE │ │ READ SIDE │ │ │ │ ┌─────────────────┐ │ │ ┌─────────────────┐ │ │ │ │ │ Command Bus │ │ │ │ Query Service │ │ │ │ │ └────────┬────────┘ │ │ └────────┬────────┘ │ │ │ │ │ │ │ │ │ │ │ │ ┌────────▼────────┐ │ │ ┌────────▼────────┐ │ │ │ │ │ Domain Model │ │ │ │ Read Model │ │ │ │ │ │ (Aggregates) │ │ │ │ (Projections) │ │ │ │ │ └────────┬────────┘ │ │ └────────┬────────┘ │ │ │ │ │ │ │ │ │ │ │ │ ┌────────▼────────┐ │ │ ┌────────▼────────┐ │ │ │ │ │ Event Store │ │ │ │ Read Database │ │ │ │ │ │ (PostgreSQL) │ │ │ │ (Elasticsearch) │ │ │ │ │ └────────┬────────┘ │ │ └─────────────────┘ │ │ │ └───────────┼───────────┘ └───────────▲───────────┘ │ │ │ │ │ │ │ Events │ │ │ └──────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Commands and Queries

go
package cqrs import ( "context" "time" ) // Command represents an intent to change state type Command interface { CommandName() string } // Query represents a request for data type Query interface { QueryName() string } // CommandHandler processes commands type CommandHandler interface { Handle(ctx context.Context, cmd Command) error } // QueryHandler processes queries type QueryHandler interface { Handle(ctx context.Context, query Query) (interface{}, error) } // Example Commands type CreateOrderCommand struct { OrderID string CustomerID string Items []OrderItem } func (c CreateOrderCommand) CommandName() string { return "CreateOrder" } type AddItemCommand struct { OrderID string ProductID string Quantity int } func (c AddItemCommand) CommandName() string { return "AddItem" } type SubmitOrderCommand struct { OrderID string } func (c SubmitOrderCommand) CommandName() string { return "SubmitOrder" } // Example Queries type GetOrderQuery struct { OrderID string } func (q GetOrderQuery) QueryName() string { return "GetOrder" } type GetOrdersByCustomerQuery struct { CustomerID string Status string Limit int Offset int } func (q GetOrdersByCustomerQuery) QueryName() string { return "GetOrdersByCustomer" } type SearchOrdersQuery struct { Query string DateFrom time.Time DateTo time.Time Limit int } func (q SearchOrdersQuery) QueryName() string { return "SearchOrders" } // OrderItem for commands type OrderItem struct { ProductID string Quantity int Price int64 }

Command Bus

go
package cqrs import ( "context" "fmt" "reflect" "sync" ) // CommandBus routes commands to handlers type CommandBus struct { handlers map[string]CommandHandler mu sync.RWMutex // Middleware middleware []CommandMiddleware } type CommandMiddleware func(CommandHandler) CommandHandler func NewCommandBus() *CommandBus { return &CommandBus{ handlers: make(map[string]CommandHandler), } } // Register a handler for a command type func (b *CommandBus) Register(cmd Command, handler CommandHandler) { b.mu.Lock() defer b.mu.Unlock() b.handlers[cmd.CommandName()] = handler } // Use adds middleware func (b *CommandBus) Use(mw CommandMiddleware) { b.middleware = append(b.middleware, mw) } // Dispatch sends a command to its handler func (b *CommandBus) Dispatch(ctx context.Context, cmd Command) error { b.mu.RLock() handler, ok := b.handlers[cmd.CommandName()] b.mu.RUnlock() if !ok { return fmt.Errorf("no handler registered for command: %s", cmd.CommandName()) } // Apply middleware for i := len(b.middleware) - 1; i >= 0; i-- { handler = b.middleware[i](handler) } return handler.Handle(ctx, cmd) } // Logging middleware func LoggingMiddleware() CommandMiddleware { return func(next CommandHandler) CommandHandler { return CommandHandlerFunc(func(ctx context.Context, cmd Command) error { start := time.Now() fmt.Printf("Executing command: %s\n", cmd.CommandName()) err := next.Handle(ctx, cmd) if err != nil { fmt.Printf("Command %s failed: %v (took %v)\n", cmd.CommandName(), err, time.Since(start)) } else { fmt.Printf("Command %s succeeded (took %v)\n", cmd.CommandName(), time.Since(start)) } return err }) } } // Validation middleware func ValidationMiddleware(validator Validator) CommandMiddleware { return func(next CommandHandler) CommandHandler { return CommandHandlerFunc(func(ctx context.Context, cmd Command) error { if err := validator.Validate(cmd); err != nil { return fmt.Errorf("validation failed: %w", err) } return next.Handle(ctx, cmd) }) } } type Validator interface { Validate(cmd Command) error } // CommandHandlerFunc adapts a function to CommandHandler type CommandHandlerFunc func(ctx context.Context, cmd Command) error func (f CommandHandlerFunc) Handle(ctx context.Context, cmd Command) error { return f(ctx, cmd) } // Transaction middleware func TransactionMiddleware(db Database) CommandMiddleware { return func(next CommandHandler) CommandHandler { return CommandHandlerFunc(func(ctx context.Context, cmd Command) error { tx, err := db.Begin(ctx) if err != nil { return err } ctx = context.WithValue(ctx, "tx", tx) if err := next.Handle(ctx, cmd); err != nil { tx.Rollback() return err } return tx.Commit() }) } } type Database interface { Begin(ctx context.Context) (Transaction, error) } type Transaction interface { Commit() error Rollback() error }

Write Side Implementation

go
package cqrs import ( "context" "fmt" "time" ) // Order aggregate (Write Model) type Order struct { ID string CustomerID string Items []OrderItem Status OrderStatus CreatedAt time.Time UpdatedAt time.Time events []DomainEvent } type OrderStatus string const ( OrderStatusDraft OrderStatus = "draft" OrderStatusSubmitted OrderStatus = "submitted" OrderStatusConfirmed OrderStatus = "confirmed" OrderStatusShipped OrderStatus = "shipped" OrderStatusDelivered OrderStatus = "delivered" OrderStatusCancelled OrderStatus = "cancelled" ) // Domain Events type DomainEvent interface { EventName() string OccurredAt() time.Time } type OrderCreated struct { OrderID string CustomerID string Timestamp time.Time } func (e OrderCreated) EventName() string { return "OrderCreated" } func (e OrderCreated) OccurredAt() time.Time { return e.Timestamp } type ItemAdded struct { OrderID string ProductID string Quantity int Price int64 Timestamp time.Time } func (e ItemAdded) EventName() string { return "ItemAdded" } func (e ItemAdded) OccurredAt() time.Time { return e.Timestamp } type OrderSubmitted struct { OrderID string Total int64 Timestamp time.Time } func (e OrderSubmitted) EventName() string { return "OrderSubmitted" } func (e OrderSubmitted) OccurredAt() time.Time { return e.Timestamp } // Order methods func NewOrder(id, customerID string) *Order { order := &Order{ ID: id, CustomerID: customerID, Status: OrderStatusDraft, CreatedAt: time.Now(), UpdatedAt: time.Now(), } order.recordEvent(OrderCreated{ OrderID: id, CustomerID: customerID, Timestamp: time.Now(), }) return order } func (o *Order) AddItem(productID string, quantity int, price int64) error { if o.Status != OrderStatusDraft { return fmt.Errorf("cannot add items to non-draft order") } o.Items = append(o.Items, OrderItem{ ProductID: productID, Quantity: quantity, Price: price, }) o.UpdatedAt = time.Now() o.recordEvent(ItemAdded{ OrderID: o.ID, ProductID: productID, Quantity: quantity, Price: price, Timestamp: time.Now(), }) return nil } func (o *Order) Submit() error { if o.Status != OrderStatusDraft { return fmt.Errorf("can only submit draft orders") } if len(o.Items) == 0 { return fmt.Errorf("cannot submit empty order") } o.Status = OrderStatusSubmitted o.UpdatedAt = time.Now() o.recordEvent(OrderSubmitted{ OrderID: o.ID, Total: o.Total(), Timestamp: time.Now(), }) return nil } func (o *Order) Total() int64 { var total int64 for _, item := range o.Items { total += item.Price * int64(item.Quantity) } return total } func (o *Order) recordEvent(event DomainEvent) { o.events = append(o.events, event) } func (o *Order) PullEvents() []DomainEvent { events := o.events o.events = nil return events } // Order Repository (Write Side) type OrderRepository interface { Save(ctx context.Context, order *Order) error GetByID(ctx context.Context, id string) (*Order, error) } // Command Handlers type CreateOrderHandler struct { repo OrderRepository publisher EventPublisher } func (h *CreateOrderHandler) Handle(ctx context.Context, cmd Command) error { c := cmd.(CreateOrderCommand) order := NewOrder(c.OrderID, c.CustomerID) for _, item := range c.Items { if err := order.AddItem(item.ProductID, item.Quantity, item.Price); err != nil { return err } } if err := h.repo.Save(ctx, order); err != nil { return err } // Publish events for read model for _, event := range order.PullEvents() { h.publisher.Publish(ctx, event) } return nil } type SubmitOrderHandler struct { repo OrderRepository publisher EventPublisher } func (h *SubmitOrderHandler) Handle(ctx context.Context, cmd Command) error { c := cmd.(SubmitOrderCommand) order, err := h.repo.GetByID(ctx, c.OrderID) if err != nil { return err } if err := order.Submit(); err != nil { return err } if err := h.repo.Save(ctx, order); err != nil { return err } for _, event := range order.PullEvents() { h.publisher.Publish(ctx, event) } return nil } type EventPublisher interface { Publish(ctx context.Context, event DomainEvent) error }

Read Side Implementation

go
package cqrs import ( "context" "sync" "time" ) // Read Models (optimized for queries) type OrderReadModel struct { ID string `json:"id"` CustomerID string `json:"customer_id"` CustomerName string `json:"customer_name"` // Denormalized Status string `json:"status"` ItemCount int `json:"item_count"` Total int64 `json:"total"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } type OrderDetailReadModel struct { ID string `json:"id"` CustomerID string `json:"customer_id"` CustomerName string `json:"customer_name"` Status string `json:"status"` Items []OrderItemReadModel `json:"items"` Total int64 `json:"total"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } type OrderItemReadModel struct { ProductID string `json:"product_id"` ProductName string `json:"product_name"` // Denormalized Quantity int `json:"quantity"` Price int64 `json:"price"` Subtotal int64 `json:"subtotal"` } // Read Model Store type OrderReadStore interface { // Write operations (for projection updates) Save(ctx context.Context, order *OrderReadModel) error SaveDetail(ctx context.Context, order *OrderDetailReadModel) error // Query operations GetByID(ctx context.Context, id string) (*OrderDetailReadModel, error) GetByCustomer(ctx context.Context, customerID string, limit, offset int) ([]OrderReadModel, error) Search(ctx context.Context, query string, limit int) ([]OrderReadModel, error) } // In-memory implementation type InMemoryOrderReadStore struct { mu sync.RWMutex orders map[string]*OrderReadModel details map[string]*OrderDetailReadModel } func NewInMemoryOrderReadStore() *InMemoryOrderReadStore { return &InMemoryOrderReadStore{ orders: make(map[string]*OrderReadModel), details: make(map[string]*OrderDetailReadModel), } } func (s *InMemoryOrderReadStore) Save(ctx context.Context, order *OrderReadModel) error { s.mu.Lock() defer s.mu.Unlock() s.orders[order.ID] = order return nil } func (s *InMemoryOrderReadStore) SaveDetail(ctx context.Context, order *OrderDetailReadModel) error { s.mu.Lock() defer s.mu.Unlock() s.details[order.ID] = order return nil } func (s *InMemoryOrderReadStore) GetByID(ctx context.Context, id string) (*OrderDetailReadModel, error) { s.mu.RLock() defer s.mu.RUnlock() return s.details[id], nil } func (s *InMemoryOrderReadStore) GetByCustomer(ctx context.Context, customerID string, limit, offset int) ([]OrderReadModel, error) { s.mu.RLock() defer s.mu.RUnlock() var result []OrderReadModel for _, order := range s.orders { if order.CustomerID == customerID { result = append(result, *order) } } // Apply pagination start := offset end := offset + limit if start > len(result) { return nil, nil } if end > len(result) { end = len(result) } return result[start:end], nil } func (s *InMemoryOrderReadStore) Search(ctx context.Context, query string, limit int) ([]OrderReadModel, error) { // Simple implementation - in production use Elasticsearch s.mu.RLock() defer s.mu.RUnlock() var result []OrderReadModel for _, order := range s.orders { if len(result) >= limit { break } // Simple contains search if contains(order.ID, query) || contains(order.CustomerName, query) { result = append(result, *order) } } return result, nil } func contains(s, substr string) bool { return len(s) >= len(substr) && (s == substr || len(substr) == 0) } // Query Handlers type QueryBus struct { handlers map[string]QueryHandler mu sync.RWMutex } func NewQueryBus() *QueryBus { return &QueryBus{ handlers: make(map[string]QueryHandler), } } func (b *QueryBus) Register(query Query, handler QueryHandler) { b.mu.Lock() defer b.mu.Unlock() b.handlers[query.QueryName()] = handler } func (b *QueryBus) Execute(ctx context.Context, query Query) (interface{}, error) { b.mu.RLock() handler, ok := b.handlers[query.QueryName()] b.mu.RUnlock() if !ok { return nil, fmt.Errorf("no handler for query: %s", query.QueryName()) } return handler.Handle(ctx, query) } type GetOrderHandler struct { store OrderReadStore } func (h *GetOrderHandler) Handle(ctx context.Context, q Query) (interface{}, error) { query := q.(GetOrderQuery) return h.store.GetByID(ctx, query.OrderID) } type GetOrdersByCustomerHandler struct { store OrderReadStore } func (h *GetOrdersByCustomerHandler) Handle(ctx context.Context, q Query) (interface{}, error) { query := q.(GetOrdersByCustomerQuery) return h.store.GetByCustomer(ctx, query.CustomerID, query.Limit, query.Offset) }

Event Handlers (Projections)

go
package cqrs import ( "context" "log" ) // EventHandler processes domain events type EventHandler interface { Handle(ctx context.Context, event DomainEvent) error } // OrderProjection updates read models from events type OrderProjection struct { store OrderReadStore customerStore CustomerStore // For denormalization productStore ProductStore // For denormalization } type CustomerStore interface { GetByID(ctx context.Context, id string) (*Customer, error) } type ProductStore interface { GetByID(ctx context.Context, id string) (*Product, error) } type Customer struct { ID string Name string } type Product struct { ID string Name string Price int64 } func NewOrderProjection(store OrderReadStore, customers CustomerStore, products ProductStore) *OrderProjection { return &OrderProjection{ store: store, customerStore: customers, productStore: products, } } func (p *OrderProjection) Handle(ctx context.Context, event DomainEvent) error { switch e := event.(type) { case OrderCreated: return p.handleOrderCreated(ctx, e) case ItemAdded: return p.handleItemAdded(ctx, e) case OrderSubmitted: return p.handleOrderSubmitted(ctx, e) default: log.Printf("Unknown event type: %T", event) return nil } } func (p *OrderProjection) handleOrderCreated(ctx context.Context, e OrderCreated) error { // Get customer details for denormalization customer, _ := p.customerStore.GetByID(ctx, e.CustomerID) customerName := "" if customer != nil { customerName = customer.Name } // Create read models readModel := &OrderReadModel{ ID: e.OrderID, CustomerID: e.CustomerID, CustomerName: customerName, Status: string(OrderStatusDraft), ItemCount: 0, Total: 0, CreatedAt: e.Timestamp, UpdatedAt: e.Timestamp, } detailModel := &OrderDetailReadModel{ ID: e.OrderID, CustomerID: e.CustomerID, CustomerName: customerName, Status: string(OrderStatusDraft), Items: []OrderItemReadModel{}, Total: 0, CreatedAt: e.Timestamp, UpdatedAt: e.Timestamp, } p.store.Save(ctx, readModel) p.store.SaveDetail(ctx, detailModel) return nil } func (p *OrderProjection) handleItemAdded(ctx context.Context, e ItemAdded) error { detail, err := p.store.GetByID(ctx, e.OrderID) if err != nil { return err } // Get product details for denormalization product, _ := p.productStore.GetByID(ctx, e.ProductID) productName := "" if product != nil { productName = product.Name } // Add item to detail item := OrderItemReadModel{ ProductID: e.ProductID, ProductName: productName, Quantity: e.Quantity, Price: e.Price, Subtotal: e.Price * int64(e.Quantity), } detail.Items = append(detail.Items, item) detail.Total += item.Subtotal detail.UpdatedAt = e.Timestamp // Update summary summary := &OrderReadModel{ ID: detail.ID, CustomerID: detail.CustomerID, CustomerName: detail.CustomerName, Status: detail.Status, ItemCount: len(detail.Items), Total: detail.Total, CreatedAt: detail.CreatedAt, UpdatedAt: detail.UpdatedAt, } p.store.Save(ctx, summary) p.store.SaveDetail(ctx, detail) return nil } func (p *OrderProjection) handleOrderSubmitted(ctx context.Context, e OrderSubmitted) error { detail, err := p.store.GetByID(ctx, e.OrderID) if err != nil { return err } detail.Status = string(OrderStatusSubmitted) detail.UpdatedAt = e.Timestamp summary := &OrderReadModel{ ID: detail.ID, CustomerID: detail.CustomerID, CustomerName: detail.CustomerName, Status: detail.Status, ItemCount: len(detail.Items), Total: detail.Total, CreatedAt: detail.CreatedAt, UpdatedAt: detail.UpdatedAt, } p.store.Save(ctx, summary) p.store.SaveDetail(ctx, detail) return nil } // Event Dispatcher type EventDispatcher struct { handlers map[string][]EventHandler mu sync.RWMutex } func NewEventDispatcher() *EventDispatcher { return &EventDispatcher{ handlers: make(map[string][]EventHandler), } } func (d *EventDispatcher) Subscribe(eventName string, handler EventHandler) { d.mu.Lock() defer d.mu.Unlock() d.handlers[eventName] = append(d.handlers[eventName], handler) } func (d *EventDispatcher) Dispatch(ctx context.Context, event DomainEvent) { d.mu.RLock() handlers := d.handlers[event.EventName()] d.mu.RUnlock() for _, handler := range handlers { go func(h EventHandler) { if err := h.Handle(ctx, event); err != nil { log.Printf("Error handling event %s: %v", event.EventName(), err) } }(handler) } }

Complete CQRS Application

go
package cqrs import ( "context" "encoding/json" "net/http" ) // Application wires everything together type Application struct { commandBus *CommandBus queryBus *QueryBus } func NewApplication() *Application { // Create stores orderRepo := NewInMemoryOrderRepository() orderReadStore := NewInMemoryOrderReadStore() customerStore := NewInMemoryCustomerStore() productStore := NewInMemoryProductStore() // Create event dispatcher dispatcher := NewEventDispatcher() // Create projection projection := NewOrderProjection(orderReadStore, customerStore, productStore) // Subscribe projection to events dispatcher.Subscribe("OrderCreated", projection) dispatcher.Subscribe("ItemAdded", projection) dispatcher.Subscribe("OrderSubmitted", projection) // Create event publisher publisher := &DispatcherPublisher{dispatcher: dispatcher} // Create command bus commandBus := NewCommandBus() commandBus.Use(LoggingMiddleware()) // Register command handlers commandBus.Register(CreateOrderCommand{}, &CreateOrderHandler{ repo: orderRepo, publisher: publisher, }) commandBus.Register(SubmitOrderCommand{}, &SubmitOrderHandler{ repo: orderRepo, publisher: publisher, }) // Create query bus queryBus := NewQueryBus() queryBus.Register(GetOrderQuery{}, &GetOrderHandler{store: orderReadStore}) queryBus.Register(GetOrdersByCustomerQuery{}, &GetOrdersByCustomerHandler{store: orderReadStore}) return &Application{ commandBus: commandBus, queryBus: queryBus, } } type DispatcherPublisher struct { dispatcher *EventDispatcher } func (p *DispatcherPublisher) Publish(ctx context.Context, event DomainEvent) error { p.dispatcher.Dispatch(ctx, event) return nil } // HTTP Handlers func (app *Application) CreateOrderHTTP(w http.ResponseWriter, r *http.Request) { var cmd CreateOrderCommand if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if err := app.commandBus.Dispatch(r.Context(), cmd); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(map[string]string{"order_id": cmd.OrderID}) } func (app *Application) SubmitOrderHTTP(w http.ResponseWriter, r *http.Request) { orderID := r.URL.Query().Get("order_id") cmd := SubmitOrderCommand{OrderID: orderID} if err := app.commandBus.Dispatch(r.Context(), cmd); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) } func (app *Application) GetOrderHTTP(w http.ResponseWriter, r *http.Request) { orderID := r.URL.Query().Get("order_id") query := GetOrderQuery{OrderID: orderID} result, err := app.queryBus.Execute(r.Context(), query) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } if result == nil { http.Error(w, "Order not found", http.StatusNotFound) return } json.NewEncoder(w).Encode(result) } func (app *Application) GetCustomerOrdersHTTP(w http.ResponseWriter, r *http.Request) { customerID := r.URL.Query().Get("customer_id") query := GetOrdersByCustomerQuery{ CustomerID: customerID, Limit: 20, Offset: 0, } result, err := app.queryBus.Execute(r.Context(), query) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } json.NewEncoder(w).Encode(result) }

CQRS with Event Sourcing

┌─────────────────────────────────────────────────────────────────┐ │ CQRS + EVENT SOURCING │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ The most powerful combination: │ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Command ──► Aggregate ──► Events ──► Event Store │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ Event Publisher │ │ │ │ │ │ │ │ │ ┌───────────────┼───────────────┐ │ │ │ │ ▼ ▼ ▼ │ │ │ │ Projection 1 Projection 2 Projection 3 │ │ │ │ (List View) (Search) (Analytics) │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ │ │ Postgres Elasticsearch ClickHouse │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ Benefits: │ │ • Event store is the single source of truth │ │ • Rebuild any projection from events │ │ • Add new projections without changing writes │ │ • Temporal queries from event history │ │ │ └─────────────────────────────────────────────────────────────────┘

Best Practices

┌─────────────────────────────────────────────────────────────────┐ │ CQRS BEST PRACTICES │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. KEEP COMMANDS FOCUSED │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • One command = one intent │ │ │ │ • Commands should be valid or fail │ │ │ │ • Return minimal data (ID, success) │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 2. OPTIMIZE READ MODELS │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Denormalize for query performance │ │ │ │ • Create specific models for each use case │ │ │ │ • Use appropriate storage (SQL, NoSQL, Search) │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 3. HANDLE EVENTUAL CONSISTENCY │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • UI should expect stale data │ │ │ │ • Show optimistic updates │ │ │ │ • Provide refresh mechanisms │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 4. USE CQRS WHERE IT ADDS VALUE │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Different read/write patterns │ │ │ │ • Complex domains │ │ │ │ • High scalability requirements │ │ │ │ • NOT for simple CRUD │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 5. VERSION YOUR EVENTS │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Include version in event type │ │ │ │ • Use upcasters for old events │ │ │ │ • Plan for schema evolution │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Interview Questions

  1. What is CQRS?
    • Separating read and write models
    • Commands change state, queries return data
    • Different optimization for each
  2. When should you use CQRS?
    • Complex domains with different read/write needs
    • High scalability requirements
    • When you need multiple read models
    • NOT for simple CRUD applications
  3. How do you handle eventual consistency?
    • Optimistic UI updates
    • Polling or push notifications
    • Design UI to expect stale data
  4. What's the relationship between CQRS and Event Sourcing?
    • Complementary but independent
    • Event Sourcing provides events for projections
    • CQRS doesn't require Event Sourcing
  5. How do you test CQRS systems?
    • Unit test aggregates
    • Integration test projections
    • End-to-end test command → query flow

Summary

┌─────────────────────────────────────────────────────────────────┐ │ CQRS SUMMARY │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Core Concepts: │ │ • Commands: Intent to change state │ │ • Queries: Request for data │ │ • Write Model: Domain logic, consistency │ │ • Read Model: Query-optimized, denormalized │ │ │ │ Benefits: │ │ • Scale reads/writes independently │ │ • Optimized query models │ │ • Simpler domain model │ │ • Flexibility in storage │ │ │ │ Trade-offs: │ │ • Eventual consistency │ │ • More infrastructure │ │ • Complexity │ │ │ │ Key Insight: │ │ "CQRS acknowledges that reading and writing data have │ │ different requirements and should be optimized separately." │ │ │ └─────────────────────────────────────────────────────────────────┘

All Blogs
Tags:cqrsarchitecturescalabilityread-write-separation