Pipeline Pattern in Go: Building Data Processing Assembly Lines

You have a stream of data. It needs to go through multiple transformations. First, parse it. Then, validate it. Then, enrich it. Then, format it. Finally, store it. Each step depends on the previous one, but within each step, you can process many items in parallel.
This is the Pipeline pattern. It's how you build scalable, maintainable data processing systems that can handle millions of records without breaking a sweat.

The Real World Parallel: A Car Factory Assembly Line

Think of it like this: In a car factory, the chassis moves along a conveyor belt through different stations. Station 1 adds the engine. Station 2 installs the transmission. Station 3 adds the electrical system. Station 4 installs the interior. Each station specializes in one task. Cars flow through continuously. If Station 2 is slow, cars queue up, but Station 1 keeps working on new cars. This is a pipeline.
Concurrency pattern diagram 1

Concurrency pattern diagram 1

Why Pipelines?

Consider processing a large CSV file:
go
// Without pipeline - everything in memory at once func processWithoutPipeline(filename string) error { // Read ENTIRE file into memory data, _ := ioutil.ReadFile(filename) // 10GB file = 10GB in memory! // Parse ALL records records := parseAll(data) // Now you have 10GB raw + parsed data // Validate ALL records valid := validateAll(records) // Even more memory // Transform ALL records transformed := transformAll(valid) // Memory keeps growing // Finally write return writeAll(transformed) // Peak memory: potentially 40GB+ for a 10GB file! } // With pipeline - constant memory regardless of file size func processWithPipeline(filename string) error { // Each stage processes one record at a time // Memory usage stays constant: ~few KB regardless of file size! lines := readLines(filename) // Stream of lines records := parse(lines) // Stream of records valid := validate(records) // Stream of valid records transformed := transform(valid) // Stream of transformed records return write(transformed) }
Benefits of Pipelines:
  • Constant memory: Process gigabytes with kilobytes of memory
  • Natural concurrency: Each stage can run in parallel
  • Composable: Mix and match stages like LEGO blocks
  • Testable: Each stage can be tested independently
  • Maintainable: Clear separation of concerns

Basic Pipeline Implementation

Let's build a pipeline step by step:
go
// Filename: basic_pipeline.go package main import ( "fmt" "strings" ) // Stage 1: Generator - produces values // Why: Every pipeline needs a source of data // This stage reads input and sends it downstream func generator(data []string) <-chan string { out := make(chan string) go func() { defer close(out) for _, item := range data { out <- item } }() return out } // Stage 2: Transform - converts to uppercase // Why: Each stage takes an input channel and returns an output channel // This creates composable building blocks func toUpper(input <-chan string) <-chan string { out := make(chan string) go func() { defer close(out) for s := range input { out <- strings.ToUpper(s) } }() return out } // Stage 3: Filter - removes short strings // Why: Stages can filter, letting only some items through func filterShort(minLength int, input <-chan string) <-chan string { out := make(chan string) go func() { defer close(out) for s := range input { if len(s) >= minLength { out <- s } } }() return out } // Stage 4: Prefix - adds a prefix to each string func addPrefix(prefix string, input <-chan string) <-chan string { out := make(chan string) go func() { defer close(out) for s := range input { out <- prefix + s } }() return out } // Sink - consumes the final output // Why: The final stage collects or processes results func collect(input <-chan string) []string { var result []string for s := range input { result = append(result, s) } return result } func main() { // Input data data := []string{"hello", "go", "pipeline", "is", "awesome", "pattern"} fmt.Println("Input:", data) fmt.Println() // Build the pipeline by connecting stages // Why: Each function returns a channel that feeds the next stage1 := generator(data) stage2 := toUpper(stage1) stage3 := filterShort(4, stage2) stage4 := addPrefix("-> ", stage3) result := collect(stage4) fmt.Println("Output:", result) // Alternative: Chain calls for cleaner syntax fmt.Println("\nSame result with chained calls:") result2 := collect( addPrefix("=> ", filterShort(4, toUpper( generator(data))))) fmt.Println("Output:", result2) }
Expected Output:
Input: [hello go pipeline is awesome pattern] Output: [-> HELLO -> PIPELINE -> AWESOME -> PATTERN] Same result with chained calls: Output: [=> HELLO => PIPELINE => AWESOME => PATTERN]

