Module 18: Message Queues

What Are Message Queues?

Message queues enable asynchronous communication between services by storing messages until they can be processed.
┌─────────────────────────────────────────────────────────────────┐ │ MESSAGE QUEUE OVERVIEW │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Synchronous (Direct): │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Service A ──── Request ────► Service B │ │ │ │ ◄──── Response ──── │ │ │ │ │ │ │ │ • A waits for B to respond │ │ │ │ • If B is slow/down, A is affected │ │ │ │ • Tight coupling │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ Asynchronous (Queue): │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Service A ──► Queue ──► Service B │ │ │ │ │ │ │ │ │ │ (done) (stores) (processes later) │ │ │ │ │ │ │ │ • A continues immediately after sending │ │ │ │ • B processes when ready │ │ │ │ • Loose coupling │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ Use Cases: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Background job processing │ │ │ │ • Event-driven architecture │ │ │ │ • Load leveling (handle traffic spikes) │ │ │ │ • Decoupling services │ │ │ │ • Reliable delivery (at-least-once) │ │ │ │ • Fan-out to multiple consumers │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Message Queue Patterns

┌─────────────────────────────────────────────────────────────────┐ │ MESSAGING PATTERNS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. POINT-TO-POINT (Queue) │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Producer ──► Queue ──► Consumer │ │ │ │ │ │ │ │ • One message, one consumer │ │ │ │ • Message deleted after processing │ │ │ │ • Load balancing across consumers │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 2. PUBLISH-SUBSCRIBE (Topic) │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Publisher ──► Topic ──► Subscriber A │ │ │ │ ├──► Subscriber B │ │ │ │ └──► Subscriber C │ │ │ │ │ │ │ │ • One message, many consumers │ │ │ │ • Each subscriber gets a copy │ │ │ │ • Fan-out pattern │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 3. REQUEST-REPLY │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Requester ──► Request Queue ──► Processor │ │ │ │ ▲ │ │ │ │ │ └──── Reply Queue ◄────────────┘ │ │ │ │ │ │ │ │ • Async request with correlation ID │ │ │ │ • Response routed back │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 4. DEAD LETTER QUEUE (DLQ) │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Queue ──► Consumer (fails) ──► Dead Letter Queue │ │ │ │ │ │ │ │ │ │ └── (retries) └── (inspect) │ │ │ │ │ │ │ │ • Failed messages sent to DLQ │ │ │ │ • Prevents poison messages blocking queue │ │ │ │ • Manual inspection and replay │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Delivery Guarantees

┌─────────────────────────────────────────────────────────────────┐ │ DELIVERY GUARANTEES │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ AT-MOST-ONCE: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Message delivered 0 or 1 time │ │ │ │ • Fire and forget │ │ │ │ • Possible message loss │ │ │ │ │ │ │ │ Use: Metrics, logs, non-critical events │ │ │ │ │ │ │ │ Producer ──► Queue ──► Consumer │ │ │ │ │ X (lost) │ │ │ │ (done) │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ AT-LEAST-ONCE: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Message delivered 1 or more times │ │ │ │ • Retry on failure │ │ │ │ • Possible duplicates │ │ │ │ │ │ │ │ Use: Most applications (with idempotency) │ │ │ │ │ │ │ │ Producer ──► Queue ──► Consumer (ack timeout) │ │ │ │ ├──► Consumer (retry) │ │ │ │ └──► Consumer (success, ack) │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ EXACTLY-ONCE: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Message delivered exactly 1 time │ │ │ │ • Requires special infrastructure │ │ │ │ • Higher overhead │ │ │ │ │ │ │ │ Implementation: │ │ │ │ • Deduplication with message IDs │ │ │ │ • Transactional outbox pattern │ │ │ │ • Kafka exactly-once semantics (EOS) │ │ │ │ │ │ │ │ Tip: Usually implemented as "at-least-once + │ │ │ │ idempotent consumer" │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Queue Architecture

