Pub/Sub Pattern in Go: Building Event Driven Systems

You have a user registration event. Multiple systems need to react: send a welcome email, create analytics entry, notify the sales team, provision cloud resources. How do you connect them without creating a tangled mess of dependencies?
The Pub/Sub (Publish-Subscribe) pattern decouples event producers from event consumers. Publishers don't know who's listening. Subscribers don't know who's publishing. They communicate through a shared topic. This creates flexible, maintainable systems that can evolve independently.

The Real World Parallel: A Radio Station

Think of it like this: A radio station broadcasts music on a frequency. The station doesn't know who's listening. Listeners tune in to the frequency they want. When you buy a new radio, you can immediately listen without the station knowing. When the station plays a new song, all tuned-in radios receive it simultaneously. This is Pub/Sub.
Concurrency pattern diagram 1

Concurrency pattern diagram 1

Why Pub/Sub?

Consider a monolithic approach vs Pub/Sub:
go
// Monolithic approach - tight coupling func RegisterUser(user User) error { // Save user if err := db.Save(user); err != nil { return err } // Direct calls to every dependent service emailService.SendWelcome(user) // What if email is down? analyticsService.Track("signup", user) // What if analytics is slow? salesService.NotifyNewLead(user) // What if we add more services? provisionService.CreateResources(user) // This grows forever... return nil } // Pub/Sub approach - loose coupling func RegisterUser(user User) error { // Save user if err := db.Save(user); err != nil { return err } // Publish event - don't care who's listening eventBus.Publish("user.created", UserCreatedEvent{User: user}) return nil } // Subscribers handle the rest independently // Easy to add/remove without changing publisher
Benefits:
  • Loose coupling: Publisher doesn't know subscribers
  • Scalability: Add subscribers without changing publishers
  • Resilience: Subscriber failures don't affect publishers
  • Flexibility: Easy to add, remove, or modify subscribers

Basic Pub/Sub Implementation

go
// Filename: pubsub_basic.go package main import ( "fmt" "sync" "time" ) // Event represents a published event type Event struct { Topic string Data interface{} Timestamp time.Time } // Subscriber is a function that handles events type Subscriber func(Event) // PubSub is a simple publish-subscribe hub type PubSub struct { mu sync.RWMutex subscribers map[string][]chan Event closed bool } // NewPubSub creates a new pub/sub hub func NewPubSub() *PubSub { return &PubSub{ subscribers: make(map[string][]chan Event), } } // Subscribe registers a subscriber for a topic // Returns a channel that receives events func (ps *PubSub) Subscribe(topic string, bufferSize int) <-chan Event { ps.mu.Lock() defer ps.mu.Unlock() ch := make(chan Event, bufferSize) ps.subscribers[topic] = append(ps.subscribers[topic], ch) return ch } // Publish sends an event to all subscribers of the topic func (ps *PubSub) Publish(topic string, data interface{}) { ps.mu.RLock() defer ps.mu.RUnlock() if ps.closed { return } event := Event{ Topic: topic, Data: data, Timestamp: time.Now(), } // Send to all subscribers (non-blocking) for _, ch := range ps.subscribers[topic] { select { case ch <- event: default: // Subscriber's buffer is full, skip fmt.Printf("Warning: subscriber buffer full for topic %s\n", topic) } } } // Close shuts down the pub/sub hub func (ps *PubSub) Close() { ps.mu.Lock() defer ps.mu.Unlock() ps.closed = true for _, subs := range ps.subscribers { for _, ch := range subs { close(ch) } } } func main() { pubsub := NewPubSub() var wg sync.WaitGroup // Subscriber 1: Email Service emailCh := pubsub.Subscribe("user.created", 10) wg.Add(1) go func() { defer wg.Done() for event := range emailCh { user := event.Data.(string) fmt.Printf("šŸ“§ Email Service: Sending welcome email to %s\n", user) time.Sleep(50 * time.Millisecond) } }() // Subscriber 2: Analytics Service analyticsCh := pubsub.Subscribe("user.created", 10) wg.Add(1) go func() { defer wg.Done() for event := range analyticsCh { user := event.Data.(string) fmt.Printf("šŸ“Š Analytics: Recording signup for %s\n", user) } }() // Subscriber 3: Notification Service notifyCh := pubsub.Subscribe("user.created", 10) wg.Add(1) go func() { defer wg.Done() for event := range notifyCh { user := event.Data.(string) fmt.Printf("šŸ”” Notifications: Alerting team about new user %s\n", user) } }() // Publisher: User Service fmt.Println("Publishing user registration events...\n") users := []string{"alice@example.com", "bob@example.com", "charlie@example.com"} for _, user := range users { fmt.Printf("āž”ļø Publishing: user.created for %s\n", user) pubsub.Publish("user.created", user) time.Sleep(100 * time.Millisecond) } // Wait a bit for subscribers to process time.Sleep(500 * time.Millisecond) // Close and wait pubsub.Close() wg.Wait() fmt.Println("\nAll events processed!") }
Expected Output:
Publishing user registration events... āž”ļø Publishing: user.created for alice@example.com šŸ“§ Email Service: Sending welcome email to alice@example.com šŸ“Š Analytics: Recording signup for alice@example.com šŸ”” Notifications: Alerting team about new user alice@example.com āž”ļø Publishing: user.created for bob@example.com šŸ“§ Email Service: Sending welcome email to bob@example.com šŸ“Š Analytics: Recording signup for bob@example.com šŸ”” Notifications: Alerting team about new user bob@example.com āž”ļø Publishing: user.created for charlie@example.com šŸ“§ Email Service: Sending welcome email to charlie@example.com šŸ“Š Analytics: Recording signup for charlie@example.com šŸ”” Notifications: Alerting team about new user charlie@example.com All events processed!

