Distributed Systems: Why, How, and When — A Deep Dive for Engineers
A practical guide from someone who's debugged 3 AM production incidents across 47 servers
It's 2:47 AM. Your phone buzzes. The payment service is down. Users are angry. Twitter is on fire.
You SSH into the server. Everything looks fine. CPU is at 12%. Memory is okay. Logs show nothing unusual.
Then you realize—the problem isn't on this server. It's somewhere in the mesh of 23 microservices, 4 databases, 3 message queues, and 2 caching layers that make up your "simple" payment system.
Welcome to distributed systems.
This isn't a textbook. This is a guide written by someone who's lived through the chaos, celebrated the wins, and learned (often painfully) what works and what doesn't when you spread your system across multiple machines.
Let's dive in.
Table of Contents
- WHY Distributed Systems?
- WHAT is a Distributed System?
- HOW Distributed Systems Work
- WHEN Should You Use Distributed Systems?
- Core Challenges
- Real-World Example: Design a Payment System
- Advanced Concepts
- Production Best Practices
- Summary and Key Takeaways
1. WHY Distributed Systems?
The Single Machine Dream
In an ideal world, we'd run everything on one really powerful machine. No network calls. No coordination. No "it works on my machine but fails in production."
But reality has other plans.
Think of it like a restaurant:
A single chef can cook for a family dinner. But when you're serving 10,000 customers during a cricket match final, one chef—no matter how talented—simply cannot keep up. You need multiple chefs, multiple kitchens, coordination, and systems to ensure everyone gets the right order.
What Breaks First?
When your application starts hitting limits, something will break. Let's understand what:
┌─────────────────────────────────────────────────────────────────┐ │ SINGLE SERVER LIMITS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ │ │ │ USERS │──────┐ │ │ └─────────┘ │ │ │ ▼ │ │ ┌──────────┐ │ │ │ SERVER │ │ │ └──────────┘ │ │ │ │ │ ┌─────────────┼─────────────┬─────────────┐ │ │ ▼ ▼ ▼ ▼ │ │ ┌─────┐ ┌────────┐ ┌──────┐ ┌─────────┐ │ │ │ CPU │ │ MEMORY │ │ DISK │ │ NETWORK │ │ │ └─────┘ └────────┘ └──────┘ └─────────┘ │ │ │ │ LIMITS: │ │ • CPU: ~128 cores max on high-end servers │ │ • Memory: ~12TB max (and that's expensive) │ │ • Disk: ~100K IOPS for SSD │ │ • Network: ~100 Gbps │ │ │ └─────────────────────────────────────────────────────────────────┘
CPU hits the wall first when you're doing heavy computation—video encoding, ML inference, complex calculations.
Memory becomes the bottleneck when you're caching everything, running in-memory databases, or handling millions of concurrent connections (each connection consumes memory).
Disk I/O caps out when you're writing millions of log entries, storing massive datasets, or running database-heavy workloads.
Network saturates when you're serving large files, streaming video, or handling extreme request volumes.
Real-World Breaking Points
Let me tell you about some systems I've seen hit their limits:
Banking System During Salary Day:
Normal day: 50,000 transactions/hour Salary day: 2,000,000 transactions in 2 hours Single DB: Melted like butter in summer
A major Indian bank I worked with saw their single PostgreSQL instance hit 100% CPU at 9:01 AM on the 1st of every month. The solution? They didn't just need a bigger machine—they needed to distribute the load across multiple database shards, each handling different account ranges.
Netflix During Stranger Things Premiere:
Normal evening: 50 million concurrent streams Premiere night: 100+ million concurrent streams Single datacenter: Would require a nuclear power plant to run
Netflix runs across 3 AWS regions, uses thousands of EC2 instances, and serves content from CDN nodes literally embedded inside ISP networks. A single server couldn't handle even 0.001% of their load.
UPI During IPL Final:
Normal transactions: 1-2 million/hour IPL final (ad breaks): 20+ million transactions in 5 minutes Single payment gateway: Would need a time machine to process them
When MS Dhoni hits a six and everyone rushes to order food or bet on the next ball, UPI handles peaks that would make most systems weep. This requires distributed processing across multiple banks, payment processors, and routing layers.
Amazon Black Friday:
Normal orders: 100,000/hour Black Friday peak: 600+ orders per SECOND
That's 10x normal load sustained for hours. No single server on Earth can handle that while maintaining sub-second response times.
The Scaled Solution
┌─────────────────────────────────────────────────────────────────┐ │ DISTRIBUTED ARCHITECTURE │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ │ │ │ USERS │──────┐ │ │ └─────────┘ │ │ │ ▼ │ │ ┌───────────────┐ │ │ │ LOAD BALANCER │ │ │ └───────────────┘ │ │ │ │ │ ┌─────────────┼─────────────┐ │ │ ▼ ▼ ▼ │ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │ │ S1 │ │ S2 │ │ S3 │ │ │ │ │ │ │ │ │ │ │ │ CPU │ │ CPU │ │ CPU │ │ │ │ RAM │ │ RAM │ │ RAM │ │ │ │ Disk │ │ Disk │ │ Disk │ │ │ └──────┘ └──────┘ └──────┘ │ │ │ │ BENEFITS: │ │ • 3x CPU capacity │ │ • 3x Memory capacity │ │ • 3x Disk I/O │ │ • Fault tolerance (one dies, two remain) │ │ • Can add S4, S5, S6... as needed │ │ │ └─────────────────────────────────────────────────────────────────┘
The fundamental reason for distributed systems: single machines have hard physical limits that cannot be overcome no matter how much money you throw at them.
You can't buy a server with 1 million CPU cores. You can't buy 1 PB of RAM in one machine. You can't get 1 Tbps network on a single NIC.
But you CAN have 10,000 servers each with 100 cores, 1TB RAM, and 10 Gbps network.
2. WHAT is a Distributed System?
The Official Definition
A distributed system is a collection of independent computers that appears to users as a single coherent system.
But let me give you the real definition:
A distributed system is a system where a machine you've never heard of can cause your application to fail.
Core Characteristics
┌─────────────────────────────────────────────────────────────────┐ │ DISTRIBUTED SYSTEM CHARACTERISTICS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. INDEPENDENT NODES │ │ Each machine runs its own processes │ │ Has its own memory, disk, CPU │ │ Can fail independently │ │ │ │ 2. NETWORK COMMUNICATION │ │ Nodes talk over the network │ │ Network is unreliable (can fail, delay, duplicate) │ │ Messages can arrive out of order │ │ │ │ 3. SHARED GOAL │ │ All nodes work together for a common purpose │ │ Serve users, process data, store information │ │ │ │ 4. NO GLOBAL CLOCK │ │ Each machine has its own clock │ │ Clocks drift (can differ by milliseconds to seconds) │ │ "What time is it?" has no single answer │ │ │ │ 5. PARTIAL FAILURES │ │ Some parts can fail while others work │ │ You might not even know something failed │ │ │ └─────────────────────────────────────────────────────────────────┘
A Typical Distributed Architecture
┌─────────────────────────────────────────────────────────────────┐ │ DISTRIBUTED ARCHITECTURE │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────┐ │ │ │ CLIENT │ │ │ └────┬─────┘ │ │ │ │ │ ▼ │ │ ┌────────────┐ │ │ │ GATEWAY │ │ │ │ (API) │ │ │ └─────┬──────┘ │ │ │ │ │ ┌─────────────┼─────────────┐ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ SERVICE │ │ SERVICE │ │ SERVICE │ │ │ │ A │ │ B │ │ C │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ DB 1 │ │ DB 2 │ │ CACHE │ │ │ │(Postgres)│ │ (Mongo) │ │ (Redis) │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘
The Eight Fallacies of Distributed Computing
In 1994, Peter Deutsch (and later James Gosling) identified assumptions that developers make about distributed systems—assumptions that are dangerously wrong:
┌─────────────────────────────────────────────────────────────────┐ │ THE EIGHT FALLACIES YOU WILL LEARN THE HARD WAY │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. The network is reliable │ │ Reality: Packets get lost. Connections drop. Always. │ │ │ │ 2. Latency is zero │ │ Reality: Cross-datacenter calls take 50-200ms │ │ │ │ 3. Bandwidth is infinite │ │ Reality: You'll hit limits, especially during peaks │ │ │ │ 4. The network is secure │ │ Reality: Everything can be intercepted │ │ │ │ 5. Topology doesn't change │ │ Reality: Servers come and go constantly │ │ │ │ 6. There is one administrator │ │ Reality: Multiple teams, multiple configs, chaos │ │ │ │ 7. Transport cost is zero │ │ Reality: Serialization, encryption, routing all cost │ │ │ │ 8. The network is homogeneous │ │ Reality: Different protocols, versions, hardware │ │ │ └─────────────────────────────────────────────────────────────────┘
Think of it like this: Imagine managing a restaurant where the kitchen, storage, and dining area are in different buildings across the city. The phone lines between them sometimes go dead. The clocks in each building show different times. And occasionally, a whole building just... disappears for 10 minutes.
That's distributed systems.
The Four Types of Transparency
A well-designed distributed system hides its complexity through transparency:
| Transparency Type | What It Hides | Example |
|---|---|---|
| Location | Where resources are | DNS hides server IP addresses |
| Replication | Multiple copies exist | Read from any Redis replica |
| Failure | When things break | Load balancer routes around dead nodes |
| Concurrency | Parallel access | Database handles multiple writers |
3. HOW Distributed Systems Work
This is where things get interesting. Let's break down the core mechanisms that make distributed systems function.
A. Communication Patterns
In a distributed system, components must talk to each other. There are several patterns for this:
Synchronous Communication (Request-Response)
┌─────────────────────────────────────────────────────────────────┐ │ SYNCHRONOUS COMMUNICATION FLOW │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Client Service A Service B Database │ │ │ │ │ │ │ │ │ HTTP POST │ │ │ │ │ │────────────────▶│ │ │ │ │ │ │ │ │ │ │ │ │ gRPC Call │ │ │ │ │ │────────────────▶│ │ │ │ │ │ │ │ │ │ │ │ │ SQL Query │ │ │ │ │ │────────────────▶│ │ │ │ │ │ │ │ │ │ │ │ Result Set │ │ │ │ │ │◀────────────────│ │ │ │ │ │ │ │ │ │ │ Response │ │ │ │ │ │◀────────────────│ │ │ │ │ │ │ │ │ │ │ JSON Response │ │ │ │ │ │◀────────────────│ │ │ │ │ │ │ │ │ │ │ │ │ Total latency = network₁ + network₂ + network₃ + processing │ │ │ └─────────────────────────────────────────────────────────────────┘
REST - Human-readable, stateless, widely supported. Great for public APIs.
GET /users/123 Content-Type: application/json {"id": 123, "name": "Rahul", "email": "rahul@example.com"}
gRPC - Binary protocol, strongly typed, efficient. Great for internal service-to-service.
protobufservice UserService { rpc GetUser (UserRequest) returns (UserResponse); }
When to use what:
- REST: Public APIs, browser clients, simplicity needed
- gRPC: Internal services, high performance required, streaming needed
Asynchronous Communication (Event-Driven)
┌─────────────────────────────────────────────────────────────────┐ │ ASYNCHRONOUS COMMUNICATION FLOW │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Producer Message Queue Consumer 1 Consumer 2│ │ │ │ │ │ │ │ │ Publish Event │ │ │ │ │ │──────────────────▶│ │ │ │ │ │ │ │ │ │ │ │ ACK │ │ │ │ │ │◀──────────────────│ │ │ │ │ │ │ │ │ │ │ │ (Producer done, │ Poll/Push │ │ │ │ │ doesn't wait) │───────────────────▶│ │ │ │ │ │ │ │ │ │ │ │ Poll/Push │ │ │ │ │ │──────────────────────────────────▶│ │ │ │ │ │ │ │ │ │ │ │ Process │ │ │ │ │ │ async │ │ │ │ │ │ │ │ │ │ │ Benefits: │ │ • Producer is decoupled from consumers │ │ • Consumers can process at their own pace │ │ • Queue acts as buffer during traffic spikes │ │ • Easy to add new consumers │ │ │ └─────────────────────────────────────────────────────────────────┘
Message Queues (RabbitMQ, SQS):
- Point-to-point or pub-sub
- Message is consumed once
- Good for task distribution
Event Streaming (Kafka, Pulsar):
- Append-only log
- Messages retained for replay
- Good for event sourcing, analytics
Think of it like this:
- Synchronous = Phone call (you wait for response)
- Asynchronous = Email (you send and continue working)
B. Data Consistency
This is where distributed systems get tricky. When data lives on multiple machines, keeping it consistent is hard.
The CAP Theorem
In 2000, Eric Brewer proposed that a distributed system can only guarantee two of three properties:
┌─────────────────────────────────────────────────────────────────┐ │ CAP THEOREM │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ CONSISTENCY │ │ ▲ │ │ /│\ │ │ / │ \ │ │ / │ \ │ │ / │ \ │ │ / CA │ CP \ │ │ / │ \ │ │ / │ \ │ │ / │ \ │ │ / │ \ │ │ ▼─────────┴─────────▼ │ │ AVAILABILITY PARTITION │ │ TOLERANCE │ │ │ │ CA: Traditional RDBMS (single node) │ │ - Not really distributed │ │ │ │ CP: MongoDB, HBase, Redis Cluster │ │ - Consistent but may reject writes during partition │ │ │ │ AP: Cassandra, DynamoDB, CouchDB │ │ - Always available but may return stale data │ │ │ └─────────────────────────────────────────────────────────────────┘
But here's the thing most people miss:
CAP is about during a network partition. In normal operation, you can have all three. The question is: when the network splits, do you:
- Refuse writes to stay consistent (CP)
- Accept writes knowing they might conflict (AP)
Real-world examples:
Banking (CP): When the network between Mumbai and Delhi datacenters breaks, the bank stops processing inter-city transfers rather than risk inconsistent balances. Money is too important to get wrong.
Social Media Likes (AP): If Instagram's systems partition, they'll still show you likes—even if the count is slightly off. Nobody dies if a like count is temporarily wrong.
Strong vs. Eventual Consistency
┌─────────────────────────────────────────────────────────────────┐ │ CONSISTENCY SPECTRUM │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ STRONG EVENTUAL │ │ CONSISTENCY CONSISTENCY │ │ │ │ │ │ ▼ ▼ │ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ Bank │ │ Order│ │ Inv- │ │Social│ │ View │ │ │ │ Bal- │ │ Mgt │ │ ento-│ │ Feed │ │Count │ │ │ │ ance │ │ │ │ ry │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ │ │ │ │ "Must be "Should "Can be "Usually "Eventually │ │ perfect" be right" close" right" right" │ │ │ │ Trade-off: Latency ←──────────────────────────→ Availability │ │ │ └─────────────────────────────────────────────────────────────────┘
Strong Consistency Example (Bank Transfer):
go// This MUST be consistent func TransferMoney(from, to string, amount int64) error { tx, _ := db.BeginSerializable() // Strongest isolation defer tx.Rollback() // Lock both accounts fromBalance := tx.GetForUpdate("SELECT balance FROM accounts WHERE id = ?", from) toBalance := tx.GetForUpdate("SELECT balance FROM accounts WHERE id = ?", to) if fromBalance < amount { return ErrInsufficientFunds } tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, from) tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, to) return tx.Commit() // Both see changes immediately after commit }
Eventual Consistency Example (Like Counter):
go// This can be eventually consistent func AddLike(postID string) { // Write to local shard localShard.Increment(postID) // Async replication to other shards go func() { for _, shard := range otherShards { shard.AsyncReplicate(postID, "increment") } }() // Return immediately - count might be slightly off // for a few hundred milliseconds }
C. Consensus: Getting Nodes to Agree
When multiple nodes need to agree on something (who's the leader? what's the committed value?), you need consensus algorithms.
The Leader Election Problem
┌─────────────────────────────────────────────────────────────────┐ │ LEADER ELECTION (RAFT) │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ SCENARIO: Three database nodes need to elect a leader │ │ │ │ Step 1: Initial state (all followers) │ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ │ │Follower│ │Follower│ │Follower│ │ │ └────────┘ └────────┘ └────────┘ │ │ │ │ Step 2: Timeout expires, Node 2 becomes candidate │ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │ Node 1 │◀───│ Node 2 │───▶│ Node 3 │ │ │ │Follower│vote│Candidat│vote│Follower│ │ │ └────────┘ └────────┘ └────────┘ │ │ │ │ │ ▼ │ │ Step 3: Majority votes received, Node 2 becomes leader │ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │ Node 1 │◀───│ Node 2 │───▶│ Node 3 │ │ │ │Follower│ │ LEADER │ │Follower│ │ │ └────────┘ └────────┘ └────────┘ │ │ │ │ │ Sends heartbeats │ │ to maintain leadership │ │ │ └─────────────────────────────────────────────────────────────────┘
Raft Consensus Algorithm (simplified):
- Leader Election: One node becomes leader, others are followers
- Log Replication: Leader receives writes, replicates to followers
- Safety: Only committed entries are applied
Split Brain Problem:
┌─────────────────────────────────────────────────────────────────┐ │ SPLIT BRAIN SCENARIO │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Network partition splits the cluster: │ │ │ │ ┌─────────────────────┐ ║ ┌─────────────────────┐ │ │ │ Partition A │ ║ │ Partition B │ │ │ │ │ ║ │ │ │ │ │ ┌────────┐ │ ║ │ ┌────────┐ │ │ │ │ │ Node 1 │ │ ║ │ │ Node 3 │ │ │ │ │ │ Leader?│ │ ║ │ │ Leader?│ │ │ │ │ └────────┘ │ ║ │ └────────┘ │ │ │ │ │ │ ║ │ │ │ │ │ ▼ │ ║ │ │ │ │ │ ┌────────┐ │ ║ │ │ │ │ │ │ Node 2 │ │ ║ │ │ │ │ │ │Follower│ │ ║ │ │ │ │ │ └────────┘ │ ║ │ │ │ │ └─────────────────────┘ ║ └─────────────────────┘ │ │ ║ │ │ DANGER: Two leaders accepting different writes! │ │ │ │ SOLUTION (Quorum): Need majority (2/3) to accept writes │ │ - Partition A: Has 2 nodes, CAN accept writes │ │ - Partition B: Has 1 node, CANNOT accept writes (read-only) │ │ │ └─────────────────────────────────────────────────────────────────┘
D. Replication: Keeping Copies in Sync
┌─────────────────────────────────────────────────────────────────┐ │ REPLICATION PATTERNS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. SINGLE-LEADER (Most common) │ │ │ │ Writes ──▶ ┌─────────┐ │ │ │ PRIMARY │ │ │ └────┬────┘ │ │ │ replication │ │ ┌────────┼────────┐ │ │ ▼ ▼ ▼ │ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │REPLICA1│ │REPLICA2│ │REPLICA3│ ◀── Reads │ │ └────────┘ └────────┘ └────────┘ │ │ │ │ 2. MULTI-LEADER (Cross-datacenter) │ │ │ │ DC1 DC2 │ │ ┌─────────┐ sync ┌─────────┐ │ │ │ LEADER1 │◀─────────────▶│ LEADER2 │ │ │ └────┬────┘ └────┬────┘ │ │ │ │ │ │ ┌────┴────┐ ┌────┴────┐ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │ │Repl 1│ │Repl 2│ │Repl 3│ │Repl 4│ │ │ └──────┘ └──────┘ └──────┘ └──────┘ │ │ │ │ 3. QUORUM (Dynamo-style) │ │ │ │ Write to W nodes, Read from R nodes │ │ Where W + R > N (total nodes) │ │ │ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │ 1 │ │ 2 │ │ 3 │ │ 4 │ │ 5 │ │ │ │ ✓ │ │ ✓ │ │ ✓ │ │ │ │ │ │ │ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ │ │ │ │ N=5, W=3, R=3: Write succeeds with 3, read sees latest │ │ │ └─────────────────────────────────────────────────────────────────┘
E. Failure Handling
In distributed systems, failures aren't exceptional—they're normal. You must design for them.
┌─────────────────────────────────────────────────────────────────┐ │ FAILURE HANDLING PATTERNS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. RETRY WITH EXPONENTIAL BACKOFF │ │ │ │ Attempt 1: Wait 100ms │ │ Attempt 2: Wait 200ms │ │ Attempt 3: Wait 400ms │ │ Attempt 4: Wait 800ms + jitter │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ Time ──────────────────────────────────────────▶│ │ │ │ │ │ │ │ │ │ │ │ R1 R2 R3 R4 │ │ │ │ × × × ✓ │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ 2. CIRCUIT BREAKER │ │ │ │ ┌────────┐ failures ┌────────┐ timeout ┌────────┐ │ │ │ CLOSED │────────────▶│ OPEN │────────────▶│HALF-OPEN│ │ │ │ │ │ │ │ │ │ │ └────────┘ └────────┘ └────────┘ │ │ ▲ │ │ │ │ success │ │ │ └──────────────────────────────────────────────┘ │ │ │ │ States: │ │ - CLOSED: Normal operation, requests go through │ │ - OPEN: Too many failures, requests fail immediately │ │ - HALF-OPEN: Testing if service recovered │ │ │ │ 3. BULKHEAD (Isolation) │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ Thread Pool A │ Thread Pool B │ Pool C │ │ │ │ (Payment API) │ (User API) │(Search)│ │ │ │ ▓▓▓▓▓▓░░░░ │ ▓▓▓░░░░░░░ │▓▓▓▓▓░░│ │ │ │ (6/10 used) │ (3/10 used) │(5/7) │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ If Payment API fails, User API still works! │ │ │ └─────────────────────────────────────────────────────────────────┘
Retry with Jitter (Go example):
gofunc CallWithRetry(ctx context.Context, fn func() error) error { maxRetries := 5 baseDelay := 100 * time.Millisecond for attempt := 0; attempt < maxRetries; attempt++ { err := fn() if err == nil { return nil } if attempt == maxRetries-1 { return err } // Exponential backoff with jitter delay := baseDelay * time.Duration(1<<attempt) jitter := time.Duration(rand.Int63n(int64(delay / 2))) select { case <-time.After(delay + jitter): continue case <-ctx.Done(): return ctx.Err() } } return nil }
4. WHEN Should You Use Distributed Systems?
This is the question that separates senior engineers from juniors. Knowing when to distribute is as important as knowing how.
When You SHOULD Distribute
┌─────────────────────────────────────────────────────────────────┐ │ SIGNS YOU NEED DISTRIBUTION │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. VERTICAL SCALING EXHAUSTED │ │ You're already on the biggest machine AWS offers │ │ And it's still not enough │ │ │ │ 2. UPTIME REQUIREMENTS > 99.9% │ │ Single machine = single point of failure │ │ 99.99% uptime needs redundancy │ │ │ │ 3. GLOBAL USER BASE │ │ Users in India, US, Europe │ │ Single datacenter = 200ms+ latency for some │ │ │ │ 4. DATA DOESN'T FIT ON ONE MACHINE │ │ Petabytes of data │ │ Billions of rows │ │ │ │ 5. THROUGHPUT EXCEEDS SINGLE-NODE LIMITS │ │ > 100K requests/second │ │ > 10K database writes/second │ │ │ │ 6. ISOLATION REQUIREMENTS │ │ One team's bad deployment shouldn't affect others │ │ Different scaling needs for different components │ │ │ └─────────────────────────────────────────────────────────────────┘
When You SHOULD NOT Distribute
┌─────────────────────────────────────────────────────────────────┐ │ SIGNS YOU DON'T NEED DISTRIBUTION │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. YOU HAVEN'T TRIED VERTICAL SCALING │ │ A bigger machine is often cheaper than distribution │ │ $500/month for 128GB RAM vs months of engineering │ │ │ │ 2. YOU HAVE < 10 DEVELOPERS │ │ Microservices need teams per service │ │ Small teams struggle with distributed overhead │ │ │ │ 3. YOUR LOAD IS < 1000 RPS │ │ A single well-optimized server handles this easily │ │ │ │ 4. YOU'RE STILL FIGURING OUT YOUR DOMAIN │ │ Distributed = hard to change boundaries │ │ Get it wrong, and you're stuck │ │ │ │ 5. CONSISTENCY IS CRITICAL EVERYWHERE │ │ Banking core? Maybe single-region is safer │ │ │ └─────────────────────────────────────────────────────────────────┘
The Decision Matrix
| Scenario | Monolith | Distributed | Why |
|---|---|---|---|
| Startup MVP | Yes | No | Speed matters more than scale |
| 100 users | Yes | No | Overkill, adds complexity |
| 10K users | Maybe | Maybe | Depends on traffic patterns |
| 1M users | No | Yes | Need horizontal scale |
| 99.9% uptime | Maybe | Yes | Need redundancy |
| 99.99% uptime | No | Yes | Multi-region required |
| Single region | Yes | Maybe | Simpler architecture |
| Global users | No | Yes | Latency requirements |
| 5 developers | Yes | No | Team can't support distributed |
| 50 developers | Maybe | Yes | Teams need boundaries |
| Unclear domain | Yes | No | Need flexibility to change |
| Mature domain | Maybe | Yes | Boundaries are known |
My Rule of Thumb:
"Start with a monolith. Extract services when the pain of not doing so exceeds the pain of distribution."
The Bezos Lesson:
Amazon started as a monolith. They only moved to services when:
- Teams were stepping on each other
- Deployments took hours
- A bug in search could crash checkout
Not because "microservices are cool."
5. Core Challenges
Let me share the challenges that keep distributed systems engineers up at night.
Network Latency: The Hidden Tax
┌─────────────────────────────────────────────────────────────────┐ │ LATENCY COMPARISON │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Operation Time │ │ ───────────────────────────────────────────────── │ │ L1 cache reference 0.5 ns │ │ L2 cache reference 7 ns │ │ Main memory reference 100 ns │ │ SSD random read 150,000 ns (150 μs) │ │ Network round-trip (same DC) 500,000 ns (0.5 ms) │ │ Network round-trip (cross-region) 150,000,000 ns (150 ms) │ │ │ │ VISUALIZATION: │ │ If L1 cache = 1 second, then: │ │ - Main memory = 3 minutes │ │ - SSD read = 3.5 days │ │ - Network (same DC) = 11.5 days │ │ - Network (cross-region) = 9.5 YEARS │ │ │ └─────────────────────────────────────────────────────────────────┘
Real-world pain: I once debugged a checkout flow that took 3 seconds. The code looked fine. Turns out, it made 47 sequential service calls. Each call was 50ms. 47 × 50ms = 2.35 seconds, plus processing.
Solution: Batch calls, parallelize where possible, cache aggressively.
Partial Failures: The Distributed Nightmare
In a monolith, either everything works or nothing works.
In distributed systems, some things can fail while others work. This is worse.
┌─────────────────────────────────────────────────────────────────┐ │ PARTIAL FAILURE SCENARIO │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ User: "Transfer $100 from Account A to Account B" │ │ │ │ ┌────────────┐ OK ┌────────────┐ ??? ┌─────────┐ │ │ │ Debit A │──────────▶│ Credit B │──────────▶│ Confirm │ │ │ │ -$100 │ │ +$100 │ │ Email │ │ │ └────────────┘ └────────────┘ └─────────┘ │ │ ✓ ? ? │ │ │ │ WHAT HAPPENED? │ │ - Debit succeeded │ │ - Credit service timed out (but did it succeed?) │ │ - We don't know if Credit happened │ │ │ │ THREE POSSIBILITIES: │ │ 1. Credit succeeded, response lost → Money transferred OK │ │ 2. Credit never happened → Money disappeared! │ │ 3. Credit is still processing → Unknown state │ │ │ └─────────────────────────────────────────────────────────────────┘
This is why idempotency is critical (more in Advanced Concepts).
Clock Drift: When Time is Relative
Each machine has its own clock. They drift apart.
┌─────────────────────────────────────────────────────────────────┐ │ CLOCK DRIFT PROBLEM │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Server A time: 10:00:00.000 │ │ Server B time: 10:00:00.150 (150ms ahead) │ │ Server C time: 09:59:59.800 (200ms behind) │ │ │ │ PROBLEM SCENARIO: │ │ │ │ 1. User updates profile on Server B at "10:00:00.150" │ │ 2. User reads profile on Server C │ │ 3. Server C sees update timestamp as "in the future" │ │ 4. Server C returns old data! │ │ │ │ OR WORSE: │ │ │ │ 1. Event A happens on Server A at 10:00:00.000 │ │ 2. Event B (caused by A) on Server B at 10:00:00.050 │ │ 3. Log shows B happened BEFORE A (B's clock was behind) │ │ 4. Debugging becomes impossible │ │ │ └─────────────────────────────────────────────────────────────────┘
Solutions:
- NTP (Network Time Protocol) - keeps clocks within ~10ms
- Logical clocks (Lamport timestamps, Vector clocks) - track causality, not wall time
- TrueTime (Google Spanner) - uses GPS + atomic clocks for bounded uncertainty
Debugging: Finding a Needle in a Distributed Haystack
Story time: We had a bug where 0.1% of orders were failing silently. No errors in logs. The order would just... not appear.
After two days of debugging:
- The issue was in Service D
- Which was called by Service C
- Which was called by Service B
- Which was called by Service A
- The error was swallowed in Service C
- Metrics showed Service D was healthy (99.9% success!)
- The 0.1% failures were in a specific code path that only triggered for orders with gift wrapping
Without distributed tracing, we'd still be looking.
┌─────────────────────────────────────────────────────────────────┐ │ OBSERVABILITY: THE THREE PILLARS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. LOGS: What happened │ │ {"level":"error", "service":"payment", │ │ "trace_id":"abc123", "msg":"card declined"} │ │ │ │ 2. METRICS: How much/how fast │ │ payment_requests_total{status="success"} 1000000 │ │ payment_requests_total{status="failure"} 150 │ │ payment_latency_p99_ms 245 │ │ │ │ 3. TRACES: The journey of a request │ │ ┌─────────────────────────────────────────────┐ │ │ │ Trace ID: abc123 │ │ │ │ │ │ │ │ Gateway ──┬── Auth ──┬── Payment ──── DB │ │ │ │ 10ms │ 5ms │ 200ms 50ms │ │ │ │ │ │ ↑ │ │ │ │ │ │ BOTTLENECK! │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘
6. Real-World Example: Design a Payment System
Let's put everything together and design a real payment system.
Requirements:
- Handle 10,000 payments per second
- 99.99% uptime
- Exactly-once payment processing
- Global availability
- PCI-DSS compliant
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐ │ PAYMENT SYSTEM ARCHITECTURE │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ CLIENTS │ │ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │ App │ │ Web │ │ POS │ │ │ └──┬──┘ └──┬──┘ └──┬──┘ │ │ │ │ │ │ │ └───────┼───────┘ │ │ ▼ │ │ ┌─────────────────────┐ │ │ │ API GATEWAY │ Rate limiting, Auth, Routing │ │ │ (Kong/Envoy) │ │ │ └──────────┬──────────┘ │ │ │ │ │ ┌──────────┼──────────────────────────────────┐ │ │ │ ▼ │ │ │ │ ┌────────────┐ ┌────────────┐ │ │ │ │ │ PAYMENT │ │ FRAUD │ │ │ │ │ │ SERVICE │─────▶│ SERVICE │ │ │ │ │ └─────┬──────┘ └────────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌────────────┐ ┌────────────┐ │ │ │ │ │ LEDGER │ │ SETTLEMENT │ │ │ │ │ │ SERVICE │─────▶│ SERVICE │ │ │ │ │ └─────┬──────┘ └─────┬──────┘ │ │ │ │ │ │ │ │ │ │ │ ┌──────────────┘ │ │ │ │ │ │ │ │ │ │ ▼ ▼ │ │ │ │ ┌────────────┐ ┌────────────┐ │ │ │ │ │ KAFKA │ │ NOTIFICAT- │ │ │ │ │ │ (Events) │─────▶│ ION │ │ │ │ │ └────────────┘ └────────────┘ │ │ │ │ │ │ │ └──────────────────────────────────────────────┘ │ │ │ │ │ ┌──────────┼──────────────────────────────────┐ │ │ │ ▼ │ │ │ │ ┌────────────┐ ┌────────────┐ │ │ │ │ │ PostgreSQL │ │ Redis │ │ DATA │ │ │ │ (Primary) │ │ (Cache) │ │ LAYER │ │ │ └─────┬──────┘ └────────────┘ │ │ │ │ │ │ │ │ │ ┌─────┴──────┐ │ │ │ │ ▼ ▼ │ │ │ │ ┌──────┐ ┌──────┐ │ │ │ │ │Replica│ │Replica│ │ │ │ │ └──────┘ └──────┘ │ │ │ └──────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘
Component Deep Dive
1. API Gateway:
go// Rate limiting per merchant func RateLimitMiddleware(next http.Handler) http.Handler { limiter := redis.NewRateLimiter() return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { merchantID := r.Header.Get("X-Merchant-ID") allowed, remaining := limiter.Allow(merchantID, 1000, time.Second) w.Header().Set("X-RateLimit-Remaining", strconv.Itoa(remaining)) if !allowed { http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests) return } next.ServeHTTP(w, r) }) }
2. Payment Service (with idempotency):
gofunc (s *PaymentService) ProcessPayment(ctx context.Context, req *PaymentRequest) (*PaymentResponse, error) { // Check idempotency key existing, err := s.idempotencyStore.Get(ctx, req.IdempotencyKey) if err == nil && existing != nil { return existing, nil // Return cached response } // Start distributed transaction saga := s.sagaOrchestrator.NewSaga(req.ID) // Step 1: Reserve funds err = saga.AddStep("reserve_funds", func() error { return s.ledger.ReserveFunds(ctx, req.FromAccount, req.Amount) }, func() error { return s.ledger.ReleaseReservation(ctx, req.FromAccount, req.Amount) }) // Step 2: Fraud check err = saga.AddStep("fraud_check", func() error { return s.fraudService.Check(ctx, req) }, nil) // No compensation needed // Step 3: Execute transfer err = saga.AddStep("execute_transfer", func() error { return s.ledger.Transfer(ctx, req.FromAccount, req.ToAccount, req.Amount) }, func() error { return s.ledger.ReverseTransfer(ctx, req.ToAccount, req.FromAccount, req.Amount) }) // Execute saga result, err := saga.Execute(ctx) if err != nil { return nil, err } // Store for idempotency s.idempotencyStore.Set(ctx, req.IdempotencyKey, result, 24*time.Hour) // Publish event s.eventBus.Publish(ctx, PaymentCompletedEvent{ PaymentID: req.ID, Amount: req.Amount, Timestamp: time.Now(), }) return result, nil }
3. Ledger Service (double-entry bookkeeping):
gofunc (l *LedgerService) Transfer(ctx context.Context, from, to string, amount int64) error { tx, err := l.db.BeginSerializableTx(ctx) if err != nil { return err } defer tx.Rollback() // Generate unique transaction ID txnID := uuid.New().String() // Double-entry: Debit from source _, err = tx.ExecContext(ctx, ` INSERT INTO ledger_entries (id, account_id, amount, type, reference_id, created_at) VALUES ($1, $2, $3, 'DEBIT', $4, NOW()) `, uuid.New().String(), from, amount, txnID) if err != nil { return err } // Double-entry: Credit to destination _, err = tx.ExecContext(ctx, ` INSERT INTO ledger_entries (id, account_id, amount, type, reference_id, created_at) VALUES ($1, $2, $3, 'CREDIT', $4, NOW()) `, uuid.New().String(), to, amount, txnID) if err != nil { return err } // Update balances _, err = tx.ExecContext(ctx, ` UPDATE accounts SET balance = balance - $1 WHERE id = $2 AND balance >= $1 `, amount, from) if err != nil { return err } _, err = tx.ExecContext(ctx, ` UPDATE accounts SET balance = balance + $1 WHERE id = $2 `, amount, to) if err != nil { return err } return tx.Commit() }
How It Handles Failures
┌─────────────────────────────────────────────────────────────────┐ │ FAILURE SCENARIOS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ SCENARIO 1: Payment service crashes mid-transaction │ │ ───────────────────────────────────────────────── │ │ Solution: Saga rolls back all completed steps │ │ - Funds reserved? → Release reservation │ │ - Transfer done? → Reverse transfer │ │ │ │ SCENARIO 2: Database becomes unavailable │ │ ───────────────────────────────────────────── │ │ Solution: Circuit breaker opens, returns 503 │ │ Client retries with same idempotency key │ │ │ │ SCENARIO 3: Kafka is down │ │ ───────────────────────────────────────── │ │ Solution: Write to outbox table, process later │ │ │ │ SCENARIO 4: Duplicate request (network retry) │ │ ───────────────────────────────────────────── │ │ Solution: Idempotency key returns cached response │ │ │ │ SCENARIO 5: Network partition between services │ │ ───────────────────────────────────────────── │ │ Solution: Timeouts + saga compensation │ │ │ └─────────────────────────────────────────────────────────────────┘
7. Advanced Concepts
Let's cover patterns that separate production systems from tutorial projects.
Idempotency: The Safety Net
Problem: User clicks "Pay" twice. Network retries the request. You don't want to charge them twice.
Solution: Every mutating request includes an idempotency key.
go// Client sends: // POST /payments // Idempotency-Key: uuid-12345 // { "amount": 100 } func (s *Service) ProcessPayment(key string, amount int64) (*Result, error) { // Check if we've seen this key before result, found := s.cache.Get(key) if found { return result, nil // Same response as before } // Process the payment result, err := s.doPayment(amount) if err != nil { return nil, err } // Cache the result for 24 hours s.cache.Set(key, result, 24*time.Hour) return result, nil }
Saga Pattern: Distributed Transactions
Traditional transactions don't work across services. Sagas break a transaction into steps, each with a compensation action.
┌─────────────────────────────────────────────────────────────────┐ │ SAGA PATTERN │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ORDER SAGA: │ │ │ │ Step 1 Step 2 Step 3 Step 4 │ │ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ │ │ │Reserve│ ───▶ │ Charge│ ───▶ │ Ship │ ───▶ │Confirm│ │ │ │ Stock │ │ Card │ │ Order │ │ Order │ │ │ └───────┘ └───────┘ └───────┘ └───────┘ │ │ │ │ IF STEP 3 FAILS: │ │ │ │ Step 1 Step 2 Step 3 │ │ ┌───────┐ ┌───────┐ ┌───────┐ │ │ │Release│ ◀─── │Refund │ ◀─── │ ✗ │ │ │ │ Stock │ │ Card │ │ FAIL │ │ │ └───────┘ └───────┘ └───────┘ │ │ │ │ Compensation runs in reverse order! │ │ │ └─────────────────────────────────────────────────────────────────┘
Event Sourcing: Never Lose Data
Instead of storing current state, store all events that led to that state.
go// Traditional: Store current balance type Account struct { ID string Balance int64 // Current balance } // Event Sourcing: Store all events type AccountEvent struct { ID string AccountID string Type string // "DEPOSIT", "WITHDRAWAL", "TRANSFER" Amount int64 Timestamp time.Time } // To get current balance: replay all events func GetBalance(accountID string) int64 { events := eventStore.GetEvents(accountID) balance := int64(0) for _, event := range events { switch event.Type { case "DEPOSIT": balance += event.Amount case "WITHDRAWAL": balance -= event.Amount } } return balance } // Benefits: // - Complete audit trail // - Can replay to any point in time // - Easy to add new read models // - Perfect for compliance (banking, healthcare)
CQRS: Separate Read and Write
Command Query Responsibility Segregation splits your system into:
- Commands: Write operations (change state)
- Queries: Read operations (return data)
┌─────────────────────────────────────────────────────────────────┐ │ CQRS PATTERN │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ │ │ │ CLIENT │ │ │ └──────┬──────┘ │ │ │ │ │ ┌─────────────┴─────────────┐ │ │ │ │ │ │ ▼ ▼ │ │ ┌────────────┐ ┌────────────┐ │ │ │ COMMAND │ │ QUERY │ │ │ │ SERVICE │ │ SERVICE │ │ │ └─────┬──────┘ └─────┬──────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌────────────┐ ┌────────────┐ │ │ │ WRITE DB │────────────▶│ READ DB │ │ │ │ (Postgres) │ async sync │(Elasticsearch)│ │ │ │ Normalized │ │ Denormalized │ │ │ └────────────┘ └────────────┘ │ │ │ │ BENEFITS: │ │ - Scale reads and writes independently │ │ - Optimize read model for queries │ │ - Write model stays normalized │ │ │ └─────────────────────────────────────────────────────────────────┘
Backpressure: Don't Drown in Traffic
When a system is overwhelmed, it should push back rather than crash.
go// Worker pool with backpressure type WorkerPool struct { jobs chan Job workers int } func NewWorkerPool(workers, queueSize int) *WorkerPool { pool := &WorkerPool{ jobs: make(chan Job, queueSize), // Bounded queue! workers: workers, } for i := 0; i < workers; i++ { go pool.worker() } return pool } func (p *WorkerPool) Submit(job Job) error { select { case p.jobs <- job: return nil default: // Queue is full - backpressure! return ErrSystemOverloaded } } func (p *WorkerPool) worker() { for job := range p.jobs { job.Process() } }
8. Production Best Practices
These are the practices that have saved me countless times.
1. Timeouts Everywhere
Every external call needs a timeout. No exceptions.
go// Bad: Can hang forever resp, err := http.Get("http://external-service/api") // Good: Bounded time ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() req, _ := http.NewRequestWithContext(ctx, "GET", "http://external-service/api", nil) resp, err := http.DefaultClient.Do(req)
2. Retries with Jitter
Jitter prevents thundering herd when many clients retry simultaneously.
gofunc RetryWithJitter(fn func() error, maxRetries int) error { for i := 0; i < maxRetries; i++ { err := fn() if err == nil { return nil } // Exponential backoff: 100ms, 200ms, 400ms, 800ms... baseDelay := 100 * time.Millisecond * time.Duration(1<<i) // Add jitter: random 0-50% of delay jitter := time.Duration(rand.Int63n(int64(baseDelay / 2))) time.Sleep(baseDelay + jitter) } return errors.New("max retries exceeded") }
3. Circuit Breakers
Stop calling a failing service. Let it recover.
go// Using sony/gobreaker var cb *gobreaker.CircuitBreaker cb = gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "external-api", MaxRequests: 5, // Requests in half-open state Interval: 10*time.Second, // Reset counters after Timeout: 30*time.Second, // Time in open state ReadyToTrip: func(counts gobreaker.Counts) bool { // Open circuit if 50% failures return counts.ConsecutiveFailures > 5 }, }) func CallExternalAPI() (Response, error) { result, err := cb.Execute(func() (interface{}, error) { return externalAPI.Call() }) if err != nil { if err == gobreaker.ErrOpenState { return fallbackResponse(), nil } return nil, err } return result.(Response), nil }
4. Observability: The Three Pillars
go// Structured logging with context func ProcessOrder(ctx context.Context, orderID string) error { logger := log.With(). Str("trace_id", GetTraceID(ctx)). Str("order_id", orderID). Logger() logger.Info().Msg("Starting order processing") // Metrics orderProcessingStarted.Inc() timer := prometheus.NewTimer(orderProcessingDuration) defer timer.ObserveDuration() // Tracing ctx, span := tracer.Start(ctx, "ProcessOrder") defer span.End() span.SetAttributes(attribute.String("order.id", orderID)) // ... actual processing logger.Info().Msg("Order processing completed") orderProcessingCompleted.Inc() return nil }
5. Graceful Degradation
When parts fail, degrade gracefully instead of dying completely.
gofunc GetProductDetails(ctx context.Context, productID string) (*Product, error) { product, err := productService.Get(ctx, productID) if err != nil { log.Warn().Err(err).Msg("Product service unavailable") // Return cached/stale data instead of error return cache.GetStale(productID) } // Try to get recommendations (non-critical) recommendations, err := recommendationService.Get(ctx, productID) if err != nil { log.Warn().Err(err).Msg("Recommendations unavailable") // Continue without recommendations product.Recommendations = []string{} } else { product.Recommendations = recommendations } return product, nil }
6. Chaos Testing
Don't wait for production to find problems.
bash# Using Chaos Monkey / Litmus / Gremlin # Kill random pods kubectl delete pod -l app=payment-service --field-selector=status.phase=Running # Add network latency tc qdisc add dev eth0 root netem delay 200ms 50ms # Fill disk dd if=/dev/zero of=/tmp/fill bs=1M count=10000
9. Summary and Key Takeaways
The Journey
We've covered a lot. Let's recap:
WHY distributed systems:
- Single machines have hard limits
- Scale, reliability, and global reach require distribution
WHAT they are:
- Independent nodes, communicating over unreliable networks
- No global clock, partial failures are normal
HOW they work:
- Communication: Sync (REST, gRPC) and Async (Kafka, queues)
- Consistency: CAP theorem, eventual vs strong
- Consensus: Raft, Paxos for agreement
- Replication: Primary-replica, quorum-based
- Failure handling: Retries, circuit breakers, bulkheads
WHEN to use them:
- When you've exhausted vertical scaling
- When you need high availability
- When you need global presence
- NOT when you're still a small team figuring things out
Key Takeaways
┌─────────────────────────────────────────────────────────────────┐ │ KEY TAKEAWAYS │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ 1. Distribution is not free │ │ Every network call can fail, be slow, or return stale data │ │ │ │ 2. Start simple, distribute when necessary │ │ Monolith first, microservices when the pain is real │ │ │ │ 3. Embrace failure │ │ It's not "if" but "when" - design for it │ │ │ │ 4. Observability is not optional │ │ You can't fix what you can't see │ │ │ │ 5. Idempotency saves lives │ │ Make operations safe to retry │ │ │ │ 6. Timeouts everywhere │ │ Unbounded waits = cascading failures │ │ │ │ 7. Test failures in development │ │ Chaos testing beats 3 AM debugging │ │ │ │ 8. CAP is about tradeoffs, not absolutes │ │ Know what you're trading and why │ │ │ └─────────────────────────────────────────────────────────────────┘
Interview Cheat Sheet
When asked about distributed systems in interviews:
| Topic | Key Points to Mention |
|---|---|
| CAP Theorem | Trade-offs during partition, CP vs AP examples |
| Consistency | Strong vs eventual, use case differences |
| Consensus | Raft (explain leader election), quorum |
| Replication | Primary-replica, multi-leader, conflict resolution |
| Failures | Partial failure, timeout, retry with backoff |
| Patterns | Circuit breaker, bulkhead, saga, idempotency |
| Observability | Logs, metrics, traces - and why all three |
| Scaling | Horizontal vs vertical, when to use each |
Recommended Learning Path
-
Foundation:
- "Designing Data-Intensive Applications" by Martin Kleppmann (THE Bible)
- MIT 6.824 Distributed Systems course (free on YouTube)
-
Practice:
- Build a distributed key-value store
- Implement Raft consensus
- Set up a multi-region database
-
Production Experience:
- Run chaos experiments
- Debug real distributed issues
- On-call for distributed systems
-
Deep Dives:
- Google Spanner paper
- Amazon Dynamo paper
- Raft paper (understandable!)
Final thought: Distributed systems are hard. They're complex. They fail in ways you can't imagine. But they also enable experiences that single machines never could—global applications, real-time collaboration, systems that serve billions.
The engineers who master distributed systems don't memorize patterns. They understand the fundamental tradeoffs and apply judgment based on context.
Now go build something that can't fit on one machine. And when it fails at 3 AM, you'll at least know where to start looking.
Have questions? Found an error? Building something interesting? The best way to learn distributed systems is to discuss them. Keep learning, keep building, and remember: the network is never reliable.