┌─────────────────────────────────────────────────────────────────┐ │ QUEUE COMPONENTS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ Message Queue │ │ │ │ ┌────────────────────────────────────────────────┐ │ │ │ │ │ Broker │ │ │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ │ │ │ │ Queue A │ │ Queue B │ │ Queue C │ │ │ │ │ │ │ │┌───────┐│ │┌───────┐│ │┌───────┐│ │ │ │ │ │ │ ││ msg 1 ││ ││ msg 1 ││ ││ msg 1 ││ │ │ │ │ │ │ ││ msg 2 ││ │└───────┘│ ││ msg 2 ││ │ │ │ │ │ │ ││ msg 3 ││ │ │ ││ msg 3 ││ │ │ │ │ │ │ │└───────┘│ │ │ ││ msg 4 ││ │ │ │ │ │ │ └─────────┘ └─────────┘ │└───────┘│ │ │ │ │ │ │ └─────────┘ │ │ │ │ │ └────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ Producer ──────────► Broker ──────────► Consumer │ │ │ │ │ │ │ │ │ │ │ 1. Connect │ │ │ │ │ │ 2. Send message │ │ │ │ │ │ 3. Get acknowledgment │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ 1. Connect │ │ │ │ │ 2. Fetch msg │ │ │ │ 3. Process │ │ │ │ 4. Acknowledge │ │ │ └──────────────────────────────────────────────────────┘ │ │ │ │ Key Concepts: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ Broker: Central server managing queues and messages │ │ │ │ Queue: FIFO container for messages │ │ │ │ Message: Data unit with payload and metadata │ │ │ │ Producer: Sends messages to queue │ │ │ │ Consumer: Receives and processes messages │ │ │ │ Acknowledgment: Consumer confirms processing │ │ │ │ Visibility Timeout: Message hidden during processing │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Basic Queue Implementation

go
package queue import ( "context" "encoding/json" "fmt" "sync" "time" ) // Message represents a queue message type Message struct { ID string `json:"id"` Body []byte `json:"body"` Attributes map[string]string `json:"attributes"` Timestamp time.Time `json:"timestamp"` DeliveryCount int `json:"delivery_count"` ReceiptHandle string `json:"receipt_handle"` } // Queue interface type Queue interface { Send(ctx context.Context, msg *Message) error Receive(ctx context.Context, maxMessages int) ([]*Message, error) Delete(ctx context.Context, receiptHandle string) error ChangeVisibility(ctx context.Context, receiptHandle string, timeout time.Duration) error } // InMemoryQueue implements a simple in-memory queue type InMemoryQueue struct { mu sync.Mutex messages []*Message inFlight map[string]*inflightMessage visibilityTimeout time.Duration } type inflightMessage struct { msg *Message visibleAt time.Time } func NewInMemoryQueue(visibilityTimeout time.Duration) *InMemoryQueue { q := &InMemoryQueue{ messages: make([]*Message, 0), inFlight: make(map[string]*inflightMessage), visibilityTimeout: visibilityTimeout, } go q.returnExpiredMessages() return q } // Send adds a message to the queue func (q *InMemoryQueue) Send(ctx context.Context, msg *Message) error { q.mu.Lock() defer q.mu.Unlock() if msg.ID == "" { msg.ID = generateID() } msg.Timestamp = time.Now() msg.DeliveryCount = 0 q.messages = append(q.messages, msg) return nil } // Receive retrieves messages from the queue func (q *InMemoryQueue) Receive(ctx context.Context, maxMessages int) ([]*Message, error) { q.mu.Lock() defer q.mu.Unlock() var result []*Message remaining := make([]*Message, 0) for _, msg := range q.messages { if len(result) >= maxMessages { remaining = append(remaining, msg) continue } // Create receipt handle msg.ReceiptHandle = fmt.Sprintf("%s:%d", msg.ID, time.Now().UnixNano()) msg.DeliveryCount++ // Move to in-flight q.inFlight[msg.ReceiptHandle] = &inflightMessage{ msg: msg, visibleAt: time.Now().Add(q.visibilityTimeout), } result = append(result, msg) } q.messages = remaining return result, nil } // Delete removes a processed message func (q *InMemoryQueue) Delete(ctx context.Context, receiptHandle string) error { q.mu.Lock() defer q.mu.Unlock() if _, ok := q.inFlight[receiptHandle]; !ok { return fmt.Errorf("message not found") } delete(q.inFlight, receiptHandle) return nil } // ChangeVisibility extends or shortens visibility timeout func (q *InMemoryQueue) ChangeVisibility(ctx context.Context, receiptHandle string, timeout time.Duration) error { q.mu.Lock() defer q.mu.Unlock() inFlight, ok := q.inFlight[receiptHandle] if !ok { return fmt.Errorf("message not found") } inFlight.visibleAt = time.Now().Add(timeout) return nil } // returnExpiredMessages returns timed-out messages to the queue func (q *InMemoryQueue) returnExpiredMessages() { ticker := time.NewTicker(time.Second) for range ticker.C { q.mu.Lock() now := time.Now() for handle, inFlight := range q.inFlight { if now.After(inFlight.visibleAt) { // Return to queue q.messages = append(q.messages, inFlight.msg) delete(q.inFlight, handle) } } q.mu.Unlock() } } func generateID() string { return fmt.Sprintf("%d", time.Now().UnixNano()) }

