Part 25: The Bulkhead Pattern - Isolating Failures in Distributed Systems

"A ship doesn't sink because of the water around it; it sinks because of the water that gets in. Bulkheads keep compartments isolated, and your systems need the same protection."
Welcome to Part 25 of our distributed systems course! After exploring retry strategies, we now turn to another crucial resilience pattern: bulkheads. This pattern prevents cascading failures by isolating system components.

The Problem: Resource Exhaustion Cascade

Imagine you have a service handling multiple types of requests:
go
// Without bulkheads - all requests share resources type UnprotectedService struct { httpClient *http.Client db *sql.DB } func (s *UnprotectedService) HandleCriticalRequest(ctx context.Context) error { // Uses shared connection pool return s.db.QueryRowContext(ctx, "SELECT * FROM critical_data").Scan() } func (s *UnprotectedService) HandleBulkExport(ctx context.Context) error { // Also uses shared connection pool - can exhaust it! rows, _ := s.db.QueryContext(ctx, "SELECT * FROM huge_table") defer rows.Close() // Long-running operation... return nil }
When bulk exports spike, they exhaust the connection pool, blocking critical requests!

Understanding Bulkheads

The bulkhead pattern divides resources into isolated pools:
go
package bulkhead import ( "context" "errors" "sync" "time" ) var ( ErrBulkheadFull = errors.New("bulkhead capacity reached") ErrBulkheadTimeout = errors.New("bulkhead timeout waiting for slot") ) // Bulkhead limits concurrent access to a resource type Bulkhead struct { name string maxCapacity int timeout time.Duration semaphore chan struct{} mu sync.RWMutex active int waiting int rejected int64 completed int64 } // Config holds bulkhead configuration type Config struct { Name string MaxCapacity int // Maximum concurrent executions Timeout time.Duration // How long to wait for a slot } // New creates a new bulkhead func New(cfg Config) *Bulkhead { if cfg.MaxCapacity <= 0 { cfg.MaxCapacity = 10 } if cfg.Timeout <= 0 { cfg.Timeout = 1 * time.Second } return &Bulkhead{ name: cfg.Name, maxCapacity: cfg.MaxCapacity, timeout: cfg.Timeout, semaphore: make(chan struct{}, cfg.MaxCapacity), } } // Execute runs a function within the bulkhead limits func (b *Bulkhead) Execute(ctx context.Context, fn func() error) error { // Try to acquire a slot if err := b.acquire(ctx); err != nil { return err } defer b.release() // Execute the function return fn() } func (b *Bulkhead) acquire(ctx context.Context) error { b.mu.Lock() b.waiting++ b.mu.Unlock() defer func() { b.mu.Lock() b.waiting-- b.mu.Unlock() }() // Create timeout context timeoutCtx, cancel := context.WithTimeout(ctx, b.timeout) defer cancel() select { case b.semaphore <- struct{}{}: b.mu.Lock() b.active++ b.mu.Unlock() return nil case <-timeoutCtx.Done(): b.mu.Lock() b.rejected++ b.mu.Unlock() if ctx.Err() != nil { return ctx.Err() } return ErrBulkheadTimeout case <-ctx.Done(): b.mu.Lock() b.rejected++ b.mu.Unlock() return ctx.Err() } } func (b *Bulkhead) release() { <-b.semaphore b.mu.Lock() b.active-- b.completed++ b.mu.Unlock() } // Stats returns current bulkhead statistics type Stats struct { Name string MaxCapacity int Active int Waiting int Rejected int64 Completed int64 Utilization float64 } func (b *Bulkhead) Stats() Stats { b.mu.RLock() defer b.mu.RUnlock() utilization := 0.0 if b.maxCapacity > 0 { utilization = float64(b.active) / float64(b.maxCapacity) } return Stats{ Name: b.name, MaxCapacity: b.maxCapacity, Active: b.active, Waiting: b.waiting, Rejected: b.rejected, Completed: b.completed, Utilization: utilization, } }

Thread Pool Bulkhead