Real World Example 1: Log Processing Pipeline

Processing log files is a perfect use case for pipelines:
go
// Filename: log_pipeline.go package main import ( "bufio" "context" "encoding/json" "fmt" "os" "regexp" "strings" "time" ) // LogEntry represents a parsed log entry type LogEntry struct { Timestamp time.Time Level string Service string Message string Raw string } // AlertableEntry is an entry that needs alerting type AlertableEntry struct { LogEntry AlertType string Priority int } // Stage 1: Read lines from file func readLines(ctx context.Context, filename string) (<-chan string, <-chan error) { lines := make(chan string) errc := make(chan error, 1) go func() { defer close(lines) defer close(errc) // Simulate reading from file (in production, use actual file) sampleLogs := []string{ `2024-01-15T10:30:00Z INFO api-gateway Request received: GET /users`, `2024-01-15T10:30:01Z ERROR auth-service Authentication failed: invalid token`, `2024-01-15T10:30:02Z WARN database Connection pool running low: 2/10 available`, `2024-01-15T10:30:03Z INFO api-gateway Request completed: 200 OK (45ms)`, `2024-01-15T10:30:04Z ERROR payment-service Payment processing failed: timeout`, `2024-01-15T10:30:05Z INFO scheduler Job completed: daily-report`, `2024-01-15T10:30:06Z CRITICAL database Database connection lost!`, `2024-01-15T10:30:07Z DEBUG auth-service Token refresh attempted`, `2024-01-15T10:30:08Z ERROR api-gateway Upstream service unavailable`, `2024-01-15T10:30:09Z INFO metrics Metrics exported: 1000 data points`, } for _, line := range sampleLogs { select { case <-ctx.Done(): errc <- ctx.Err() return case lines <- line: time.Sleep(10 * time.Millisecond) // Simulate reading delay } } }() return lines, errc } // Stage 2: Parse log lines into structured entries func parseLogLines(ctx context.Context, lines <-chan string) (<-chan LogEntry, <-chan error) { entries := make(chan LogEntry) errc := make(chan error, 1) // Log format: timestamp LEVEL service message pattern := regexp.MustCompile(`^(\S+)\s+(\w+)\s+(\S+)\s+(.+)$`) go func() { defer close(entries) defer close(errc) for line := range lines { select { case <-ctx.Done(): errc <- ctx.Err() return default: } matches := pattern.FindStringSubmatch(line) if matches == nil { continue // Skip unparseable lines } timestamp, err := time.Parse(time.RFC3339, matches[1]) if err != nil { continue } entry := LogEntry{ Timestamp: timestamp, Level: matches[2], Service: matches[3], Message: matches[4], Raw: line, } entries <- entry } }() return entries, errc } // Stage 3: Filter for errors and above func filterSeverity(ctx context.Context, minLevel string, entries <-chan LogEntry) <-chan LogEntry { filtered := make(chan LogEntry) severityOrder := map[string]int{ "DEBUG": 0, "INFO": 1, "WARN": 2, "WARNING": 2, "ERROR": 3, "CRITICAL": 4, "FATAL": 5, } minSeverity := severityOrder[minLevel] go func() { defer close(filtered) for entry := range entries { select { case <-ctx.Done(): return default: } if severityOrder[entry.Level] >= minSeverity { filtered <- entry } } }() return filtered } // Stage 4: Enrich with alert information func enrichForAlerting(ctx context.Context, entries <-chan LogEntry) <-chan AlertableEntry { alertable := make(chan AlertableEntry) go func() { defer close(alertable) for entry := range entries { select { case <-ctx.Done(): return default: } alert := AlertableEntry{LogEntry: entry} // Determine alert type and priority switch entry.Level { case "CRITICAL", "FATAL": alert.AlertType = "PAGE" alert.Priority = 1 case "ERROR": if strings.Contains(entry.Message, "timeout") || strings.Contains(entry.Message, "unavailable") { alert.AlertType = "PAGE" alert.Priority = 2 } else { alert.AlertType = "NOTIFY" alert.Priority = 3 } case "WARN", "WARNING": alert.AlertType = "LOG" alert.Priority = 4 } alertable <- alert } }() return alertable } // Stage 5: Format for output func formatAlerts(ctx context.Context, alerts <-chan AlertableEntry) <-chan string { formatted := make(chan string) go func() { defer close(formatted) for alert := range alerts { select { case <-ctx.Done(): return default: } output := fmt.Sprintf("[P%d %s] %s | %s: %s", alert.Priority, alert.AlertType, alert.Service, alert.Level, alert.Message) formatted <- output } }() return formatted } // Sink: Collect all formatted alerts func collectAlerts(formatted <-chan string) []string { var alerts []string for alert := range formatted { alerts = append(alerts, alert) } return alerts } func main() { ctx := context.Background() fmt.Println("šŸ” Log Processing Pipeline") fmt.Println("=" + strings.Repeat("=", 50)) fmt.Println() // Build the pipeline lines, lineErr := readLines(ctx, "app.log") entries, parseErr := parseLogLines(ctx, lines) filtered := filterSeverity(ctx, "WARN", entries) alertable := enrichForAlerting(ctx, filtered) formatted := formatAlerts(ctx, alertable) // Collect results alerts := collectAlerts(formatted) // Check for errors if err := <-lineErr; err != nil { fmt.Printf("Read error: %v\n", err) } if err := <-parseErr; err != nil { fmt.Printf("Parse error: %v\n", err) } // Display results fmt.Println("Alerts Generated:") fmt.Println("-" + strings.Repeat("-", 50)) for _, alert := range alerts { fmt.Println(alert) } fmt.Printf("\nTotal alerts: %d\n", len(alerts)) }
Expected Output:
šŸ” Log Processing Pipeline =================================================== Alerts Generated: --------------------------------------------------- [P3 NOTIFY] auth-service | ERROR: Authentication failed: invalid token [P4 LOG] database | WARN: Connection pool running low: 2/10 available [P2 PAGE] payment-service | ERROR: Payment processing failed: timeout [P1 PAGE] database | CRITICAL: Database connection lost! [P2 PAGE] api-gateway | ERROR: Upstream service unavailable Total alerts: 5