Producer and Consumer Patterns

go
package queue import ( "context" "encoding/json" "fmt" "log" "sync" "time" ) // Producer sends messages to a queue type Producer struct { queue Queue } func NewProducer(queue Queue) *Producer { return &Producer{queue: queue} } // SendJSON sends a JSON-encoded message func (p *Producer) SendJSON(ctx context.Context, data interface{}) error { body, err := json.Marshal(data) if err != nil { return err } msg := &Message{ Body: body, Attributes: map[string]string{ "content-type": "application/json", }, } return p.queue.Send(ctx, msg) } // SendWithDelay sends a message with delayed visibility func (p *Producer) SendWithDelay(ctx context.Context, data interface{}, delay time.Duration) error { body, err := json.Marshal(data) if err != nil { return err } msg := &Message{ Body: body, Attributes: map[string]string{ "delay-seconds": fmt.Sprintf("%d", int(delay.Seconds())), }, } return p.queue.Send(ctx, msg) } // Consumer processes messages from a queue type Consumer struct { queue Queue handler MessageHandler maxMessages int pollInterval time.Duration workers int stopCh chan struct{} wg sync.WaitGroup } type MessageHandler func(ctx context.Context, msg *Message) error type ConsumerConfig struct { MaxMessages int PollInterval time.Duration Workers int } func NewConsumer(queue Queue, handler MessageHandler, config ConsumerConfig) *Consumer { if config.MaxMessages == 0 { config.MaxMessages = 10 } if config.PollInterval == 0 { config.PollInterval = time.Second } if config.Workers == 0 { config.Workers = 1 } return &Consumer{ queue: queue, handler: handler, maxMessages: config.MaxMessages, pollInterval: config.PollInterval, workers: config.Workers, stopCh: make(chan struct{}), } } // Start begins consuming messages func (c *Consumer) Start(ctx context.Context) { // Start worker goroutines msgCh := make(chan *Message, c.workers*c.maxMessages) for i := 0; i < c.workers; i++ { c.wg.Add(1) go c.worker(ctx, msgCh) } // Poll for messages c.wg.Add(1) go c.poller(ctx, msgCh) } func (c *Consumer) poller(ctx context.Context, msgCh chan<- *Message) { defer c.wg.Done() ticker := time.NewTicker(c.pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-c.stopCh: return case <-ticker.C: messages, err := c.queue.Receive(ctx, c.maxMessages) if err != nil { log.Printf("Error receiving messages: %v", err) continue } for _, msg := range messages { select { case msgCh <- msg: case <-ctx.Done(): return case <-c.stopCh: return } } } } } func (c *Consumer) worker(ctx context.Context, msgCh <-chan *Message) { defer c.wg.Done() for { select { case <-ctx.Done(): return case <-c.stopCh: return case msg := <-msgCh: if err := c.processMessage(ctx, msg); err != nil { log.Printf("Error processing message %s: %v", msg.ID, err) // Message will become visible again after timeout } } } } func (c *Consumer) processMessage(ctx context.Context, msg *Message) error { // Process the message if err := c.handler(ctx, msg); err != nil { return err } // Delete on success return c.queue.Delete(ctx, msg.ReceiptHandle) } // Stop gracefully shuts down the consumer func (c *Consumer) Stop() { close(c.stopCh) c.wg.Wait() } // Batch processing consumer type BatchConsumer struct { queue Queue handler BatchHandler batchSize int batchTimeout time.Duration stopCh chan struct{} wg sync.WaitGroup } type BatchHandler func(ctx context.Context, msgs []*Message) error func NewBatchConsumer( queue Queue, handler BatchHandler, batchSize int, batchTimeout time.Duration, ) *BatchConsumer { return &BatchConsumer{ queue: queue, handler: handler, batchSize: batchSize, batchTimeout: batchTimeout, stopCh: make(chan struct{}), } } func (c *BatchConsumer) Start(ctx context.Context) { c.wg.Add(1) go c.run(ctx) } func (c *BatchConsumer) run(ctx context.Context) { defer c.wg.Done() batch := make([]*Message, 0, c.batchSize) timer := time.NewTimer(c.batchTimeout) for { select { case <-ctx.Done(): c.processBatch(ctx, batch) return case <-c.stopCh: c.processBatch(ctx, batch) return case <-timer.C: if len(batch) > 0 { c.processBatch(ctx, batch) batch = make([]*Message, 0, c.batchSize) } timer.Reset(c.batchTimeout) default: messages, _ := c.queue.Receive(ctx, c.batchSize-len(batch)) batch = append(batch, messages...) if len(batch) >= c.batchSize { c.processBatch(ctx, batch) batch = make([]*Message, 0, c.batchSize) timer.Reset(c.batchTimeout) } if len(messages) == 0 { time.Sleep(100 * time.Millisecond) } } } } func (c *BatchConsumer) processBatch(ctx context.Context, batch []*Message) { if len(batch) == 0 { return } if err := c.handler(ctx, batch); err != nil { log.Printf("Error processing batch: %v", err) return } // Delete all messages in batch for _, msg := range batch { c.queue.Delete(ctx, msg.ReceiptHandle) } }