For CPU-bound work, use thread pool isolation:
go
// WorkerPoolBulkhead provides thread-pool style isolation type WorkerPoolBulkhead struct { name string workers int queue chan *work mu sync.RWMutex running bool completed int64 rejected int64 } type work struct { fn func() error result chan error } // WorkerPoolConfig configures the worker pool type WorkerPoolConfig struct { Name string Workers int QueueSize int } // NewWorkerPool creates a worker pool bulkhead func NewWorkerPool(cfg WorkerPoolConfig) *WorkerPoolBulkhead { if cfg.Workers <= 0 { cfg.Workers = 4 } if cfg.QueueSize <= 0 { cfg.QueueSize = 100 } b := &WorkerPoolBulkhead{ name: cfg.Name, workers: cfg.Workers, queue: make(chan *work, cfg.QueueSize), running: true, } // Start workers for i := 0; i < cfg.Workers; i++ { go b.worker(i) } return b } func (b *WorkerPoolBulkhead) worker(id int) { for w := range b.queue { err := w.fn() w.result <- err b.mu.Lock() b.completed++ b.mu.Unlock() } } // Submit submits work to the pool func (b *WorkerPoolBulkhead) Submit(ctx context.Context, fn func() error) error { b.mu.RLock() if !b.running { b.mu.RUnlock() return errors.New("bulkhead is closed") } b.mu.RUnlock() w := &work{ fn: fn, result: make(chan error, 1), } select { case b.queue <- w: // Work submitted, wait for result select { case err := <-w.result: return err case <-ctx.Done(): return ctx.Err() } default: // Queue full b.mu.Lock() b.rejected++ b.mu.Unlock() return ErrBulkheadFull } } // Close shuts down the worker pool func (b *WorkerPoolBulkhead) Close() { b.mu.Lock() b.running = false b.mu.Unlock() close(b.queue) }

Service-Level Bulkheads

Apply bulkheads at the service level:
go
// ServiceBulkheads manages bulkheads for different service operations type ServiceBulkheads struct { bulkheads map[string]*Bulkhead mu sync.RWMutex } // NewServiceBulkheads creates service bulkheads func NewServiceBulkheads() *ServiceBulkheads { return &ServiceBulkheads{ bulkheads: make(map[string]*Bulkhead), } } // Register registers a bulkhead for an operation func (sb *ServiceBulkheads) Register(name string, cfg Config) { cfg.Name = name sb.mu.Lock() defer sb.mu.Unlock() sb.bulkheads[name] = New(cfg) } // Execute executes a function in the named bulkhead func (sb *ServiceBulkheads) Execute(ctx context.Context, name string, fn func() error) error { sb.mu.RLock() b, exists := sb.bulkheads[name] sb.mu.RUnlock() if !exists { // No bulkhead, execute directly return fn() } return b.Execute(ctx, fn) } // AllStats returns stats for all bulkheads func (sb *ServiceBulkheads) AllStats() map[string]Stats { sb.mu.RLock() defer sb.mu.RUnlock() stats := make(map[string]Stats) for name, b := range sb.bulkheads { stats[name] = b.Stats() } return stats } // Example: Protected service with bulkheads type ProtectedService struct { bulkheads *ServiceBulkheads db *sql.DB } func NewProtectedService(db *sql.DB) *ProtectedService { svc := &ProtectedService{ bulkheads: NewServiceBulkheads(), db: db, } // Register bulkheads with different capacities svc.bulkheads.Register("critical", Config{ MaxCapacity: 50, // High capacity for critical ops Timeout: 100 * time.Millisecond, }) svc.bulkheads.Register("bulk-export", Config{ MaxCapacity: 5, // Limited capacity for bulk ops Timeout: 5 * time.Second, }) svc.bulkheads.Register("analytics", Config{ MaxCapacity: 10, // Moderate capacity for analytics Timeout: 1 * time.Second, }) return svc } func (s *ProtectedService) HandleCriticalRequest(ctx context.Context, id string) (string, error) { var result string err := s.bulkheads.Execute(ctx, "critical", func() error { return s.db.QueryRowContext(ctx, "SELECT data FROM critical_table WHERE id = $1", id, ).Scan(&result) }) return result, err } func (s *ProtectedService) HandleBulkExport(ctx context.Context) error { return s.bulkheads.Execute(ctx, "bulk-export", func() error { // Long-running export operation rows, err := s.db.QueryContext(ctx, "SELECT * FROM large_table") if err != nil { return err } defer rows.Close() for rows.Next() { // Process each row... } return rows.Err() }) }

