Architecture

Modern Distributed Computing Patterns: From Theory to Practice

February 25, 2025
24 min read
By Architecture Team

Explore distributed systems architecture patterns including MapReduce, actor models, event sourcing, and CQRS with real-world implementation examples.

Introduction: The Evolution of Distributed Systems

Distributed systems have evolved from academic curiosities in the 1970s to the foundation of modern internet-scale applications. Today, virtually every web application, mobile app, and cloud service relies on distributed computing patterns to achieve scalability, reliability, and performance.

This article explores the fundamental patterns that power systems at companies like Netflix, Amazon, Google, and Uber. We'll cover MapReduce, Actor models, Event Sourcing, CQRS, Saga patterns, and more-with real-world implementations you can deploy today.

Pattern 1: MapReduce - Divide and Conquer at Scale

MapReduce Principles:

  1. 1.Map Phase: Transform input data into key-value pairs in parallel
  2. 2.Shuffle Phase: Group all values by key across nodes
  3. 3.Reduce Phase: Aggregate values for each key in parallel
  • Large-scale batch processing (logs, analytics, ETL)
  • Embarrassingly parallel problems
  • Data > memory on single machine

Modern Alternatives: Apache Spark (100x faster for iterative workloads)

MapReduce Implementation: Word Count

Python
# Classic MapReduce: Word Count in Python (Hadoop Streaming)

# mapper.py
import sys

def mapper():
    """Emit (word, 1) for each word in input"""
    for line in sys.stdin:
        # Remove leading/trailing whitespace
        line = line.strip()
        # Split into words
        words = line.split()
        # Emit key-value pairs
        for word in words:
            # Output: word	1
            print(f"{word}	1")

if __name__ == "__main__":
    mapper()

# reducer.py
import sys
from collections import defaultdict

def reducer():
    """Sum counts for each word"""
    current_word = None
    current_count = 0
    
    for line in sys.stdin:
        # Parse input (word, count)
        word, count = line.strip().split('	')
        count = int(count)
        
        # Check if we're still on the same word
        if current_word == word:
            current_count += count
        else:
            # New word, emit previous
            if current_word:
                print(f"{current_word}	{current_count}")
            current_word = word
            current_count = count
    
    # Emit last word
    if current_word:
        print(f"{current_word}	{current_count}")

if __name__ == "__main__":
    reducer()

# Run on Hadoop:
# hadoop jar hadoop-streaming.jar \
#   -input /data/logs/*.txt \
#   -output /data/wordcount-output \
#   -mapper mapper.py \
#   -reducer reducer.py

# Modern Spark equivalent (10x less code):
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WordCount").getOrCreate()

# Read text files
lines = spark.read.text("/data/logs/*.txt")

# Word count in one line
word_counts = (lines
    .selectExpr("explode(split(value, ' ')) as word")
    .groupBy("word")
    .count()
    .orderBy("count", ascending=False))

word_counts.write.parquet("/data/wordcount-output")

Pattern 2: Actor Model - Message-Passing Concurrency

Actor Model Benefits:

  1. 1.No Race Conditions: Each actor processes messages sequentially
  2. 2.Location Transparency: Actors can be local or remote
  3. 3.Fault Isolation: Failed actors don't crash the system
  4. 4.Natural Scalability: Add more actors = add more capacity
  • Chat systems (WhatsApp uses Erlang actors)
  • Game servers (player = actor)
  • IoT device management
  • Financial trading systems

Frameworks: Akka (JVM), Orleans (.NET), Erlang/Elixir, Ray (Python)

Actor Model Implementation: Order Processing

Python
# Actor Model in Python using Ray

import ray
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Order:
    order_id: str
    user_id: str
    items: List[Dict]
    total: float
    timestamp: datetime

# Initialize Ray
ray.init()

