Database

Apache Druid

Overview

Apache Druid is a real-time analytics database management system designed for fast data ingestion and immediate query execution on streaming data. It is optimized for time-series and event data analysis, combining columnar storage with low-latency interactive analytics and high-throughput ingestion capabilities.

Details

Apache Druid was originally developed by Metamarkets and became an Apache Software Foundation top-level project in 2018. As a columnar database designed for real-time analytics, it offers the following characteristics:

Architecture Features

  • Hybrid Storage: Hot data in memory, cold data on disk
  • Pre-aggregation: Metrics are pre-computed during data ingestion
  • Segment-oriented: Data is partitioned into time-based segments
  • Distributed Architecture: Horizontal scaling across multiple nodes

Key Components

  • Coordinator: Manages metadata and monitors cluster state
  • Overlord: Manages data ingestion tasks
  • Historical: Stores historical data and processes queries
  • MiddleManager: Executes real-time ingestion tasks
  • Broker: Routes queries and merges results
  • Router: HTTP API entry point

Technical Features

  • Real-time Ingestion: Streaming ingestion from Kafka, Kinesis, etc.
  • Batch Ingestion: Bulk data processing from S3, HDFS, etc.
  • Fast Queries: Millisecond-level interactive analytics
  • Auto-scaling: Resource adjustment based on demand
  • Rollup: Data compression and query acceleration
  • Multi-dimensional Analysis: Query engine optimized for OLAP operations

Pros and Cons

Pros

  • Ultra-low Latency: Millisecond query response times
  • Real-time Processing: Seconds from data ingestion to analysis
  • High Availability: Fault-tolerant distributed architecture
  • Scalability: Horizontal scaling for large-scale data
  • Flexible Ingestion: Supports both batch and streaming
  • Time-series Optimized: Specialized for time-series data analysis
  • Auto-optimization: Automated indexing and data compression
  • SQL Compatible: Query with near-standard SQL syntax

Cons

  • Complexity: Complex operations due to multi-component architecture
  • Update Limitations: Difficult to update/delete individual records
  • Memory Consumption: Requires large amounts of memory for fast processing
  • Learning Curve: Unique architecture and configuration requirements
  • Join Limitations: Inefficient joins between large tables
  • Storage Overhead: Pre-aggregation may increase data size

Key Links

Code Examples

Installation & Setup

# Start with Docker Compose (Recommended)
curl -O https://raw.githubusercontent.com/apache/druid/master/distribution/docker/docker-compose.yml
docker-compose up -d

# Start with individual Docker container
docker run -p 8888:8888 -d apache/druid

# Installation from tar file
wget https://archive.apache.org/dist/druid/0.23.0/apache-druid-0.23.0-bin.tar.gz
tar -xzf apache-druid-0.23.0-bin.tar.gz
cd apache-druid-0.23.0

# Start single machine (development)
./bin/start-micro-quickstart

# Deploy with Kubernetes
helm repo add druid https://druid.apache.org/helm
helm install druid druid/druid

# Access Web Console
# http://localhost:8888/

# Access REST API
curl -X GET http://localhost:8888/status

Basic Operations (Data Definition & Manipulation)

-- Create datasource (ingestion specification)
{
  "type": "index_parallel",
  "spec": {
    "dataSchema": {
      "dataSource": "web_events",
      "timestampSpec": {
        "column": "timestamp",
        "format": "auto"
      },
      "dimensionsSpec": {
        "dimensions": [
          "userId",
          "sessionId", 
          "page",
          "country",
          "device",
          "browser"
        ]
      },
      "metricsSpec": [
        {"type": "count", "name": "events"},
        {"type": "longSum", "name": "duration", "fieldName": "duration"},
        {"type": "longSum", "name": "bytes", "fieldName": "bytes"}
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "HOUR",
        "rollup": true
      }
    },
    "ioConfig": {
      "type": "index_parallel",
      "inputSource": {
        "type": "s3",
        "uris": ["s3://bucket/events/2024/01/15/events.json"]
      },
      "inputFormat": {
        "type": "json"
      }
    },
    "tuningConfig": {
      "type": "index_parallel",
      "maxRowsPerSegment": 1000000,
      "maxRowsInMemory": 100000
    }
  }
}