Connection Pool Bulkheads

Isolate database connection pools:
go
// ConnectionPoolBulkhead manages isolated connection pools type ConnectionPoolBulkhead struct { pools map[string]*sql.DB mu sync.RWMutex } // PoolConfig configures a connection pool type PoolConfig struct { DSN string MaxOpenConns int MaxIdleConns int ConnMaxLifetime time.Duration } // NewConnectionPoolBulkhead creates connection pool bulkheads func NewConnectionPoolBulkhead() *ConnectionPoolBulkhead { return &ConnectionPoolBulkhead{ pools: make(map[string]*sql.DB), } } // RegisterPool registers a named connection pool func (cpb *ConnectionPoolBulkhead) RegisterPool(name string, cfg PoolConfig) error { db, err := sql.Open("postgres", cfg.DSN) if err != nil { return fmt.Errorf("failed to open database: %w", err) } db.SetMaxOpenConns(cfg.MaxOpenConns) db.SetMaxIdleConns(cfg.MaxIdleConns) db.SetConnMaxLifetime(cfg.ConnMaxLifetime) // Verify connection if err := db.Ping(); err != nil { db.Close() return fmt.Errorf("failed to ping database: %w", err) } cpb.mu.Lock() cpb.pools[name] = db cpb.mu.Unlock() return nil } // GetPool returns the named connection pool func (cpb *ConnectionPoolBulkhead) GetPool(name string) (*sql.DB, error) { cpb.mu.RLock() defer cpb.mu.RUnlock() db, exists := cpb.pools[name] if !exists { return nil, fmt.Errorf("pool %s not found", name) } return db, nil } // Example usage func setupDatabasePools() (*ConnectionPoolBulkhead, error) { cpb := NewConnectionPoolBulkhead() // Critical operations pool - high capacity err := cpb.RegisterPool("critical", PoolConfig{ DSN: "postgres://localhost/mydb", MaxOpenConns: 50, MaxIdleConns: 25, ConnMaxLifetime: 5 * time.Minute, }) if err != nil { return nil, err } // Bulk operations pool - limited capacity err = cpb.RegisterPool("bulk", PoolConfig{ DSN: "postgres://localhost/mydb", MaxOpenConns: 5, MaxIdleConns: 2, ConnMaxLifetime: 10 * time.Minute, }) if err != nil { return nil, err } // Analytics pool - separate capacity err = cpb.RegisterPool("analytics", PoolConfig{ DSN: "postgres://localhost/mydb_replica", MaxOpenConns: 20, MaxIdleConns: 10, ConnMaxLifetime: 5 * time.Minute, }) if err != nil { return nil, err } return cpb, nil }

HTTP Client Bulkheads