Message Queue with RabbitMQ

go
package rabbitmq import ( "context" "encoding/json" "fmt" "log" "time" amqp "github.com/rabbitmq/amqp091-go" ) // RabbitMQ client type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel } func NewRabbitMQ(url string) (*RabbitMQ, error) { conn, err := amqp.Dial(url) if err != nil { return nil, err } ch, err := conn.Channel() if err != nil { conn.Close() return nil, err } return &RabbitMQ{ conn: conn, channel: ch, }, nil } func (r *RabbitMQ) Close() { r.channel.Close() r.conn.Close() } // DeclareQueue creates a queue func (r *RabbitMQ) DeclareQueue(name string, durable bool) error { _, err := r.channel.QueueDeclare( name, // name durable, // durable false, // delete when unused false, // exclusive false, // no-wait amqp.Table{ "x-dead-letter-exchange": "dlx", "x-message-ttl": 86400000, // 24 hours }, ) return err } // Publish sends a message to a queue func (r *RabbitMQ) Publish(ctx context.Context, queue string, body interface{}) error { data, err := json.Marshal(body) if err != nil { return err } return r.channel.PublishWithContext( ctx, "", // exchange queue, // routing key false, // mandatory false, // immediate amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: data, MessageId: generateMessageID(), Timestamp: time.Now(), }, ) } // PublishWithDelay sends a delayed message func (r *RabbitMQ) PublishWithDelay(ctx context.Context, queue string, body interface{}, delay time.Duration) error { data, err := json.Marshal(body) if err != nil { return err } // Use delayed message exchange return r.channel.PublishWithContext( ctx, "delayed-exchange", queue, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "application/json", Body: data, Headers: amqp.Table{ "x-delay": delay.Milliseconds(), }, }, ) } // Consumer for RabbitMQ type RabbitMQConsumer struct { rabbit *RabbitMQ queue string handler func([]byte) error prefetch int } func NewRabbitMQConsumer(rabbit *RabbitMQ, queue string, handler func([]byte) error) *RabbitMQConsumer { return &RabbitMQConsumer{ rabbit: rabbit, queue: queue, handler: handler, prefetch: 10, } } func (c *RabbitMQConsumer) Start(ctx context.Context) error { // Set QoS if err := c.rabbit.channel.Qos(c.prefetch, 0, false); err != nil { return err } // Start consuming msgs, err := c.rabbit.channel.Consume( c.queue, // queue "", // consumer tag false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return err } go func() { for { select { case <-ctx.Done(): return case msg, ok := <-msgs: if !ok { return } c.processMessage(msg) } } }() return nil } func (c *RabbitMQConsumer) processMessage(msg amqp.Delivery) { if err := c.handler(msg.Body); err != nil { log.Printf("Error processing message: %v", err) // Requeue the message msg.Nack(false, true) return } // Acknowledge success msg.Ack(false) } func generateMessageID() string { return fmt.Sprintf("%d", time.Now().UnixNano()) } // Pub/Sub with RabbitMQ type RabbitMQPubSub struct { rabbit *RabbitMQ exchange string } func NewRabbitMQPubSub(rabbit *RabbitMQ, exchange string) (*RabbitMQPubSub, error) { // Declare fanout exchange err := rabbit.channel.ExchangeDeclare( exchange, // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err != nil { return nil, err } return &RabbitMQPubSub{ rabbit: rabbit, exchange: exchange, }, nil } // Publish sends to all subscribers func (p *RabbitMQPubSub) Publish(ctx context.Context, body interface{}) error { data, _ := json.Marshal(body) return p.rabbit.channel.PublishWithContext( ctx, p.exchange, // exchange "", // routing key (ignored for fanout) false, false, amqp.Publishing{ ContentType: "application/json", Body: data, }, ) } // Subscribe creates a queue bound to the exchange func (p *RabbitMQPubSub) Subscribe(ctx context.Context, handler func([]byte) error) error { // Create exclusive queue for this subscriber q, err := p.rabbit.channel.QueueDeclare( "", // name (auto-generated) false, // durable true, // delete when unused true, // exclusive false, // no-wait nil, ) if err != nil { return err } // Bind to exchange err = p.rabbit.channel.QueueBind( q.Name, // queue name "", // routing key p.exchange, // exchange false, nil, ) if err != nil { return err } // Start consuming msgs, err := p.rabbit.channel.Consume(q.Name, "", true, false, false, false, nil) if err != nil { return err } go func() { for { select { case <-ctx.Done(): return case msg, ok := <-msgs: if !ok { return } handler(msg.Body) } } }() return nil }

