System Design Part 4: Database & Storage - The Complete Guide
Introduction
Every system you build will eventually live or die by its data layer. You can have the most elegant microservices architecture, but if your database becomes a bottleneck, nothing else matters. In this guide, we'll explore how to design data layers that scale to billions of transactions while maintaining consistency and performance.
1. How Do You Design Schema for 1 Billion Transactions?
The Challenge
Imagine you're at HDFC Bank. Every day, millions of customers make transactions - UPI payments, card swipes, NEFT transfers, bill payments. Over a year, you accumulate billions of records. How do you design a schema that:
- Supports fast writes (can't make customers wait)
- Enables efficient queries (account statements, analytics)
- Maintains data integrity (it's money!)
- Scales horizontally (business grows every year)
The Naive Approach (And Why It Fails)
sql-- The "obvious" schema that doesn't scale CREATE TABLE transactions ( id BIGSERIAL PRIMARY KEY, account_id BIGINT NOT NULL, amount DECIMAL(18, 2) NOT NULL, type VARCHAR(20) NOT NULL, description TEXT, created_at TIMESTAMP NOT NULL, metadata JSONB ); CREATE INDEX idx_account_id ON transactions(account_id); CREATE INDEX idx_created_at ON transactions(created_at);
Why this fails at scale:
- Single table becomes massive (1B rows = slow queries)
- Indexes grow huge (B-tree depth increases)
- Write contention on sequence generation
- Cannot distribute across servers easily
- Backup/restore becomes nightmare
The Production-Ready Design
graph TB subgraph "Write Path" API[API Layer] --> Router[Partition Router] Router --> P1[Partition 2024-01] Router --> P2[Partition 2024-02] Router --> P3[Partition 2024-03] Router --> PN[Partition 2024-N] end subgraph "Read Path" Query[Query Layer] --> Aggregator[Result Aggregator] Aggregator --> P1 Aggregator --> P2 Aggregator --> P3 end subgraph "Archive Path" P1 --> Cold[Cold Storage S3] P2 --> Cold end
Go Implementation for Partition-Aware Writes
gopackage storage import ( "context" "database/sql" "fmt" "time" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" ) type Transaction struct { ID uuid.UUID AccountID int64 TransactionDate time.Time Amount float64 TransactionType int16 Status int16 ReferenceID string CounterpartyAccount *int64 CreatedAt time.Time } type TransactionRepository struct { pool *pgxpool.Pool } func NewTransactionRepository(pool *pgxpool.Pool) *TransactionRepository { return &TransactionRepository{pool: pool} } // Insert routes to correct partition automatically (PostgreSQL handles this) func (r *TransactionRepository) Insert(ctx context.Context, txn *Transaction) error { query := ` INSERT INTO transactions ( id, account_id, transaction_date, amount, transaction_type, status, reference_id, counterparty_account, created_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ` if txn.ID == uuid.Nil { txn.ID = uuid.New() } if txn.CreatedAt.IsZero() { txn.CreatedAt = time.Now() } if txn.TransactionDate.IsZero() { txn.TransactionDate = txn.CreatedAt.Truncate(24 * time.Hour) } _, err := r.pool.Exec(ctx, query, txn.ID, txn.AccountID, txn.TransactionDate, txn.Amount, txn.TransactionType, txn.Status, txn.ReferenceID, txn.CounterpartyAccount, txn.CreatedAt, ) return err } // GetByAccount - partition pruning happens automatically func (r *TransactionRepository) GetByAccount( ctx context.Context, accountID int64, startDate, endDate time.Time, limit int, ) ([]Transaction, error) { // PostgreSQL will only scan relevant partitions query := ` SELECT id, account_id, transaction_date, amount, transaction_type, status, reference_id, counterparty_account, created_at FROM transactions WHERE account_id = $1 AND transaction_date >= $2 AND transaction_date < $3 ORDER BY created_at DESC LIMIT $4 ` rows, err := r.pool.Query(ctx, query, accountID, startDate, endDate, limit) if err != nil { return nil, err } defer rows.Close() var transactions []Transaction for rows.Next() { var txn Transaction err := rows.Scan( &txn.ID, &txn.AccountID, &txn.TransactionDate, &txn.Amount, &txn.TransactionType, &txn.Status, &txn.ReferenceID, &txn.CounterpartyAccount, &txn.CreatedAt, ) if err != nil { return nil, err } transactions = append(transactions, txn) } return transactions, rows.Err() } // Automatic partition management type PartitionManager struct { pool *pgxpool.Pool } func (pm *PartitionManager) CreateFuturePartitions(ctx context.Context, monthsAhead int) error { now := time.Now() for i := 0; i < monthsAhead; i++ { targetMonth := now.AddDate(0, i, 0) nextMonth := targetMonth.AddDate(0, 1, 0) partitionName := fmt.Sprintf("transactions_%s", targetMonth.Format("2006_01")) startDate := time.Date(targetMonth.Year(), targetMonth.Month(), 1, 0, 0, 0, 0, time.UTC) endDate := time.Date(nextMonth.Year(), nextMonth.Month(), 1, 0, 0, 0, 0, time.UTC) query := fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s PARTITION OF transactions FOR VALUES FROM ('%s') TO ('%s') `, partitionName, startDate.Format("2006-01-02"), endDate.Format("2006-01-02")) if _, err := pm.pool.Exec(ctx, query); err != nil { return fmt.Errorf("failed to create partition %s: %w", partitionName, err) } // Create partition-local indexes indexQuery := fmt.Sprintf(` CREATE INDEX IF NOT EXISTS idx_%s_account ON %s(account_id, created_at DESC) `, partitionName, partitionName) if _, err := pm.pool.Exec(ctx, indexQuery); err != nil { return fmt.Errorf("failed to create index on %s: %w", partitionName, err) } } return nil } // Archive old partitions to cold storage func (pm *PartitionManager) ArchiveOldPartitions(ctx context.Context, olderThanMonths int) error { cutoffDate := time.Now().AddDate(0, -olderThanMonths, 0) partitionName := fmt.Sprintf("transactions_%s", cutoffDate.Format("2006_01")) // Export to S3/cold storage first (use pg_dump or COPY) // Then detach and drop detachQuery := fmt.Sprintf(` ALTER TABLE transactions DETACH PARTITION %s `, partitionName) _, err := pm.pool.Exec(ctx, detachQuery) return err }
What Happens Under Load?
Load Level: 1K TPS (Normal Day) ├── Single partition handles easily ├── Index fits in memory └── Response time: 2-5ms Load Level: 10K TPS (Month End) ├── Multiple partitions active ├── Connection pooling critical ├── Batch inserts recommended └── Response time: 5-15ms Load Level: 100K TPS (Diwali Sale) ├── Horizontal sharding needed ├── Read replicas for queries ├── Archive older partitions └── Response time: 10-50ms
Counter Questions & Answers
Q: Why UUID instead of BIGSERIAL for primary key?
UUIDs can be generated application-side without database round-trip. At high concurrency, sequence generation becomes a bottleneck. UUIDs also work better for distributed systems where multiple nodes insert data.
Q: Why partition by date instead of account_id?
Most queries are time-bound (last 30 days, this month). Date partitioning enables efficient pruning. Account-based sharding would scatter a user's transactions across all shards, making account statements expensive.
Q: How do you handle cross-partition queries?
PostgreSQL's partition pruning handles this automatically. For complex analytics, we use materialized views or a separate analytics database (ClickHouse/TimescaleDB).
Q: What about global indexes?
PostgreSQL doesn't support global indexes on partitioned tables. We use partition-local indexes. For globally unique constraints (like reference_id), we use application-level checks or a separate lookup table.
2. How Would You Partition a Transaction Table?
Partitioning Strategies Compared
mermaidgraph LR subgraph "Range Partitioning" R1[2024-01] --> R2[2024-02] --> R3[2024-03] end subgraph "List Partitioning" L1[CREDIT] L2[DEBIT] L3[TRANSFER] end subgraph "Hash Partitioning" H1[Hash 0-3] H2[Hash 4-7] H3[Hash 8-11] H4[Hash 12-15] end
Strategy 1: Time-Based Range Partitioning (Most Common)
sql-- Best for: Time-series data, audit logs, transactions CREATE TABLE transactions ( id UUID PRIMARY KEY, account_id BIGINT NOT NULL, created_at TIMESTAMP NOT NULL, amount DECIMAL(18,2) NOT NULL ) PARTITION BY RANGE (created_at); -- Monthly partitions CREATE TABLE transactions_2024_01 PARTITION OF transactions FOR VALUES FROM ('2024-01-01') TO ('2024-02-01'); -- Daily partitions for high-volume tables CREATE TABLE transactions_2024_01_01 PARTITION OF transactions FOR VALUES FROM ('2024-01-01') TO ('2024-01-02');
Pros:
- Natural data lifecycle (archive old partitions)
- Queries with date ranges are fast
- Easy maintenance (drop old partitions)
Cons:
- Hot partition problem (current month gets all writes)
- Uneven partition sizes (holiday months larger)
Strategy 2: Hash Partitioning (Even Distribution)
sql-- Best for: Even write distribution, no time-based queries CREATE TABLE user_sessions ( id UUID PRIMARY KEY, user_id BIGINT NOT NULL, session_data JSONB, created_at TIMESTAMP ) PARTITION BY HASH (user_id); -- Create 16 partitions (power of 2 recommended) CREATE TABLE user_sessions_0 PARTITION OF user_sessions FOR VALUES WITH (MODULUS 16, REMAINDER 0); CREATE TABLE user_sessions_1 PARTITION OF user_sessions FOR VALUES WITH (MODULUS 16, REMAINDER 1); -- ... up to 15
Pros:
- Even data distribution
- No hot partition problem
- Good for sharding across nodes
Cons:
- Cannot prune partitions for range queries
- Adding partitions requires rehashing (complex)
Strategy 3: Composite Partitioning (Best of Both)
sql-- First level: Range by date -- Second level: Hash by account CREATE TABLE transactions ( id UUID, account_id BIGINT NOT NULL, created_at TIMESTAMP NOT NULL, amount DECIMAL(18,2) ) PARTITION BY RANGE (created_at); CREATE TABLE transactions_2024_01 PARTITION OF transactions FOR VALUES FROM ('2024-01-01') TO ('2024-02-01') PARTITION BY HASH (account_id); CREATE TABLE transactions_2024_01_h0 PARTITION OF transactions_2024_01 FOR VALUES WITH (MODULUS 4, REMAINDER 0); CREATE TABLE transactions_2024_01_h1 PARTITION OF transactions_2024_01 FOR VALUES WITH (MODULUS 4, REMAINDER 1); -- ...
Go Implementation: Partition-Aware Query Optimizer
gopackage partition import ( "context" "fmt" "strings" "time" "github.com/jackc/pgx/v5/pgxpool" ) type QueryOptimizer struct { pool *pgxpool.Pool partitionPrefix string } // OptimizedQuery rewrites queries to target specific partitions func (qo *QueryOptimizer) OptimizedQuery( ctx context.Context, accountID int64, startDate, endDate time.Time, ) ([]Transaction, error) { // Calculate which partitions to query partitions := qo.calculatePartitions(startDate, endDate) if len(partitions) == 0 { return nil, fmt.Errorf("no partitions found for date range") } // For small number of partitions, query directly if len(partitions) <= 3 { return qo.queryPartitionsDirect(ctx, partitions, accountID, startDate, endDate) } // For many partitions, use UNION ALL with parallel execution return qo.queryPartitionsParallel(ctx, partitions, accountID, startDate, endDate) } func (qo *QueryOptimizer) calculatePartitions(start, end time.Time) []string { var partitions []string current := time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC) endMonth := time.Date(end.Year(), end.Month(), 1, 0, 0, 0, 0, time.UTC) for !current.After(endMonth) { partitionName := fmt.Sprintf("%s_%s", qo.partitionPrefix, current.Format("2006_01")) partitions = append(partitions, partitionName) current = current.AddDate(0, 1, 0) } return partitions } func (qo *QueryOptimizer) queryPartitionsDirect( ctx context.Context, partitions []string, accountID int64, startDate, endDate time.Time, ) ([]Transaction, error) { // Build UNION ALL query var queryParts []string for _, partition := range partitions { part := fmt.Sprintf(` SELECT id, account_id, amount, created_at FROM %s WHERE account_id = $1 AND created_at >= $2 AND created_at < $3 `, partition) queryParts = append(queryParts, part) } query := strings.Join(queryParts, " UNION ALL ") + " ORDER BY created_at DESC" rows, err := qo.pool.Query(ctx, query, accountID, startDate, endDate) if err != nil { return nil, err } defer rows.Close() var transactions []Transaction for rows.Next() { var txn Transaction if err := rows.Scan(&txn.ID, &txn.AccountID, &txn.Amount, &txn.CreatedAt); err != nil { return nil, err } transactions = append(transactions, txn) } return transactions, rows.Err() } // Parallel query for large date ranges func (qo *QueryOptimizer) queryPartitionsParallel( ctx context.Context, partitions []string, accountID int64, startDate, endDate time.Time, ) ([]Transaction, error) { type result struct { transactions []Transaction err error } results := make(chan result, len(partitions)) for _, partition := range partitions { go func(p string) { query := fmt.Sprintf(` SELECT id, account_id, amount, created_at FROM %s WHERE account_id = $1 AND created_at >= $2 AND created_at < $3 ORDER BY created_at DESC `, p) rows, err := qo.pool.Query(ctx, query, accountID, startDate, endDate) if err != nil { results <- result{err: err} return } defer rows.Close() var txns []Transaction for rows.Next() { var txn Transaction if err := rows.Scan(&txn.ID, &txn.AccountID, &txn.Amount, &txn.CreatedAt); err != nil { results <- result{err: err} return } txns = append(txns, txn) } results <- result{transactions: txns, err: rows.Err()} }(partition) } // Collect and merge results var allTransactions []Transaction for i := 0; i < len(partitions); i++ { r := <-results if r.err != nil { return nil, r.err } allTransactions = append(allTransactions, r.transactions...) } // Sort merged results sortByCreatedAtDesc(allTransactions) return allTransactions, nil }
Counter Questions & Answers
Q: How do you handle queries that span many partitions?
For 1-3 partitions, PostgreSQL handles it efficiently. For larger ranges, we either use parallel queries (as shown above), materialized views for common aggregations, or route to an analytics database.
Q: What's the ideal partition size?
Target 10-100 million rows per partition. Too small = overhead of managing many partitions. Too large = slow queries and maintenance.
Q: How do you add partitions without downtime?
PostgreSQL'sCREATE TABLE ... PARTITION OFis non-blocking. We run a cron job to create partitions 3 months ahead, so they're ready before needed.
3. When Would You Use SQL vs NoSQL?
The Decision Framework
mermaidgraph TD Start[New Data Store Needed] --> Q1{Need ACID transactions?} Q1 -->|Yes| Q2{Complex relationships?} Q1 -->|No| Q3{High write throughput?} Q2 -->|Yes| SQL[PostgreSQL/MySQL] Q2 -->|No| Q4{Need flexible schema?} Q3 -->|Yes| Q5{Time-series data?} Q3 -->|No| Q6{Need caching?} Q4 -->|Yes| Document[MongoDB/DynamoDB] Q4 -->|No| SQL Q5 -->|Yes| TimeSeries[InfluxDB/TimescaleDB] Q5 -->|No| KV[Cassandra/ScyllaDB] Q6 -->|Yes| Cache[Redis/Aerospike] Q6 -->|No| Document
SQL (PostgreSQL) - When to Use
go// Use Case 1: Financial Transactions (ACID required) type AccountService struct { db *sql.DB } func (s *AccountService) Transfer(ctx context.Context, from, to int64, amount float64) error { tx, err := s.db.BeginTx(ctx, &sql.TxOptions{ Isolation: sql.LevelSerializable, // Strongest isolation }) if err != nil { return err } defer tx.Rollback() // Debit source account result, err := tx.ExecContext(ctx, ` UPDATE accounts SET balance = balance - $1 WHERE id = $2 AND balance >= $1 `, amount, from) if err != nil { return err } rows, _ := result.RowsAffected() if rows == 0 { return ErrInsufficientBalance } // Credit destination account _, err = tx.ExecContext(ctx, ` UPDATE accounts SET balance = balance + $1 WHERE id = $2 `, amount, to) if err != nil { return err } // Record transaction _, err = tx.ExecContext(ctx, ` INSERT INTO transfers (from_account, to_account, amount, created_at) VALUES ($1, $2, $3, NOW()) `, from, to, amount) if err != nil { return err } return tx.Commit() } // Use Case 2: Complex Queries with JOINs func (s *AccountService) GetCustomerPortfolio(ctx context.Context, customerID int64) (*Portfolio, error) { query := ` SELECT c.name, a.account_number, a.balance, COUNT(t.id) as transaction_count, SUM(CASE WHEN t.type = 'CREDIT' THEN t.amount ELSE 0 END) as total_credits, SUM(CASE WHEN t.type = 'DEBIT' THEN t.amount ELSE 0 END) as total_debits FROM customers c JOIN accounts a ON c.id = a.customer_id LEFT JOIN transactions t ON a.id = t.account_id WHERE c.id = $1 GROUP BY c.name, a.account_number, a.balance ` rows, err := s.db.QueryContext(ctx, query, customerID) // ... process rows }
NoSQL (MongoDB) - When to Use
go// Use Case 1: Flexible Schema (Product Catalog) type ProductRepository struct { collection *mongo.Collection } // Different products have different attributes type Product struct { ID primitive.ObjectID `bson:"_id"` Name string `bson:"name"` Category string `bson:"category"` Price float64 `bson:"price"` Attributes map[string]any `bson:"attributes"` // Flexible! } // Electronics might have: screen_size, battery_life, processor // Clothing might have: size, color, material, fit_type // Books might have: author, isbn, pages, genre func (r *ProductRepository) Create(ctx context.Context, product *Product) error { product.ID = primitive.NewObjectID() _, err := r.collection.InsertOne(ctx, product) return err } // Use Case 2: Nested Documents (User Preferences) type UserPreferences struct { UserID string `bson:"user_id"` NotificationPrefs NotificationSettings `bson:"notifications"` UISettings UIPreferences `bson:"ui"` RecentSearches []string `bson:"recent_searches"` SavedItems []SavedItem `bson:"saved_items"` } type NotificationSettings struct { Email bool `bson:"email"` Push bool `bson:"push"` SMS bool `bson:"sms"` Quiet []string `bson:"quiet_hours"` // ["22:00-07:00"] Topics []string `bson:"topics"` }
Key-Value (Redis) - When to Use
go// Use Case 1: Session Storage type SessionStore struct { client *redis.Client } func (s *SessionStore) CreateSession(ctx context.Context, userID int64, ttl time.Duration) (string, error) { sessionID := uuid.New().String() sessionData := map[string]interface{}{ "user_id": userID, "created_at": time.Now().Unix(), "ip": "...", } data, _ := json.Marshal(sessionData) err := s.client.Set(ctx, "session:"+sessionID, data, ttl).Err() return sessionID, err } // Use Case 2: Rate Limiting func (s *SessionStore) CheckRateLimit(ctx context.Context, key string, limit int, window time.Duration) (bool, error) { pipe := s.client.Pipeline() incr := pipe.Incr(ctx, key) pipe.Expire(ctx, key, window) _, err := pipe.Exec(ctx) if err != nil { return false, err } return incr.Val() <= int64(limit), nil } // Use Case 3: Leaderboards func (s *SessionStore) UpdateScore(ctx context.Context, leaderboard string, userID string, score float64) error { return s.client.ZAdd(ctx, leaderboard, redis.Z{ Score: score, Member: userID, }).Err() } func (s *SessionStore) GetTopPlayers(ctx context.Context, leaderboard string, count int) ([]redis.Z, error) { return s.client.ZRevRangeWithScores(ctx, leaderboard, 0, int64(count-1)).Result() }
Wide-Column (Cassandra) - When to Use
go// Use Case: Time-Series IoT Data // Cassandra excels at: high write throughput, time-series, distributed type SensorDataRepository struct { session *gocql.Session } // Table design: partition by sensor + day, cluster by timestamp // CREATE TABLE sensor_data ( // sensor_id text, // day date, // timestamp timestamp, // temperature double, // humidity double, // PRIMARY KEY ((sensor_id, day), timestamp) // ) WITH CLUSTERING ORDER BY (timestamp DESC); func (r *SensorDataRepository) Insert(ctx context.Context, sensorID string, temp, humidity float64) error { now := time.Now() day := now.Format("2006-01-02") return r.session.Query(` INSERT INTO sensor_data (sensor_id, day, timestamp, temperature, humidity) VALUES (?, ?, ?, ?, ?) `, sensorID, day, now, temp, humidity).Exec() } // Efficient query: reads single partition func (r *SensorDataRepository) GetTodayReadings(ctx context.Context, sensorID string) ([]SensorReading, error) { day := time.Now().Format("2006-01-02") iter := r.session.Query(` SELECT timestamp, temperature, humidity FROM sensor_data WHERE sensor_id = ? AND day = ? ORDER BY timestamp DESC LIMIT 100 `, sensorID, day).Iter() var readings []SensorReading var reading SensorReading for iter.Scan(&reading.Timestamp, &reading.Temperature, &reading.Humidity) { readings = append(readings, reading) } return readings, iter.Close() }
Decision Matrix
| Requirement | SQL | Document | Key-Value | Wide-Column |
|---|---|---|---|---|
| ACID Transactions | ✅ Best | ⚠️ Limited | ❌ No | ❌ No |
| Complex JOINs | ✅ Best | ❌ Poor | ❌ No | ❌ No |
| Flexible Schema | ❌ Rigid | ✅ Best | ⚠️ Limited | ⚠️ Limited |
| Write Throughput | ⚠️ Medium | ✅ Good | ✅ Best | ✅ Best |
| Read Latency | ✅ Good | ✅ Good | ✅ Best | ✅ Good |
| Horizontal Scaling | ⚠️ Hard | ✅ Good | ✅ Best | ✅ Best |
| Time-Series | ⚠️ Extensions | ❌ Poor | ⚠️ Limited | ✅ Best |
4. How Do You Avoid Hotspot Keys?
Understanding Hotspots
mermaidgraph TB subgraph "Hotspot Problem" Client1[Client 1] --> Key[Popular Key] Client2[Client 2] --> Key Client3[Client 3] --> Key ClientN[Client N] --> Key Key --> Node1[Single Node - Overloaded!] end subgraph "After Distribution" C1[Client 1] --> K1[Key:shard0] C2[Client 2] --> K2[Key:shard1] C3[Client 3] --> K3[Key:shard2] CN[Client N] --> KN[Key:shardN] K1 --> N1[Node 1] K2 --> N2[Node 2] K3 --> N3[Node 3] KN --> NN[Node N] end
Strategy 1: Key Sharding (Read Distribution)
gopackage hotspot import ( "context" "fmt" "hash/fnv" "sync" "github.com/redis/go-redis/v9" ) const numShards = 16 type ShardedCounter struct { client *redis.ClusterClient } // Instead of: INCR view_count:product:123 // We use: INCR view_count:product:123:shard:7 func (sc *ShardedCounter) Increment(ctx context.Context, productID string) error { // Distribute writes across shards shard := sc.getShard(productID) key := fmt.Sprintf("view_count:product:%s:shard:%d", productID, shard) return sc.client.Incr(ctx, key).Err() } func (sc *ShardedCounter) getShard(productID string) int { h := fnv.New32a() h.Write([]byte(productID)) h.Write([]byte(fmt.Sprintf("%d", time.Now().UnixNano()))) // Add randomness return int(h.Sum32()) % numShards } // Aggregate for reads (less frequent) func (sc *ShardedCounter) GetCount(ctx context.Context, productID string) (int64, error) { pipe := sc.client.Pipeline() cmds := make([]*redis.StringCmd, numShards) for i := 0; i < numShards; i++ { key := fmt.Sprintf("view_count:product:%s:shard:%d", productID, i) cmds[i] = pipe.Get(ctx, key) } _, err := pipe.Exec(ctx) if err != nil && err != redis.Nil { return 0, err } var total int64 for _, cmd := range cmds { val, _ := cmd.Int64() total += val } return total, nil } // Batch aggregation for efficiency func (sc *ShardedCounter) GetCountsCached(ctx context.Context, productID string) (int64, error) { cacheKey := fmt.Sprintf("view_count:product:%s:cached", productID) // Try cache first cached, err := sc.client.Get(ctx, cacheKey).Int64() if err == nil { return cached, nil } // Aggregate from shards count, err := sc.GetCount(ctx, productID) if err != nil { return 0, err } // Cache aggregated value for 5 seconds sc.client.Set(ctx, cacheKey, count, 5*time.Second) return count, nil }
Strategy 2: Time-Based Bucketing
go// For time-series data, bucket by time windows type TimeBasedCounter struct { client *redis.Client } func (tc *TimeBasedCounter) IncrementWithTimeBucket(ctx context.Context, key string) error { // Bucket by minute bucket := time.Now().Unix() / 60 bucketKey := fmt.Sprintf("%s:bucket:%d", key, bucket) pipe := tc.client.Pipeline() pipe.Incr(ctx, bucketKey) pipe.Expire(ctx, bucketKey, 2*time.Hour) // Keep 2 hours of buckets _, err := pipe.Exec(ctx) return err } func (tc *TimeBasedCounter) GetCountLastHour(ctx context.Context, key string) (int64, error) { now := time.Now().Unix() / 60 pipe := tc.client.Pipeline() cmds := make([]*redis.StringCmd, 60) for i := 0; i < 60; i++ { bucket := now - int64(i) bucketKey := fmt.Sprintf("%s:bucket:%d", key, bucket) cmds[i] = pipe.Get(ctx, bucketKey) } pipe.Exec(ctx) var total int64 for _, cmd := range cmds { val, _ := cmd.Int64() total += val } return total, nil }
Strategy 3: Local Aggregation (Write Buffering)
go// Buffer writes locally, flush periodically type BufferedCounter struct { client *redis.Client buffer map[string]int64 mu sync.Mutex batchSize int flushCh chan struct{} } func NewBufferedCounter(client *redis.Client, batchSize int) *BufferedCounter { bc := &BufferedCounter{ client: client, buffer: make(map[string]int64), batchSize: batchSize, flushCh: make(chan struct{}, 1), } go bc.flushLoop() return bc } func (bc *BufferedCounter) Increment(ctx context.Context, key string) { bc.mu.Lock() bc.buffer[key]++ shouldFlush := len(bc.buffer) >= bc.batchSize bc.mu.Unlock() if shouldFlush { select { case bc.flushCh <- struct{}{}: default: } } } func (bc *BufferedCounter) flushLoop() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-ticker.C: bc.flush() case <-bc.flushCh: bc.flush() } } } func (bc *BufferedCounter) flush() { bc.mu.Lock() if len(bc.buffer) == 0 { bc.mu.Unlock() return } toFlush := bc.buffer bc.buffer = make(map[string]int64) bc.mu.Unlock() ctx := context.Background() pipe := bc.client.Pipeline() for key, count := range toFlush { pipe.IncrBy(ctx, key, count) } pipe.Exec(ctx) }
Strategy 4: Consistent Hashing for Distributed Systems
go// Distribute hot keys across multiple Redis nodes type ConsistentHashRing struct { nodes []string replicas int ring map[uint32]string keys []uint32 mu sync.RWMutex } func NewConsistentHashRing(nodes []string, replicas int) *ConsistentHashRing { chr := &ConsistentHashRing{ nodes: nodes, replicas: replicas, ring: make(map[uint32]string), } for _, node := range nodes { for i := 0; i < replicas; i++ { hash := chr.hash(fmt.Sprintf("%s:%d", node, i)) chr.ring[hash] = node chr.keys = append(chr.keys, hash) } } sort.Slice(chr.keys, func(i, j int) bool { return chr.keys[i] < chr.keys[j] }) return chr } func (chr *ConsistentHashRing) hash(key string) uint32 { h := fnv.New32a() h.Write([]byte(key)) return h.Sum32() } func (chr *ConsistentHashRing) GetNode(key string) string { chr.mu.RLock() defer chr.mu.RUnlock() hash := chr.hash(key) // Binary search for the first node >= hash idx := sort.Search(len(chr.keys), func(i int) bool { return chr.keys[i] >= hash }) if idx >= len(chr.keys) { idx = 0 } return chr.ring[chr.keys[idx]] }
Counter Questions & Answers
Q: How do you handle hotspots in database primary keys?
Use composite keys that include a distribution element:(customer_id, order_id)instead of justorder_id. For write-heavy tables, consider UUID or ULID which distribute better than sequential IDs.
Q: What about hotspots in Kafka partitions?
If one key produces most messages, either: (1) add a random suffix to partition key, (2) use round-robin for that specific key, or (3) create a dedicated high-throughput topic for hot keys.
Q: How do you detect hotspots before they cause problems?
Monitor key access patterns: Redis SLOWLOG, database query logs, custom metrics on key access frequency. Set up alerts when any single key exceeds threshold requests/second.
5. Read Replica Lag Handling
Understanding Replica Lag
mermaidsequenceDiagram participant Client participant Primary participant Replica Client->>Primary: INSERT user (id=1) Primary->>Primary: Write to WAL Primary-->>Replica: Replicate (async) Note over Replica: 50-200ms lag Client->>Replica: SELECT user WHERE id=1 Replica-->>Client: NOT FOUND! (stale)
Strategy 1: Read-Your-Writes Consistency
gopackage replication import ( "context" "database/sql" "sync" "time" ) type ReadYourWritesDB struct { primary *sql.DB replica *sql.DB recentWrites map[string]time.Time // key -> last write time mu sync.RWMutex consistencyWindow time.Duration } func NewReadYourWritesDB(primary, replica *sql.DB) *ReadYourWritesDB { db := &ReadYourWritesDB{ primary: primary, replica: replica, recentWrites: make(map[string]time.Time), consistencyWindow: 5 * time.Second, // Expected max lag } go db.cleanupLoop() return db } // Write always goes to primary func (db *ReadYourWritesDB) Write(ctx context.Context, key string, query string, args ...interface{}) (sql.Result, error) { result, err := db.primary.ExecContext(ctx, query, args...) if err == nil { db.mu.Lock() db.recentWrites[key] = time.Now() db.mu.Unlock() } return result, err } // Read checks if we need consistency func (db *ReadYourWritesDB) Read(ctx context.Context, key string, query string, args ...interface{}) (*sql.Rows, error) { db.mu.RLock() lastWrite, exists := db.recentWrites[key] db.mu.RUnlock() // If recently written, read from primary if exists && time.Since(lastWrite) < db.consistencyWindow { return db.primary.QueryContext(ctx, query, args...) } // Safe to read from replica return db.replica.QueryContext(ctx, query, args...) } func (db *ReadYourWritesDB) cleanupLoop() { ticker := time.NewTicker(time.Minute) for range ticker.C { db.mu.Lock() now := time.Now() for key, writeTime := range db.recentWrites { if now.Sub(writeTime) > db.consistencyWindow*2 { delete(db.recentWrites, key) } } db.mu.Unlock() } }
Strategy 2: LSN-Based Consistency
go// Use PostgreSQL's LSN (Log Sequence Number) for precise consistency type LSNAwareDB struct { primary *pgxpool.Pool replica *pgxpool.Pool } type WriteResult struct { Result pgconn.CommandTag LSN uint64 } func (db *LSNAwareDB) WriteWithLSN(ctx context.Context, query string, args ...interface{}) (*WriteResult, error) { var lsn uint64 // Execute write and get current LSN err := db.primary.QueryRow(ctx, ` WITH write AS ( `+query+` RETURNING 1 ) SELECT pg_current_wal_lsn()::text::bigint `, args...).Scan(&lsn) if err != nil { return nil, err } return &WriteResult{LSN: lsn}, nil } func (db *LSNAwareDB) ReadAfterLSN(ctx context.Context, minLSN uint64, query string, args ...interface{}) (*pgx.Rows, error) { // Check replica's replay position var replayLSN uint64 err := db.replica.QueryRow(ctx, ` SELECT pg_last_wal_replay_lsn()::text::bigint `).Scan(&replayLSN) if err != nil { return nil, err } // If replica hasn't caught up, read from primary if replayLSN < minLSN { return db.primary.Query(ctx, query, args...) } return db.replica.Query(ctx, query, args...) } // Client usage with session type Session struct { db *LSNAwareDB minLSN uint64 mu sync.Mutex } func (s *Session) Write(ctx context.Context, query string, args ...interface{}) error { result, err := s.db.WriteWithLSN(ctx, query, args...) if err != nil { return err } s.mu.Lock() if result.LSN > s.minLSN { s.minLSN = result.LSN } s.mu.Unlock() return nil } func (s *Session) Read(ctx context.Context, query string, args ...interface{}) (*pgx.Rows, error) { s.mu.Lock() minLSN := s.minLSN s.mu.Unlock() return s.db.ReadAfterLSN(ctx, minLSN, query, args...) }
Strategy 3: Synchronous Replication for Critical Reads
go// For critical operations, wait for replica confirmation type SyncReplicaDB struct { primary *pgxpool.Pool } func (db *SyncReplicaDB) WriteAndWait(ctx context.Context, query string, args ...interface{}) error { // Start transaction with synchronous commit tx, err := db.primary.BeginTx(ctx, pgx.TxOptions{}) if err != nil { return err } defer tx.Rollback(ctx) // Execute the write _, err = tx.Exec(ctx, query, args...) if err != nil { return err } // Set synchronous commit for this transaction // This waits for at least one replica to confirm _, err = tx.Exec(ctx, "SET LOCAL synchronous_commit = 'on'") if err != nil { return err } return tx.Commit(ctx) } // For even stronger guarantees func (db *SyncReplicaDB) WriteAndWaitAllReplicas(ctx context.Context, query string, args ...interface{}) error { tx, err := db.primary.BeginTx(ctx, pgx.TxOptions{}) if err != nil { return err } defer tx.Rollback(ctx) _, err = tx.Exec(ctx, query, args...) if err != nil { return err } // Wait for ALL replicas (strongest guarantee, slowest) _, err = tx.Exec(ctx, "SET LOCAL synchronous_commit = 'remote_apply'") if err != nil { return err } return tx.Commit(ctx) }
Monitoring Replica Lag
gotype ReplicaMonitor struct { primary *pgxpool.Pool replica *pgxpool.Pool metrics MetricsClient } func (rm *ReplicaMonitor) CheckLag(ctx context.Context) (time.Duration, error) { var lagBytes int64 var lagSeconds float64 // Method 1: Byte lag err := rm.replica.QueryRow(ctx, ` SELECT pg_wal_lsn_diff( pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn() ) `).Scan(&lagBytes) if err != nil { return 0, err } // Method 2: Time-based lag (more intuitive) err = rm.replica.QueryRow(ctx, ` SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) `).Scan(&lagSeconds) if err != nil { return 0, err } lag := time.Duration(lagSeconds * float64(time.Second)) rm.metrics.Gauge("replica.lag.bytes", float64(lagBytes)) rm.metrics.Gauge("replica.lag.seconds", lagSeconds) return lag, nil } func (rm *ReplicaMonitor) StartMonitoring(ctx context.Context) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: lag, err := rm.CheckLag(ctx) if err != nil { log.Printf("Failed to check replica lag: %v", err) continue } if lag > 5*time.Second { log.Printf("WARNING: Replica lag is %v", lag) // Could trigger alerts or traffic shifting } } } }
6. Connection Pool Sizing
The Science of Pool Sizing
mermaidgraph LR subgraph "Too Small Pool" R1[Request] --> W1[Waiting...] R2[Request] --> W2[Waiting...] R3[Request] --> W3[Waiting...] W1 --> C1[1 Conn] W2 --> C2[2 Conn] end subgraph "Right-Sized Pool" RR1[Request] --> CC1[Conn 1] RR2[Request] --> CC2[Conn 2] RR3[Request] --> CC3[Conn 3] RR4[Request] --> CC4[Conn 4] end subgraph "Too Large Pool" RRR1[Request] --> CCC1[Conn 1 - Active] RRR2[Request] --> CCC2[Conn 2 - Active] CCC3[Conn 3 - Idle] CCC4[Conn 4 - Idle] CCC5[Conn 5 - Idle] CCCN[Conn N - Wasted RAM] end
Formula: Optimal Pool Size
Pool Size = (Core Count * 2) + Effective Spindle Count For SSD-based systems: Pool Size = Core Count * 2 to 4 For most Go applications: Pool Size = Number of CPU cores * (1 + (Wait Time / Service Time))
Production Connection Pool Implementation
gopackage database import ( "context" "fmt" "runtime" "time" "github.com/jackc/pgx/v5/pgxpool" ) type PoolConfig struct { MaxConns int32 MinConns int32 MaxConnLifetime time.Duration MaxConnIdleTime time.Duration HealthCheckPeriod time.Duration } func OptimalPoolConfig() PoolConfig { cpuCount := runtime.NumCPU() // Base formula for OLTP workloads maxConns := int32(cpuCount * 2) if maxConns < 10 { maxConns = 10 // Minimum for burst handling } if maxConns > 100 { maxConns = 100 // Database typically can't handle more efficiently } return PoolConfig{ MaxConns: maxConns, MinConns: int32(cpuCount), // Keep some warm MaxConnLifetime: time.Hour, // Prevent stale connections MaxConnIdleTime: 30 * time.Minute, // Release idle connections HealthCheckPeriod: time.Minute, // Detect dead connections } } func NewPool(ctx context.Context, connString string, cfg PoolConfig) (*pgxpool.Pool, error) { poolCfg, err := pgxpool.ParseConfig(connString) if err != nil { return nil, err } poolCfg.MaxConns = cfg.MaxConns poolCfg.MinConns = cfg.MinConns poolCfg.MaxConnLifetime = cfg.MaxConnLifetime poolCfg.MaxConnIdleTime = cfg.MaxConnIdleTime poolCfg.HealthCheckPeriod = cfg.HealthCheckPeriod // Connection setup hook poolCfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { // Set statement timeout to prevent runaway queries _, err := conn.Exec(ctx, "SET statement_timeout = '30s'") return err } pool, err := pgxpool.NewWithConfig(ctx, poolCfg) if err != nil { return nil, err } // Verify connectivity if err := pool.Ping(ctx); err != nil { pool.Close() return nil, fmt.Errorf("failed to ping database: %w", err) } return pool, nil } // Adaptive pool that adjusts based on load type AdaptivePool struct { pool *pgxpool.Pool metrics *PoolMetrics scalingEnabled bool } type PoolMetrics struct { AcquireCount int64 AcquireDuration time.Duration AcquiredConns int32 IdleConns int32 TotalConns int32 MaxConns int32 WaitCount int64 // Requests that had to wait WaitDuration time.Duration } func (ap *AdaptivePool) CollectMetrics() *PoolMetrics { stats := ap.pool.Stat() return &PoolMetrics{ AcquireCount: stats.AcquireCount(), AcquireDuration: stats.AcquireDuration(), AcquiredConns: stats.AcquiredConns(), IdleConns: stats.IdleConns(), TotalConns: stats.TotalConns(), MaxConns: stats.MaxConns(), // Calculate wait metrics WaitCount: stats.EmptyAcquireCount(), WaitDuration: stats.AcquireDuration() - (time.Duration(stats.AcquireCount()) * time.Millisecond), } } func (ap *AdaptivePool) ShouldScale() (bool, string) { metrics := ap.CollectMetrics() utilizationPct := float64(metrics.AcquiredConns) / float64(metrics.MaxConns) * 100 // Scale up indicators if utilizationPct > 80 { return true, "high_utilization" } avgWaitTime := time.Duration(0) if metrics.WaitCount > 0 { avgWaitTime = metrics.WaitDuration / time.Duration(metrics.WaitCount) } if avgWaitTime > 100*time.Millisecond { return true, "high_wait_time" } // Scale down indicators if utilizationPct < 20 && metrics.IdleConns > 10 { return true, "low_utilization" } return false, "" }
Connection Pool for Multiple Services
go// When your app connects to multiple databases type MultiPool struct { pools map[string]*pgxpool.Pool mu sync.RWMutex } func NewMultiPool() *MultiPool { return &MultiPool{ pools: make(map[string]*pgxpool.Pool), } } func (mp *MultiPool) AddPool(name string, pool *pgxpool.Pool) { mp.mu.Lock() defer mp.mu.Unlock() mp.pools[name] = pool } func (mp *MultiPool) Get(name string) (*pgxpool.Pool, error) { mp.mu.RLock() defer mp.mu.RUnlock() pool, ok := mp.pools[name] if !ok { return nil, fmt.Errorf("pool %s not found", name) } return pool, nil } // Graceful shutdown func (mp *MultiPool) Close() { mp.mu.Lock() defer mp.mu.Unlock() for name, pool := range mp.pools { log.Printf("Closing pool: %s", name) pool.Close() } } // Health check all pools func (mp *MultiPool) HealthCheck(ctx context.Context) map[string]error { mp.mu.RLock() defer mp.mu.RUnlock() results := make(map[string]error) var wg sync.WaitGroup var mu sync.Mutex for name, pool := range mp.pools { wg.Add(1) go func(n string, p *pgxpool.Pool) { defer wg.Done() checkCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() err := p.Ping(checkCtx) mu.Lock() results[n] = err mu.Unlock() }(name, pool) } wg.Wait() return results }
Counter Questions & Answers
Q: What happens when pool is exhausted?
Requests wait for available connections (with timeout). If wait exceeds timeout, request fails with "connection pool exhausted" error. Monitorempty_acquire_countmetric.
Q: Why not just set max connections very high?
Each connection consumes: ~10MB RAM on database server, file descriptors, process slots. Too many connections = memory pressure, context switching overhead, actually SLOWER performance.
Q: How do you handle connection storms after deployment?
UseMinConnsto pre-warm connections. Implement gradual traffic ramping. Consider connection proxy like PgBouncer.
7. Optimistic vs Pessimistic Locking
Understanding the Difference
mermaidsequenceDiagram participant U1 as User 1 participant U2 as User 2 participant DB as Database Note over U1,DB: PESSIMISTIC LOCKING U1->>DB: SELECT FOR UPDATE (Lock acquired) U2->>DB: SELECT FOR UPDATE (Blocked!) U1->>DB: UPDATE balance U1->>DB: COMMIT (Lock released) U2->>DB: Now gets lock Note over U1,DB: OPTIMISTIC LOCKING U1->>DB: SELECT (version=1) U2->>DB: SELECT (version=1) U1->>DB: UPDATE WHERE version=1 (Success, version=2) U2->>DB: UPDATE WHERE version=1 (FAILS - version changed!) U2->>DB: Retry: SELECT (version=2) U2->>DB: UPDATE WHERE version=2 (Success)
Pessimistic Locking Implementation
gopackage locking import ( "context" "database/sql" "errors" "fmt" ) var ( ErrInsufficientFunds = errors.New("insufficient funds") ErrAccountNotFound = errors.New("account not found") ) type Account struct { ID int64 Balance float64 Version int64 } type PessimisticLockingService struct { db *sql.DB } // Transfer with pessimistic locking - use when conflicts are COMMON func (s *PessimisticLockingService) Transfer( ctx context.Context, fromID, toID int64, amount float64, ) error { tx, err := s.db.BeginTx(ctx, &sql.TxOptions{ Isolation: sql.LevelSerializable, }) if err != nil { return fmt.Errorf("begin transaction: %w", err) } defer tx.Rollback() // Lock accounts in consistent order to prevent deadlocks // Always lock lower ID first firstID, secondID := fromID, toID if fromID > toID { firstID, secondID = toID, fromID } // Lock first account var firstBalance float64 err = tx.QueryRowContext(ctx, ` SELECT balance FROM accounts WHERE id = $1 FOR UPDATE `, firstID).Scan(&firstBalance) if err != nil { return fmt.Errorf("lock first account: %w", err) } // Lock second account var secondBalance float64 err = tx.QueryRowContext(ctx, ` SELECT balance FROM accounts WHERE id = $1 FOR UPDATE `, secondID).Scan(&secondBalance) if err != nil { return fmt.Errorf("lock second account: %w", err) } // Determine balances based on transfer direction var fromBalance float64 if fromID == firstID { fromBalance = firstBalance } else { fromBalance = secondBalance } // Check sufficient funds if fromBalance < amount { return ErrInsufficientFunds } // Perform transfer _, err = tx.ExecContext(ctx, ` UPDATE accounts SET balance = balance - $1 WHERE id = $2 `, amount, fromID) if err != nil { return fmt.Errorf("debit: %w", err) } _, err = tx.ExecContext(ctx, ` UPDATE accounts SET balance = balance + $1 WHERE id = $2 `, amount, toID) if err != nil { return fmt.Errorf("credit: %w", err) } return tx.Commit() } // NOWAIT variant - fail immediately if lock unavailable func (s *PessimisticLockingService) TransferNoWait( ctx context.Context, fromID, toID int64, amount float64, ) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // Try to acquire lock without waiting var balance float64 err = tx.QueryRowContext(ctx, ` SELECT balance FROM accounts WHERE id = $1 FOR UPDATE NOWAIT `, fromID).Scan(&balance) if err != nil { // Check if it's a lock conflict if strings.Contains(err.Error(), "could not obtain lock") { return ErrLockConflict } return err } // ... rest of transfer logic return tx.Commit() } // SKIP LOCKED - process only unlocked rows (useful for job queues) func (s *PessimisticLockingService) ClaimJobs(ctx context.Context, limit int) ([]Job, error) { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() rows, err := tx.QueryContext(ctx, ` SELECT id, payload FROM job_queue WHERE status = 'pending' ORDER BY created_at LIMIT $1 FOR UPDATE SKIP LOCKED `, limit) if err != nil { return nil, err } defer rows.Close() var jobs []Job var ids []int64 for rows.Next() { var job Job if err := rows.Scan(&job.ID, &job.Payload); err != nil { return nil, err } jobs = append(jobs, job) ids = append(ids, job.ID) } if len(ids) > 0 { // Mark as processing _, err = tx.ExecContext(ctx, ` UPDATE job_queue SET status = 'processing', started_at = NOW() WHERE id = ANY($1) `, pq.Array(ids)) if err != nil { return nil, err } } return jobs, tx.Commit() }
Optimistic Locking Implementation
go// Use when conflicts are RARE - better performance, no blocking type OptimisticLockingService struct { db *sql.DB } func (s *OptimisticLockingService) UpdateAccountBalance( ctx context.Context, accountID int64, newBalance float64, expectedVersion int64, ) error { result, err := s.db.ExecContext(ctx, ` UPDATE accounts SET balance = $1, version = version + 1, updated_at = NOW() WHERE id = $2 AND version = $3 `, newBalance, accountID, expectedVersion) if err != nil { return fmt.Errorf("update failed: %w", err) } rowsAffected, _ := result.RowsAffected() if rowsAffected == 0 { return ErrOptimisticLockConflict } return nil } // With automatic retry func (s *OptimisticLockingService) TransferWithRetry( ctx context.Context, fromID, toID int64, amount float64, maxRetries int, ) error { for attempt := 0; attempt < maxRetries; attempt++ { err := s.tryTransfer(ctx, fromID, toID, amount) if err == nil { return nil } if !errors.Is(err, ErrOptimisticLockConflict) { return err // Non-retryable error } // Exponential backoff backoff := time.Duration(1<<uint(attempt)) * 10 * time.Millisecond jitter := time.Duration(rand.Intn(10)) * time.Millisecond time.Sleep(backoff + jitter) } return fmt.Errorf("max retries exceeded") } func (s *OptimisticLockingService) tryTransfer( ctx context.Context, fromID, toID int64, amount float64, ) error { // Read current state var fromBalance float64 var fromVersion int64 var toVersion int64 err := s.db.QueryRowContext(ctx, ` SELECT balance, version FROM accounts WHERE id = $1 `, fromID).Scan(&fromBalance, &fromVersion) if err != nil { return err } if fromBalance < amount { return ErrInsufficientFunds } err = s.db.QueryRowContext(ctx, ` SELECT version FROM accounts WHERE id = $1 `, toID).Scan(&toVersion) if err != nil { return err } // Start transaction for atomic update tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // Update with version check result, err := tx.ExecContext(ctx, ` UPDATE accounts SET balance = balance - $1, version = version + 1 WHERE id = $2 AND version = $3 `, amount, fromID, fromVersion) if err != nil { return err } if rows, _ := result.RowsAffected(); rows == 0 { return ErrOptimisticLockConflict } result, err = tx.ExecContext(ctx, ` UPDATE accounts SET balance = balance + $1, version = version + 1 WHERE id = $2 AND version = $3 `, amount, toID, toVersion) if err != nil { return err } if rows, _ := result.RowsAffected(); rows == 0 { return ErrOptimisticLockConflict } return tx.Commit() }
When to Use Which?
| Scenario | Pessimistic | Optimistic |
|---|---|---|
| High contention (same row updated frequently) | ✅ | ❌ |
| Low contention (rare conflicts) | ❌ | ✅ |
| Long-running transactions | ❌ | ✅ |
| Short transactions | ✅ | ✅ |
| Requires real-time consistency | ✅ | ⚠️ |
| High throughput needed | ❌ | ✅ |
| Banking/Financial | ✅ | ⚠️ |
| E-commerce cart | ❌ | ✅ |
| Inventory management | ✅ | ⚠️ |
| User profile updates | ❌ | ✅ |
Counter Questions & Answers
Q: How do you prevent deadlocks with pessimistic locking?
Always acquire locks in consistent order (e.g., by ID ascending). Use lock timeouts. ConsiderFOR UPDATE NOWAITorSKIP LOCKED.
Q: What's the performance impact of optimistic locking retries?
Each retry = extra database round trips. If conflict rate > 10%, pessimistic locking may be more efficient. Monitor retry metrics.
Q: Can you combine both approaches?
Yes! Use optimistic for most operations, switch to pessimistic for high-contention scenarios detected at runtime.
8. Database Indexing Strategies
Index Types and When to Use Them
mermaidgraph TB subgraph "B-Tree Index" BT[Best for: =, <, >, BETWEEN, ORDER BY] BT --> BTE1[Equality: WHERE id = 5] BT --> BTE2[Range: WHERE date > '2024-01-01'] BT --> BTE3[Sorting: ORDER BY created_at] end subgraph "Hash Index" HI[Best for: = only] HI --> HIE1[Exact match: WHERE email = 'x@y.com'] end subgraph "GIN Index" GI[Best for: Arrays, JSONB, Full-text] GI --> GIE1[JSONB: WHERE data @> '{"key": "val"}'] GI --> GIE2[Array: WHERE tags @> ARRAY['go']] end subgraph "BRIN Index" BR[Best for: Large tables, natural ordering] BR --> BRE1[Time-series: WHERE timestamp > x] BR --> BRE2[Sequential: WHERE id BETWEEN x AND y] end
Practical Indexing Examples
gopackage indexing // Query patterns determine index design type TransactionQueries struct { db *sql.DB } /* Schema: CREATE TABLE transactions ( id UUID PRIMARY KEY, account_id BIGINT NOT NULL, amount DECIMAL(18, 2) NOT NULL, status VARCHAR(20) NOT NULL, transaction_type VARCHAR(20) NOT NULL, reference_id VARCHAR(64) NOT NULL, metadata JSONB, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL ); Index Strategy Based on Query Patterns: */ // Query 1: Get transactions by account (MOST COMMON) // SELECT * FROM transactions WHERE account_id = ? ORDER BY created_at DESC LIMIT 20 // Index: CREATE INDEX idx_txn_account_date ON transactions(account_id, created_at DESC); // Query 2: Find by reference ID (UNIQUE LOOKUP) // SELECT * FROM transactions WHERE reference_id = ? // Index: CREATE UNIQUE INDEX idx_txn_reference ON transactions(reference_id); // Query 3: Filter by status and date range (REPORTS) // SELECT * FROM transactions WHERE status = 'PENDING' AND created_at > ? // Index: CREATE INDEX idx_txn_status_date ON transactions(status, created_at) WHERE status != 'COMPLETED'; // Query 4: Search in JSONB metadata // SELECT * FROM transactions WHERE metadata @> '{"merchant_id": "123"}' // Index: CREATE INDEX idx_txn_metadata ON transactions USING GIN(metadata); // Query 5: Full-text search on description (if added) // SELECT * FROM transactions WHERE to_tsvector('english', description) @@ to_tsquery('payment & refund') // Index: CREATE INDEX idx_txn_fts ON transactions USING GIN(to_tsvector('english', description)); func (tq *TransactionQueries) CreateOptimalIndexes(ctx context.Context) error { indexes := []string{ // Composite index for common account queries `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_txn_account_date ON transactions(account_id, created_at DESC)`, // Unique index for reference lookup `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx_txn_reference ON transactions(reference_id)`, // Partial index for pending transactions (much smaller) `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_txn_pending ON transactions(account_id, created_at DESC) WHERE status = 'PENDING'`, // GIN index for JSONB queries `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_txn_metadata ON transactions USING GIN(metadata jsonb_path_ops)`, // BRIN index for time-series queries on large tables `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_txn_created_brin ON transactions USING BRIN(created_at) WITH (pages_per_range = 128)`, } for _, idx := range indexes { if _, err := tq.db.ExecContext(ctx, idx); err != nil { return fmt.Errorf("create index failed: %w", err) } } return nil } // Analyze query to suggest indexes func (tq *TransactionQueries) AnalyzeQuery(ctx context.Context, query string) (*QueryAnalysis, error) { var analysis QueryAnalysis // Get query plan rows, err := tq.db.QueryContext(ctx, "EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) "+query) if err != nil { return nil, err } defer rows.Close() var planJSON string if rows.Next() { rows.Scan(&planJSON) } // Parse and analyze analysis.Plan = planJSON analysis.UsesIndex = strings.Contains(planJSON, "Index") analysis.SeqScan = strings.Contains(planJSON, "Seq Scan") if analysis.SeqScan { analysis.Suggestions = append(analysis.Suggestions, "Consider adding an index to avoid sequential scan") } return &analysis, nil }
Covering Indexes (Index-Only Scans)
sql-- Instead of: CREATE INDEX idx_account ON transactions(account_id); -- Query: SELECT amount, created_at FROM transactions WHERE account_id = ? -- This requires: Index lookup → Table lookup (extra I/O) -- Use covering index: CREATE INDEX idx_account_covering ON transactions(account_id) INCLUDE (amount, created_at, status); -- Query can be satisfied entirely from index (no table lookup)
Index Maintenance
gotype IndexMaintenance struct { db *sql.DB } func (im *IndexMaintenance) GetIndexStats(ctx context.Context) ([]IndexStat, error) { rows, err := im.db.QueryContext(ctx, ` SELECT schemaname, tablename, indexname, idx_scan, idx_tup_read, idx_tup_fetch, pg_size_pretty(pg_relation_size(indexrelid)) as index_size FROM pg_stat_user_indexes ORDER BY idx_scan DESC `) if err != nil { return nil, err } defer rows.Close() var stats []IndexStat for rows.Next() { var s IndexStat rows.Scan(&s.Schema, &s.Table, &s.Index, &s.Scans, &s.TuplesRead, &s.TuplesFetched, &s.Size) stats = append(stats, s) } return stats, nil } func (im *IndexMaintenance) FindUnusedIndexes(ctx context.Context) ([]string, error) { rows, err := im.db.QueryContext(ctx, ` SELECT indexrelname FROM pg_stat_user_indexes WHERE idx_scan = 0 AND indexrelname NOT LIKE '%pkey%' AND indexrelname NOT LIKE '%unique%' `) if err != nil { return nil, err } defer rows.Close() var unused []string for rows.Next() { var name string rows.Scan(&name) unused = append(unused, name) } return unused, nil } func (im *IndexMaintenance) FindDuplicateIndexes(ctx context.Context) ([]DuplicateIndex, error) { rows, err := im.db.QueryContext(ctx, ` SELECT a.indexrelname as index1, b.indexrelname as index2, a.tablename FROM pg_stat_user_indexes a JOIN pg_stat_user_indexes b ON a.tablename = b.tablename JOIN pg_index ai ON a.indexrelid = ai.indexrelid JOIN pg_index bi ON b.indexrelid = bi.indexrelid WHERE a.indexrelid != b.indexrelid AND ai.indkey::text LIKE bi.indkey::text || '%' `) // ... process results } func (im *IndexMaintenance) ReindexBloatedIndexes(ctx context.Context, bloatThreshold float64) error { // Find bloated indexes rows, err := im.db.QueryContext(ctx, ` SELECT schemaname || '.' || indexrelname as index_name, pg_relation_size(indexrelid) as current_size, pg_relation_size(indexrelid) * (1 - avg_leaf_density/100) as estimated_bloat FROM pg_stat_user_indexes JOIN pgstatindex(indexrelid::regclass::text) ON true WHERE avg_leaf_density < $1 `, (1-bloatThreshold)*100) if err != nil { return err } defer rows.Close() var indexesToReindex []string for rows.Next() { var name string var size, bloat int64 rows.Scan(&name, &size, &bloat) if float64(bloat)/float64(size) > bloatThreshold { indexesToReindex = append(indexesToReindex, name) } } // Reindex concurrently (non-blocking) for _, idx := range indexesToReindex { _, err := im.db.ExecContext(ctx, "REINDEX INDEX CONCURRENTLY "+idx) if err != nil { log.Printf("Failed to reindex %s: %v", idx, err) } } return nil }
9. Write-Ahead Logging (WAL) and Recovery
How WAL Works
mermaidsequenceDiagram participant App participant Buffer as Buffer Pool participant WAL as WAL Buffer participant Disk as WAL Disk participant Data as Data Disk App->>Buffer: UPDATE row Buffer->>Buffer: Modify in memory Buffer->>WAL: Write change record WAL->>Disk: Sync WAL (fsync) Note over Disk: Transaction committed! Note over Buffer,Data: Later (checkpoint) Buffer->>Data: Flush dirty pages
Understanding WAL in PostgreSQL
gopackage wal import ( "context" "fmt" "github.com/jackc/pgx/v5/pgxpool" ) type WALMonitor struct { pool *pgxpool.Pool } // Get current WAL status func (wm *WALMonitor) GetWALStatus(ctx context.Context) (*WALStatus, error) { var status WALStatus err := wm.pool.QueryRow(ctx, ` SELECT pg_current_wal_lsn() as current_lsn, pg_walfile_name(pg_current_wal_lsn()) as current_file, pg_wal_lsn_diff(pg_current_wal_lsn(), '0/0') as total_wal_bytes, (SELECT setting FROM pg_settings WHERE name = 'wal_level') as wal_level, (SELECT setting FROM pg_settings WHERE name = 'synchronous_commit') as sync_commit `).Scan( &status.CurrentLSN, &status.CurrentFile, &status.TotalWALBytes, &status.WALLevel, &status.SyncCommit, ) return &status, err } // Check replication lag func (wm *WALMonitor) GetReplicationLag(ctx context.Context) ([]ReplicaLag, error) { rows, err := wm.pool.Query(ctx, ` SELECT client_addr, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, pg_wal_lsn_diff(sent_lsn, replay_lsn) as lag_bytes, now() - (replay_lag) as replay_lag FROM pg_stat_replication `) if err != nil { return nil, err } defer rows.Close() var lags []ReplicaLag for rows.Next() { var lag ReplicaLag err := rows.Scan( &lag.ClientAddr, &lag.State, &lag.SentLSN, &lag.WriteLSN, &lag.FlushLSN, &lag.ReplayLSN, &lag.LagBytes, &lag.ReplayLag, ) if err != nil { return nil, err } lags = append(lags, lag) } return lags, nil } // WAL archiving status func (wm *WALMonitor) GetArchiveStatus(ctx context.Context) (*ArchiveStatus, error) { var status ArchiveStatus err := wm.pool.QueryRow(ctx, ` SELECT archived_count, failed_count, last_archived_wal, last_archived_time, last_failed_wal, last_failed_time FROM pg_stat_archiver `).Scan( &status.ArchivedCount, &status.FailedCount, &status.LastArchivedWAL, &status.LastArchivedTime, &status.LastFailedWAL, &status.LastFailedTime, ) return &status, err }
Point-in-Time Recovery (PITR)
go// Recovery configuration type RecoveryConfig struct { // Restore to specific point in time TargetTime *time.Time // Or restore to specific transaction TargetXID *uint64 // Or restore to named restore point TargetName string // Recovery action Action string // "pause", "promote", "shutdown" } func (wm *WALMonitor) PrepareRecoveryConfig(cfg RecoveryConfig) string { config := ` # Recovery configuration restore_command = 'cp /archive/%f %p' recovery_target_action = '` + cfg.Action + `' ` if cfg.TargetTime != nil { config += fmt.Sprintf("recovery_target_time = '%s'\n", cfg.TargetTime.Format("2006-01-02 15:04:05")) } else if cfg.TargetXID != nil { config += fmt.Sprintf("recovery_target_xid = '%d'\n", *cfg.TargetXID) } else if cfg.TargetName != "" { config += fmt.Sprintf("recovery_target_name = '%s'\n", cfg.TargetName) } return config } // Create a named restore point before major operations func (wm *WALMonitor) CreateRestorePoint(ctx context.Context, name string) error { _, err := wm.pool.Exec(ctx, "SELECT pg_create_restore_point($1)", name) return err } // Usage example for deployment func DeployWithRestorePoint(ctx context.Context, wm *WALMonitor) error { // Create restore point before migration if err := wm.CreateRestorePoint(ctx, "before_v2_migration"); err != nil { return err } // Run migration if err := runMigration(ctx); err != nil { // If migration fails, we can recover to restore point return fmt.Errorf("migration failed, recover to 'before_v2_migration': %w", err) } // Create another restore point after successful migration return wm.CreateRestorePoint(ctx, "after_v2_migration") }
10. ACID Properties in Distributed Systems
The Challenge
mermaidgraph TB subgraph "Single Database ACID" TX[Transaction] --> A[Atomicity: All or nothing] TX --> C[Consistency: Valid state] TX --> I[Isolation: No interference] TX --> D[Durability: Persisted] end subgraph "Distributed System Challenges" DT[Distributed TX] --> N1[Node 1] DT --> N2[Node 2] DT --> N3[Node 3] N1 -.->|Network partition?| N2 N2 -.->|Node crash?| N3 N3 -.->|Timeout?| N1 end
Two-Phase Commit (2PC)
gopackage distributed import ( "context" "errors" "fmt" "sync" "time" ) type TwoPhaseCommit struct { coordinator string participants []Participant timeout time.Duration } type Participant interface { Prepare(ctx context.Context, txID string) error Commit(ctx context.Context, txID string) error Rollback(ctx context.Context, txID string) error } func (tpc *TwoPhaseCommit) Execute(ctx context.Context, txID string, operation func(Participant) error) error { // Phase 1: Prepare prepareCtx, cancel := context.WithTimeout(ctx, tpc.timeout) defer cancel() var mu sync.Mutex var prepareErrors []error var wg sync.WaitGroup for _, p := range tpc.participants { wg.Add(1) go func(participant Participant) { defer wg.Done() // Execute the operation if err := operation(participant); err != nil { mu.Lock() prepareErrors = append(prepareErrors, err) mu.Unlock() return } // Ask to prepare if err := participant.Prepare(prepareCtx, txID); err != nil { mu.Lock() prepareErrors = append(prepareErrors, err) mu.Unlock() } }(p) } wg.Wait() // If any prepare failed, rollback all if len(prepareErrors) > 0 { tpc.rollbackAll(ctx, txID) return fmt.Errorf("prepare phase failed: %v", prepareErrors) } // Phase 2: Commit commitCtx, cancel := context.WithTimeout(ctx, tpc.timeout) defer cancel() var commitErrors []error for _, p := range tpc.participants { wg.Add(1) go func(participant Participant) { defer wg.Done() if err := participant.Commit(commitCtx, txID); err != nil { mu.Lock() commitErrors = append(commitErrors, err) mu.Unlock() } }(p) } wg.Wait() if len(commitErrors) > 0 { // This is bad - partial commit // Need manual intervention or recovery process return fmt.Errorf("CRITICAL: partial commit, errors: %v", commitErrors) } return nil } func (tpc *TwoPhaseCommit) rollbackAll(ctx context.Context, txID string) { var wg sync.WaitGroup for _, p := range tpc.participants { wg.Add(1) go func(participant Participant) { defer wg.Done() // Best effort rollback participant.Rollback(ctx, txID) }(p) } wg.Wait() }
Saga Pattern (Better for Long-Running Transactions)
go// Saga with compensating transactions type Saga struct { steps []SagaStep } type SagaStep struct { Name string Execute func(ctx context.Context) error Compensate func(ctx context.Context) error } func (s *Saga) Run(ctx context.Context) error { var completedSteps []int for i, step := range s.steps { log.Printf("Executing step: %s", step.Name) if err := step.Execute(ctx); err != nil { log.Printf("Step %s failed: %v, compensating...", step.Name, err) // Compensate in reverse order for j := len(completedSteps) - 1; j >= 0; j-- { stepIdx := completedSteps[j] compensateStep := s.steps[stepIdx] log.Printf("Compensating step: %s", compensateStep.Name) if compErr := compensateStep.Compensate(ctx); compErr != nil { // Log but continue compensating other steps log.Printf("Compensation failed for %s: %v", compensateStep.Name, compErr) } } return fmt.Errorf("saga failed at step %s: %w", step.Name, err) } completedSteps = append(completedSteps, i) } return nil } // Example: Order placement saga func CreateOrderSaga(orderID string, userID int64, items []Item, paymentInfo PaymentInfo) *Saga { return &Saga{ steps: []SagaStep{ { Name: "reserve_inventory", Execute: func(ctx context.Context) error { return inventoryService.Reserve(ctx, orderID, items) }, Compensate: func(ctx context.Context) error { return inventoryService.ReleaseReservation(ctx, orderID) }, }, { Name: "create_order", Execute: func(ctx context.Context) error { return orderService.Create(ctx, orderID, userID, items) }, Compensate: func(ctx context.Context) error { return orderService.Cancel(ctx, orderID) }, }, { Name: "process_payment", Execute: func(ctx context.Context) error { return paymentService.Charge(ctx, orderID, paymentInfo) }, Compensate: func(ctx context.Context) error { return paymentService.Refund(ctx, orderID) }, }, { Name: "confirm_inventory", Execute: func(ctx context.Context) error { return inventoryService.ConfirmReservation(ctx, orderID) }, Compensate: func(ctx context.Context) error { // Already compensated by reserve step return nil }, }, { Name: "notify_user", Execute: func(ctx context.Context) error { return notificationService.SendOrderConfirmation(ctx, userID, orderID) }, Compensate: func(ctx context.Context) error { // Optional: send cancellation notification return notificationService.SendOrderCancellation(ctx, userID, orderID) }, }, }, } }
Outbox Pattern for Reliable Messaging
go// Ensures database update and message publish are atomic type OutboxPublisher struct { db *sql.DB topic string } func (op *OutboxPublisher) PublishWithOutbox(ctx context.Context, tx *sql.Tx, event Event) error { // Insert into outbox table (same transaction as business logic) eventData, _ := json.Marshal(event) _, err := tx.ExecContext(ctx, ` INSERT INTO outbox ( id, aggregate_type, aggregate_id, event_type, payload, created_at ) VALUES ($1, $2, $3, $4, $5, NOW()) `, event.ID, event.AggregateType, event.AggregateID, event.Type, eventData) return err } // Background worker that publishes outbox events type OutboxWorker struct { db *sql.DB publisher MessagePublisher batchSize int } func (ow *OutboxWorker) Run(ctx context.Context) { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: ow.processBatch(ctx) } } } func (ow *OutboxWorker) processBatch(ctx context.Context) error { tx, err := ow.db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // Select and lock unpublished events rows, err := tx.QueryContext(ctx, ` SELECT id, aggregate_type, aggregate_id, event_type, payload FROM outbox WHERE published_at IS NULL ORDER BY created_at LIMIT $1 FOR UPDATE SKIP LOCKED `, ow.batchSize) if err != nil { return err } defer rows.Close() var publishedIDs []string for rows.Next() { var id, aggType, aggID, eventType string var payload []byte if err := rows.Scan(&id, &aggType, &aggID, &eventType, &payload); err != nil { return err } // Publish to message broker if err := ow.publisher.Publish(ctx, eventType, payload); err != nil { // Log error but continue with other events log.Printf("Failed to publish event %s: %v", id, err) continue } publishedIDs = append(publishedIDs, id) } if len(publishedIDs) > 0 { // Mark as published _, err = tx.ExecContext(ctx, ` UPDATE outbox SET published_at = NOW() WHERE id = ANY($1) `, pq.Array(publishedIDs)) if err != nil { return err } } return tx.Commit() }
Summary: Database & Storage Design Principles
Quick Reference Decision Tree
Need to store data? → ├── Structured + Relations + ACID → PostgreSQL ├── Semi-structured + Flexible → MongoDB/DynamoDB ├── Key-value + Fast access → Redis/Aerospike ├── Time-series + Analytics → TimescaleDB/InfluxDB ├── Wide-column + Write-heavy → Cassandra/ScyllaDB └── Search + Full-text → Elasticsearch Need to scale? → ├── Read-heavy → Add read replicas ├── Write-heavy → Shard by key ├── Both → Combination of above └── Globally → Multi-region with conflict resolution Need transactions across services? → ├── Short-lived → 2PC ├── Long-running → Saga pattern └── With messaging → Outbox pattern
Key Metrics to Monitor
| Metric | Warning Threshold | Action |
|---|---|---|
| Connection pool utilization | >80% | Increase pool size |
| Query latency p99 | >100ms | Add indexes, optimize queries |
| Replica lag | >5 seconds | Check network, increase bandwidth |
| Index bloat | >30% | REINDEX CONCURRENTLY |
| Table bloat | >20% | VACUUM FULL (maintenance window) |
| Lock wait time | >1 second | Review locking strategy |
| Deadlocks/hour | >1 | Fix lock ordering |
This guide covers the essential database and storage concepts you'll encounter in system design interviews. Each pattern has trade-offs - the key is understanding when to apply which pattern based on your specific requirements.