Isolate HTTP clients for different services:
go
// HTTPClientBulkhead manages isolated HTTP clients type HTTPClientBulkhead struct { clients map[string]*http.Client bulkheads map[string]*Bulkhead mu sync.RWMutex } // HTTPClientConfig configures an HTTP client type HTTPClientConfig struct { Timeout time.Duration MaxIdleConns int MaxConnsPerHost int IdleConnTimeout time.Duration BulkheadCapacity int BulkheadTimeout time.Duration } // NewHTTPClientBulkhead creates HTTP client bulkheads func NewHTTPClientBulkhead() *HTTPClientBulkhead { return &HTTPClientBulkhead{ clients: make(map[string]*http.Client), bulkheads: make(map[string]*Bulkhead), } } // RegisterClient registers a named HTTP client func (hcb *HTTPClientBulkhead) RegisterClient(name string, cfg HTTPClientConfig) { transport := &http.Transport{ MaxIdleConns: cfg.MaxIdleConns, MaxConnsPerHost: cfg.MaxConnsPerHost, IdleConnTimeout: cfg.IdleConnTimeout, DisableCompression: false, DisableKeepAlives: false, } client := &http.Client{ Transport: transport, Timeout: cfg.Timeout, } bulkhead := New(Config{ Name: name, MaxCapacity: cfg.BulkheadCapacity, Timeout: cfg.BulkheadTimeout, }) hcb.mu.Lock() hcb.clients[name] = client hcb.bulkheads[name] = bulkhead hcb.mu.Unlock() } // Do executes an HTTP request through the named client func (hcb *HTTPClientBulkhead) Do(ctx context.Context, name string, req *http.Request) (*http.Response, error) { hcb.mu.RLock() client, clientExists := hcb.clients[name] bulkhead, bulkheadExists := hcb.bulkheads[name] hcb.mu.RUnlock() if !clientExists { return nil, fmt.Errorf("client %s not found", name) } var resp *http.Response execute := func() error { req = req.WithContext(ctx) var err error resp, err = client.Do(req) return err } var err error if bulkheadExists { err = bulkhead.Execute(ctx, execute) } else { err = execute() } return resp, err } // Example: Multi-service HTTP clients func setupHTTPClients() *HTTPClientBulkhead { hcb := NewHTTPClientBulkhead() // Payment service - critical, limited concurrency hcb.RegisterClient("payment-service", HTTPClientConfig{ Timeout: 5 * time.Second, MaxIdleConns: 20, MaxConnsPerHost: 20, IdleConnTimeout: 90 * time.Second, BulkheadCapacity: 20, BulkheadTimeout: 500 * time.Millisecond, }) // Notification service - non-critical, higher concurrency hcb.RegisterClient("notification-service", HTTPClientConfig{ Timeout: 10 * time.Second, MaxIdleConns: 50, MaxConnsPerHost: 50, IdleConnTimeout: 90 * time.Second, BulkheadCapacity: 50, BulkheadTimeout: 2 * time.Second, }) // Analytics service - batch operations hcb.RegisterClient("analytics-service", HTTPClientConfig{ Timeout: 30 * time.Second, MaxIdleConns: 10, MaxConnsPerHost: 10, IdleConnTimeout: 120 * time.Second, BulkheadCapacity: 10, BulkheadTimeout: 5 * time.Second, }) return hcb }

Adaptive Bulkheads

Dynamically adjust capacity based on performance:
go
// AdaptiveBulkhead adjusts capacity based on metrics type AdaptiveBulkhead struct { *Bulkhead minCapacity int maxCapacity int // Performance tracking latencies *RollingWindow errorRate *RollingWindow // Thresholds targetLatency time.Duration maxErrorRate float64 // Adjustment adjustInterval time.Duration lastAdjustment time.Time mu sync.Mutex } // RollingWindow tracks values over a time window type RollingWindow struct { values []float64 index int size int count int mu sync.Mutex } func NewRollingWindow(size int) *RollingWindow { return &RollingWindow{ values: make([]float64, size), size: size, } } func (rw *RollingWindow) Add(value float64) { rw.mu.Lock() defer rw.mu.Unlock() rw.values[rw.index] = value rw.index = (rw.index + 1) % rw.size if rw.count < rw.size { rw.count++ } } func (rw *RollingWindow) Average() float64 { rw.mu.Lock() defer rw.mu.Unlock() if rw.count == 0 { return 0 } var sum float64 for i := 0; i < rw.count; i++ { sum += rw.values[i] } return sum / float64(rw.count) } // AdaptiveConfig configures the adaptive bulkhead type AdaptiveConfig struct { Name string MinCapacity int MaxCapacity int InitialCapacity int TargetLatency time.Duration MaxErrorRate float64 AdjustInterval time.Duration } // NewAdaptiveBulkhead creates an adaptive bulkhead func NewAdaptiveBulkhead(cfg AdaptiveConfig) *AdaptiveBulkhead { if cfg.InitialCapacity <= 0 { cfg.InitialCapacity = cfg.MinCapacity } ab := &AdaptiveBulkhead{ Bulkhead: New(Config{ Name: cfg.Name, MaxCapacity: cfg.InitialCapacity, Timeout: 1 * time.Second, }), minCapacity: cfg.MinCapacity, maxCapacity: cfg.MaxCapacity, latencies: NewRollingWindow(100), errorRate: NewRollingWindow(100), targetLatency: cfg.TargetLatency, maxErrorRate: cfg.MaxErrorRate, adjustInterval: cfg.AdjustInterval, } // Start adjustment goroutine go ab.adjustLoop() return ab } // Execute runs a function and tracks performance func (ab *AdaptiveBulkhead) Execute(ctx context.Context, fn func() error) error { start := time.Now() err := ab.Bulkhead.Execute(ctx, fn) latency := time.Since(start) ab.latencies.Add(float64(latency.Milliseconds())) if err != nil { ab.errorRate.Add(1.0) } else { ab.errorRate.Add(0.0) } return err } func (ab *AdaptiveBulkhead) adjustLoop() { ticker := time.NewTicker(ab.adjustInterval) defer ticker.Stop() for range ticker.C { ab.adjust() } } func (ab *AdaptiveBulkhead) adjust() { ab.mu.Lock() defer ab.mu.Unlock() avgLatency := time.Duration(ab.latencies.Average()) * time.Millisecond avgErrorRate := ab.errorRate.Average() currentCapacity := ab.maxCapacity // Determine new capacity var newCapacity int if avgErrorRate > ab.maxErrorRate { // High error rate - reduce capacity newCapacity = int(float64(currentCapacity) * 0.8) } else if avgLatency > ab.targetLatency { // High latency - reduce capacity newCapacity = int(float64(currentCapacity) * 0.9) } else if avgLatency < ab.targetLatency/2 && avgErrorRate < ab.maxErrorRate/2 { // Good performance - increase capacity newCapacity = int(float64(currentCapacity) * 1.1) } else { // No change needed return } // Clamp to bounds if newCapacity < ab.minCapacity { newCapacity = ab.minCapacity } if newCapacity > ab.maxCapacity { newCapacity = ab.maxCapacity } if newCapacity != currentCapacity { // Create new bulkhead with updated capacity ab.Bulkhead = New(Config{ Name: ab.name, MaxCapacity: newCapacity, Timeout: ab.timeout, }) } }