AWS SQS Integration

go
package sqs import ( "context" "encoding/json" "strconv" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) // SQSClient wraps AWS SQS type SQSClient struct { client *sqs.Client queueURL string } func NewSQSClient(ctx context.Context, queueURL string) (*SQSClient, error) { cfg, err := config.LoadDefaultConfig(ctx) if err != nil { return nil, err } return &SQSClient{ client: sqs.NewFromConfig(cfg), queueURL: queueURL, }, nil } // SendMessage sends a message to SQS func (s *SQSClient) SendMessage(ctx context.Context, body interface{}, delaySeconds int32) (*string, error) { data, err := json.Marshal(body) if err != nil { return nil, err } input := &sqs.SendMessageInput{ QueueUrl: &s.queueURL, MessageBody: aws.String(string(data)), DelaySeconds: delaySeconds, } result, err := s.client.SendMessage(ctx, input) if err != nil { return nil, err } return result.MessageId, nil } // SendMessageBatch sends multiple messages func (s *SQSClient) SendMessageBatch(ctx context.Context, messages []interface{}) error { entries := make([]types.SendMessageBatchRequestEntry, len(messages)) for i, msg := range messages { data, _ := json.Marshal(msg) entries[i] = types.SendMessageBatchRequestEntry{ Id: aws.String(strconv.Itoa(i)), MessageBody: aws.String(string(data)), } } _, err := s.client.SendMessageBatch(ctx, &sqs.SendMessageBatchInput{ QueueUrl: &s.queueURL, Entries: entries, }) return err } // ReceiveMessages retrieves messages from SQS func (s *SQSClient) ReceiveMessages(ctx context.Context, maxMessages int32, waitTimeSeconds int32) ([]types.Message, error) { result, err := s.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ QueueUrl: &s.queueURL, MaxNumberOfMessages: maxMessages, WaitTimeSeconds: waitTimeSeconds, // Long polling VisibilityTimeout: 30, AttributeNames: []types.QueueAttributeName{"All"}, }) if err != nil { return nil, err } return result.Messages, nil } // DeleteMessage removes a processed message func (s *SQSClient) DeleteMessage(ctx context.Context, receiptHandle string) error { _, err := s.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{ QueueUrl: &s.queueURL, ReceiptHandle: &receiptHandle, }) return err } // ChangeMessageVisibility extends processing time func (s *SQSClient) ChangeMessageVisibility(ctx context.Context, receiptHandle string, timeout int32) error { _, err := s.client.ChangeMessageVisibility(ctx, &sqs.ChangeMessageVisibilityInput{ QueueUrl: &s.queueURL, ReceiptHandle: &receiptHandle, VisibilityTimeout: timeout, }) return err } // SQS Consumer type SQSConsumer struct { client *SQSClient handler func(ctx context.Context, msg types.Message) error workers int stopCh chan struct{} } func NewSQSConsumer(client *SQSClient, handler func(ctx context.Context, msg types.Message) error, workers int) *SQSConsumer { return &SQSConsumer{ client: client, handler: handler, workers: workers, stopCh: make(chan struct{}), } } func (c *SQSConsumer) Start(ctx context.Context) { msgCh := make(chan types.Message, c.workers*10) // Start workers for i := 0; i < c.workers; i++ { go c.worker(ctx, msgCh) } // Poller go func() { for { select { case <-ctx.Done(): return case <-c.stopCh: return default: messages, err := c.client.ReceiveMessages(ctx, 10, 20) if err != nil { continue } for _, msg := range messages { select { case msgCh <- msg: case <-ctx.Done(): return } } } } }() } func (c *SQSConsumer) worker(ctx context.Context, msgCh <-chan types.Message) { for { select { case <-ctx.Done(): return case <-c.stopCh: return case msg := <-msgCh: if err := c.handler(ctx, msg); err != nil { // Message will become visible again continue } c.client.DeleteMessage(ctx, *msg.ReceiptHandle) } } } func (c *SQSConsumer) Stop() { close(c.stopCh) }