Real World Example 2: ETL Data Pipeline

Extract, Transform, Load pipeline for data processing:
go
// Filename: etl_pipeline.go package main import ( "context" "encoding/json" "fmt" "math/rand" "strings" "sync" "time" ) // RawRecord represents data as it comes from the source type RawRecord struct { ID string `json:"id"` Data json.RawMessage `json:"data"` Source string `json:"source"` Timestamp string `json:"timestamp"` } // CleanRecord represents validated and cleaned data type CleanRecord struct { ID string Name string Email string Amount float64 Currency string Source string Timestamp time.Time } // EnrichedRecord has additional derived data type EnrichedRecord struct { CleanRecord AmountUSD float64 CustomerTier string ProcessedAt time.Time ProcessingMS int64 } // LoadResult represents the result of loading a record type LoadResult struct { RecordID string Success bool Error error } // ETL Pipeline Stages // Extract: Generate sample raw records func extract(ctx context.Context, count int) <-chan RawRecord { out := make(chan RawRecord) go func() { defer close(out) sources := []string{"web", "mobile", "api", "partner"} currencies := []string{"USD", "EUR", "GBP", "JPY"} for i := 0; i < count; i++ { select { case <-ctx.Done(): return default: } data := map[string]interface{}{ "name": fmt.Sprintf("Customer %d", i+1), "email": fmt.Sprintf("customer%d@example.com", i+1), "amount": 100.0 + rand.Float64()*900.0, "currency": currencies[rand.Intn(len(currencies))], } dataJSON, _ := json.Marshal(data) record := RawRecord{ ID: fmt.Sprintf("REC-%05d", i+1), Data: dataJSON, Source: sources[rand.Intn(len(sources))], Timestamp: time.Now().Format(time.RFC3339), } out <- record time.Sleep(5 * time.Millisecond) // Simulate extraction delay } }() return out } // Transform Stage 1: Parse and validate func parseAndValidate(ctx context.Context, input <-chan RawRecord) (<-chan CleanRecord, <-chan error) { out := make(chan CleanRecord) errc := make(chan error, 100) go func() { defer close(out) defer close(errc) for raw := range input { select { case <-ctx.Done(): return default: } var data struct { Name string `json:"name"` Email string `json:"email"` Amount float64 `json:"amount"` Currency string `json:"currency"` } if err := json.Unmarshal(raw.Data, &data); err != nil { errc <- fmt.Errorf("parse error for %s: %w", raw.ID, err) continue } // Validation if data.Email == "" || !strings.Contains(data.Email, "@") { errc <- fmt.Errorf("invalid email for %s", raw.ID) continue } if data.Amount <= 0 { errc <- fmt.Errorf("invalid amount for %s", raw.ID) continue } timestamp, err := time.Parse(time.RFC3339, raw.Timestamp) if err != nil { errc <- fmt.Errorf("invalid timestamp for %s", raw.ID) continue } clean := CleanRecord{ ID: raw.ID, Name: data.Name, Email: data.Email, Amount: data.Amount, Currency: data.Currency, Source: raw.Source, Timestamp: timestamp, } out <- clean } }() return out, errc } // Transform Stage 2: Enrich with derived data func enrich(ctx context.Context, input <-chan CleanRecord) <-chan EnrichedRecord { out := make(chan EnrichedRecord) // Exchange rates (simplified) exchangeRates := map[string]float64{ "USD": 1.0, "EUR": 1.10, "GBP": 1.27, "JPY": 0.0067, } go func() { defer close(out) for clean := range input { select { case <-ctx.Done(): return default: } start := time.Now() // Convert to USD rate := exchangeRates[clean.Currency] if rate == 0 { rate = 1.0 } amountUSD := clean.Amount * rate // Determine customer tier var tier string switch { case amountUSD >= 500: tier = "GOLD" case amountUSD >= 200: tier = "SILVER" default: tier = "BRONZE" } enriched := EnrichedRecord{ CleanRecord: clean, AmountUSD: amountUSD, CustomerTier: tier, ProcessedAt: time.Now(), ProcessingMS: time.Since(start).Milliseconds(), } out <- enriched } }() return out } // Load: Write to destination (simulated) func load(ctx context.Context, input <-chan EnrichedRecord) <-chan LoadResult { out := make(chan LoadResult) go func() { defer close(out) for record := range input { select { case <-ctx.Done(): return default: } // Simulate database write time.Sleep(10 * time.Millisecond) // Simulate occasional failures (5%) var err error success := true if rand.Float32() < 0.05 { err = fmt.Errorf("database write failed") success = false } out <- LoadResult{ RecordID: record.ID, Success: success, Error: err, } } }() return out } // Statistics collector type Stats struct { mu sync.Mutex Extracted int Validated int ValidationErrs int Enriched int Loaded int LoadErrors int TierCounts map[string]int SourceCounts map[string]int } func main() { rand.Seed(time.Now().UnixNano()) ctx := context.Background() recordCount := 50 fmt.Println("šŸ“Š ETL Pipeline Demo") fmt.Println("=" + strings.Repeat("=", 50)) fmt.Printf("Processing %d records...\n\n", recordCount) start := time.Now() // Build the pipeline rawRecords := extract(ctx, recordCount) cleanRecords, validationErrors := parseAndValidate(ctx, rawRecords) enrichedRecords := enrich(ctx, cleanRecords) loadResults := load(ctx, enrichedRecords) // Collect statistics stats := &Stats{ TierCounts: make(map[string]int), SourceCounts: make(map[string]int), } // Process validation errors in background go func() { for err := range validationErrors { stats.mu.Lock() stats.ValidationErrs++ stats.mu.Unlock() fmt.Printf("āš ļø Validation: %v\n", err) } }() // Collect load results for result := range loadResults { stats.mu.Lock() if result.Success { stats.Loaded++ } else { stats.LoadErrors++ fmt.Printf("āŒ Load failed: %s - %v\n", result.RecordID, result.Error) } stats.mu.Unlock() } totalTime := time.Since(start) // Print summary fmt.Println("\n" + strings.Repeat("-", 50)) fmt.Println("šŸ“ˆ Pipeline Statistics:") fmt.Printf(" Records processed: %d\n", recordCount) fmt.Printf(" Successfully loaded: %d\n", stats.Loaded) fmt.Printf(" Validation errors: %d\n", stats.ValidationErrs) fmt.Printf(" Load errors: %d\n", stats.LoadErrors) fmt.Printf(" Total time: %v\n", totalTime.Round(time.Millisecond)) fmt.Printf(" Throughput: %.1f records/second\n", float64(recordCount)/totalTime.Seconds()) }

