Semaphore Pattern in Go: Controlling Access to Limited Resources

Your database has a connection limit of 100. Your API provider allows only 10 concurrent requests. Your server can only open 1000 files at once. How do you ensure your Go program respects these limits?
You could use a mutex, but that only allows one goroutine at a time. You need something that allows N goroutines simultaneously but blocks the N+1th until one finishes. That's a semaphore.

The Real World Parallel: A Parking Lot

Think of it like this: A parking lot has 50 spaces. When you arrive, you take a ticket (acquire) if spaces are available. When you leave, you return the ticket (release), making a space available. If all 50 spaces are full, new cars must wait at the entrance until someone leaves. The parking lot doesn't care which car is in which space, only that there are never more than 50 cars inside.
This is exactly what a semaphore does:
  • Fixed capacity: Maximum N concurrent operations
  • Acquire: Wait if at capacity, proceed if not
  • Release: Signal that a slot is available
  • No ownership: Unlike mutex, any goroutine can release
Concurrency pattern diagram 1

Concurrency pattern diagram 1

Implementing Semaphore in Go

Go doesn't have a built-in semaphore, but we can easily create one using a buffered channel:
go
// Filename: semaphore_basic.go package main import ( "fmt" "sync" "time" ) // Semaphore limits concurrent access to a resource // Why: A buffered channel naturally implements semaphore semantics // - Channel capacity = max concurrent operations // - Send = acquire (blocks when full) // - Receive = release (unblocks a waiter) type Semaphore struct { sem chan struct{} } // NewSemaphore creates a new semaphore with given capacity // Why: Constructor ensures proper initialization func NewSemaphore(capacity int) *Semaphore { return &Semaphore{ sem: make(chan struct{}, capacity), } } // Acquire blocks until a slot is available // Why: Sending to a full channel blocks the goroutine func (s *Semaphore) Acquire() { s.sem <- struct{}{} } // TryAcquire attempts to acquire without blocking // Why: Sometimes you want to fail fast if resource unavailable // Returns true if acquired, false if would block func (s *Semaphore) TryAcquire() bool { select { case s.sem <- struct{}{}: return true default: return false } } // AcquireWithTimeout tries to acquire with a timeout // Why: Prevents indefinite waiting // Returns true if acquired, false if timed out func (s *Semaphore) AcquireWithTimeout(timeout time.Duration) bool { select { case s.sem <- struct{}{}: return true case <-time.After(timeout): return false } } // Release releases a semaphore slot // Why: Receiving from the channel frees a slot for another goroutine func (s *Semaphore) Release() { <-s.sem } // Available returns the number of available slots // Why: Useful for monitoring and debugging func (s *Semaphore) Available() int { return cap(s.sem) - len(s.sem) } func main() { // Create semaphore with capacity 3 sem := NewSemaphore(3) var wg sync.WaitGroup // Try to run 10 goroutines, but only 3 can run at once for i := 1; i <= 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() fmt.Printf("Goroutine %d: waiting to acquire...\n", id) sem.Acquire() fmt.Printf("Goroutine %d: acquired! (available: %d)\n", id, sem.Available()) // Simulate work time.Sleep(500 * time.Millisecond) sem.Release() fmt.Printf("Goroutine %d: released (available: %d)\n", id, sem.Available()) }(i) } wg.Wait() fmt.Println("\nAll goroutines completed!") }
Expected Output:
Goroutine 1: waiting to acquire... Goroutine 1: acquired! (available: 2) Goroutine 2: waiting to acquire... Goroutine 2: acquired! (available: 1) Goroutine 3: waiting to acquire... Goroutine 3: acquired! (available: 0) Goroutine 4: waiting to acquire... Goroutine 5: waiting to acquire... ... (4, 5, etc. wait) Goroutine 1: released (available: 1) Goroutine 4: acquired! (available: 0) ... All goroutines completed!

Real World Example 1: Database Connection Limiter

