Module 21: Stream Processing
What is Stream Processing?
Stream processing is the real-time processing of continuous data streams as they arrive.
┌─────────────────────────────────────────────────────────────────┐ │ STREAM PROCESSING OVERVIEW │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ BATCH PROCESSING: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Data ──► Store ──► Process (hourly/daily) ──► Output │ │ │ │ │ │ │ │ • Process bounded dataset │ │ │ │ • High latency (minutes to hours) │ │ │ │ • Complete data available │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ STREAM PROCESSING: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Data ──► Process (immediately) ──► Output │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ │ │ event compute action │ │ │ │ │ │ │ │ • Process unbounded stream │ │ │ │ • Low latency (milliseconds to seconds) │ │ │ │ • Incomplete data (windowing) │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ Use Cases: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Real-time analytics │ │ │ │ • Fraud detection │ │ │ │ • Monitoring and alerting │ │ │ │ • Real-time recommendations │ │ │ │ • IoT data processing │ │ │ │ • Log aggregation │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘
Stream Processing Concepts
┌─────────────────────────────────────────────────────────────────┐ │ CORE CONCEPTS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ EVENT TIME vs PROCESSING TIME: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Event Time: When event actually occurred │ │ │ │ Processing Time: When event is processed │ │ │ │ │ │ │ │ Timeline: │ │ │ │ Event: t=100 t=101 t=102 │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ │ │ Network: ~~~delay~~~ │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ │ │ Process: t=105 t=103 t=108 (out of order!) │ │ │ │ │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ WINDOWING: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Tumbling Window (fixed, non-overlapping): │ │ │ │ ├───────┤├───────┤├───────┤ │ │ │ │ │ 0-10 ││ 10-20 ││ 20-30 │ │ │ │ │ │ │ │ │ Sliding Window (overlapping): │ │ │ │ ├───────────┤ │ │ │ │ ├───────────┤ │ │ │ │ ├───────────┤ │ │ │ │ │ │ │ │ Session Window (activity-based): │ │ │ │ ├──────┤ ├──┤ ├─────────┤ │ │ │ │ │ user │ gap│ │ gap │ user │ │ │ │ │ │active│ │ │ │ active │ │ │ │ │ │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ WATERMARKS: │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ "All events with timestamp <= watermark have arrived" │ │ │ │ │ │ │ │ Used to handle late data and trigger window outputs │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘
Basic Stream Processor
gopackage stream import ( "context" "sync" "time" ) // Event represents a stream event type Event struct { Key string Value interface{} Timestamp time.Time } // Stream represents a data stream type Stream interface { Process(ctx context.Context) <-chan Event } // Processor processes events type Processor interface { Process(ctx context.Context, event Event) ([]Event, error) } // StreamProcessor orchestrates stream processing type StreamProcessor struct { source Stream processors []Processor sink Sink parallelism int } type Sink interface { Write(ctx context.Context, events []Event) error } func NewStreamProcessor(source Stream, sink Sink, processors ...Processor) *StreamProcessor { return &StreamProcessor{ source: source, processors: processors, sink: sink, parallelism: 4, } } func (sp *StreamProcessor) Run(ctx context.Context) error { events := sp.source.Process(ctx) // Create worker pool var wg sync.WaitGroup for i := 0; i < sp.parallelism; i++ { wg.Add(1) go func() { defer wg.Done() sp.worker(ctx, events) }() } wg.Wait() return nil } func (sp *StreamProcessor) worker(ctx context.Context, events <-chan Event) { for { select { case <-ctx.Done(): return case event, ok := <-events: if !ok { return } // Process through all processors result := []Event{event} for _, proc := range sp.processors { var newResult []Event for _, e := range result { output, err := proc.Process(ctx, e) if err != nil { // Handle error - log, retry, etc. continue } newResult = append(newResult, output...) } result = newResult } // Write to sink if len(result) > 0 { sp.sink.Write(ctx, result) } } } } // Map processor type MapProcessor struct { fn func(Event) Event } func Map(fn func(Event) Event) *MapProcessor { return &MapProcessor{fn: fn} } func (p *MapProcessor) Process(ctx context.Context, event Event) ([]Event, error) { return []Event{p.fn(event)}, nil } // Filter processor type FilterProcessor struct { predicate func(Event) bool } func Filter(predicate func(Event) bool) *FilterProcessor { return &FilterProcessor{predicate: predicate} } func (p *FilterProcessor) Process(ctx context.Context, event Event) ([]Event, error) { if p.predicate(event) { return []Event{event}, nil } return nil, nil } // FlatMap processor type FlatMapProcessor struct { fn func(Event) []Event } func FlatMap(fn func(Event) []Event) *FlatMapProcessor { return &FlatMapProcessor{fn: fn} } func (p *FlatMapProcessor) Process(ctx context.Context, event Event) ([]Event, error) { return p.fn(event), nil }
Windowing Implementation
gopackage stream import ( "context" "sync" "time" ) // Window collects events over time type Window interface { Add(event Event) Trigger() []Event IsComplete(watermark time.Time) bool } // TumblingWindow is a fixed-size, non-overlapping window type TumblingWindow struct { mu sync.Mutex events []Event start time.Time duration time.Duration } func NewTumblingWindow(start time.Time, duration time.Duration) *TumblingWindow { return &TumblingWindow{ start: start, duration: duration, } } func (w *TumblingWindow) Add(event Event) { w.mu.Lock() defer w.mu.Unlock() w.events = append(w.events, event) } func (w *TumblingWindow) Trigger() []Event { w.mu.Lock() defer w.mu.Unlock() events := w.events w.events = nil return events } func (w *TumblingWindow) IsComplete(watermark time.Time) bool { return watermark.After(w.start.Add(w.duration)) } func (w *TumblingWindow) End() time.Time { return w.start.Add(w.duration) } // WindowManager manages multiple windows type WindowManager struct { mu sync.Mutex windows map[string]map[time.Time]*TumblingWindow duration time.Duration aggregator WindowAggregator } type WindowAggregator interface { Aggregate(key string, events []Event) Event } func NewWindowManager(duration time.Duration, aggregator WindowAggregator) *WindowManager { return &WindowManager{ windows: make(map[string]map[time.Time]*TumblingWindow), duration: duration, aggregator: aggregator, } } func (wm *WindowManager) Add(event Event) { wm.mu.Lock() defer wm.mu.Unlock() // Determine window start time windowStart := event.Timestamp.Truncate(wm.duration) // Get or create window keyWindows, ok := wm.windows[event.Key] if !ok { keyWindows = make(map[time.Time]*TumblingWindow) wm.windows[event.Key] = keyWindows } window, ok := keyWindows[windowStart] if !ok { window = NewTumblingWindow(windowStart, wm.duration) keyWindows[windowStart] = window } window.Add(event) } func (wm *WindowManager) TriggerComplete(watermark time.Time) []Event { wm.mu.Lock() defer wm.mu.Unlock() var results []Event for key, keyWindows := range wm.windows { for start, window := range keyWindows { if window.IsComplete(watermark) { events := window.Trigger() if len(events) > 0 { result := wm.aggregator.Aggregate(key, events) results = append(results, result) } delete(keyWindows, start) } } } return results } // Sliding Window type SlidingWindow struct { mu sync.Mutex events []Event size time.Duration slide time.Duration } func NewSlidingWindow(size, slide time.Duration) *SlidingWindow { return &SlidingWindow{ size: size, slide: slide, } } func (w *SlidingWindow) Add(event Event) { w.mu.Lock() defer w.mu.Unlock() w.events = append(w.events, event) } func (w *SlidingWindow) GetWindows(now time.Time) [][]Event { w.mu.Lock() defer w.mu.Unlock() var windows [][]Event // Calculate how many windows numWindows := int(w.size / w.slide) for i := 0; i < numWindows; i++ { windowEnd := now.Add(-time.Duration(i) * w.slide) windowStart := windowEnd.Add(-w.size) var windowEvents []Event for _, event := range w.events { if event.Timestamp.After(windowStart) && !event.Timestamp.After(windowEnd) { windowEvents = append(windowEvents, event) } } windows = append(windows, windowEvents) } // Clean up old events cutoff := now.Add(-w.size) var remaining []Event for _, event := range w.events { if event.Timestamp.After(cutoff) { remaining = append(remaining, event) } } w.events = remaining return windows } // Session Window type SessionWindow struct { mu sync.Mutex events []Event gap time.Duration lastTime time.Time } func NewSessionWindow(gap time.Duration) *SessionWindow { return &SessionWindow{gap: gap} } func (w *SessionWindow) Add(event Event) bool { w.mu.Lock() defer w.mu.Unlock() // Check if this starts a new session if !w.lastTime.IsZero() && event.Timestamp.Sub(w.lastTime) > w.gap { return true // Signal session complete } w.events = append(w.events, event) w.lastTime = event.Timestamp return false } func (w *SessionWindow) Trigger() []Event { w.mu.Lock() defer w.mu.Unlock() events := w.events w.events = nil w.lastTime = time.Time{} return events }
Aggregations
gopackage stream import ( "context" "sync" "time" ) // Aggregator maintains running aggregations type Aggregator interface { Add(event Event) Result() interface{} Reset() } // CountAggregator counts events type CountAggregator struct { mu sync.Mutex count int64 } func NewCountAggregator() *CountAggregator { return &CountAggregator{} } func (a *CountAggregator) Add(event Event) { a.mu.Lock() defer a.mu.Unlock() a.count++ } func (a *CountAggregator) Result() interface{} { a.mu.Lock() defer a.mu.Unlock() return a.count } func (a *CountAggregator) Reset() { a.mu.Lock() defer a.mu.Unlock() a.count = 0 } // SumAggregator sums numeric values type SumAggregator struct { mu sync.Mutex sum float64 field string } func NewSumAggregator(field string) *SumAggregator { return &SumAggregator{field: field} } func (a *SumAggregator) Add(event Event) { a.mu.Lock() defer a.mu.Unlock() if m, ok := event.Value.(map[string]interface{}); ok { if v, ok := m[a.field].(float64); ok { a.sum += v } } } func (a *SumAggregator) Result() interface{} { a.mu.Lock() defer a.mu.Unlock() return a.sum } func (a *SumAggregator) Reset() { a.mu.Lock() defer a.mu.Unlock() a.sum = 0 } // AvgAggregator calculates average type AvgAggregator struct { mu sync.Mutex sum float64 count int64 field string } func NewAvgAggregator(field string) *AvgAggregator { return &AvgAggregator{field: field} } func (a *AvgAggregator) Add(event Event) { a.mu.Lock() defer a.mu.Unlock() if m, ok := event.Value.(map[string]interface{}); ok { if v, ok := m[a.field].(float64); ok { a.sum += v a.count++ } } } func (a *AvgAggregator) Result() interface{} { a.mu.Lock() defer a.mu.Unlock() if a.count == 0 { return 0.0 } return a.sum / float64(a.count) } func (a *AvgAggregator) Reset() { a.mu.Lock() defer a.mu.Unlock() a.sum = 0 a.count = 0 } // WindowedAggregation combines windowing with aggregation type WindowedAggregation struct { windowManager *WindowManager aggregators map[string]Aggregator mu sync.Mutex } type AggregatorFactory func() Aggregator func NewWindowedAggregation( windowDuration time.Duration, factory AggregatorFactory, ) *WindowedAggregation { return &WindowedAggregation{ windowManager: NewWindowManager(windowDuration, nil), aggregators: make(map[string]Aggregator), } } // GroupByAggregation groups events by key and aggregates type GroupByAggregation struct { mu sync.Mutex groups map[string]Aggregator factory AggregatorFactory windowSize time.Duration outputCh chan Event } func NewGroupByAggregation( windowSize time.Duration, factory AggregatorFactory, ) *GroupByAggregation { gba := &GroupByAggregation{ groups: make(map[string]Aggregator), factory: factory, windowSize: windowSize, outputCh: make(chan Event, 100), } go gba.ticker() return gba } func (g *GroupByAggregation) Add(event Event) { g.mu.Lock() defer g.mu.Unlock() agg, ok := g.groups[event.Key] if !ok { agg = g.factory() g.groups[event.Key] = agg } agg.Add(event) } func (g *GroupByAggregation) ticker() { ticker := time.NewTicker(g.windowSize) for range ticker.C { g.emit() } } func (g *GroupByAggregation) emit() { g.mu.Lock() defer g.mu.Unlock() for key, agg := range g.groups { event := Event{ Key: key, Value: agg.Result(), Timestamp: time.Now(), } select { case g.outputCh <- event: default: // Channel full, drop } agg.Reset() } } func (g *GroupByAggregation) Output() <-chan Event { return g.outputCh }
Stateful Processing
gopackage stream import ( "context" "encoding/json" "sync" "time" ) // StateStore manages processor state type StateStore interface { Get(key string) (interface{}, bool) Put(key string, value interface{}) Delete(key string) All() map[string]interface{} } // InMemoryStateStore is a simple in-memory state store type InMemoryStateStore struct { mu sync.RWMutex data map[string]interface{} } func NewInMemoryStateStore() *InMemoryStateStore { return &InMemoryStateStore{ data: make(map[string]interface{}), } } func (s *InMemoryStateStore) Get(key string) (interface{}, bool) { s.mu.RLock() defer s.mu.RUnlock() val, ok := s.data[key] return val, ok } func (s *InMemoryStateStore) Put(key string, value interface{}) { s.mu.Lock() defer s.mu.Unlock() s.data[key] = value } func (s *InMemoryStateStore) Delete(key string) { s.mu.Lock() defer s.mu.Unlock() delete(s.data, key) } func (s *InMemoryStateStore) All() map[string]interface{} { s.mu.RLock() defer s.mu.RUnlock() result := make(map[string]interface{}) for k, v := range s.data { result[k] = v } return result } // StatefulProcessor has access to state type StatefulProcessor struct { store StateStore processFn func(ctx context.Context, event Event, store StateStore) ([]Event, error) } func NewStatefulProcessor( store StateStore, fn func(ctx context.Context, event Event, store StateStore) ([]Event, error), ) *StatefulProcessor { return &StatefulProcessor{ store: store, processFn: fn, } } func (p *StatefulProcessor) Process(ctx context.Context, event Event) ([]Event, error) { return p.processFn(ctx, event, p.store) } // Example: Running Count func RunningCountProcessor(store StateStore) *StatefulProcessor { return NewStatefulProcessor(store, func(ctx context.Context, event Event, store StateStore) ([]Event, error) { count, _ := store.Get(event.Key) countVal := int64(0) if count != nil { countVal = count.(int64) } countVal++ store.Put(event.Key, countVal) return []Event{{ Key: event.Key, Value: map[string]interface{}{"count": countVal}, Timestamp: event.Timestamp, }}, nil }) } // Example: Deduplication func DeduplicationProcessor(store StateStore, ttl time.Duration) *StatefulProcessor { return NewStatefulProcessor(store, func(ctx context.Context, event Event, store StateStore) ([]Event, error) { // Create dedup key dedupKey := event.Key + ":" + eventHash(event) // Check if seen if _, seen := store.Get(dedupKey); seen { return nil, nil // Duplicate, filter out } // Mark as seen store.Put(dedupKey, time.Now()) // TODO: Clean up old entries return []Event{event}, nil }) } func eventHash(event Event) string { data, _ := json.Marshal(event.Value) return string(data) // Simplified - use proper hash } // Example: Session Tracking type UserSession struct { UserID string StartTime time.Time LastActive time.Time PageViews int Events []string } func SessionTrackingProcessor(store StateStore, sessionTimeout time.Duration) *StatefulProcessor { return NewStatefulProcessor(store, func(ctx context.Context, event Event, store StateStore) ([]Event, error) { userID := event.Key // Get or create session var session *UserSession if existing, ok := store.Get(userID); ok { session = existing.(*UserSession) } else { session = &UserSession{ UserID: userID, StartTime: event.Timestamp, } } // Check if session expired if !session.LastActive.IsZero() && event.Timestamp.Sub(session.LastActive) > sessionTimeout { // Emit session end event sessionEndEvent := Event{ Key: userID, Value: map[string]interface{}{ "type": "session_end", "user_id": userID, "duration": session.LastActive.Sub(session.StartTime).Seconds(), "page_views": session.PageViews, }, Timestamp: session.LastActive, } // Start new session session = &UserSession{ UserID: userID, StartTime: event.Timestamp, } store.Put(userID, session) session.LastActive = event.Timestamp session.PageViews++ return []Event{sessionEndEvent}, nil } // Update session session.LastActive = event.Timestamp session.PageViews++ if eventType, ok := event.Value.(map[string]interface{})["type"].(string); ok { session.Events = append(session.Events, eventType) } store.Put(userID, session) return nil, nil }) }
Stream Joins
gopackage stream import ( "context" "sync" "time" ) // StreamJoin joins two streams type StreamJoin struct { leftBuffer map[string][]Event rightBuffer map[string][]Event mu sync.Mutex windowSize time.Duration joinFn func(left, right Event) Event outputCh chan Event } func NewStreamJoin(windowSize time.Duration, joinFn func(left, right Event) Event) *StreamJoin { sj := &StreamJoin{ leftBuffer: make(map[string][]Event), rightBuffer: make(map[string][]Event), windowSize: windowSize, joinFn: joinFn, outputCh: make(chan Event, 100), } go sj.cleanup() return sj } func (j *StreamJoin) AddLeft(event Event) { j.mu.Lock() defer j.mu.Unlock() j.leftBuffer[event.Key] = append(j.leftBuffer[event.Key], event) // Check for matches in right buffer if rightEvents, ok := j.rightBuffer[event.Key]; ok { for _, rightEvent := range rightEvents { if j.inWindow(event, rightEvent) { result := j.joinFn(event, rightEvent) select { case j.outputCh <- result: default: } } } } } func (j *StreamJoin) AddRight(event Event) { j.mu.Lock() defer j.mu.Unlock() j.rightBuffer[event.Key] = append(j.rightBuffer[event.Key], event) // Check for matches in left buffer if leftEvents, ok := j.leftBuffer[event.Key]; ok { for _, leftEvent := range leftEvents { if j.inWindow(leftEvent, event) { result := j.joinFn(leftEvent, event) select { case j.outputCh <- result: default: } } } } } func (j *StreamJoin) inWindow(e1, e2 Event) bool { diff := e1.Timestamp.Sub(e2.Timestamp) if diff < 0 { diff = -diff } return diff <= j.windowSize } func (j *StreamJoin) cleanup() { ticker := time.NewTicker(j.windowSize) for range ticker.C { j.mu.Lock() cutoff := time.Now().Add(-j.windowSize * 2) for key, events := range j.leftBuffer { var remaining []Event for _, e := range events { if e.Timestamp.After(cutoff) { remaining = append(remaining, e) } } j.leftBuffer[key] = remaining } for key, events := range j.rightBuffer { var remaining []Event for _, e := range events { if e.Timestamp.After(cutoff) { remaining = append(remaining, e) } } j.rightBuffer[key] = remaining } j.mu.Unlock() } } func (j *StreamJoin) Output() <-chan Event { return j.outputCh } // Example: Join orders with payments func JoinOrdersWithPayments() *StreamJoin { return NewStreamJoin(5*time.Minute, func(order, payment Event) Event { orderData := order.Value.(map[string]interface{}) paymentData := payment.Value.(map[string]interface{}) return Event{ Key: order.Key, Value: map[string]interface{}{ "order_id": orderData["order_id"], "customer_id": orderData["customer_id"], "amount": orderData["amount"], "payment_id": paymentData["payment_id"], "payment_method": paymentData["method"], "status": "paid", }, Timestamp: payment.Timestamp, } }) }
Real-World Example: Click Analytics
gopackage stream import ( "context" "fmt" "time" ) // ClickEvent represents a user click type ClickEvent struct { UserID string `json:"user_id"` PageURL string `json:"page_url"` Element string `json:"element"` Timestamp time.Time `json:"timestamp"` } // ClickAnalytics processes click streams type ClickAnalytics struct { // Per-page click counts (1-minute windows) pageClicks *GroupByAggregation // Per-user session tracking sessions *StatefulProcessor // Real-time trending pages trending *TrendingCalculator } func NewClickAnalytics() *ClickAnalytics { store := NewInMemoryStateStore() return &ClickAnalytics{ pageClicks: NewGroupByAggregation(time.Minute, func() Aggregator { return NewCountAggregator() }), sessions: SessionTrackingProcessor(store, 30*time.Minute), trending: NewTrendingCalculator(5*time.Minute, 10), } } func (ca *ClickAnalytics) ProcessClick(ctx context.Context, click ClickEvent) { event := Event{ Key: click.PageURL, Value: click, Timestamp: click.Timestamp, } // Update page click counts ca.pageClicks.Add(event) // Update user session userEvent := Event{ Key: click.UserID, Value: click, Timestamp: click.Timestamp, } ca.sessions.Process(ctx, userEvent) // Update trending ca.trending.Add(click.PageURL, click.Timestamp) } // TrendingCalculator calculates trending items type TrendingCalculator struct { mu sync.Mutex counts map[string]*timeDecayCounter window time.Duration topN int } type timeDecayCounter struct { count float64 lastDecay time.Time } func NewTrendingCalculator(window time.Duration, topN int) *TrendingCalculator { tc := &TrendingCalculator{ counts: make(map[string]*timeDecayCounter), window: window, topN: topN, } go tc.decayLoop() return tc } func (t *TrendingCalculator) Add(key string, timestamp time.Time) { t.mu.Lock() defer t.mu.Unlock() counter, ok := t.counts[key] if !ok { counter = &timeDecayCounter{lastDecay: timestamp} t.counts[key] = counter } counter.count++ } func (t *TrendingCalculator) GetTop() []string { t.mu.Lock() defer t.mu.Unlock() // Sort by count type kv struct { key string count float64 } var items []kv for k, v := range t.counts { items = append(items, kv{k, v.count}) } // Simple bubble sort for top N for i := 0; i < len(items) && i < t.topN; i++ { for j := i + 1; j < len(items); j++ { if items[j].count > items[i].count { items[i], items[j] = items[j], items[i] } } } result := make([]string, 0, t.topN) for i := 0; i < len(items) && i < t.topN; i++ { result = append(result, items[i].key) } return result } func (t *TrendingCalculator) decayLoop() { ticker := time.NewTicker(time.Minute) for range ticker.C { t.decay() } } func (t *TrendingCalculator) decay() { t.mu.Lock() defer t.mu.Unlock() now := time.Now() for key, counter := range t.counts { // Exponential decay elapsed := now.Sub(counter.lastDecay) decayFactor := 0.5 // Half-life of 1 minute counter.count *= (1 - decayFactor*elapsed.Minutes()) counter.lastDecay = now // Remove if negligible if counter.count < 0.1 { delete(t.counts, key) } } }
Best Practices
┌─────────────────────────────────────────────────────────────────┐ │ STREAM PROCESSING BEST PRACTICES │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. HANDLE LATE DATA │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Use watermarks to track event time progress │ │ │ │ • Allow configurable late data tolerance │ │ │ │ • Consider side outputs for late arrivals │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 2. MANAGE STATE CAREFULLY │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Use TTL for state entries │ │ │ │ • Checkpoint state regularly │ │ │ │ • Consider state size limits │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 3. ENSURE EXACTLY-ONCE SEMANTICS │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Use idempotent operations │ │ │ │ • Track processed offsets │ │ │ │ • Use transactional writes │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 4. MONITOR AND ALERT │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Track processing lag │ │ │ │ • Monitor throughput │ │ │ │ • Alert on backpressure │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ │ 5. TEST WITH REALISTIC DATA │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ • Test with out-of-order events │ │ │ │ • Simulate failures and recovery │ │ │ │ • Load test with production volumes │ │ │ └───────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘
Summary
┌─────────────────────────────────────────────────────────────────┐ │ STREAM PROCESSING SUMMARY │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Core Concepts: │ │ • Event Time vs Processing Time │ │ • Windowing (tumbling, sliding, session) │ │ • Watermarks for late data │ │ • Stateful processing │ │ │ │ Operations: │ │ • Map, Filter, FlatMap │ │ • Aggregations (count, sum, avg) │ │ • Joins (stream-stream, stream-table) │ │ • Windowed operations │ │ │ │ Frameworks: │ │ • Apache Kafka Streams │ │ • Apache Flink │ │ • Apache Spark Streaming │ │ │ │ Key Insight: │ │ "Stream processing turns infinite data into finite, │ │ actionable insights through windowing and aggregation." │ │ │ └─────────────────────────────────────────────────────────────────┘
Next Module: Module 22 - Failure Detection
Previous Module: Module 20 - CQRS Pattern