データベース
Apache Druid
概要
Apache Druidは、リアルタイム分析データベース管理システムです。ストリーミングデータの高速取り込みと即座のクエリ実行を可能にし、時系列データやイベントデータの分析に最適化されています。列指向ストレージを採用し、低レイテンシの対話的分析と高スループットの取り込みを同時に実現します。
詳細
Apache Druidは、もともとMetamarketsによって開発され、2018年にApache Software Foundationの最上位プロジェクトとなりました。リアルタイム分析用に設計されたカラムナデータベースで、以下の特徴を持ちます:
アーキテクチャの特徴
- ハイブリッドストレージ: ホットデータはメモリ、コールドデータはディスクに格納
- 事前集計: データ取り込み時に指定されたメトリクスを事前計算
- セグメント指向: データを時間ベースのセグメントに分割して管理
- 分散アーキテクチャ: 複数ノードでのスケールアウトが可能
主要コンポーネント
- Coordinator: メタデータ管理とクラスター状態の監視
- Overlord: データ取り込みタスクの管理
- Historical: 過去データの保存とクエリ処理
- MiddleManager: リアルタイム取り込みタスクの実行
- Broker: クエリのルーティングと結果マージ
- Router: HTTP API のエントリーポイント
技術的特徴
- リアルタイム取り込み: Kafka、Kinesis等からのストリーミング取り込み
- バッチ取り込み: S3、HDFS等からの一括データ処理
- 高速クエリ: ミリ秒単位での対話的分析
- 自動スケーリング: 需要に応じたリソース調整
- ロールアップ: データ圧縮とクエリ高速化
- 多次元分析: OLAP操作に最適化されたクエリエンジン
メリット・デメリット
メリット
- 超低レイテンシ: ミリ秒単位でのクエリ応答時間
- リアルタイム性: データ取り込みから分析まで秒単位
- 高可用性: 障害耐性のある分散アーキテクチャ
- スケーラビリティ: 水平スケーリングで大規模データに対応
- 柔軟な取り込み: バッチとストリーミングの両方をサポート
- 時系列特化: 時系列データの分析に最適化
- 自動最適化: インデックス作成やデータ圧縮の自動化
- SQL互換: 標準SQLに近い構文でクエリ可能
デメリット
- 複雑性: 多コンポーネントによる運用の複雑さ
- 更新制限: 個別レコードの更新・削除は困難
- メモリ消費: 高速処理のため大量メモリが必要
- 学習コスト: 独特のアーキテクチャと設定が必要
- 結合制限: 大きなテーブル間の結合は非効率
- ストレージコスト: 事前集計によりデータサイズが増加する場合がある
主要リンク
書き方の例
インストール・セットアップ
# Docker Composeでの起動(推奨)
curl -O https://raw.githubusercontent.com/apache/druid/master/distribution/docker/docker-compose.yml
docker-compose up -d
# 個別Dockerコンテナでの起動
docker run -p 8888:8888 -d apache/druid
# tarファイルからのインストール
wget https://archive.apache.org/dist/druid/0.23.0/apache-druid-0.23.0-bin.tar.gz
tar -xzf apache-druid-0.23.0-bin.tar.gz
cd apache-druid-0.23.0
# 単一マシンでの起動(開発用)
./bin/start-micro-quickstart
# Kubernetesでのデプロイ
helm repo add druid https://druid.apache.org/helm
helm install druid druid/druid
# Web コンソールアクセス
# http://localhost:8888/
# REST API アクセス
curl -X GET http://localhost:8888/status
基本操作(データ定義・操作)
-- データソースの作成(取り込み仕様)
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "web_events",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"userId",
"sessionId",
"page",
"country",
"device",
"browser"
]
},
"metricsSpec": [
{"type": "count", "name": "events"},
{"type": "longSum", "name": "duration", "fieldName": "duration"},
{"type": "longSum", "name": "bytes", "fieldName": "bytes"}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "HOUR",
"rollup": true
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"uris": ["s3://bucket/events/2024/01/15/events.json"]
},
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "index_parallel",
"maxRowsPerSegment": 1000000,
"maxRowsInMemory": 100000
}
}
}
-- Kafkaからのリアルタイム取り込み
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "realtime_events",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": ["userId", "action", "page"]
},
"metricsSpec": [
{"type": "count", "name": "count"},
{"type": "doubleSum", "name": "value", "fieldName": "value"}
]
},
"ioConfig": {
"topic": "events",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 100000
}
}
}
-- データソース情報確認
SELECT COLUMN_NAME, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME = 'web_events';
-- セグメント情報確認
SELECT
"datasource",
"start",
"end",
"size",
"num_rows"
FROM sys.segments
WHERE "datasource" = 'web_events'
ORDER BY "start" DESC;
クエリ・分析
-- 基本的な時系列クエリ
SELECT
TIME_FLOOR(__time, 'PT1H') AS hour,
COUNT(*) AS events,
COUNT(DISTINCT userId) AS unique_users,
SUM(duration) AS total_duration,
AVG(duration) AS avg_duration
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
GROUP BY 1
ORDER BY 1;
-- TOP N クエリ
SELECT
page,
COUNT(*) AS page_views,
COUNT(DISTINCT userId) AS unique_visitors
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
GROUP BY page
ORDER BY page_views DESC
LIMIT 10;
-- 多次元分析
SELECT
country,
device,
browser,
TIME_FLOOR(__time, 'P1D') AS day,
COUNT(*) AS events,
SUM(bytes) AS total_bytes,
AVG(duration) AS avg_duration
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1, 2, 3, 4
ORDER BY day DESC, events DESC;
-- フィルタリングとドリルダウン
SELECT
TIME_FLOOR(__time, 'PT1H') AS hour,
COUNT(*) FILTER (WHERE page = '/home') AS home_views,
COUNT(*) FILTER (WHERE page = '/products') AS product_views,
COUNT(*) FILTER (WHERE page = '/checkout') AS checkout_views,
COUNT(DISTINCT userId) AS unique_users
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
AND country = 'JP'
AND device = 'mobile'
GROUP BY 1
ORDER BY 1;
高度な分析機能
-- ウィンドウ関数を使った分析
SELECT
userId,
__time,
page,
LAG(page) OVER (PARTITION BY userId ORDER BY __time) AS previous_page,
LEAD(page) OVER (PARTITION BY userId ORDER BY __time) AS next_page,
ROW_NUMBER() OVER (PARTITION BY userId ORDER BY __time) AS page_sequence
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY
AND userId = '12345'
ORDER BY __time;
-- コホート分析
WITH user_first_visit AS (
SELECT
userId,
TIME_FLOOR(MIN(__time), 'P1D') AS first_visit_date
FROM web_events
GROUP BY userId
),
user_activity AS (
SELECT
e.userId,
ufv.first_visit_date,
TIME_FLOOR(e.__time, 'P1D') AS activity_date,
EXTRACT(DAY FROM e.__time - ufv.first_visit_date) AS days_since_first
FROM web_events e
JOIN user_first_visit ufv ON e.userId = ufv.userId
)
SELECT
first_visit_date,
days_since_first,
COUNT(DISTINCT userId) AS active_users
FROM user_activity
WHERE first_visit_date >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1, 2
ORDER BY 1, 2;
-- ファネル分析
SELECT
step,
COUNT(DISTINCT userId) AS users,
LAG(COUNT(DISTINCT userId)) OVER (ORDER BY step) AS prev_step_users,
ROUND(
COUNT(DISTINCT userId) * 100.0 /
LAG(COUNT(DISTINCT userId)) OVER (ORDER BY step), 2
) AS conversion_rate
FROM (
SELECT
userId,
CASE
WHEN page = '/home' THEN 1
WHEN page = '/products' THEN 2
WHEN page = '/cart' THEN 3
WHEN page = '/checkout' THEN 4
WHEN page = '/success' THEN 5
END AS step
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
AND page IN ('/home', '/products', '/cart', '/checkout', '/success')
) funnel_data
WHERE step IS NOT NULL
GROUP BY step
ORDER BY step;
-- 近似計算(HyperLogLog)
SELECT
TIME_FLOOR(__time, 'P1D') AS day,
APPROX_COUNT_DISTINCT_DS_HLL(userId) AS approx_unique_users,
COUNT(DISTINCT userId) AS exact_unique_users
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1
ORDER BY 1;
最適化・パフォーマンス
-- データソースの圧縮設定
{
"dataSchema": {
"dataSource": "optimized_events",
"dimensionsSpec": {
"dimensions": [
{
"name": "country",
"type": "string",
"multiValueHandling": "SORTED_ARRAY",
"createBitmapIndex": true
},
{
"name": "userId",
"type": "string",
"createBitmapIndex": false
}
]
},
"metricsSpec": [
{
"type": "thetaSketch",
"name": "unique_users",
"fieldName": "userId"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "HOUR",
"rollup": true
}
}
}
-- クエリ実行計画の確認
EXPLAIN PLAN FOR
SELECT
country,
COUNT(*) AS events
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY
GROUP BY country;
-- セグメント情報とサイズ確認
SELECT
"datasource",
COUNT(*) AS segment_count,
SUM("size") AS total_size_bytes,
SUM("num_rows") AS total_rows,
MIN("start") AS earliest_data,
MAX("end") AS latest_data
FROM sys.segments
WHERE "datasource" = 'web_events'
GROUP BY "datasource";
-- インデックス使用状況確認
SELECT
"segment_id",
"dimension",
"bitmap_index_size",
"dictionary_size"
FROM sys.segment_indexes
WHERE "datasource" = 'web_events'
ORDER BY "bitmap_index_size" DESC
LIMIT 10;
運用・監視
-- タスク実行状況確認
SELECT
"id",
"type",
"status",
"created_time",
"location"
FROM sys.tasks
ORDER BY "created_time" DESC
LIMIT 10;
-- データソースの統計情報
SELECT
"datasource",
"num_segments",
"num_rows",
"size",
"replicated_size"
FROM sys.datasources;
-- サーバー状況確認
SELECT
"server",
"server_type",
"tier",
"curr_size",
"max_size"
FROM sys.servers;
-- 実行中クエリの監視
SELECT
"query_id",
"sql_query",
"state",
"created_time",
"duration"
FROM sys.queries
WHERE "state" = 'RUNNING';
-- リソース使用量監視
SELECT
"service",
"metric",
"value"
FROM sys.metrics
WHERE "service" = 'druid/broker'
AND "metric" IN ('query/cpu/time', 'query/count', 'query/bytes');
リアルタイム取り込み設定
# Kafkaクラスターの設定例
# supervisor-spec.json
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "live_events",
"timestampSpec": {
"column": "timestamp",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
"eventType",
"userId",
"sessionId",
"deviceType",
"country"
]
},
"metricsSpec": [
{"type": "count", "name": "count"},
{"type": "longSum", "name": "value", "fieldName": "value"},
{
"type": "thetaSketch",
"name": "unique_users",
"fieldName": "userId"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "MINUTE"
}
},
"ioConfig": {
"topic": "events",
"consumerProperties": {
"bootstrap.servers": "localhost:9092",
"group.id": "druid-console-consumer"
},
"taskCount": 3,
"replicas": 2,
"taskDuration": "PT1H",
"useEarliestOffset": false
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 100000,
"maxBytesInMemory": 50000000,
"intermediatePersistPeriod": "PT10M",
"maxPendingPersists": 0
}
}
}
# スーパーバイザーの起動
curl -X POST \
http://localhost:8888/druid/indexer/v1/supervisor \
-H 'Content-Type: application/json' \
-d @supervisor-spec.json
# スーパーバイザー状態確認
curl -X GET http://localhost:8888/druid/indexer/v1/supervisor
# スーパーバイザー停止
curl -X POST http://localhost:8888/druid/indexer/v1/supervisor/live_events/terminate
実用例
-- リアルタイムダッシュボード
SELECT
TIME_FLOOR(__time, 'PT1M') AS minute,
COUNT(*) AS events_per_minute,
COUNT(DISTINCT userId) AS active_users,
SUM(value) AS total_value,
AVG(value) AS avg_value,
COUNT(*) FILTER (WHERE eventType = 'purchase') AS purchases
FROM live_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY 1
ORDER BY 1 DESC;
-- A/Bテスト分析
WITH experiment_assignment AS (
SELECT
userId,
CASE WHEN MOD(CAST(userId AS BIGINT), 2) = 0 THEN 'A' ELSE 'B' END AS variant
FROM (
SELECT DISTINCT userId FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
)
)
SELECT
ea.variant,
COUNT(DISTINCT we.userId) AS unique_users,
COUNT(*) AS total_events,
COUNT(*) FILTER (WHERE we.page = '/purchase') AS conversions,
ROUND(
COUNT(*) FILTER (WHERE we.page = '/purchase') * 100.0 /
COUNT(DISTINCT we.userId), 2
) AS conversion_rate
FROM web_events we
JOIN experiment_assignment ea ON we.userId = ea.userId
WHERE we.__time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
GROUP BY ea.variant;
-- 異常検知
WITH hourly_metrics AS (
SELECT
TIME_FLOOR(__time, 'PT1H') AS hour,
COUNT(*) AS events,
COUNT(DISTINCT userId) AS unique_users,
AVG(duration) AS avg_duration
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
GROUP BY 1
),
metrics_with_stats AS (
SELECT
hour,
events,
unique_users,
avg_duration,
AVG(events) OVER (
ORDER BY hour
ROWS BETWEEN 23 PRECEDING AND 1 PRECEDING
) AS avg_events_24h,
STDDEV(events) OVER (
ORDER BY hour
ROWS BETWEEN 23 PRECEDING AND 1 PRECEDING
) AS stddev_events_24h
FROM hourly_metrics
)
SELECT
hour,
events,
avg_events_24h,
ABS(events - avg_events_24h) / stddev_events_24h AS z_score
FROM metrics_with_stats
WHERE ABS(events - avg_events_24h) / stddev_events_24h > 2
AND hour >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
ORDER BY hour DESC;