Parallel Stages: Fan-Out Within Pipeline

For CPU-intensive stages, you can parallelize within a stage:
go
// Filename: parallel_pipeline_stage.go package main import ( "context" "fmt" "math/rand" "runtime" "sync" "time" ) // Record to process type Record struct { ID int Data string } // ProcessedRecord after expensive computation type ProcessedRecord struct { ID int Data string Result int Duration time.Duration } // Generator stage func generate(ctx context.Context, count int) <-chan Record { out := make(chan Record) go func() { defer close(out) for i := 0; i < count; i++ { select { case <-ctx.Done(): return case out <- Record{ID: i, Data: fmt.Sprintf("data-%d", i)}: } } }() return out } // Parallel processing stage with fan-out within the stage // Why: For CPU-bound work, parallelize within a single stage func parallelProcess(ctx context.Context, input <-chan Record, workers int) <-chan ProcessedRecord { out := make(chan ProcessedRecord) var wg sync.WaitGroup // Start multiple workers for this stage for i := 0; i < workers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for record := range input { select { case <-ctx.Done(): return default: } start := time.Now() // Simulate CPU-intensive work result := expensiveComputation(record.Data) processed := ProcessedRecord{ ID: record.ID, Data: record.Data, Result: result, Duration: time.Since(start), } select { case <-ctx.Done(): return case out <- processed: } } }(i) } // Close output when all workers done go func() { wg.Wait() close(out) }() return out } // expensiveComputation simulates CPU-intensive work func expensiveComputation(data string) int { // Simulate CPU work time.Sleep(time.Duration(50+rand.Intn(50)) * time.Millisecond) return len(data) * rand.Intn(100) } // Collect results func collect(input <-chan ProcessedRecord) []ProcessedRecord { var results []ProcessedRecord for r := range input { results = append(results, r) } return results } func main() { rand.Seed(time.Now().UnixNano()) ctx := context.Background() recordCount := 20 numWorkers := runtime.NumCPU() fmt.Printf("Processing %d records with %d parallel workers...\n\n", recordCount, numWorkers) // Sequential timing seqStart := time.Now() for i := 0; i < recordCount; i++ { expensiveComputation(fmt.Sprintf("data-%d", i)) } seqDuration := time.Since(seqStart) // Parallel pipeline parStart := time.Now() records := generate(ctx, recordCount) processed := parallelProcess(ctx, records, numWorkers) results := collect(processed) parDuration := time.Since(parStart) fmt.Printf("Results (showing first 5):\n") for i := 0; i < 5 && i < len(results); i++ { fmt.Printf(" Record %d: result=%d, took %v\n", results[i].ID, results[i].Result, results[i].Duration.Round(time.Millisecond)) } fmt.Printf("\nšŸ“Š Performance Comparison:\n") fmt.Printf(" Sequential: %v\n", seqDuration.Round(time.Millisecond)) fmt.Printf(" Parallel: %v\n", parDuration.Round(time.Millisecond)) fmt.Printf(" Speedup: %.1fx\n", float64(seqDuration)/float64(parDuration)) }

