Modern Distributed Computing Patterns: From Theory to Practice
Explore distributed systems architecture patterns including MapReduce, actor models, event sourcing, and CQRS with real-world implementation examples.
Introduction: The Evolution of Distributed Systems
Distributed systems have evolved from academic curiosities in the 1970s to the foundation of modern internet-scale applications. Today, virtually every web application, mobile app, and cloud service relies on distributed computing patterns to achieve scalability, reliability, and performance.
This article explores the fundamental patterns that power systems at companies like Netflix, Amazon, Google, and Uber. We'll cover MapReduce, Actor models, Event Sourcing, CQRS, Saga patterns, and more-with real-world implementations you can deploy today.
Pattern 1: MapReduce - Divide and Conquer at Scale
MapReduce Principles:
- 1.Map Phase: Transform input data into key-value pairs in parallel
- 2.Shuffle Phase: Group all values by key across nodes
- 3.Reduce Phase: Aggregate values for each key in parallel
- •Large-scale batch processing (logs, analytics, ETL)
- •Embarrassingly parallel problems
- •Data > memory on single machine
Modern Alternatives: Apache Spark (100x faster for iterative workloads)
MapReduce Implementation: Word Count
# Classic MapReduce: Word Count in Python (Hadoop Streaming)
# mapper.py
import sys
def mapper():
"""Emit (word, 1) for each word in input"""
for line in sys.stdin:
# Remove leading/trailing whitespace
line = line.strip()
# Split into words
words = line.split()
# Emit key-value pairs
for word in words:
# Output: word 1
print(f"{word} 1")
if __name__ == "__main__":
mapper()
# reducer.py
import sys
from collections import defaultdict
def reducer():
"""Sum counts for each word"""
current_word = None
current_count = 0
for line in sys.stdin:
# Parse input (word, count)
word, count = line.strip().split(' ')
count = int(count)
# Check if we're still on the same word
if current_word == word:
current_count += count
else:
# New word, emit previous
if current_word:
print(f"{current_word} {current_count}")
current_word = word
current_count = count
# Emit last word
if current_word:
print(f"{current_word} {current_count}")
if __name__ == "__main__":
reducer()
# Run on Hadoop:
# hadoop jar hadoop-streaming.jar \
# -input /data/logs/*.txt \
# -output /data/wordcount-output \
# -mapper mapper.py \
# -reducer reducer.py
# Modern Spark equivalent (10x less code):
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WordCount").getOrCreate()
# Read text files
lines = spark.read.text("/data/logs/*.txt")
# Word count in one line
word_counts = (lines
.selectExpr("explode(split(value, ' ')) as word")
.groupBy("word")
.count()
.orderBy("count", ascending=False))
word_counts.write.parquet("/data/wordcount-output")
Pattern 2: Actor Model - Message-Passing Concurrency
Actor Model Benefits:
- 1.No Race Conditions: Each actor processes messages sequentially
- 2.Location Transparency: Actors can be local or remote
- 3.Fault Isolation: Failed actors don't crash the system
- 4.Natural Scalability: Add more actors = add more capacity
- •Chat systems (WhatsApp uses Erlang actors)
- •Game servers (player = actor)
- •IoT device management
- •Financial trading systems
Frameworks: Akka (JVM), Orleans (.NET), Erlang/Elixir, Ray (Python)
Actor Model Implementation: Order Processing
# Actor Model in Python using Ray
import ray
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Order:
order_id: str
user_id: str
items: List[Dict]
total: float
timestamp: datetime
# Initialize Ray
ray.init()
@ray.remote
class OrderProcessorActor:
"""Actor that processes orders sequentially"""
def __init__(self, processor_id: str):
self.processor_id = processor_id
self.processed_orders = []
self.total_revenue = 0.0
def process_order(self, order: Order) -> Dict:
"""Process a single order (no shared state, no locks!)"""
try:
# Validate order
if order.total <= 0:
return {
"status": "rejected",
"order_id": order.order_id,
"reason": "Invalid total"
}
# Process payment (simulate)
payment_success = self._charge_payment(order)
if not payment_success:
return {
"status": "payment_failed",
"order_id": order.order_id
}
# Update inventory
inventory_updated = self._update_inventory(order)
if not inventory_updated:
# Refund payment
self._refund_payment(order)
return {
"status": "out_of_stock",
"order_id": order.order_id
}
# Success!
self.processed_orders.append(order.order_id)
self.total_revenue += order.total
return {
"status": "success",
"order_id": order.order_id,
"processed_by": self.processor_id,
"total": order.total
}
except Exception as e:
return {
"status": "error",
"order_id": order.order_id,
"error": str(e)
}
def _charge_payment(self, order: Order) -> bool:
# Payment gateway call
return True
def _update_inventory(self, order: Order) -> bool:
# Inventory service call
return True
def _refund_payment(self, order: Order) -> bool:
# Refund call
return True
def get_stats(self) -> Dict:
"""Get actor statistics"""
return {
"processor_id": self.processor_id,
"orders_processed": len(self.processed_orders),
"total_revenue": self.total_revenue
}
@ray.remote
class OrderCoordinatorActor:
"""Supervisor actor that distributes work"""
def __init__(self, num_processors: int = 4):
# Create worker actors
self.processors = [
OrderProcessorActor.remote(f"processor-{i}")
for i in range(num_processors)
]
self.round_robin_idx = 0
async def submit_order(self, order: Order) -> Dict:
"""Distribute order to available processor"""
processor = self.processors[self.round_robin_idx]
self.round_robin_idx = (self.round_robin_idx + 1) % len(self.processors)
# Send message to actor (async, non-blocking)
result = await processor.process_order.remote(order)
return result
async def get_system_stats(self) -> List[Dict]:
"""Get stats from all processors"""
stats_futures = [
processor.get_stats.remote()
for processor in self.processors
]
return await ray.get(stats_futures)
# Usage
async def main():
# Create coordinator with 4 worker actors
coordinator = OrderCoordinatorActor.remote(num_processors=4)
# Submit 1000 orders concurrently
orders = [
Order(
order_id=f"order-{i}",
user_id=f"user-{i % 100}",
items=[{"product": "widget", "qty": 1}],
total=99.99,
timestamp=datetime.now()
)
for i in range(1000)
]
# Process all orders (distributed across 4 actors)
results = await asyncio.gather(*[
coordinator.submit_order.remote(order)
for order in orders
])
# Get system stats
stats = await coordinator.get_system_stats.remote()
for stat in stats:
revenue = round(stat['total_revenue'], 2)
print(f"Processor {stat['processor_id']}: "
f"{stat['orders_processed']} orders, "
f"$\{revenue} revenue")
# Run
import asyncio
asyncio.run(main())
# Benefits:
# - No locks, no race conditions
# - Fault isolated (one actor crash doesn't affect others)
# - Scales horizontally (add more actors)
# - Location transparent (actors can be on different machines)
Pattern 3: Event Sourcing - Immutable Event Log
Event Sourcing Principles:
Instead of storing current state, store the sequence of events that led to that state.
- 1.Perfect Audit Trail: Every change is recorded
- 2.Time Travel: Replay to any point in history
- 3.Debug Production: Reproduce exact state that caused bugs
- 4.Event-Driven: Natural fit for microservices
- 5.Analytics: Rich data for business intelligence
- 1.Complexity: More moving parts than CRUD
- 2.Schema Evolution: Old events must still be readable
- 3.Snapshots: Need periodic snapshots for performance
- 4.Eventual Consistency: Projections lag behind events
- •Financial systems (audit requirements)
- •Collaborative editing (version history)
- •Fraud detection (need full history)
- •Complex domains with many state transitions
Event Sourcing Implementation
# Event Sourcing Pattern in Python
from typing import List, Dict, Any
from dataclasses import dataclass, asdict
from datetime import datetime
from abc import ABC, abstractmethod
import json
# Base Event class
@dataclass
class Event(ABC):
aggregate_id: str
event_id: str
timestamp: datetime
version: int
def to_dict(self) -> Dict:
return {
**asdict(self),
'event_type': self.__class__.__name__,
'timestamp': self.timestamp.isoformat()
}
# Domain Events
@dataclass
class AccountCreated(Event):
initial_balance: float
currency: str = "USD"
@dataclass
class MoneyDeposited(Event):
amount: float
source: str
@dataclass
class MoneyWithdrawn(Event):
amount: float
destination: str
@dataclass
class AccountClosed(Event):
reason: str
# Aggregate (Entity reconstructed from events)
class BankAccount:
def __init__(self, account_id: str):
self.account_id = account_id
self.balance = 0.0
self.is_active = False
self.version = 0
self.uncommitted_events: List[Event] = []
# Command handlers (produce events)
def create_account(self, initial_balance: float = 0.0) -> None:
if self.is_active:
raise ValueError("Account already exists")
event = AccountCreated(
aggregate_id=self.account_id,
event_id=f"evt-{self.version}",
timestamp=datetime.now(),
version=self.version,
initial_balance=initial_balance
)
self._apply_event(event)
self.uncommitted_events.append(event)
def deposit(self, amount: float, source: str) -> None:
if not self.is_active:
raise ValueError("Account is not active")
if amount <= 0:
raise ValueError("Amount must be positive")
event = MoneyDeposited(
aggregate_id=self.account_id,
event_id=f"evt-{self.version}",
timestamp=datetime.now(),
version=self.version,
amount=amount,
source=source
)
self._apply_event(event)
self.uncommitted_events.append(event)
def withdraw(self, amount: float, destination: str) -> None:
if not self.is_active:
raise ValueError("Account is not active")
if amount <= 0:
raise ValueError("Amount must be positive")
if amount > self.balance:
raise ValueError("Insufficient funds")
event = MoneyWithdrawn(
aggregate_id=self.account_id,
event_id=f"evt-{self.version}",
timestamp=datetime.now(),
version=self.version,
amount=amount,
destination=destination
)
self._apply_event(event)
self.uncommitted_events.append(event)
# Event handlers (mutate state)
def _apply_event(self, event: Event) -> None:
"""Apply event to current state"""
if isinstance(event, AccountCreated):
self.balance = event.initial_balance
self.is_active = True
elif isinstance(event, MoneyDeposited):
self.balance += event.amount
elif isinstance(event, MoneyWithdrawn):
self.balance -= event.amount
elif isinstance(event, AccountClosed):
self.is_active = False
self.version += 1
def get_uncommitted_events(self) -> List[Event]:
"""Get events that haven't been saved yet"""
return self.uncommitted_events.copy()
def mark_events_committed(self) -> None:
"""Clear uncommitted events after saving"""
self.uncommitted_events.clear()
@classmethod
def from_events(cls, account_id: str, events: List[Event]) -> 'BankAccount':
"""Reconstruct aggregate from event history"""
account = cls(account_id)
for event in events:
account._apply_event(event)
return account
# Event Store
class EventStore:
def __init__(self):
self.events: Dict[str, List[Event]] = {}
def save_events(self, aggregate_id: str, events: List[Event]) -> None:
"""Append events to the store"""
if aggregate_id not in self.events:
self.events[aggregate_id] = []
self.events[aggregate_id].extend(events)
print(f"Saved {len(events)} events for {aggregate_id}")
def get_events(self, aggregate_id: str) -> List[Event]:
"""Get all events for an aggregate"""
return self.events.get(aggregate_id, [])
def get_events_after_version(
self,
aggregate_id: str,
version: int
) -> List[Event]:
"""Get events after a specific version (for incremental updates)"""
all_events = self.get_events(aggregate_id)
return [e for e in all_events if e.version > version]
# Repository (loads aggregates from events)
class BankAccountRepository:
def __init__(self, event_store: EventStore):
self.event_store = event_store
def get(self, account_id: str) -> BankAccount:
"""Load account by replaying events"""
events = self.event_store.get_events(account_id)
if not events:
raise ValueError(f"Account {account_id} not found")
return BankAccount.from_events(account_id, events)
def save(self, account: BankAccount) -> None:
"""Save uncommitted events"""
uncommitted = account.get_uncommitted_events()
if uncommitted:
self.event_store.save_events(account.account_id, uncommitted)
account.mark_events_committed()
# Usage Example
def main():
# Setup
event_store = EventStore()
repo = BankAccountRepository(event_store)
# Create new account
account = BankAccount("ACC-12345")
account.create_account(initial_balance=100.0)
repo.save(account)
# Load account from events
loaded_account = repo.get("ACC-12345")
print(f"Balance after creation: $\{loaded_account.balance}")
# Perform transactions
loaded_account.deposit(500.0, "Paycheck")
loaded_account.withdraw(200.0, "Rent")
loaded_account.deposit(50.0, "Refund")
repo.save(loaded_account)
# Reload from events
final_account = repo.get("ACC-12345")
print(f"Final balance: $\{final_account.balance}")
print(f"Total events: {final_account.version}")
# View event history
all_events = event_store.get_events("ACC-12345")
print("\nEvent History:")
for event in all_events:
print(f" {event.event_id}: {event.__class__.__name__} "
f"at {event.timestamp}")
if __name__ == "__main__":
main()
# Output:
# Saved 1 events for ACC-12345
# Balance after creation: $100.0
# Saved 3 events for ACC-12345
# Final balance: $450.0
# Total events: 4
# Event History:
# evt-0: AccountCreated at 2025-01-15 10:30:00
# evt-1: MoneyDeposited at 2025-01-15 10:30:01
# evt-2: MoneyWithdrawn at 2025-01-15 10:30:02
# evt-3: MoneyDeposited at 2025-01-15 10:30:03
Pattern 4: CQRS (Command Query Responsibility Segregation)
CQRS Separates Reads and Writes:
Instead of a single model for both reading and writing data, use separate models optimized for each:
- •Command Model: Optimized for writes (normalization, transactions, consistency)
- •Query Model: Optimized for reads (denormalization, caching, speed)
- 1.Performance: Optimize each side independently
- 2.Scalability: Scale reads and writes separately
- 3.Flexibility: Different databases for different needs
- 4.Maintainability: Clear separation of concerns
- •High read:write ratios (10:1 or higher)
- •Complex queries that don't fit transactional model
- •Need different scaling for reads vs writes
- •Event sourcing (natural fit)
- •Write: PostgreSQL (product updates, inventory)
- •Read: Elasticsearch (fast search) + Redis (popular products)
- •Eventual consistency between models
- •More complex architecture
- •Data synchronization overhead
Key Takeaways: Distributed Patterns
1. MapReduce: Batch process massive datasets in parallel across clusters. Modern alternative: Apache Spark for 10-100x better performance.
2. Actor Model: Message-passing concurrency without shared state. Perfect for chat systems, games, IoT. Frameworks: Akka, Erlang, Ray.
3. Event Sourcing: Store events, not state. Perfect audit trail and time travel. Adds complexity but essential for financial/compliance systems.
4. CQRS: Separate read and write models for independent optimization. Combine with event sourcing for powerful architectures.
- •Data processing → MapReduce/Spark
- •Concurrent systems → Actor Model
- •Audit requirements → Event Sourcing
- •Read-heavy systems → CQRS
6. Production Reality: Most systems use multiple patterns. Example: Netflix uses actors for user sessions, event sourcing for billing, CQRS for recommendations, and MapReduce for analytics.
The key is understanding trade-offs and applying the right pattern to each part of your system.
Related Articles
Erasure Coding in Distributed Storage: Mathematics, Implementation, and Trade-offs
Comprehensive exploration of erasure coding techniques, Reed-Solomon codes, storage efficiency, fault tolerance mathematics, and practical implementation in systems like HDFS, Ceph, and S3.
ArchitectureModern API Design: REST, GraphQL, and gRPC in Production
Comprehensive comparison of API design patterns with real-world examples, performance benchmarks, and guidance on choosing the right approach.