Dead Letter Queues

go
package dlq import ( "context" "encoding/json" "log" "time" ) // DeadLetterQueue handles failed messages type DeadLetterQueue struct { mainQueue Queue dlq Queue maxRetries int } func NewDeadLetterQueue(mainQueue, dlq Queue, maxRetries int) *DeadLetterQueue { return &DeadLetterQueue{ mainQueue: mainQueue, dlq: dlq, maxRetries: maxRetries, } } // ProcessWithDLQ processes messages with DLQ fallback func (d *DeadLetterQueue) ProcessWithDLQ( ctx context.Context, handler func(ctx context.Context, msg *Message) error, ) { for { select { case <-ctx.Done(): return default: messages, err := d.mainQueue.Receive(ctx, 10) if err != nil { time.Sleep(time.Second) continue } for _, msg := range messages { if err := d.processMessage(ctx, msg, handler); err != nil { log.Printf("Message %s failed: %v", msg.ID, err) } } } } } func (d *DeadLetterQueue) processMessage( ctx context.Context, msg *Message, handler func(ctx context.Context, msg *Message) error, ) error { err := handler(ctx, msg) if err != nil { if msg.DeliveryCount >= d.maxRetries { // Move to DLQ msg.Attributes["error"] = err.Error() msg.Attributes["original_queue"] = "main-queue" d.dlq.Send(ctx, msg) d.mainQueue.Delete(ctx, msg.ReceiptHandle) log.Printf("Message %s moved to DLQ after %d retries", msg.ID, msg.DeliveryCount) } // Message will be redelivered after visibility timeout return err } // Success - delete message return d.mainQueue.Delete(ctx, msg.ReceiptHandle) } // DLQProcessor handles DLQ messages type DLQProcessor struct { dlq Queue handler func(ctx context.Context, msg *Message) error } func NewDLQProcessor(dlq Queue, handler func(ctx context.Context, msg *Message) error) *DLQProcessor { return &DLQProcessor{ dlq: dlq, handler: handler, } } // ReprocessAll attempts to reprocess all DLQ messages func (p *DLQProcessor) ReprocessAll(ctx context.Context, targetQueue Queue) error { for { messages, err := p.dlq.Receive(ctx, 10) if err != nil { return err } if len(messages) == 0 { return nil // Done } for _, msg := range messages { // Reset delivery count msg.DeliveryCount = 0 delete(msg.Attributes, "error") // Send to target queue if err := targetQueue.Send(ctx, msg); err != nil { log.Printf("Failed to requeue message %s: %v", msg.ID, err) continue } // Delete from DLQ p.dlq.Delete(ctx, msg.ReceiptHandle) } } } // InspectDLQ retrieves DLQ messages for inspection func (p *DLQProcessor) InspectDLQ(ctx context.Context, limit int) ([]*DLQEntry, error) { messages, err := p.dlq.Receive(ctx, limit) if err != nil { return nil, err } var entries []*DLQEntry for _, msg := range messages { // Make message visible again for others p.dlq.ChangeVisibility(ctx, msg.ReceiptHandle, 0) entries = append(entries, &DLQEntry{ MessageID: msg.ID, Body: string(msg.Body), Error: msg.Attributes["error"], OriginalQueue: msg.Attributes["original_queue"], FailedAt: msg.Timestamp, Retries: msg.DeliveryCount, }) } return entries, nil } type DLQEntry struct { MessageID string `json:"message_id"` Body string `json:"body"` Error string `json:"error"` OriginalQueue string `json:"original_queue"` FailedAt time.Time `json:"failed_at"` Retries int `json:"retries"` }