@ray.remote
class OrderProcessorActor:
    """Actor that processes orders sequentially"""
    
    def __init__(self, processor_id: str):
        self.processor_id = processor_id
        self.processed_orders = []
        self.total_revenue = 0.0
    
    def process_order(self, order: Order) -> Dict:
        """Process a single order (no shared state, no locks!)"""
        try:
            # Validate order
            if order.total <= 0:
                return {
                    "status": "rejected",
                    "order_id": order.order_id,
                    "reason": "Invalid total"
                }
            
            # Process payment (simulate)
            payment_success = self._charge_payment(order)
            if not payment_success:
                return {
                    "status": "payment_failed",
                    "order_id": order.order_id
                }
            
            # Update inventory
            inventory_updated = self._update_inventory(order)
            if not inventory_updated:
                # Refund payment
                self._refund_payment(order)
                return {
                    "status": "out_of_stock",
                    "order_id": order.order_id
                }
            
            # Success!
            self.processed_orders.append(order.order_id)
            self.total_revenue += order.total
            
            return {
                "status": "success",
                "order_id": order.order_id,
                "processed_by": self.processor_id,
                "total": order.total
            }
        
        except Exception as e:
            return {
                "status": "error",
                "order_id": order.order_id,
                "error": str(e)
            }
    
    def _charge_payment(self, order: Order) -> bool:
        # Payment gateway call
        return True
    
    def _update_inventory(self, order: Order) -> bool:
        # Inventory service call
        return True
    
    def _refund_payment(self, order: Order) -> bool:
        # Refund call
        return True
    
    def get_stats(self) -> Dict:
        """Get actor statistics"""
        return {
            "processor_id": self.processor_id,
            "orders_processed": len(self.processed_orders),
            "total_revenue": self.total_revenue
        }

@ray.remote
class OrderCoordinatorActor:
    """Supervisor actor that distributes work"""
    
    def __init__(self, num_processors: int = 4):
        # Create worker actors
        self.processors = [
            OrderProcessorActor.remote(f"processor-{i}")
            for i in range(num_processors)
        ]
        self.round_robin_idx = 0
    
    async def submit_order(self, order: Order) -> Dict:
        """Distribute order to available processor"""
        processor = self.processors[self.round_robin_idx]
        self.round_robin_idx = (self.round_robin_idx + 1) % len(self.processors)
        
        # Send message to actor (async, non-blocking)
        result = await processor.process_order.remote(order)
        return result
    
    async def get_system_stats(self) -> List[Dict]:
        """Get stats from all processors"""
        stats_futures = [
            processor.get_stats.remote()
            for processor in self.processors
        ]
        return await ray.get(stats_futures)

# Usage
async def main():
    # Create coordinator with 4 worker actors
    coordinator = OrderCoordinatorActor.remote(num_processors=4)
    
    # Submit 1000 orders concurrently
    orders = [
        Order(
            order_id=f"order-{i}",
            user_id=f"user-{i % 100}",
            items=[{"product": "widget", "qty": 1}],
            total=99.99,
            timestamp=datetime.now()
        )
        for i in range(1000)
    ]
    
    # Process all orders (distributed across 4 actors)
    results = await asyncio.gather(*[
        coordinator.submit_order.remote(order)
        for order in orders
    ])
    
    # Get system stats
    stats = await coordinator.get_system_stats.remote()
    for stat in stats:
        revenue = round(stat['total_revenue'], 2)
        print(f"Processor {stat['processor_id']}: "
              f"{stat['orders_processed']} orders, "
              f"$\{revenue} revenue")

# Run
import asyncio
asyncio.run(main())

# Benefits:
# - No locks, no race conditions
# - Fault isolated (one actor crash doesn't affect others)
# - Scales horizontally (add more actors)
# - Location transparent (actors can be on different machines)

Pattern 3: Event Sourcing - Immutable Event Log

Event Sourcing Principles:

Instead of storing current state, store the sequence of events that led to that state.

  1. 1.Perfect Audit Trail: Every change is recorded
  2. 2.Time Travel: Replay to any point in history
  3. 3.Debug Production: Reproduce exact state that caused bugs
  4. 4.Event-Driven: Natural fit for microservices
  5. 5.Analytics: Rich data for business intelligence
  1. 1.Complexity: More moving parts than CRUD
  2. 2.Schema Evolution: Old events must still be readable
  3. 3.Snapshots: Need periodic snapshots for performance
  4. 4.Eventual Consistency: Projections lag behind events
  • Financial systems (audit requirements)
  • Collaborative editing (version history)
  • Fraud detection (need full history)
  • Complex domains with many state transitions