Protect your database from connection exhaustion:
go
// Filename: db_connection_limiter.go package main import ( "context" "database/sql" "fmt" "math/rand" "sync" "sync/atomic" "time" ) // DBConnectionLimiter limits concurrent database operations type DBConnectionLimiter struct { sem chan struct{} maxConnections int activeCount int64 totalQueries int64 waitTime int64 // nanoseconds spent waiting } // NewDBConnectionLimiter creates a new limiter func NewDBConnectionLimiter(maxConnections int) *DBConnectionLimiter { return &DBConnectionLimiter{ sem: make(chan struct{}, maxConnections), maxConnections: maxConnections, } } // Query executes a database query with connection limiting // Why: Wrapping the semaphore in a method provides a clean API // and allows for metrics collection func (l *DBConnectionLimiter) Query(ctx context.Context, query string, args ...interface{}) ([]map[string]interface{}, error) { // Acquire semaphore waitStart := time.Now() select { case l.sem <- struct{}{}: // Got a connection slot case <-ctx.Done(): return nil, ctx.Err() } // Track metrics waitDuration := time.Since(waitStart) atomic.AddInt64(&l.waitTime, int64(waitDuration)) atomic.AddInt64(&l.activeCount, 1) atomic.AddInt64(&l.totalQueries, 1) defer func() { atomic.AddInt64(&l.activeCount, -1) <-l.sem // Release semaphore }() // Execute query (simulated) return l.executeQuery(ctx, query, args...) } // executeQuery simulates database query execution func (l *DBConnectionLimiter) executeQuery(ctx context.Context, query string, args ...interface{}) ([]map[string]interface{}, error) { // Simulate query execution time queryTime := time.Duration(50+rand.Intn(200)) * time.Millisecond select { case <-time.After(queryTime): // Query completed return []map[string]interface{}{ {"id": 1, "result": "data"}, }, nil case <-ctx.Done(): return nil, ctx.Err() } } // Stats returns current statistics func (l *DBConnectionLimiter) Stats() (active, total int64, avgWait time.Duration) { active = atomic.LoadInt64(&l.activeCount) total = atomic.LoadInt64(&l.totalQueries) totalWait := atomic.LoadInt64(&l.waitTime) if total > 0 { avgWait = time.Duration(totalWait / total) } return } // Available returns available connection slots func (l *DBConnectionLimiter) Available() int { return l.maxConnections - int(atomic.LoadInt64(&l.activeCount)) } func main() { rand.Seed(time.Now().UnixNano()) // Create limiter with 5 max connections limiter := NewDBConnectionLimiter(5) ctx := context.Background() var wg sync.WaitGroup fmt.Println("Database Connection Limiter Demo") fmt.Println("================================") fmt.Printf("Max concurrent connections: 5\n") fmt.Printf("Simulating 20 concurrent queries...\n\n") // Start stats printer done := make(chan struct{}) go func() { ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() for { select { case <-ticker.C: active, total, avgWait := limiter.Stats() fmt.Printf("šŸ“Š Active: %d/%d | Total: %d | Avg Wait: %v\n", active, 5, total, avgWait.Round(time.Millisecond)) case <-done: return } } }() // Simulate 20 concurrent database queries for i := 1; i <= 20; i++ { wg.Add(1) go func(queryID int) { defer wg.Done() query := fmt.Sprintf("SELECT * FROM users WHERE id = %d", queryID) result, err := limiter.Query(ctx, query) if err != nil { fmt.Printf("āŒ Query %d failed: %v\n", queryID, err) } else { fmt.Printf("āœ… Query %d completed: %d rows\n", queryID, len(result)) } }(i) } wg.Wait() close(done) // Final stats active, total, avgWait := limiter.Stats() fmt.Printf("\nšŸ“ˆ Final Stats:\n") fmt.Printf(" Total queries: %d\n", total) fmt.Printf(" Average wait time: %v\n", avgWait.Round(time.Millisecond)) }

Real World Example 2: API Rate Limiter