Best Practices

┌─────────────────────────────────────────────────────────────────┐ │ MESSAGE QUEUE BEST PRACTICES │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. MAKE CONSUMERS IDEMPOTENT │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Expect duplicate messages │ │ │ │ • Use message ID for deduplication │ │ │ │ • Design operations to be safe if repeated │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 2. SET APPROPRIATE VISIBILITY TIMEOUTS │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Longer than expected processing time │ │ │ │ • Extend timeout for long-running tasks │ │ │ │ • Consider 6x average processing time │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 3. USE DEAD LETTER QUEUES │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Don't let poison messages block the queue │ │ │ │ • Set max retry count │ │ │ │ • Monitor and alert on DLQ depth │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 4. BATCH OPERATIONS │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Send/receive in batches for efficiency │ │ │ │ • Balance batch size with latency requirements │ │ │ │ • Delete messages in batches │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 5. MONITOR QUEUE METRICS │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Queue depth (messages waiting) │ │ │ │ • Message age (oldest message) │ │ │ │ • Processing rate │ │ │ │ • Error rate │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Interview Questions

  1. What's the difference between a queue and a topic?
    • Queue: Point-to-point, one consumer per message
    • Topic: Pub/sub, all subscribers get the message
  2. How do you handle duplicate messages?
    • Idempotent consumers
    • Message deduplication with IDs
    • Idempotency keys in database
  3. When would you use a message queue?
    • Decoupling services
    • Handling traffic spikes
    • Background processing
    • Reliable delivery
  4. How do you ensure message ordering?
    • FIFO queues
    • Single partition/consumer
    • Sequence numbers in messages
  5. What happens when a consumer crashes mid-processing?
    • Message visibility timeout expires
    • Message becomes visible again
    • Another consumer picks it up

Summary

┌─────────────────────────────────────────────────────────────────┐ │ MESSAGE QUEUE SUMMARY │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Patterns: │ │ • Point-to-point (queue) │ │ • Pub/sub (topic) │ │ • Request-reply │ │ • Dead letter queue │ │ │ │ Guarantees: │ │ • At-most-once: Fire and forget │ │ • At-least-once: Retry until ack (most common) │ │ • Exactly-once: Requires idempotency │ │ │ │ Key Concepts: │ │ • Visibility timeout │ │ • Message acknowledgment │ │ • Batching │ │ • Dead letter queues │ │ │ │ Key Insight: │ │ "Message queues turn synchronous failures into │ │ asynchronous retries. Design for eventual success." │ │ │ └─────────────────────────────────────────────────────────────────┘

All Blogs
Tags:message-queuesasync-processingreliabilitydistributed-systems