Real World Example 1: E-Commerce Order System

go
// Filename: ecommerce_pubsub.go package main import ( "context" "encoding/json" "fmt" "math/rand" "sync" "time" ) // Domain Events type OrderCreatedEvent struct { OrderID string CustomerID string Items []OrderItem Total float64 CreatedAt time.Time } type OrderItem struct { ProductID string Quantity int Price float64 } type PaymentCompletedEvent struct { OrderID string PaymentID string Amount float64 Method string TransactionID string CompletedAt time.Time } type OrderShippedEvent struct { OrderID string TrackingNumber string Carrier string EstimatedDelivery time.Time ShippedAt time.Time } // EventBus is a production-ready pub/sub implementation type EventBus struct { mu sync.RWMutex subscribers map[string][]Subscription closed bool } type Subscription struct { ID string Channel chan []byte Handler string } func NewEventBus() *EventBus { return &EventBus{ subscribers: make(map[string][]Subscription), } } // Subscribe with a named handler for debugging func (eb *EventBus) Subscribe(topic, handlerName string, bufferSize int) (<-chan []byte, string) { eb.mu.Lock() defer eb.mu.Unlock() subID := fmt.Sprintf("%s-%d", handlerName, time.Now().UnixNano()) ch := make(chan []byte, bufferSize) sub := Subscription{ ID: subID, Channel: ch, Handler: handlerName, } eb.subscribers[topic] = append(eb.subscribers[topic], sub) return ch, subID } // Unsubscribe removes a subscription func (eb *EventBus) Unsubscribe(topic, subID string) { eb.mu.Lock() defer eb.mu.Unlock() subs := eb.subscribers[topic] for i, sub := range subs { if sub.ID == subID { close(sub.Channel) eb.subscribers[topic] = append(subs[:i], subs[i+1:]...) break } } } // Publish serializes and sends an event func (eb *EventBus) Publish(topic string, event interface{}) error { eb.mu.RLock() defer eb.mu.RUnlock() if eb.closed { return fmt.Errorf("event bus is closed") } data, err := json.Marshal(event) if err != nil { return fmt.Errorf("failed to marshal event: %w", err) } delivered := 0 for _, sub := range eb.subscribers[topic] { select { case sub.Channel <- data: delivered++ default: fmt.Printf("āš ļø Buffer full for %s on topic %s\n", sub.Handler, topic) } } fmt.Printf("šŸ“¤ Published to %s: delivered to %d subscribers\n", topic, delivered) return nil } // Close shuts down the event bus func (eb *EventBus) Close() { eb.mu.Lock() defer eb.mu.Unlock() eb.closed = true for _, subs := range eb.subscribers { for _, sub := range subs { close(sub.Channel) } } } // Services type EmailService struct { eventBus *EventBus } func (s *EmailService) Start(ctx context.Context) { // Subscribe to multiple topics orderCreatedCh, _ := s.eventBus.Subscribe("order.created", "email-service", 100) paymentCompletedCh, _ := s.eventBus.Subscribe("payment.completed", "email-service", 100) orderShippedCh, _ := s.eventBus.Subscribe("order.shipped", "email-service", 100) go func() { for { select { case <-ctx.Done(): return case data, ok := <-orderCreatedCh: if !ok { return } var event OrderCreatedEvent json.Unmarshal(data, &event) fmt.Printf("šŸ“§ Email: Sending order confirmation for #%s\n", event.OrderID) case data, ok := <-paymentCompletedCh: if !ok { return } var event PaymentCompletedEvent json.Unmarshal(data, &event) fmt.Printf("šŸ“§ Email: Sending payment receipt for order #%s\n", event.OrderID) case data, ok := <-orderShippedCh: if !ok { return } var event OrderShippedEvent json.Unmarshal(data, &event) fmt.Printf("šŸ“§ Email: Sending shipping notification for #%s (tracking: %s)\n", event.OrderID, event.TrackingNumber) } } }() } type InventoryService struct { eventBus *EventBus } func (s *InventoryService) Start(ctx context.Context) { orderCreatedCh, _ := s.eventBus.Subscribe("order.created", "inventory-service", 100) go func() { for { select { case <-ctx.Done(): return case data, ok := <-orderCreatedCh: if !ok { return } var event OrderCreatedEvent json.Unmarshal(data, &event) for _, item := range event.Items { fmt.Printf("šŸ“¦ Inventory: Reserving %d units of product %s\n", item.Quantity, item.ProductID) } } } }() } type AnalyticsService struct { eventBus *EventBus } func (s *AnalyticsService) Start(ctx context.Context) { orderCreatedCh, _ := s.eventBus.Subscribe("order.created", "analytics-service", 100) paymentCompletedCh, _ := s.eventBus.Subscribe("payment.completed", "analytics-service", 100) go func() { for { select { case <-ctx.Done(): return case data, ok := <-orderCreatedCh: if !ok { return } var event OrderCreatedEvent json.Unmarshal(data, &event) fmt.Printf("šŸ“Š Analytics: Recording order #%s (total: $%.2f)\n", event.OrderID, event.Total) case data, ok := <-paymentCompletedCh: if !ok { return } var event PaymentCompletedEvent json.Unmarshal(data, &event) fmt.Printf("šŸ“Š Analytics: Recording payment via %s ($%.2f)\n", event.Method, event.Amount) } } }() } type NotificationService struct { eventBus *EventBus } func (s *NotificationService) Start(ctx context.Context) { orderShippedCh, _ := s.eventBus.Subscribe("order.shipped", "notification-service", 100) go func() { for { select { case <-ctx.Done(): return case data, ok := <-orderShippedCh: if !ok { return } var event OrderShippedEvent json.Unmarshal(data, &event) fmt.Printf("šŸ“± Push Notification: Your order #%s has been shipped!\n", event.OrderID) } } }() } func main() { rand.Seed(time.Now().UnixNano()) eventBus := NewEventBus() ctx, cancel := context.WithCancel(context.Background()) // Start services emailService := &EmailService{eventBus: eventBus} emailService.Start(ctx) inventoryService := &InventoryService{eventBus: eventBus} inventoryService.Start(ctx) analyticsService := &AnalyticsService{eventBus: eventBus} analyticsService.Start(ctx) notificationService := &NotificationService{eventBus: eventBus} notificationService.Start(ctx) // Let services initialize time.Sleep(100 * time.Millisecond) fmt.Println("šŸ›’ E-Commerce Order Flow Simulation") fmt.Println("====================================\n") // Simulate order flow orderID := fmt.Sprintf("ORD-%06d", rand.Intn(1000000)) // Step 1: Order Created fmt.Println("Step 1: Customer places order") eventBus.Publish("order.created", OrderCreatedEvent{ OrderID: orderID, CustomerID: "CUST-123", Items: []OrderItem{ {ProductID: "PROD-A", Quantity: 2, Price: 29.99}, {ProductID: "PROD-B", Quantity: 1, Price: 49.99}, }, Total: 109.97, CreatedAt: time.Now(), }) time.Sleep(200 * time.Millisecond) // Step 2: Payment Completed fmt.Println("\nStep 2: Customer completes payment") eventBus.Publish("payment.completed", PaymentCompletedEvent{ OrderID: orderID, PaymentID: "PAY-789", Amount: 109.97, Method: "credit_card", TransactionID: "TXN-456789", CompletedAt: time.Now(), }) time.Sleep(200 * time.Millisecond) // Step 3: Order Shipped fmt.Println("\nStep 3: Warehouse ships order") eventBus.Publish("order.shipped", OrderShippedEvent{ OrderID: orderID, TrackingNumber: "1Z999AA10123456784", Carrier: "UPS", EstimatedDelivery: time.Now().Add(72 * time.Hour), ShippedAt: time.Now(), }) time.Sleep(300 * time.Millisecond) // Cleanup cancel() eventBus.Close() fmt.Println("\nāœ… Order flow completed!") }
Expected Output:
šŸ›’ E-Commerce Order Flow Simulation ==================================== Step 1: Customer places order šŸ“¤ Published to order.created: delivered to 3 subscribers šŸ“§ Email: Sending order confirmation for #ORD-123456 šŸ“¦ Inventory: Reserving 2 units of product PROD-A šŸ“¦ Inventory: Reserving 1 units of product PROD-B šŸ“Š Analytics: Recording order #ORD-123456 (total: $109.97) Step 2: Customer completes payment šŸ“¤ Published to payment.completed: delivered to 2 subscribers šŸ“§ Email: Sending payment receipt for order #ORD-123456 šŸ“Š Analytics: Recording payment via credit_card ($109.97) Step 3: Warehouse ships order šŸ“¤ Published to order.shipped: delivered to 2 subscribers šŸ“§ Email: Sending shipping notification for #ORD-123456 (tracking: 1Z999AA10123456784) šŸ“± Push Notification: Your order #ORD-123456 has been shipped! āœ… Order flow completed!

