Data Engineering

Building Real-Time Analytics with Kafka, Flink, and ClickHouse

March 28, 2025
17 min read
By Data Engineering Team

Architecture and implementation of streaming data pipelines for real-time analytics, handling millions of events per second with sub-second latency.

Introduction: The Real-Time Analytics Challenge

Modern applications generate millions of events per second - user clicks, sensor data, financial transactions, IoT telemetry. Traditional batch processing (running jobs nightly or hourly) is too slow for use cases that demand sub-second insights: fraud detection, recommendation engines, operational dashboards, and anomaly detection.

Real-time analytics streaming architectures enable processing data as it arrives, providing immediate insights and triggering automated actions. This post covers production-ready streaming architectures using Apache Kafka, Apache Flink, and cloud-native services.

  • Fraud Detection: Analyze transactions in real-time to block suspicious activity
  • Personalization: Update recommendations instantly based on user behavior
  • Operational Monitoring: Dashboard metrics with <1 second latency
  • IoT Analytics: Process sensor data from thousands of devices
  • Log Analytics: Real-time error detection and alerting
  1. 1.Stream Ingestion: Kafka, Kinesis, Pub/Sub
  2. 2.Stream Processing: Flink, Spark Streaming, Kafka Streams
  3. 3.State Management: RocksDB, Redis, DynamoDB
  4. 4.Data Sink: Elasticsearch, ClickHouse, TimescaleDB, S3
  5. 5.Monitoring: Prometheus, Grafana, Datadog

Architecture Overview

Stream Processing with Apache Flink

Why Flink for Streaming?

Flink provides true event-time processing, exactly-once semantics, and low-latency state management - making it ideal for production real-time analytics.

  1. 1.Event Time Processing: Handle late-arriving data correctly
  2. 2.Stateful Processing: Maintain aggregations, windows, joins
  3. 3.Exactly-Once Semantics: No duplicate processing
  4. 4.High Throughput: Process millions of events/second
  5. 5.Fault Tolerance: Automatic recovery from failures

Common Streaming Patterns:

1. Windowed Aggregations: Count/sum events over time windows
2. Stream Enrichment: Join streaming data with reference tables
3. Pattern Detection: Identify sequences (e.g., user journey analysis)
4. Anomaly Detection: ML models on streaming data
5. Stream-to-Stream Joins: Correlate events from multiple sources

Python
# Flink SQL for Real-Time Analytics
# Example: Real-time user activity dashboard

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Tumble

# Setup Flink environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# Configure Kafka source
table_env.execute_sql("""
    CREATE TABLE user_events (
        user_id STRING,
        event_type STRING,
        event_timestamp BIGINT,
        page_url STRING,
        session_id STRING,
        device_type STRING,
        country STRING,
        event_time AS TO_TIMESTAMP(FROM_UNIXTIME(event_timestamp)),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'flink-analytics',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json',
        'json.fail-on-missing-field' = 'false',
        'json.ignore-parse-errors' = 'true'
    )
""")

# Real-time aggregation: Events per minute by device type
table_env.execute_sql("""
    CREATE TABLE events_per_minute AS
    SELECT
        window_start,
        window_end,
        device_type,
        COUNT(*) as event_count,
        COUNT(DISTINCT user_id) as unique_users,
        COUNT(DISTINCT session_id) as sessions
    FROM TABLE(
        TUMBLE(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
    )
    GROUP BY window_start, window_end, device_type
""")

# Stream enrichment: Join with user profile data
table_env.execute_sql("""
    CREATE TABLE user_profiles (
        user_id STRING,
        plan_type STRING,
        signup_date STRING,
        PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://postgres:5432/users',
        'table-name' = 'user_profiles',
        'lookup.cache.max-rows' = '10000',
        'lookup.cache.ttl' = '1 hour'
    )
""")

# Enriched event stream
enriched_events = table_env.sql_query("""
    SELECT 
        e.user_id,
        e.event_type,
        e.event_time,
        e.page_url,
        e.device_type,
        p.plan_type,
        p.signup_date
    FROM user_events e
    LEFT JOIN user_profiles FOR SYSTEM_TIME AS OF e.event_time AS p
    ON e.user_id = p.user_id
""")

# Anomaly detection: Users with >100 events in 5 minutes
anomalies = table_env.sql_query("""
    SELECT
        user_id,
        window_start,
        COUNT(*) as event_count
    FROM TABLE(
        TUMBLE(TABLE user_events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
    )
    GROUP BY window_start, user_id
    HAVING COUNT(*) > 100
""")