Control concurrent requests to external APIs:
go
// Filename: api_rate_limiter.go package main import ( "context" "encoding/json" "fmt" "math/rand" "sync" "time" ) // APIClient wraps HTTP client with rate limiting type APIClient struct { baseURL string maxConcurrent int sem chan struct{} requestTimeout time.Duration } // APIResponse represents an API response type APIResponse struct { Status int Body map[string]interface{} Latency time.Duration } // NewAPIClient creates a new rate-limited API client func NewAPIClient(baseURL string, maxConcurrent int, timeout time.Duration) *APIClient { return &APIClient{ baseURL: baseURL, maxConcurrent: maxConcurrent, sem: make(chan struct{}, maxConcurrent), requestTimeout: timeout, } } // Get performs a rate-limited GET request func (c *APIClient) Get(ctx context.Context, path string) (*APIResponse, error) { // Acquire semaphore select { case c.sem <- struct{}{}: defer func() { <-c.sem }() case <-ctx.Done(): return nil, fmt.Errorf("context cancelled while waiting for semaphore") } // Create request context with timeout reqCtx, cancel := context.WithTimeout(ctx, c.requestTimeout) defer cancel() return c.doRequest(reqCtx, "GET", path) } // doRequest simulates an HTTP request func (c *APIClient) doRequest(ctx context.Context, method, path string) (*APIResponse, error) { start := time.Now() // Simulate network latency latency := time.Duration(100+rand.Intn(300)) * time.Millisecond select { case <-time.After(latency): // Request completed case <-ctx.Done(): return nil, ctx.Err() } // Simulate occasional errors (10% chance) if rand.Float32() < 0.1 { return &APIResponse{ Status: 500, Latency: time.Since(start), }, fmt.Errorf("server error") } return &APIResponse{ Status: 200, Body: map[string]interface{}{ "path": path, "method": method, "data": "response data", }, Latency: time.Since(start), }, nil } // BatchGet performs multiple GET requests with rate limiting func (c *APIClient) BatchGet(ctx context.Context, paths []string) []struct { Path string Response *APIResponse Error error } { results := make([]struct { Path string Response *APIResponse Error error }, len(paths)) var wg sync.WaitGroup for i, path := range paths { wg.Add(1) go func(index int, p string) { defer wg.Done() resp, err := c.Get(ctx, p) results[index] = struct { Path string Response *APIResponse Error error }{ Path: p, Response: resp, Error: err, } }(i, path) } wg.Wait() return results } func main() { rand.Seed(time.Now().UnixNano()) // Create client with max 3 concurrent requests client := NewAPIClient("https://api.example.com", 3, 5*time.Second) // Generate 15 paths to request paths := make([]string, 15) for i := range paths { paths[i] = fmt.Sprintf("/users/%d", i+1) } fmt.Println("🌐 API Rate Limiter Demo") fmt.Println("========================") fmt.Printf("Max concurrent requests: 3\n") fmt.Printf("Total requests: %d\n\n", len(paths)) start := time.Now() // Batch request with rate limiting results := client.BatchGet(context.Background(), paths) totalTime := time.Since(start) // Print results successCount := 0 errorCount := 0 var totalLatency time.Duration for _, r := range results { if r.Error != nil { errorCount++ fmt.Printf("āŒ %s: %v\n", r.Path, r.Error) } else { successCount++ totalLatency += r.Response.Latency fmt.Printf("āœ… %s: %d (%v)\n", r.Path, r.Response.Status, r.Response.Latency.Round(time.Millisecond)) } } fmt.Printf("\nšŸ“Š Summary:\n") fmt.Printf(" Success: %d\n", successCount) fmt.Printf(" Errors: %d\n", errorCount) fmt.Printf(" Total time: %v\n", totalTime.Round(time.Millisecond)) fmt.Printf(" Avg latency: %v\n", (totalLatency / time.Duration(successCount)).Round(time.Millisecond)) // Show that rate limiting worked sequentialTime := totalLatency fmt.Printf("\n⚔ Without rate limit (sequential): ~%v\n", sequentialTime.Round(time.Millisecond)) fmt.Printf("⚔ With rate limit (3 concurrent): %v\n", totalTime.Round(time.Millisecond)) }

Real World Example 3: File System Access Limiter