Real World Example 2: Real-Time Dashboard Updates

go
// Filename: realtime_dashboard.go package main import ( "context" "fmt" "math/rand" "sync" "time" ) // MetricEvent represents a system metric type MetricEvent struct { Name string Value float64 Tags map[string]string Timestamp time.Time } // DashboardHub manages real-time metric subscriptions type DashboardHub struct { mu sync.RWMutex clients map[string]chan MetricEvent broadcast chan MetricEvent register chan *DashboardClient unregister chan *DashboardClient } type DashboardClient struct { ID string Metrics chan MetricEvent Filters []string // Metric names to receive } func NewDashboardHub() *DashboardHub { return &DashboardHub{ clients: make(map[string]chan MetricEvent), broadcast: make(chan MetricEvent, 1000), register: make(chan *DashboardClient), unregister: make(chan *DashboardClient), } } // Run starts the hub's main loop func (h *DashboardHub) Run(ctx context.Context) { for { select { case <-ctx.Done(): return case client := <-h.register: h.mu.Lock() h.clients[client.ID] = client.Metrics h.mu.Unlock() fmt.Printf("šŸ“ŗ Dashboard client connected: %s\n", client.ID) case client := <-h.unregister: h.mu.Lock() if ch, ok := h.clients[client.ID]; ok { close(ch) delete(h.clients, client.ID) } h.mu.Unlock() fmt.Printf("šŸ“ŗ Dashboard client disconnected: %s\n", client.ID) case metric := <-h.broadcast: h.mu.RLock() for _, ch := range h.clients { select { case ch <- metric: default: // Client buffer full, skip } } h.mu.RUnlock() } } } // Publish sends a metric to all connected clients func (h *DashboardHub) Publish(metric MetricEvent) { select { case h.broadcast <- metric: default: // Broadcast buffer full } } // Subscribe creates a new dashboard client func (h *DashboardHub) Subscribe(clientID string, bufferSize int) *DashboardClient { client := &DashboardClient{ ID: clientID, Metrics: make(chan MetricEvent, bufferSize), } h.register <- client return client } // Unsubscribe removes a dashboard client func (h *DashboardHub) Unsubscribe(client *DashboardClient) { h.unregister <- client } // MetricProducer simulates system metrics func MetricProducer(ctx context.Context, hub *DashboardHub) { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() metrics := []string{"cpu_usage", "memory_usage", "disk_io", "network_in", "network_out", "requests_per_sec"} for { select { case <-ctx.Done(): return case <-ticker.C: // Produce a random metric metricName := metrics[rand.Intn(len(metrics))] var value float64 switch metricName { case "cpu_usage", "memory_usage": value = rand.Float64() * 100 case "disk_io", "network_in", "network_out": value = rand.Float64() * 1000 case "requests_per_sec": value = float64(rand.Intn(10000)) } hub.Publish(MetricEvent{ Name: metricName, Value: value, Tags: map[string]string{"host": "server-1"}, Timestamp: time.Now(), }) } } } // DashboardRenderer simulates a dashboard rendering metrics func DashboardRenderer(ctx context.Context, name string, client *DashboardClient) { counts := make(map[string]int) for { select { case <-ctx.Done(): fmt.Printf("\n%s final counts: %v\n", name, counts) return case metric, ok := <-client.Metrics: if !ok { return } counts[metric.Name]++ fmt.Printf("šŸ“Š %s received: %s = %.2f\n", name, metric.Name, metric.Value) } } } func main() { rand.Seed(time.Now().UnixNano()) hub := NewDashboardHub() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() // Start hub go hub.Run(ctx) // Start metric producer go MetricProducer(ctx, hub) // Create dashboard clients fmt.Println("šŸ–„ļø Real-Time Dashboard Demo") fmt.Println("============================\n") client1 := hub.Subscribe("admin-dashboard", 100) go DashboardRenderer(ctx, "Admin Dashboard", client1) time.Sleep(500 * time.Millisecond) client2 := hub.Subscribe("ops-dashboard", 100) go DashboardRenderer(ctx, "Ops Dashboard", client2) // Wait for timeout <-ctx.Done() hub.Unsubscribe(client1) hub.Unsubscribe(client2) time.Sleep(100 * time.Millisecond) fmt.Println("\nāœ… Demo completed!") }