Bulkhead with Priority Queuing

Prioritize important requests:
go
// PriorityBulkhead handles requests with different priorities type PriorityBulkhead struct { name string maxCapacity int highQueue chan *priorityWork normalQueue chan *priorityWork lowQueue chan *priorityWork semaphore chan struct{} mu sync.RWMutex active int processed map[Priority]int64 } type Priority int const ( PriorityLow Priority = iota PriorityNormal PriorityHigh ) type priorityWork struct { priority Priority fn func() error result chan error } // PriorityConfig configures the priority bulkhead type PriorityConfig struct { Name string MaxCapacity int HighQueueSize int NormalQueueSize int LowQueueSize int } // NewPriorityBulkhead creates a priority bulkhead func NewPriorityBulkhead(cfg PriorityConfig) *PriorityBulkhead { pb := &PriorityBulkhead{ name: cfg.Name, maxCapacity: cfg.MaxCapacity, highQueue: make(chan *priorityWork, cfg.HighQueueSize), normalQueue: make(chan *priorityWork, cfg.NormalQueueSize), lowQueue: make(chan *priorityWork, cfg.LowQueueSize), semaphore: make(chan struct{}, cfg.MaxCapacity), processed: make(map[Priority]int64), } // Start workers for i := 0; i < cfg.MaxCapacity; i++ { go pb.worker() } return pb } func (pb *PriorityBulkhead) worker() { for { var work *priorityWork // Priority selection: high > normal > low select { case work = <-pb.highQueue: default: select { case work = <-pb.highQueue: case work = <-pb.normalQueue: default: select { case work = <-pb.highQueue: case work = <-pb.normalQueue: case work = <-pb.lowQueue: } } } if work == nil { continue } // Acquire semaphore pb.semaphore <- struct{}{} pb.mu.Lock() pb.active++ pb.mu.Unlock() // Execute work err := work.fn() work.result <- err // Release semaphore <-pb.semaphore pb.mu.Lock() pb.active-- pb.processed[work.priority]++ pb.mu.Unlock() } } // Execute submits work with priority func (pb *PriorityBulkhead) Execute(ctx context.Context, priority Priority, fn func() error) error { work := &priorityWork{ priority: priority, fn: fn, result: make(chan error, 1), } // Select queue based on priority var queue chan *priorityWork switch priority { case PriorityHigh: queue = pb.highQueue case PriorityNormal: queue = pb.normalQueue case PriorityLow: queue = pb.lowQueue } // Try to submit select { case queue <- work: // Wait for result select { case err := <-work.result: return err case <-ctx.Done(): return ctx.Err() } default: return ErrBulkheadFull } }