Prevent "too many open files" errors:
go
// Filename: file_access_limiter.go package main import ( "context" "crypto/md5" "fmt" "io" "math/rand" "os" "path/filepath" "sync" "time" ) // FileProcessor processes files with limited concurrent access type FileProcessor struct { maxOpenFiles int sem chan struct{} } // FileResult represents the result of processing a file type FileResult struct { Path string Size int64 Hash string Duration time.Duration Error error } // NewFileProcessor creates a new processor with file limit func NewFileProcessor(maxOpenFiles int) *FileProcessor { return &FileProcessor{ maxOpenFiles: maxOpenFiles, sem: make(chan struct{}, maxOpenFiles), } } // ProcessFile processes a single file with semaphore control func (fp *FileProcessor) ProcessFile(ctx context.Context, path string) FileResult { // Acquire semaphore before opening file select { case fp.sem <- struct{}{}: defer func() { <-fp.sem }() case <-ctx.Done(): return FileResult{Path: path, Error: ctx.Err()} } start := time.Now() // Simulate file processing size, hash, err := fp.processFileInternal(path) return FileResult{ Path: path, Size: size, Hash: hash, Duration: time.Since(start), Error: err, } } // processFileInternal simulates processing a file func (fp *FileProcessor) processFileInternal(path string) (int64, string, error) { // Simulate file I/O time.Sleep(time.Duration(50+rand.Intn(150)) * time.Millisecond) // Simulate file size and hash size := int64(1000 + rand.Intn(100000)) hash := fmt.Sprintf("%x", md5.Sum([]byte(path)))[:8] // Simulate occasional errors (5%) if rand.Float32() < 0.05 { return 0, "", fmt.Errorf("permission denied") } return size, hash, nil } // ProcessFiles processes multiple files concurrently with limit func (fp *FileProcessor) ProcessFiles(ctx context.Context, paths []string) []FileResult { results := make([]FileResult, len(paths)) var wg sync.WaitGroup for i, path := range paths { wg.Add(1) go func(index int, p string) { defer wg.Done() results[index] = fp.ProcessFile(ctx, p) }(i, path) } wg.Wait() return results } func main() { rand.Seed(time.Now().UnixNano()) // Create processor with max 10 open files processor := NewFileProcessor(10) // Generate test file paths files := make([]string, 50) for i := range files { files[i] = filepath.Join("/data/files", fmt.Sprintf("document_%03d.pdf", i+1)) } fmt.Println("šŸ“ File Processor with Open File Limit") fmt.Println("======================================") fmt.Printf("Max concurrent open files: 10\n") fmt.Printf("Total files to process: %d\n\n", len(files)) start := time.Now() results := processor.ProcessFiles(context.Background(), files) totalTime := time.Since(start) // Calculate stats var totalSize int64 var totalDuration time.Duration successCount := 0 errorCount := 0 for _, r := range results { if r.Error != nil { errorCount++ } else { successCount++ totalSize += r.Size totalDuration += r.Duration } } fmt.Printf("šŸ“Š Results:\n") fmt.Printf(" Files processed: %d/%d\n", successCount, len(files)) fmt.Printf(" Errors: %d\n", errorCount) fmt.Printf(" Total size: %.2f MB\n", float64(totalSize)/(1024*1024)) fmt.Printf(" Total wall-clock time: %v\n", totalTime.Round(time.Millisecond)) fmt.Printf(" Sum of processing times: %v\n", totalDuration.Round(time.Millisecond)) fmt.Printf(" Parallelization factor: %.1fx\n", float64(totalDuration)/float64(totalTime)) // Show first few results fmt.Println("\nFirst 5 results:") for i := 0; i < 5 && i < len(results); i++ { r := results[i] if r.Error != nil { fmt.Printf(" āŒ %s: %v\n", filepath.Base(r.Path), r.Error) } else { fmt.Printf(" āœ… %s: %d bytes, hash=%s (%v)\n", filepath.Base(r.Path), r.Size, r.Hash, r.Duration.Round(time.Millisecond)) } } }

Weighted Semaphore

For resources with different costs:
go
// Filename: weighted_semaphore.go package main import ( "context" "fmt" "sync" "time" ) // WeightedSemaphore allows acquiring multiple slots at once // Why: Some operations need more resources than others // e.g., large queries vs small queries type WeightedSemaphore struct { size int64 current int64 mu sync.Mutex cond *sync.Cond } // NewWeightedSemaphore creates a semaphore with total weight capacity func NewWeightedSemaphore(size int64) *WeightedSemaphore { ws := &WeightedSemaphore{size: size} ws.cond = sync.NewCond(&ws.mu) return ws } // Acquire blocks until n slots are available func (ws *WeightedSemaphore) Acquire(ctx context.Context, n int64) error { ws.mu.Lock() defer ws.mu.Unlock() for ws.size-ws.current < n { // Check context before waiting select { case <-ctx.Done(): return ctx.Err() default: } // Wait for release ws.cond.Wait() } ws.current += n return nil } // TryAcquire attempts to acquire without blocking func (ws *WeightedSemaphore) TryAcquire(n int64) bool { ws.mu.Lock() defer ws.mu.Unlock() if ws.size-ws.current >= n { ws.current += n return true } return false } // Release releases n slots func (ws *WeightedSemaphore) Release(n int64) { ws.mu.Lock() ws.current -= n ws.mu.Unlock() ws.cond.Broadcast() } // Available returns available capacity func (ws *WeightedSemaphore) Available() int64 { ws.mu.Lock() defer ws.mu.Unlock() return ws.size - ws.current } // QueryType represents different query complexities type QueryType struct { Name string Weight int64 Time time.Duration } func main() { // Create semaphore with 100 "units" of capacity sem := NewWeightedSemaphore(100) // Different query types with different weights queryTypes := []QueryType{ {Name: "Simple", Weight: 10, Time: 50 * time.Millisecond}, {Name: "Medium", Weight: 30, Time: 150 * time.Millisecond}, {Name: "Complex", Weight: 50, Time: 300 * time.Millisecond}, {Name: "Heavy", Weight: 80, Time: 500 * time.Millisecond}, } fmt.Println("āš–ļø Weighted Semaphore Demo") fmt.Println("===========================") fmt.Printf("Total capacity: 100 units\n\n") var wg sync.WaitGroup ctx := context.Background() // Run various queries for i := 0; i < 10; i++ { qt := queryTypes[i%len(queryTypes)] wg.Add(1) go func(id int, query QueryType) { defer wg.Done() fmt.Printf("Query %d (%s, weight=%d): waiting... (available: %d)\n", id, query.Name, query.Weight, sem.Available()) if err := sem.Acquire(ctx, query.Weight); err != nil { fmt.Printf("Query %d: failed to acquire: %v\n", id, err) return } fmt.Printf("Query %d (%s): acquired! (available: %d)\n", id, query.Name, sem.Available()) time.Sleep(query.Time) sem.Release(query.Weight) fmt.Printf("Query %d (%s): released (available: %d)\n", id, query.Name, sem.Available()) }(i, qt) time.Sleep(20 * time.Millisecond) // Stagger starts } wg.Wait() fmt.Println("\nAll queries completed!") }