Pub/Sub with Topic Patterns

Support wildcard subscriptions:
go
// Filename: pattern_pubsub.go package main import ( "fmt" "path" "sync" "time" ) // PatternPubSub supports wildcard topic subscriptions type PatternPubSub struct { mu sync.RWMutex subscribers map[string][]chan Event } type Event struct { Topic string Data interface{} } func NewPatternPubSub() *PatternPubSub { return &PatternPubSub{ subscribers: make(map[string][]chan Event), } } // Subscribe with pattern support // Patterns: "user.*" matches "user.created", "user.deleted" // "*.error" matches "api.error", "db.error" // "*" matches everything func (ps *PatternPubSub) Subscribe(pattern string, bufferSize int) <-chan Event { ps.mu.Lock() defer ps.mu.Unlock() ch := make(chan Event, bufferSize) ps.subscribers[pattern] = append(ps.subscribers[pattern], ch) return ch } // Publish sends event to all matching subscribers func (ps *PatternPubSub) Publish(topic string, data interface{}) { ps.mu.RLock() defer ps.mu.RUnlock() event := Event{Topic: topic, Data: data} for pattern, subs := range ps.subscribers { if matchPattern(pattern, topic) { for _, ch := range subs { select { case ch <- event: default: } } } } } // matchPattern checks if topic matches pattern func matchPattern(pattern, topic string) bool { matched, _ := path.Match(pattern, topic) return matched } // Close shuts down all subscriptions func (ps *PatternPubSub) Close() { ps.mu.Lock() defer ps.mu.Unlock() for _, subs := range ps.subscribers { for _, ch := range subs { close(ch) } } } func main() { pubsub := NewPatternPubSub() var wg sync.WaitGroup // Subscribe to all user events userEvents := pubsub.Subscribe("user.*", 10) wg.Add(1) go func() { defer wg.Done() for event := range userEvents { fmt.Printf("šŸ‘¤ User handler received: %s -> %v\n", event.Topic, event.Data) } }() // Subscribe to all error events errorEvents := pubsub.Subscribe("*.error", 10) wg.Add(1) go func() { defer wg.Done() for event := range errorEvents { fmt.Printf("āŒ Error handler received: %s -> %v\n", event.Topic, event.Data) } }() // Subscribe to everything allEvents := pubsub.Subscribe("*", 10) wg.Add(1) go func() { defer wg.Done() for event := range allEvents { fmt.Printf("šŸ“ Logger received: %s -> %v\n", event.Topic, event.Data) } }() // Publish events fmt.Println("Publishing events...\n") pubsub.Publish("user.created", map[string]string{"id": "123", "name": "Alice"}) pubsub.Publish("user.updated", map[string]string{"id": "123", "name": "Alice Smith"}) pubsub.Publish("api.error", "Connection timeout") pubsub.Publish("db.error", "Query failed") pubsub.Publish("system.startup", "Server started") time.Sleep(100 * time.Millisecond) pubsub.Close() wg.Wait() fmt.Println("\nDone!") }