-- Real-time ingestion from Kafka
{
  "type": "kafka",
  "spec": {
    "dataSchema": {
      "dataSource": "realtime_events",
      "timestampSpec": {
        "column": "timestamp",
        "format": "iso"
      },
      "dimensionsSpec": {
        "dimensions": ["userId", "action", "page"]
      },
      "metricsSpec": [
        {"type": "count", "name": "count"},
        {"type": "doubleSum", "name": "value", "fieldName": "value"}
      ]
    },
    "ioConfig": {
      "topic": "events",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "taskCount": 1,
      "replicas": 1,
      "taskDuration": "PT1H"
    },
    "tuningConfig": {
      "type": "kafka",
      "maxRowsInMemory": 100000
    }
  }
}

-- Check datasource information
SELECT COLUMN_NAME, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME = 'web_events';

-- Check segment information
SELECT
  "datasource",
  "start",
  "end", 
  "size",
  "num_rows"
FROM sys.segments
WHERE "datasource" = 'web_events'
ORDER BY "start" DESC;

Querying & Analytics

-- Basic time series query
SELECT
  TIME_FLOOR(__time, 'PT1H') AS hour,
  COUNT(*) AS events,
  COUNT(DISTINCT userId) AS unique_users,
  SUM(duration) AS total_duration,
  AVG(duration) AS avg_duration
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
GROUP BY 1
ORDER BY 1;

-- TOP N query
SELECT
  page,
  COUNT(*) AS page_views,
  COUNT(DISTINCT userId) AS unique_visitors
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
GROUP BY page
ORDER BY page_views DESC
LIMIT 10;

-- Multi-dimensional analysis
SELECT
  country,
  device,
  browser,
  TIME_FLOOR(__time, 'P1D') AS day,
  COUNT(*) AS events,
  SUM(bytes) AS total_bytes,
  AVG(duration) AS avg_duration
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1, 2, 3, 4
ORDER BY day DESC, events DESC;

-- Filtering and drill-down
SELECT
  TIME_FLOOR(__time, 'PT1H') AS hour,
  COUNT(*) FILTER (WHERE page = '/home') AS home_views,
  COUNT(*) FILTER (WHERE page = '/products') AS product_views,
  COUNT(*) FILTER (WHERE page = '/checkout') AS checkout_views,
  COUNT(DISTINCT userId) AS unique_users
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
  AND country = 'US'
  AND device = 'mobile'
GROUP BY 1
ORDER BY 1;

Advanced Analytics

-- Window function analysis
SELECT
  userId,
  __time,
  page,
  LAG(page) OVER (PARTITION BY userId ORDER BY __time) AS previous_page,
  LEAD(page) OVER (PARTITION BY userId ORDER BY __time) AS next_page,
  ROW_NUMBER() OVER (PARTITION BY userId ORDER BY __time) AS page_sequence
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY
  AND userId = '12345'
ORDER BY __time;

-- Cohort analysis
WITH user_first_visit AS (
  SELECT
    userId,
    TIME_FLOOR(MIN(__time), 'P1D') AS first_visit_date
  FROM web_events
  GROUP BY userId
),
user_activity AS (
  SELECT
    e.userId,
    ufv.first_visit_date,
    TIME_FLOOR(e.__time, 'P1D') AS activity_date,
    EXTRACT(DAY FROM e.__time - ufv.first_visit_date) AS days_since_first
  FROM web_events e
  JOIN user_first_visit ufv ON e.userId = ufv.userId
)
SELECT
  first_visit_date,
  days_since_first,
  COUNT(DISTINCT userId) AS active_users