Event Sourcing Implementation

Python
# Event Sourcing Pattern in Python

from typing import List, Dict, Any
from dataclasses import dataclass, asdict
from datetime import datetime
from abc import ABC, abstractmethod
import json

# Base Event class
@dataclass
class Event(ABC):
    aggregate_id: str
    event_id: str
    timestamp: datetime
    version: int
    
    def to_dict(self) -> Dict:
        return {
            **asdict(self),
            'event_type': self.__class__.__name__,
            'timestamp': self.timestamp.isoformat()
        }

# Domain Events
@dataclass
class AccountCreated(Event):
    initial_balance: float
    currency: str = "USD"

@dataclass
class MoneyDeposited(Event):
    amount: float
    source: str

@dataclass
class MoneyWithdrawn(Event):
    amount: float
    destination: str

@dataclass
class AccountClosed(Event):
    reason: str

# Aggregate (Entity reconstructed from events)
class BankAccount:
    def __init__(self, account_id: str):
        self.account_id = account_id
        self.balance = 0.0
        self.is_active = False
        self.version = 0
        self.uncommitted_events: List[Event] = []
    
    # Command handlers (produce events)
    def create_account(self, initial_balance: float = 0.0) -> None:
        if self.is_active:
            raise ValueError("Account already exists")
        
        event = AccountCreated(
            aggregate_id=self.account_id,
            event_id=f"evt-{self.version}",
            timestamp=datetime.now(),
            version=self.version,
            initial_balance=initial_balance
        )
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    def deposit(self, amount: float, source: str) -> None:
        if not self.is_active:
            raise ValueError("Account is not active")
        if amount <= 0:
            raise ValueError("Amount must be positive")
        
        event = MoneyDeposited(
            aggregate_id=self.account_id,
            event_id=f"evt-{self.version}",
            timestamp=datetime.now(),
            version=self.version,
            amount=amount,
            source=source
        )
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    def withdraw(self, amount: float, destination: str) -> None:
        if not self.is_active:
            raise ValueError("Account is not active")
        if amount <= 0:
            raise ValueError("Amount must be positive")
        if amount > self.balance:
            raise ValueError("Insufficient funds")
        
        event = MoneyWithdrawn(
            aggregate_id=self.account_id,
            event_id=f"evt-{self.version}",
            timestamp=datetime.now(),
            version=self.version,
            amount=amount,
            destination=destination
        )
        self._apply_event(event)
        self.uncommitted_events.append(event)
    
    # Event handlers (mutate state)
    def _apply_event(self, event: Event) -> None:
        """Apply event to current state"""
        if isinstance(event, AccountCreated):
            self.balance = event.initial_balance
            self.is_active = True
        elif isinstance(event, MoneyDeposited):
            self.balance += event.amount
        elif isinstance(event, MoneyWithdrawn):
            self.balance -= event.amount
        elif isinstance(event, AccountClosed):
            self.is_active = False
        
        self.version += 1
    
    def get_uncommitted_events(self) -> List[Event]:
        """Get events that haven't been saved yet"""
        return self.uncommitted_events.copy()
    
    def mark_events_committed(self) -> None:
        """Clear uncommitted events after saving"""
        self.uncommitted_events.clear()
    
    @classmethod
    def from_events(cls, account_id: str, events: List[Event]) -> 'BankAccount':
        """Reconstruct aggregate from event history"""
        account = cls(account_id)
        for event in events:
            account._apply_event(event)
        return account

# Event Store
class EventStore:
    def __init__(self):
        self.events: Dict[str, List[Event]] = {}
    
    def save_events(self, aggregate_id: str, events: List[Event]) -> None:
        """Append events to the store"""
        if aggregate_id not in self.events:
            self.events[aggregate_id] = []
        
        self.events[aggregate_id].extend(events)
        print(f"Saved {len(events)} events for {aggregate_id}")
    
    def get_events(self, aggregate_id: str) -> List[Event]:
        """Get all events for an aggregate"""
        return self.events.get(aggregate_id, [])
    
    def get_events_after_version(
        self,
        aggregate_id: str,
        version: int
    ) -> List[Event]:
        """Get events after a specific version (for incremental updates)"""
        all_events = self.get_events(aggregate_id)
        return [e for e in all_events if e.version > version]