Error Handling in Pipelines

Proper error handling is crucial for production pipelines:
go
// Filename: pipeline_error_handling.go package main import ( "context" "fmt" "math/rand" "time" ) // Result wraps a value with potential error type Result[T any] struct { Value T Error error } // Stage that can fail func riskyStage(ctx context.Context, input <-chan int) <-chan Result[int] { out := make(chan Result[int]) go func() { defer close(out) for val := range input { select { case <-ctx.Done(): return default: } // Simulate occasional errors if rand.Float32() < 0.2 { out <- Result[int]{Error: fmt.Errorf("processing failed for %d", val)} } else { out <- Result[int]{Value: val * 2} } } }() return out } // ErrorFilter separates successful results from errors func errorFilter[T any](ctx context.Context, input <-chan Result[T]) (<-chan T, <-chan error) { values := make(chan T) errors := make(chan error) go func() { defer close(values) defer close(errors) for result := range input { select { case <-ctx.Done(): return default: } if result.Error != nil { errors <- result.Error } else { values <- result.Value } } }() return values, errors } // RetryStage retries failed operations func retryStage(ctx context.Context, input <-chan int, maxRetries int, process func(int) (int, error)) <-chan Result[int] { out := make(chan Result[int]) go func() { defer close(out) for val := range input { var lastErr error var result int for attempt := 0; attempt <= maxRetries; attempt++ { select { case <-ctx.Done(): return default: } result, lastErr = process(val) if lastErr == nil { break } if attempt < maxRetries { // Exponential backoff backoff := time.Duration(1<<uint(attempt)) * 10 * time.Millisecond time.Sleep(backoff) } } if lastErr != nil { out <- Result[int]{Error: fmt.Errorf("failed after %d retries: %w", maxRetries+1, lastErr)} } else { out <- Result[int]{Value: result} } } }() return out } func main() { rand.Seed(time.Now().UnixNano()) ctx := context.Background() // Generate input input := make(chan int, 10) go func() { for i := 1; i <= 10; i++ { input <- i } close(input) }() // Pipeline with error handling results := riskyStage(ctx, input) values, errors := errorFilter(ctx, results) // Collect errors in background go func() { for err := range errors { fmt.Printf("āŒ Error: %v\n", err) } }() // Collect successful values fmt.Println("āœ… Successful results:") for val := range values { fmt.Printf(" %d\n", val) } }