Using golang.org/x/sync/semaphore

Go's extended library provides a production-ready semaphore:
go
// Filename: x_sync_semaphore.go package main import ( "context" "fmt" "time" "golang.org/x/sync/semaphore" ) func main() { // Create weighted semaphore with capacity 3 sem := semaphore.NewWeighted(3) ctx := context.Background() fmt.Println("Using golang.org/x/sync/semaphore") fmt.Println("==================================\n") // Acquire 1 slot if err := sem.Acquire(ctx, 1); err != nil { fmt.Printf("Acquire failed: %v\n", err) return } fmt.Println("Acquired 1 slot") // Try to acquire 2 more (should succeed) if err := sem.Acquire(ctx, 2); err != nil { fmt.Printf("Acquire failed: %v\n", err) return } fmt.Println("Acquired 2 more slots (now at capacity)") // Try to acquire 1 more with timeout (should fail) ctxTimeout, cancel := context.WithTimeout(ctx, 100*time.Millisecond) defer cancel() if err := sem.Acquire(ctxTimeout, 1); err != nil { fmt.Printf("Acquire timed out as expected: %v\n", err) } // Release 2 slots sem.Release(2) fmt.Println("Released 2 slots") // Now we can acquire 1 if err := sem.Acquire(ctx, 1); err != nil { fmt.Printf("Acquire failed: %v\n", err) return } fmt.Println("Acquired 1 slot successfully") // TryAcquire example if sem.TryAcquire(1) { fmt.Println("TryAcquire succeeded") sem.Release(1) } else { fmt.Println("TryAcquire failed (would block)") } }

When to Use Semaphore

Perfect Use Cases

ScenarioWhy Semaphore Fits
Database connectionsFixed connection pool size
API rate limitingMax concurrent requests
File descriptorsOS has limits on open files
Memory-intensive opsControl total memory usage
External servicesService has concurrency limits
Thread poolsLimit parallel execution

Semaphore vs Other Patterns

PatternUse Case
SemaphoreLimit concurrent access to N
MutexExclusive access (N=1)
Worker PoolFixed workers, unlimited queue
Rate LimiterLimit requests per time period

Summary

The Semaphore pattern is essential when you need to control concurrent access to limited resources:
  • Bounded concurrency: Allow exactly N concurrent operations
  • Simple implementation: Buffered channel in Go
  • Weighted option: For resources with different costs
  • Production ready: Use golang.org/x/sync/semaphore
Key Takeaways:
  1. Use buffered channels: make(chan struct{}, N) is a simple semaphore
  2. Always release: Use defer to ensure release happens
  3. Consider timeouts: Don't wait forever for resources
  4. Monitor usage: Track wait times and utilization
  5. Size appropriately: Match semaphore size to actual resource limits

Next Steps

  • Practice: Add connection limiting to your database layer
  • Explore: Rate limiting with token bucket algorithm
  • Read: golang.org/x/sync package documentation
  • Build: Monitor semaphore metrics in production
All Blogs
Tags:golangconcurrencysemaphoreresource-managementrate-limitingchannels