concurrency
2w ago

Common Patterns and Best Practices

7 views • 0 upvotes

1. Worker Pool Pattern

go
func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(time.Second)
        results <- job * 2
    }
}

func main() {
    const numWorkers = 3
    const numJobs = 9
    
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    
    // Start workers
    for w := 1; w <= numWorkers; w++ {
        go worker(w, jobs, results)
    }
    
    // Send jobs
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)
    
    // Collect results
    for a := 1; a <= numJobs; a++ {
        <-results
    }
}
Diagram:
code
Jobs Queue          Workers          Results
   [9]                                 []
   [8]              Worker 1
   [7]        ┌───► Worker 2 ────┐
   [6]        │     Worker 3      │
   [5] ───────┘                   └──► [2][4][6]
   [4]                                  [8][10]
   [3]
   [2]
   [1]

2. Pipeline Pattern

go
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // Set up pipeline
    nums := generator(2, 3, 4, 5)
    squared := square(nums)
    
    // Consume output
    for result := range squared {
        fmt.Println(result) // 4, 9, 16, 25
    }
}
Visualization:
code
[2, 3, 4, 5] ──► generator() ──► square() ──► [4, 9, 16, 25]
                    chan int       chan int

3. Fan-Out, Fan-In Pattern

go
func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

func worker(id int, in <-chan int, out chan<- int) {
    for n := range in {
        fmt.Printf("Worker %d processing %d\n", id, n)
        time.Sleep(100 * time.Millisecond)
        out <- n * n
    }
}

func merge(cs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    wg.Add(len(cs))
    for _, c := range cs {
        go func(ch <-chan int) {
            defer wg.Done()
            for n := range ch {
                out <- n
            }
        }(c)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    input := make(chan int)
    
    // Fan-out: Multiple workers
    c1 := make(chan int)
    c2 := make(chan int)
    c3 := make(chan int)
    
    go producer(input)
    go worker(1, input, c1)
    go worker(2, input, c2)
    go worker(3, input, c3)
    
    // Fan-in: Merge results
    for n := range merge(c1, c2, c3) {
        fmt.Println(n)
    }
}
Diagram:
code
         ┌──► Worker 1 ──┐
Producer ├──► Worker 2 ──┤──► Merged Output
         └──► Worker 3 ──┘
       (Fan-Out)      (Fan-In)

4. Context for Cancellation

go
func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d cancelled\n", id)
            return
        default:
            fmt.Printf("Worker %d working\n", id)
            time.Sleep(500 * time.Millisecond)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    for i := 1; i <= 3; i++ {
        go worker(ctx, i)
    }
    
    <-ctx.Done()
    fmt.Println("Main: context cancelled")
    time.Sleep(1 * time.Second) // Give goroutines time to cleanup
}

5. Graceful Shutdown

go
func server() {
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    
    done := make(chan bool, 1)
    
    go func() {
        <-quit
        fmt.Println("Server is shutting down...")
        
        // Cleanup work
        time.Sleep(2 * time.Second)
        
        done <- true
    }()
    
    fmt.Println("Server is ready")
    <-done
    fmt.Println("Server stopped")
}

Was this helpful?

Difficulty & Status

medium
Lvl. 4
Community Verified

Related Topics

goroutinesconcurrencygo runtimechannels
Progress: 11%
Answered by: shubham vyasPrev TopicNext Topic