Pipeline with Graceful Shutdown

go
// Filename: pipeline_graceful_shutdown.go package main import ( "context" "fmt" "os" "os/signal" "syscall" "time" ) // GracefulPipeline demonstrates proper shutdown handling func runPipeline(ctx context.Context) { // Stage 1: Generate numbers numbers := make(chan int) go func() { defer close(numbers) for i := 1; ; i++ { select { case <-ctx.Done(): fmt.Println("Stage 1: Context cancelled, stopping generation") return case numbers <- i: time.Sleep(100 * time.Millisecond) } } }() // Stage 2: Double numbers doubled := make(chan int) go func() { defer close(doubled) for n := range numbers { select { case <-ctx.Done(): fmt.Println("Stage 2: Context cancelled, draining input") // Drain remaining input to unblock upstream for range numbers { } return case doubled <- n * 2: } } }() // Stage 3: Print results for n := range doubled { fmt.Printf("Result: %d\n", n) } fmt.Println("Pipeline completed") } func main() { // Create context that cancels on SIGINT/SIGTERM ctx, cancel := context.WithCancel(context.Background()) // Handle shutdown signals sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigChan fmt.Printf("\nReceived signal: %v\n", sig) fmt.Println("Initiating graceful shutdown...") cancel() }() fmt.Println("Pipeline running. Press Ctrl+C to stop.") runPipeline(ctx) }

When to Use Pipeline Pattern

Perfect Use Cases

ScenarioWhy Pipeline Fits
ETL processingClear Extract → Transform → Load stages
Log processingParse → Filter → Enrich → Alert
Image processingRead → Decode → Transform → Encode → Write
Data validationParse → Validate → Clean → Store
Stream processingContinuous data flow through stages
File conversionRead → Parse → Convert → Write

When to Choose Something Else

ScenarioBetter Pattern
Independent tasksWorker Pool
Aggregating resultsFan-Out Fan-In
Request-responseSimple handlers
One-time batchParallel for loop

Summary

The Pipeline pattern transforms complex data processing into manageable, composable stages:
  • Streaming: Process data item by item, not all at once
  • Composable: Mix and match stages like building blocks
  • Concurrent: Each stage runs in its own goroutine
  • Efficient: Constant memory regardless of data size
Key Takeaways:
  1. Each stage is a goroutine: Takes input channel, returns output channel
  2. Close channels when done: Signals downstream stages to finish
  3. Use context for cancellation: Allows graceful shutdown
  4. Handle errors explicitly: Don't let errors disappear silently
  5. Parallelize within stages: For CPU-bound work, use multiple workers per stage

Next Steps

  • Practice: Build an image processing pipeline
  • Explore: Combine with Worker Pool for bounded concurrency
  • Read: Go's io.Pipe for connecting readers and writers
  • Build: Add metrics to monitor pipeline throughput
All Blogs
Tags:golangconcurrencypipelinestreamingchannelsdata-processing