# Write to Elasticsearch for real-time dashboards
table_env.execute_sql("""
    CREATE TABLE es_events_per_minute (
        window_start TIMESTAMP(3),
        device_type STRING,
        event_count BIGINT,
        unique_users BIGINT,
        sessions BIGINT,
        PRIMARY KEY (window_start, device_type) NOT ENFORCED
    ) WITH (
        'connector' = 'elasticsearch-7',
        'hosts' = 'http://elasticsearch:9200',
        'index' = 'events-per-minute',
        'document-id.key-delimiter' = '-',
        'sink.flush-on-checkpoint' = 'true',
        'sink.bulk-flush.max-actions' = '1000',
        'sink.bulk-flush.interval' = '1s',
        'format' = 'json'
    )
""")

# Insert aggregated data into Elasticsearch
table_env.execute_sql("""
    INSERT INTO es_events_per_minute
    SELECT * FROM events_per_minute
""")

# Write anomalies to Kafka for alerting
table_env.execute_sql("""
    CREATE TABLE kafka_anomalies (
        user_id STRING,
        window_start TIMESTAMP(3),
        event_count BIGINT
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-anomalies',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json'
    )
""")

table_env.execute_sql("""
    INSERT INTO kafka_anomalies
    SELECT * FROM anomalies
""")

# Execute all jobs
table_env.execute("Real-Time Analytics Pipeline")

State Management and Windowing

Stateful Stream Processing:

  • Running aggregations (count, sum, avg over time windows)
  • Session tracking (user activity within a session)
  • Pattern detection (sequence of events)
  • Stream joins (correlating events from multiple streams)

Windowing Strategies:

  • Use case: Metrics every 1 minute
  • Example: "Events per minute by device type"
  • Use case: Moving averages, trend detection
  • Example: "Average response time over last 5 minutes, updated every 30 seconds"
  • Use case: User session analytics
  • Example: "User activity within a session (15-min inactivity timeout)"
  • Use case: Custom aggregation logic
  • Example: "Alert after 10 failed login attempts"

State Backend Options:

  • Disk-based, scales to terabytes
  • Supports incremental checkpoints
  • Slightly higher latency than in-memory
  • Faster access (<1ms)
  • Limited by JVM memory
  • Use for small state (<1GB)
  • Auto-cleanup old state
  • Reduces memory footprint
  • Example: Keep only last 7 days of user activity

Production Considerations

Scalability:

  • Increase Kafka partitions and Flink parallelism together
  • Rule of thumb: 1 Flink task per Kafka partition
  • Example: 12 partitions = 12 Flink parallel instances
  • Monitor task queue sizes
  • Add more Flink task managers if backpressure detected
  • Consider sampling if processing can't keep up
  • CPU-bound: Increase parallelism
  • Memory-bound: Increase heap size or use RocksDB
  • Network-bound: Optimize serialization (Avro, Protobuf)

Fault Tolerance:

  • Frequency: Every 1-5 minutes (trade-off: recovery time vs overhead)
  • Incremental checkpoints for large state (RocksDB)
  • Store checkpoints in S3/HDFS (durable storage)
  • Requires idempotent sinks or transactional writes
  • Kafka sink: Use transactional producer
  • Database sink: Use upserts with event ID

Monitoring & Alerting:

  • Throughput: Events/second processed
  • Latency: End-to-end processing time (p50, p95, p99)
  • Backpressure: Task buffer utilization
  • Checkpoint Duration: Time to complete checkpoint
  • State Size: Growth over time
  • Backpressure > 80% for 5 minutes
  • Checkpoint failure
  • Job restart
  • Latency p99 > 5 seconds
  • Consumer lag > 1 million messages

Cost Optimization:

  • Use spot instances for non-critical jobs (50-70% savings)
  • Right-size task managers (4-8 cores typical)
  • Auto-scale based on consumer lag
  • Compress checkpoints (Snappy, ZSTD)
  • Use S3 Intelligent-Tiering for old checkpoints
  • Clean up old savepoints
  • Co-locate Kafka and Flink in same AZ
  • Use compression for Kafka messages
  • Batch small messages
  • Traffic: 1M events/second
  • Infrastructure: 10 Kafka brokers, 20 Flink task managers
  • Latency: p99 < 500ms
  • Cost: $15K/month (AWS)
  • Availability: 99.9%
KafkaFlinkReal-timeStreamingAnalytics

Need Expert Help?

Our team has extensive experience implementing solutions like this. Let's discuss your project.