Monitoring and Observability

Track bulkhead metrics:
go
// BulkheadMetrics provides comprehensive monitoring type BulkheadMetrics struct { bulkheads map[string]*Bulkhead mu sync.RWMutex } // NewBulkheadMetrics creates a metrics collector func NewBulkheadMetrics() *BulkheadMetrics { return &BulkheadMetrics{ bulkheads: make(map[string]*Bulkhead), } } // Register registers a bulkhead for monitoring func (bm *BulkheadMetrics) Register(b *Bulkhead) { bm.mu.Lock() defer bm.mu.Unlock() bm.bulkheads[b.name] = b } // Collect gathers all metrics func (bm *BulkheadMetrics) Collect() []BulkheadMetric { bm.mu.RLock() defer bm.mu.RUnlock() metrics := make([]BulkheadMetric, 0, len(bm.bulkheads)) for name, b := range bm.bulkheads { stats := b.Stats() metrics = append(metrics, BulkheadMetric{ Name: name, Capacity: stats.MaxCapacity, ActiveCalls: stats.Active, WaitingCalls: stats.Waiting, RejectedCalls: stats.Rejected, CompletedCalls: stats.Completed, Utilization: stats.Utilization, }) } return metrics } type BulkheadMetric struct { Name string Capacity int ActiveCalls int WaitingCalls int RejectedCalls int64 CompletedCalls int64 Utilization float64 } // PrometheusExporter exports metrics to Prometheus type PrometheusExporter struct { metrics *BulkheadMetrics activeGauge *prometheus.GaugeVec waitingGauge *prometheus.GaugeVec rejectedCounter *prometheus.CounterVec completedCounter *prometheus.CounterVec utilizationGauge *prometheus.GaugeVec } func NewPrometheusExporter(metrics *BulkheadMetrics) *PrometheusExporter { return &PrometheusExporter{ metrics: metrics, activeGauge: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "bulkhead_active_calls", Help: "Number of active calls in bulkhead", }, []string{"bulkhead"}, ), waitingGauge: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "bulkhead_waiting_calls", Help: "Number of waiting calls in bulkhead", }, []string{"bulkhead"}, ), rejectedCounter: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "bulkhead_rejected_calls_total", Help: "Total rejected calls by bulkhead", }, []string{"bulkhead"}, ), completedCounter: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "bulkhead_completed_calls_total", Help: "Total completed calls by bulkhead", }, []string{"bulkhead"}, ), utilizationGauge: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "bulkhead_utilization", Help: "Bulkhead utilization ratio", }, []string{"bulkhead"}, ), } } func (pe *PrometheusExporter) Update() { for _, m := range pe.metrics.Collect() { pe.activeGauge.WithLabelValues(m.Name).Set(float64(m.ActiveCalls)) pe.waitingGauge.WithLabelValues(m.Name).Set(float64(m.WaitingCalls)) pe.utilizationGauge.WithLabelValues(m.Name).Set(m.Utilization) } }

Complete Example: E-commerce Service