# Repository (loads aggregates from events)
class BankAccountRepository:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def get(self, account_id: str) -> BankAccount:
        """Load account by replaying events"""
        events = self.event_store.get_events(account_id)
        if not events:
            raise ValueError(f"Account {account_id} not found")
        
        return BankAccount.from_events(account_id, events)
    
    def save(self, account: BankAccount) -> None:
        """Save uncommitted events"""
        uncommitted = account.get_uncommitted_events()
        if uncommitted:
            self.event_store.save_events(account.account_id, uncommitted)
            account.mark_events_committed()

# Usage Example
def main():
    # Setup
    event_store = EventStore()
    repo = BankAccountRepository(event_store)
    
    # Create new account
    account = BankAccount("ACC-12345")
    account.create_account(initial_balance=100.0)
    repo.save(account)
    
    # Load account from events
    loaded_account = repo.get("ACC-12345")
    print(f"Balance after creation: $\{loaded_account.balance}")
    
    # Perform transactions
    loaded_account.deposit(500.0, "Paycheck")
    loaded_account.withdraw(200.0, "Rent")
    loaded_account.deposit(50.0, "Refund")
    repo.save(loaded_account)
    
    # Reload from events
    final_account = repo.get("ACC-12345")
    print(f"Final balance: $\{final_account.balance}")
    print(f"Total events: {final_account.version}")
    
    # View event history
    all_events = event_store.get_events("ACC-12345")
    print("\nEvent History:")
    for event in all_events:
        print(f"  {event.event_id}: {event.__class__.__name__} "
              f"at {event.timestamp}")

if __name__ == "__main__":
    main()

# Output:
# Saved 1 events for ACC-12345
# Balance after creation: $100.0
# Saved 3 events for ACC-12345
# Final balance: $450.0
# Total events: 4
# Event History:
#   evt-0: AccountCreated at 2025-01-15 10:30:00
#   evt-1: MoneyDeposited at 2025-01-15 10:30:01
#   evt-2: MoneyWithdrawn at 2025-01-15 10:30:02
#   evt-3: MoneyDeposited at 2025-01-15 10:30:03

Pattern 4: CQRS (Command Query Responsibility Segregation)

CQRS Separates Reads and Writes:

Instead of a single model for both reading and writing data, use separate models optimized for each:

  • Command Model: Optimized for writes (normalization, transactions, consistency)
  • Query Model: Optimized for reads (denormalization, caching, speed)
  1. 1.Performance: Optimize each side independently
  2. 2.Scalability: Scale reads and writes separately
  3. 3.Flexibility: Different databases for different needs
  4. 4.Maintainability: Clear separation of concerns
  • High read:write ratios (10:1 or higher)
  • Complex queries that don't fit transactional model
  • Need different scaling for reads vs writes
  • Event sourcing (natural fit)
  • Write: PostgreSQL (product updates, inventory)
  • Read: Elasticsearch (fast search) + Redis (popular products)
  • Eventual consistency between models
  • More complex architecture
  • Data synchronization overhead

Key Takeaways: Distributed Patterns

1. MapReduce: Batch process massive datasets in parallel across clusters. Modern alternative: Apache Spark for 10-100x better performance.

2. Actor Model: Message-passing concurrency without shared state. Perfect for chat systems, games, IoT. Frameworks: Akka, Erlang, Ray.

3. Event Sourcing: Store events, not state. Perfect audit trail and time travel. Adds complexity but essential for financial/compliance systems.

4. CQRS: Separate read and write models for independent optimization. Combine with event sourcing for powerful architectures.

  • Data processing → MapReduce/Spark
  • Concurrent systems → Actor Model
  • Audit requirements → Event Sourcing
  • Read-heavy systems → CQRS

6. Production Reality: Most systems use multiple patterns. Example: Netflix uses actors for user sessions, event sourcing for billing, CQRS for recommendations, and MapReduce for analytics.

The key is understanding trade-offs and applying the right pattern to each part of your system.

Distributed SystemsArchitectureMicroservicesScalability

Need Expert Help?

Our team has extensive experience implementing solutions like this. Let's discuss your project.