FROM user_activity
WHERE first_visit_date >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1, 2
ORDER BY 1, 2;

-- Funnel analysis
SELECT
  step,
  COUNT(DISTINCT userId) AS users,
  LAG(COUNT(DISTINCT userId)) OVER (ORDER BY step) AS prev_step_users,
  ROUND(
    COUNT(DISTINCT userId) * 100.0 / 
    LAG(COUNT(DISTINCT userId)) OVER (ORDER BY step), 2
  ) AS conversion_rate
FROM (
  SELECT
    userId,
    CASE
      WHEN page = '/home' THEN 1
      WHEN page = '/products' THEN 2  
      WHEN page = '/cart' THEN 3
      WHEN page = '/checkout' THEN 4
      WHEN page = '/success' THEN 5
    END AS step
  FROM web_events
  WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
    AND page IN ('/home', '/products', '/cart', '/checkout', '/success')
) funnel_data
WHERE step IS NOT NULL
GROUP BY step
ORDER BY step;

-- Approximate calculations (HyperLogLog)
SELECT
  TIME_FLOOR(__time, 'P1D') AS day,
  APPROX_COUNT_DISTINCT_DS_HLL(userId) AS approx_unique_users,
  COUNT(DISTINCT userId) AS exact_unique_users
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1
ORDER BY 1;

Optimization & Performance

-- Datasource compression settings
{
  "dataSchema": {
    "dataSource": "optimized_events",
    "dimensionsSpec": {
      "dimensions": [
        {
          "name": "country",
          "type": "string",
          "multiValueHandling": "SORTED_ARRAY",
          "createBitmapIndex": true
        },
        {
          "name": "userId", 
          "type": "string",
          "createBitmapIndex": false
        }
      ]
    },
    "metricsSpec": [
      {
        "type": "thetaSketch",
        "name": "unique_users",
        "fieldName": "userId"
      }
    ],
    "granularitySpec": {
      "segmentGranularity": "DAY",
      "queryGranularity": "HOUR",
      "rollup": true
    }
  }
}

-- Check query execution plan
EXPLAIN PLAN FOR
SELECT
  country,
  COUNT(*) AS events
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY
GROUP BY country;

-- Check segment information and size
SELECT
  "datasource",
  COUNT(*) AS segment_count,
  SUM("size") AS total_size_bytes,
  SUM("num_rows") AS total_rows,
  MIN("start") AS earliest_data,
  MAX("end") AS latest_data
FROM sys.segments
WHERE "datasource" = 'web_events'
GROUP BY "datasource";

-- Check index usage
SELECT
  "segment_id",
  "dimension",
  "bitmap_index_size",
  "dictionary_size"
FROM sys.segment_indexes
WHERE "datasource" = 'web_events'
ORDER BY "bitmap_index_size" DESC
LIMIT 10;

Operations & Monitoring

-- Check task execution status
SELECT
  "id",
  "type", 
  "status",
  "created_time",
  "location"
FROM sys.tasks
ORDER BY "created_time" DESC
LIMIT 10;

-- Datasource statistics
SELECT
  "datasource",
  "num_segments", 
  "num_rows",
  "size",
  "replicated_size"
FROM sys.datasources;

-- Server status check
SELECT
  "server",
  "server_type",
  "tier",
  "curr_size",
  "max_size"
FROM sys.servers;

-- Monitor running queries
SELECT
  "query_id",
  "sql_query",
  "state",
  "created_time",
  "duration"
FROM sys.queries
WHERE "state" = 'RUNNING';

-- Resource usage monitoring
SELECT
  "service",
  "metric",
  "value"
FROM sys.metrics
WHERE "service" = 'druid/broker'
  AND "metric" IN ('query/cpu/time', 'query/count', 'query/bytes');

Real-time Ingestion Configuration