When to Use Pub/Sub

Perfect Use Cases

ScenarioWhy Pub/Sub Fits
Event-driven architectureDecouple services
NotificationsBroadcast to many recipients
Real-time updatesPush changes to clients
Logging/AuditingMultiple log destinations
MicroservicesService communication
Cache invalidationNotify caches of changes

Pub/Sub vs Other Patterns

PatternUse Case
Pub/SubOne-to-many, fire-and-forget
Request-ResponseNeed immediate reply
QueueWork distribution, exactly-once
PipelineSequential processing

Summary

Pub/Sub creates flexible, decoupled systems where:
  • Publishers don't know subscribers
  • Subscribers don't know publishers
  • Adding/removing components is easy
  • Systems evolve independently
Key Takeaways:
  1. Decouple with events: Don't call services directly
  2. Buffer appropriately: Prevent slow subscribers from blocking
  3. Handle failures: Subscribers may be slow or unavailable
  4. Use patterns: Wildcard subscriptions for flexibility
  5. Monitor: Track delivery rates and lag

Next Steps

  • Practice: Build a notification system with Pub/Sub
  • Explore: Message brokers like NATS, Kafka, RabbitMQ
  • Read: Event-driven architecture patterns
  • Build: Add persistence for event replay
All Blogs
Tags:golangconcurrencypub-subeventsmessagingchannels