go
package main import ( "context" "database/sql" "fmt" "log" "net/http" "time" ) // EcommerceService demonstrates bulkhead usage type EcommerceService struct { // Database bulkheads criticalDB *sql.DB bulkDB *sql.DB // HTTP client bulkheads httpClients *HTTPClientBulkhead // Operation bulkheads operationBulkheads *ServiceBulkheads } func NewEcommerceService() (*EcommerceService, error) { svc := &EcommerceService{ httpClients: NewHTTPClientBulkhead(), operationBulkheads: NewServiceBulkheads(), } // Setup HTTP clients for external services svc.httpClients.RegisterClient("payment-gateway", HTTPClientConfig{ Timeout: 5 * time.Second, MaxConnsPerHost: 20, BulkheadCapacity: 20, BulkheadTimeout: 500 * time.Millisecond, }) svc.httpClients.RegisterClient("shipping-service", HTTPClientConfig{ Timeout: 10 * time.Second, MaxConnsPerHost: 30, BulkheadCapacity: 30, BulkheadTimeout: 1 * time.Second, }) svc.httpClients.RegisterClient("recommendation-engine", HTTPClientConfig{ Timeout: 2 * time.Second, MaxConnsPerHost: 50, BulkheadCapacity: 50, BulkheadTimeout: 200 * time.Millisecond, }) // Setup operation bulkheads svc.operationBulkheads.Register("checkout", Config{ MaxCapacity: 100, Timeout: 500 * time.Millisecond, }) svc.operationBulkheads.Register("inventory-update", Config{ MaxCapacity: 20, Timeout: 1 * time.Second, }) svc.operationBulkheads.Register("report-generation", Config{ MaxCapacity: 5, Timeout: 5 * time.Second, }) return svc, nil } // ProcessCheckout handles checkout with bulkhead protection func (svc *EcommerceService) ProcessCheckout(ctx context.Context, orderID string) error { return svc.operationBulkheads.Execute(ctx, "checkout", func() error { // Verify inventory if err := svc.verifyInventory(ctx, orderID); err != nil { return fmt.Errorf("inventory check failed: %w", err) } // Process payment if err := svc.processPayment(ctx, orderID); err != nil { return fmt.Errorf("payment failed: %w", err) } // Schedule shipping if err := svc.scheduleShipping(ctx, orderID); err != nil { return fmt.Errorf("shipping scheduling failed: %w", err) } return nil }) } func (svc *EcommerceService) verifyInventory(ctx context.Context, orderID string) error { return svc.operationBulkheads.Execute(ctx, "inventory-update", func() error { // Check inventory in database return nil }) } func (svc *EcommerceService) processPayment(ctx context.Context, orderID string) error { req, _ := http.NewRequest("POST", "https://payment.example.com/charge", nil) resp, err := svc.httpClients.Do(ctx, "payment-gateway", req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("payment failed with status: %d", resp.StatusCode) } return nil } func (svc *EcommerceService) scheduleShipping(ctx context.Context, orderID string) error { req, _ := http.NewRequest("POST", "https://shipping.example.com/schedule", nil) resp, err := svc.httpClients.Do(ctx, "shipping-service", req) if err != nil { return err } defer resp.Body.Close() return nil } // GenerateReport handles report generation with limited concurrency func (svc *EcommerceService) GenerateReport(ctx context.Context, reportType string) error { return svc.operationBulkheads.Execute(ctx, "report-generation", func() error { // Long-running report generation // Limited to 5 concurrent reports to prevent resource exhaustion log.Printf("Generating report: %s", reportType) time.Sleep(30 * time.Second) // Simulate long operation return nil }) } func main() { svc, err := NewEcommerceService() if err != nil { log.Fatal(err) } // Simulate concurrent checkouts for i := 0; i < 200; i++ { go func(orderID int) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := svc.ProcessCheckout(ctx, fmt.Sprintf("order-%d", orderID)) if err != nil { log.Printf("Checkout failed for order-%d: %v", orderID, err) } else { log.Printf("Checkout succeeded for order-%d", orderID) } }(i) } // Wait for completion time.Sleep(30 * time.Second) // Print stats for name, stats := range svc.operationBulkheads.AllStats() { fmt.Printf("Bulkhead %s: completed=%d, rejected=%d, utilization=%.2f\n", name, stats.Completed, stats.Rejected, stats.Utilization) } }

Best Practices

  1. Size bulkheads appropriately - Too small causes unnecessary rejections; too large defeats the purpose
  2. Monitor utilization - Track rejection rates and adjust capacity
  3. Combine with circuit breakers - Bulkheads prevent resource exhaustion; circuit breakers handle downstream failures
  4. Use separate pools for critical paths - Protect essential operations from non-critical ones
  5. Set appropriate timeouts - Don't let requests wait forever for a slot

What's Next?

In Part 26, we'll explore Rate Limiting - controlling the rate of requests to protect systems from overload.

"Bulkheads don't just prevent failures - they contain them. Design your systems so that problems in one area can't sink the entire ship."
All Blogs
Tags:bulkhead-patternresilienceisolationfault-tolerance