# Kafka cluster configuration example
# supervisor-spec.json
{
  "type": "kafka",
  "spec": {
    "dataSchema": {
      "dataSource": "live_events",
      "timestampSpec": {
        "column": "timestamp",
        "format": "millis"
      },
      "dimensionsSpec": {
        "dimensions": [
          "eventType",
          "userId", 
          "sessionId",
          "deviceType",
          "country"
        ]
      },
      "metricsSpec": [
        {"type": "count", "name": "count"},
        {"type": "longSum", "name": "value", "fieldName": "value"},
        {
          "type": "thetaSketch",
          "name": "unique_users",
          "fieldName": "userId"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "HOUR",
        "queryGranularity": "MINUTE"
      }
    },
    "ioConfig": {
      "topic": "events",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092",
        "group.id": "druid-console-consumer"
      },
      "taskCount": 3,
      "replicas": 2,
      "taskDuration": "PT1H",
      "useEarliestOffset": false
    },
    "tuningConfig": {
      "type": "kafka",
      "maxRowsInMemory": 100000,
      "maxBytesInMemory": 50000000,
      "intermediatePersistPeriod": "PT10M",
      "maxPendingPersists": 0
    }
  }
}

# Start supervisor
curl -X POST \
  http://localhost:8888/druid/indexer/v1/supervisor \
  -H 'Content-Type: application/json' \
  -d @supervisor-spec.json

# Check supervisor status
curl -X GET http://localhost:8888/druid/indexer/v1/supervisor

# Stop supervisor
curl -X POST http://localhost:8888/druid/indexer/v1/supervisor/live_events/terminate

Practical Examples

-- Real-time dashboard
SELECT
  TIME_FLOOR(__time, 'PT1M') AS minute,
  COUNT(*) AS events_per_minute,
  COUNT(DISTINCT userId) AS active_users,
  SUM(value) AS total_value,
  AVG(value) AS avg_value,
  COUNT(*) FILTER (WHERE eventType = 'purchase') AS purchases
FROM live_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY 1
ORDER BY 1 DESC;

-- A/B test analysis
WITH experiment_assignment AS (
  SELECT
    userId,
    CASE WHEN MOD(CAST(userId AS BIGINT), 2) = 0 THEN 'A' ELSE 'B' END AS variant
  FROM (
    SELECT DISTINCT userId FROM web_events
    WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
  )
)
SELECT
  ea.variant,
  COUNT(DISTINCT we.userId) AS unique_users,
  COUNT(*) AS total_events,
  COUNT(*) FILTER (WHERE we.page = '/purchase') AS conversions,
  ROUND(
    COUNT(*) FILTER (WHERE we.page = '/purchase') * 100.0 / 
    COUNT(DISTINCT we.userId), 2
  ) AS conversion_rate
FROM web_events we
JOIN experiment_assignment ea ON we.userId = ea.userId  
WHERE we.__time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
GROUP BY ea.variant;

-- Anomaly detection
WITH hourly_metrics AS (
  SELECT
    TIME_FLOOR(__time, 'PT1H') AS hour,
    COUNT(*) AS events,
    COUNT(DISTINCT userId) AS unique_users,
    AVG(duration) AS avg_duration
  FROM web_events
  WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
  GROUP BY 1
),
metrics_with_stats AS (
  SELECT
    hour,
    events,
    unique_users,
    avg_duration,
    AVG(events) OVER (
      ORDER BY hour 
      ROWS BETWEEN 23 PRECEDING AND 1 PRECEDING
    ) AS avg_events_24h,
    STDDEV(events) OVER (
      ORDER BY hour
      ROWS BETWEEN 23 PRECEDING AND 1 PRECEDING  
    ) AS stddev_events_24h
  FROM hourly_metrics
)
SELECT
  hour,
  events,
  avg_events_24h,
  ABS(events - avg_events_24h) / stddev_events_24h AS z_score
FROM metrics_with_stats
WHERE ABS(events - avg_events_24h) / stddev_events_24h > 2
  AND hour >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
ORDER BY hour DESC;