Database
Apache Druid
Overview
Apache Druid is a real-time analytics database management system designed for fast data ingestion and immediate query execution on streaming data. It is optimized for time-series and event data analysis, combining columnar storage with low-latency interactive analytics and high-throughput ingestion capabilities.
Details
Apache Druid was originally developed by Metamarkets and became an Apache Software Foundation top-level project in 2018. As a columnar database designed for real-time analytics, it offers the following characteristics:
Architecture Features
- Hybrid Storage: Hot data in memory, cold data on disk
- Pre-aggregation: Metrics are pre-computed during data ingestion
- Segment-oriented: Data is partitioned into time-based segments
- Distributed Architecture: Horizontal scaling across multiple nodes
Key Components
- Coordinator: Manages metadata and monitors cluster state
- Overlord: Manages data ingestion tasks
- Historical: Stores historical data and processes queries
- MiddleManager: Executes real-time ingestion tasks
- Broker: Routes queries and merges results
- Router: HTTP API entry point
Technical Features
- Real-time Ingestion: Streaming ingestion from Kafka, Kinesis, etc.
- Batch Ingestion: Bulk data processing from S3, HDFS, etc.
- Fast Queries: Millisecond-level interactive analytics
- Auto-scaling: Resource adjustment based on demand
- Rollup: Data compression and query acceleration
- Multi-dimensional Analysis: Query engine optimized for OLAP operations
Pros and Cons
Pros
- Ultra-low Latency: Millisecond query response times
- Real-time Processing: Seconds from data ingestion to analysis
- High Availability: Fault-tolerant distributed architecture
- Scalability: Horizontal scaling for large-scale data
- Flexible Ingestion: Supports both batch and streaming
- Time-series Optimized: Specialized for time-series data analysis
- Auto-optimization: Automated indexing and data compression
- SQL Compatible: Query with near-standard SQL syntax
Cons
- Complexity: Complex operations due to multi-component architecture
- Update Limitations: Difficult to update/delete individual records
- Memory Consumption: Requires large amounts of memory for fast processing
- Learning Curve: Unique architecture and configuration requirements
- Join Limitations: Inefficient joins between large tables
- Storage Overhead: Pre-aggregation may increase data size
Key Links
Code Examples
Installation & Setup
# Start with Docker Compose (Recommended)
curl -O https://raw.githubusercontent.com/apache/druid/master/distribution/docker/docker-compose.yml
docker-compose up -d
# Start with individual Docker container
docker run -p 8888:8888 -d apache/druid
# Installation from tar file
wget https://archive.apache.org/dist/druid/0.23.0/apache-druid-0.23.0-bin.tar.gz
tar -xzf apache-druid-0.23.0-bin.tar.gz
cd apache-druid-0.23.0
# Start single machine (development)
./bin/start-micro-quickstart
# Deploy with Kubernetes
helm repo add druid https://druid.apache.org/helm
helm install druid druid/druid
# Access Web Console
# http://localhost:8888/
# Access REST API
curl -X GET http://localhost:8888/status
Basic Operations (Data Definition & Manipulation)
-- Create datasource (ingestion specification)
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "web_events",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"userId",
"sessionId",
"page",
"country",
"device",
"browser"
]
},
"metricsSpec": [
{"type": "count", "name": "events"},
{"type": "longSum", "name": "duration", "fieldName": "duration"},
{"type": "longSum", "name": "bytes", "fieldName": "bytes"}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "HOUR",
"rollup": true
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"uris": ["s3://bucket/events/2024/01/15/events.json"]
},
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "index_parallel",
"maxRowsPerSegment": 1000000,
"maxRowsInMemory": 100000
}
}
}
-- Real-time ingestion from Kafka
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "realtime_events",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": ["userId", "action", "page"]
},
"metricsSpec": [
{"type": "count", "name": "count"},
{"type": "doubleSum", "name": "value", "fieldName": "value"}
]
},
"ioConfig": {
"topic": "events",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 100000
}
}
}
-- Check datasource information
SELECT COLUMN_NAME, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME = 'web_events';
-- Check segment information
SELECT
"datasource",
"start",
"end",
"size",
"num_rows"
FROM sys.segments
WHERE "datasource" = 'web_events'
ORDER BY "start" DESC;
Querying & Analytics
-- Basic time series query
SELECT
TIME_FLOOR(__time, 'PT1H') AS hour,
COUNT(*) AS events,
COUNT(DISTINCT userId) AS unique_users,
SUM(duration) AS total_duration,
AVG(duration) AS avg_duration
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
GROUP BY 1
ORDER BY 1;
-- TOP N query
SELECT
page,
COUNT(*) AS page_views,
COUNT(DISTINCT userId) AS unique_visitors
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
GROUP BY page
ORDER BY page_views DESC
LIMIT 10;
-- Multi-dimensional analysis
SELECT
country,
device,
browser,
TIME_FLOOR(__time, 'P1D') AS day,
COUNT(*) AS events,
SUM(bytes) AS total_bytes,
AVG(duration) AS avg_duration
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1, 2, 3, 4
ORDER BY day DESC, events DESC;
-- Filtering and drill-down
SELECT
TIME_FLOOR(__time, 'PT1H') AS hour,
COUNT(*) FILTER (WHERE page = '/home') AS home_views,
COUNT(*) FILTER (WHERE page = '/products') AS product_views,
COUNT(*) FILTER (WHERE page = '/checkout') AS checkout_views,
COUNT(DISTINCT userId) AS unique_users
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
AND country = 'US'
AND device = 'mobile'
GROUP BY 1
ORDER BY 1;
Advanced Analytics
-- Window function analysis
SELECT
userId,
__time,
page,
LAG(page) OVER (PARTITION BY userId ORDER BY __time) AS previous_page,
LEAD(page) OVER (PARTITION BY userId ORDER BY __time) AS next_page,
ROW_NUMBER() OVER (PARTITION BY userId ORDER BY __time) AS page_sequence
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY
AND userId = '12345'
ORDER BY __time;
-- Cohort analysis
WITH user_first_visit AS (
SELECT
userId,
TIME_FLOOR(MIN(__time), 'P1D') AS first_visit_date
FROM web_events
GROUP BY userId
),
user_activity AS (
SELECT
e.userId,
ufv.first_visit_date,
TIME_FLOOR(e.__time, 'P1D') AS activity_date,
EXTRACT(DAY FROM e.__time - ufv.first_visit_date) AS days_since_first
FROM web_events e
JOIN user_first_visit ufv ON e.userId = ufv.userId
)
SELECT
first_visit_date,
days_since_first,
COUNT(DISTINCT userId) AS active_users
FROM user_activity
WHERE first_visit_date >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1, 2
ORDER BY 1, 2;
-- Funnel analysis
SELECT
step,
COUNT(DISTINCT userId) AS users,
LAG(COUNT(DISTINCT userId)) OVER (ORDER BY step) AS prev_step_users,
ROUND(
COUNT(DISTINCT userId) * 100.0 /
LAG(COUNT(DISTINCT userId)) OVER (ORDER BY step), 2
) AS conversion_rate
FROM (
SELECT
userId,
CASE
WHEN page = '/home' THEN 1
WHEN page = '/products' THEN 2
WHEN page = '/cart' THEN 3
WHEN page = '/checkout' THEN 4
WHEN page = '/success' THEN 5
END AS step
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
AND page IN ('/home', '/products', '/cart', '/checkout', '/success')
) funnel_data
WHERE step IS NOT NULL
GROUP BY step
ORDER BY step;
-- Approximate calculations (HyperLogLog)
SELECT
TIME_FLOOR(__time, 'P1D') AS day,
APPROX_COUNT_DISTINCT_DS_HLL(userId) AS approx_unique_users,
COUNT(DISTINCT userId) AS exact_unique_users
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1
ORDER BY 1;
Optimization & Performance
-- Datasource compression settings
{
"dataSchema": {
"dataSource": "optimized_events",
"dimensionsSpec": {
"dimensions": [
{
"name": "country",
"type": "string",
"multiValueHandling": "SORTED_ARRAY",
"createBitmapIndex": true
},
{
"name": "userId",
"type": "string",
"createBitmapIndex": false
}
]
},
"metricsSpec": [
{
"type": "thetaSketch",
"name": "unique_users",
"fieldName": "userId"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "HOUR",
"rollup": true
}
}
}
-- Check query execution plan
EXPLAIN PLAN FOR
SELECT
country,
COUNT(*) AS events
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY
GROUP BY country;
-- Check segment information and size
SELECT
"datasource",
COUNT(*) AS segment_count,
SUM("size") AS total_size_bytes,
SUM("num_rows") AS total_rows,
MIN("start") AS earliest_data,
MAX("end") AS latest_data
FROM sys.segments
WHERE "datasource" = 'web_events'
GROUP BY "datasource";
-- Check index usage
SELECT
"segment_id",
"dimension",
"bitmap_index_size",
"dictionary_size"
FROM sys.segment_indexes
WHERE "datasource" = 'web_events'
ORDER BY "bitmap_index_size" DESC
LIMIT 10;
Operations & Monitoring
-- Check task execution status
SELECT
"id",
"type",
"status",
"created_time",
"location"
FROM sys.tasks
ORDER BY "created_time" DESC
LIMIT 10;
-- Datasource statistics
SELECT
"datasource",
"num_segments",
"num_rows",
"size",
"replicated_size"
FROM sys.datasources;
-- Server status check
SELECT
"server",
"server_type",
"tier",
"curr_size",
"max_size"
FROM sys.servers;
-- Monitor running queries
SELECT
"query_id",
"sql_query",
"state",
"created_time",
"duration"
FROM sys.queries
WHERE "state" = 'RUNNING';
-- Resource usage monitoring
SELECT
"service",
"metric",
"value"
FROM sys.metrics
WHERE "service" = 'druid/broker'
AND "metric" IN ('query/cpu/time', 'query/count', 'query/bytes');
Real-time Ingestion Configuration
# Kafka cluster configuration example
# supervisor-spec.json
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "live_events",
"timestampSpec": {
"column": "timestamp",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
"eventType",
"userId",
"sessionId",
"deviceType",
"country"
]
},
"metricsSpec": [
{"type": "count", "name": "count"},
{"type": "longSum", "name": "value", "fieldName": "value"},
{
"type": "thetaSketch",
"name": "unique_users",
"fieldName": "userId"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "MINUTE"
}
},
"ioConfig": {
"topic": "events",
"consumerProperties": {
"bootstrap.servers": "localhost:9092",
"group.id": "druid-console-consumer"
},
"taskCount": 3,
"replicas": 2,
"taskDuration": "PT1H",
"useEarliestOffset": false
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 100000,
"maxBytesInMemory": 50000000,
"intermediatePersistPeriod": "PT10M",
"maxPendingPersists": 0
}
}
}
# Start supervisor
curl -X POST \
http://localhost:8888/druid/indexer/v1/supervisor \
-H 'Content-Type: application/json' \
-d @supervisor-spec.json
# Check supervisor status
curl -X GET http://localhost:8888/druid/indexer/v1/supervisor
# Stop supervisor
curl -X POST http://localhost:8888/druid/indexer/v1/supervisor/live_events/terminate
Practical Examples
-- Real-time dashboard
SELECT
TIME_FLOOR(__time, 'PT1M') AS minute,
COUNT(*) AS events_per_minute,
COUNT(DISTINCT userId) AS active_users,
SUM(value) AS total_value,
AVG(value) AS avg_value,
COUNT(*) FILTER (WHERE eventType = 'purchase') AS purchases
FROM live_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY 1
ORDER BY 1 DESC;
-- A/B test analysis
WITH experiment_assignment AS (
SELECT
userId,
CASE WHEN MOD(CAST(userId AS BIGINT), 2) = 0 THEN 'A' ELSE 'B' END AS variant
FROM (
SELECT DISTINCT userId FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
)
)
SELECT
ea.variant,
COUNT(DISTINCT we.userId) AS unique_users,
COUNT(*) AS total_events,
COUNT(*) FILTER (WHERE we.page = '/purchase') AS conversions,
ROUND(
COUNT(*) FILTER (WHERE we.page = '/purchase') * 100.0 /
COUNT(DISTINCT we.userId), 2
) AS conversion_rate
FROM web_events we
JOIN experiment_assignment ea ON we.userId = ea.userId
WHERE we.__time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
GROUP BY ea.variant;
-- Anomaly detection
WITH hourly_metrics AS (
SELECT
TIME_FLOOR(__time, 'PT1H') AS hour,
COUNT(*) AS events,
COUNT(DISTINCT userId) AS unique_users,
AVG(duration) AS avg_duration
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
GROUP BY 1
),
metrics_with_stats AS (
SELECT
hour,
events,
unique_users,
avg_duration,
AVG(events) OVER (
ORDER BY hour
ROWS BETWEEN 23 PRECEDING AND 1 PRECEDING
) AS avg_events_24h,
STDDEV(events) OVER (
ORDER BY hour
ROWS BETWEEN 23 PRECEDING AND 1 PRECEDING
) AS stddev_events_24h
FROM hourly_metrics
)
SELECT
hour,
events,
avg_events_24h,
ABS(events - avg_events_24h) / stddev_events_24h AS z_score
FROM metrics_with_stats
WHERE ABS(events - avg_events_24h) / stddev_events_24h > 2
AND hour >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
ORDER BY hour DESC;