データベース

Apache Druid

概要

Apache Druidは、リアルタイム分析データベース管理システムです。ストリーミングデータの高速取り込みと即座のクエリ実行を可能にし、時系列データやイベントデータの分析に最適化されています。列指向ストレージを採用し、低レイテンシの対話的分析と高スループットの取り込みを同時に実現します。

詳細

Apache Druidは、もともとMetamarketsによって開発され、2018年にApache Software Foundationの最上位プロジェクトとなりました。リアルタイム分析用に設計されたカラムナデータベースで、以下の特徴を持ちます:

アーキテクチャの特徴

  • ハイブリッドストレージ: ホットデータはメモリ、コールドデータはディスクに格納
  • 事前集計: データ取り込み時に指定されたメトリクスを事前計算
  • セグメント指向: データを時間ベースのセグメントに分割して管理
  • 分散アーキテクチャ: 複数ノードでのスケールアウトが可能

主要コンポーネント

  • Coordinator: メタデータ管理とクラスター状態の監視
  • Overlord: データ取り込みタスクの管理
  • Historical: 過去データの保存とクエリ処理
  • MiddleManager: リアルタイム取り込みタスクの実行
  • Broker: クエリのルーティングと結果マージ
  • Router: HTTP API のエントリーポイント

技術的特徴

  • リアルタイム取り込み: Kafka、Kinesis等からのストリーミング取り込み
  • バッチ取り込み: S3、HDFS等からの一括データ処理
  • 高速クエリ: ミリ秒単位での対話的分析
  • 自動スケーリング: 需要に応じたリソース調整
  • ロールアップ: データ圧縮とクエリ高速化
  • 多次元分析: OLAP操作に最適化されたクエリエンジン

メリット・デメリット

メリット

  • 超低レイテンシ: ミリ秒単位でのクエリ応答時間
  • リアルタイム性: データ取り込みから分析まで秒単位
  • 高可用性: 障害耐性のある分散アーキテクチャ
  • スケーラビリティ: 水平スケーリングで大規模データに対応
  • 柔軟な取り込み: バッチとストリーミングの両方をサポート
  • 時系列特化: 時系列データの分析に最適化
  • 自動最適化: インデックス作成やデータ圧縮の自動化
  • SQL互換: 標準SQLに近い構文でクエリ可能

デメリット

  • 複雑性: 多コンポーネントによる運用の複雑さ
  • 更新制限: 個別レコードの更新・削除は困難
  • メモリ消費: 高速処理のため大量メモリが必要
  • 学習コスト: 独特のアーキテクチャと設定が必要
  • 結合制限: 大きなテーブル間の結合は非効率
  • ストレージコスト: 事前集計によりデータサイズが増加する場合がある

主要リンク

書き方の例

インストール・セットアップ

# Docker Composeでの起動(推奨)
curl -O https://raw.githubusercontent.com/apache/druid/master/distribution/docker/docker-compose.yml
docker-compose up -d

# 個別Dockerコンテナでの起動
docker run -p 8888:8888 -d apache/druid

# tarファイルからのインストール
wget https://archive.apache.org/dist/druid/0.23.0/apache-druid-0.23.0-bin.tar.gz
tar -xzf apache-druid-0.23.0-bin.tar.gz
cd apache-druid-0.23.0

# 単一マシンでの起動(開発用)
./bin/start-micro-quickstart

# Kubernetesでのデプロイ
helm repo add druid https://druid.apache.org/helm
helm install druid druid/druid

# Web コンソールアクセス
# http://localhost:8888/

# REST API アクセス
curl -X GET http://localhost:8888/status

基本操作(データ定義・操作)

-- データソースの作成(取り込み仕様)
{
  "type": "index_parallel",
  "spec": {
    "dataSchema": {
      "dataSource": "web_events",
      "timestampSpec": {
        "column": "timestamp",
        "format": "auto"
      },
      "dimensionsSpec": {
        "dimensions": [
          "userId",
          "sessionId", 
          "page",
          "country",
          "device",
          "browser"
        ]
      },
      "metricsSpec": [
        {"type": "count", "name": "events"},
        {"type": "longSum", "name": "duration", "fieldName": "duration"},
        {"type": "longSum", "name": "bytes", "fieldName": "bytes"}
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "HOUR",
        "rollup": true
      }
    },
    "ioConfig": {
      "type": "index_parallel",
      "inputSource": {
        "type": "s3",
        "uris": ["s3://bucket/events/2024/01/15/events.json"]
      },
      "inputFormat": {
        "type": "json"
      }
    },
    "tuningConfig": {
      "type": "index_parallel",
      "maxRowsPerSegment": 1000000,
      "maxRowsInMemory": 100000
    }
  }
}

-- Kafkaからのリアルタイム取り込み
{
  "type": "kafka",
  "spec": {
    "dataSchema": {
      "dataSource": "realtime_events",
      "timestampSpec": {
        "column": "timestamp",
        "format": "iso"
      },
      "dimensionsSpec": {
        "dimensions": ["userId", "action", "page"]
      },
      "metricsSpec": [
        {"type": "count", "name": "count"},
        {"type": "doubleSum", "name": "value", "fieldName": "value"}
      ]
    },
    "ioConfig": {
      "topic": "events",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "taskCount": 1,
      "replicas": 1,
      "taskDuration": "PT1H"
    },
    "tuningConfig": {
      "type": "kafka",
      "maxRowsInMemory": 100000
    }
  }
}

-- データソース情報確認
SELECT COLUMN_NAME, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME = 'web_events';

-- セグメント情報確認
SELECT
  "datasource",
  "start",
  "end", 
  "size",
  "num_rows"
FROM sys.segments
WHERE "datasource" = 'web_events'
ORDER BY "start" DESC;

クエリ・分析

-- 基本的な時系列クエリ
SELECT
  TIME_FLOOR(__time, 'PT1H') AS hour,
  COUNT(*) AS events,
  COUNT(DISTINCT userId) AS unique_users,
  SUM(duration) AS total_duration,
  AVG(duration) AS avg_duration
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
GROUP BY 1
ORDER BY 1;

-- TOP N クエリ
SELECT
  page,
  COUNT(*) AS page_views,
  COUNT(DISTINCT userId) AS unique_visitors
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
GROUP BY page
ORDER BY page_views DESC
LIMIT 10;

-- 多次元分析
SELECT
  country,
  device,
  browser,
  TIME_FLOOR(__time, 'P1D') AS day,
  COUNT(*) AS events,
  SUM(bytes) AS total_bytes,
  AVG(duration) AS avg_duration
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1, 2, 3, 4
ORDER BY day DESC, events DESC;

-- フィルタリングとドリルダウン
SELECT
  TIME_FLOOR(__time, 'PT1H') AS hour,
  COUNT(*) FILTER (WHERE page = '/home') AS home_views,
  COUNT(*) FILTER (WHERE page = '/products') AS product_views,
  COUNT(*) FILTER (WHERE page = '/checkout') AS checkout_views,
  COUNT(DISTINCT userId) AS unique_users
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
  AND country = 'JP'
  AND device = 'mobile'
GROUP BY 1
ORDER BY 1;

高度な分析機能

-- ウィンドウ関数を使った分析
SELECT
  userId,
  __time,
  page,
  LAG(page) OVER (PARTITION BY userId ORDER BY __time) AS previous_page,
  LEAD(page) OVER (PARTITION BY userId ORDER BY __time) AS next_page,
  ROW_NUMBER() OVER (PARTITION BY userId ORDER BY __time) AS page_sequence
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY
  AND userId = '12345'
ORDER BY __time;

-- コホート分析
WITH user_first_visit AS (
  SELECT
    userId,
    TIME_FLOOR(MIN(__time), 'P1D') AS first_visit_date
  FROM web_events
  GROUP BY userId
),
user_activity AS (
  SELECT
    e.userId,
    ufv.first_visit_date,
    TIME_FLOOR(e.__time, 'P1D') AS activity_date,
    EXTRACT(DAY FROM e.__time - ufv.first_visit_date) AS days_since_first
  FROM web_events e
  JOIN user_first_visit ufv ON e.userId = ufv.userId
)
SELECT
  first_visit_date,
  days_since_first,
  COUNT(DISTINCT userId) AS active_users
FROM user_activity
WHERE first_visit_date >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1, 2
ORDER BY 1, 2;

-- ファネル分析
SELECT
  step,
  COUNT(DISTINCT userId) AS users,
  LAG(COUNT(DISTINCT userId)) OVER (ORDER BY step) AS prev_step_users,
  ROUND(
    COUNT(DISTINCT userId) * 100.0 / 
    LAG(COUNT(DISTINCT userId)) OVER (ORDER BY step), 2
  ) AS conversion_rate
FROM (
  SELECT
    userId,
    CASE
      WHEN page = '/home' THEN 1
      WHEN page = '/products' THEN 2  
      WHEN page = '/cart' THEN 3
      WHEN page = '/checkout' THEN 4
      WHEN page = '/success' THEN 5
    END AS step
  FROM web_events
  WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
    AND page IN ('/home', '/products', '/cart', '/checkout', '/success')
) funnel_data
WHERE step IS NOT NULL
GROUP BY step
ORDER BY step;

-- 近似計算(HyperLogLog)
SELECT
  TIME_FLOOR(__time, 'P1D') AS day,
  APPROX_COUNT_DISTINCT_DS_HLL(userId) AS approx_unique_users,
  COUNT(DISTINCT userId) AS exact_unique_users
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '30' DAY
GROUP BY 1
ORDER BY 1;

最適化・パフォーマンス

-- データソースの圧縮設定
{
  "dataSchema": {
    "dataSource": "optimized_events",
    "dimensionsSpec": {
      "dimensions": [
        {
          "name": "country",
          "type": "string",
          "multiValueHandling": "SORTED_ARRAY",
          "createBitmapIndex": true
        },
        {
          "name": "userId", 
          "type": "string",
          "createBitmapIndex": false
        }
      ]
    },
    "metricsSpec": [
      {
        "type": "thetaSketch",
        "name": "unique_users",
        "fieldName": "userId"
      }
    ],
    "granularitySpec": {
      "segmentGranularity": "DAY",
      "queryGranularity": "HOUR",
      "rollup": true
    }
  }
}

-- クエリ実行計画の確認
EXPLAIN PLAN FOR
SELECT
  country,
  COUNT(*) AS events
FROM web_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY
GROUP BY country;

-- セグメント情報とサイズ確認
SELECT
  "datasource",
  COUNT(*) AS segment_count,
  SUM("size") AS total_size_bytes,
  SUM("num_rows") AS total_rows,
  MIN("start") AS earliest_data,
  MAX("end") AS latest_data
FROM sys.segments
WHERE "datasource" = 'web_events'
GROUP BY "datasource";

-- インデックス使用状況確認
SELECT
  "segment_id",
  "dimension",
  "bitmap_index_size",
  "dictionary_size"
FROM sys.segment_indexes
WHERE "datasource" = 'web_events'
ORDER BY "bitmap_index_size" DESC
LIMIT 10;

運用・監視

-- タスク実行状況確認
SELECT
  "id",
  "type", 
  "status",
  "created_time",
  "location"
FROM sys.tasks
ORDER BY "created_time" DESC
LIMIT 10;

-- データソースの統計情報
SELECT
  "datasource",
  "num_segments", 
  "num_rows",
  "size",
  "replicated_size"
FROM sys.datasources;

-- サーバー状況確認
SELECT
  "server",
  "server_type",
  "tier",
  "curr_size",
  "max_size"
FROM sys.servers;

-- 実行中クエリの監視
SELECT
  "query_id",
  "sql_query",
  "state",
  "created_time",
  "duration"
FROM sys.queries
WHERE "state" = 'RUNNING';

-- リソース使用量監視
SELECT
  "service",
  "metric",
  "value"
FROM sys.metrics
WHERE "service" = 'druid/broker'
  AND "metric" IN ('query/cpu/time', 'query/count', 'query/bytes');

リアルタイム取り込み設定

# Kafkaクラスターの設定例
# supervisor-spec.json
{
  "type": "kafka",
  "spec": {
    "dataSchema": {
      "dataSource": "live_events",
      "timestampSpec": {
        "column": "timestamp",
        "format": "millis"
      },
      "dimensionsSpec": {
        "dimensions": [
          "eventType",
          "userId", 
          "sessionId",
          "deviceType",
          "country"
        ]
      },
      "metricsSpec": [
        {"type": "count", "name": "count"},
        {"type": "longSum", "name": "value", "fieldName": "value"},
        {
          "type": "thetaSketch",
          "name": "unique_users",
          "fieldName": "userId"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "HOUR",
        "queryGranularity": "MINUTE"
      }
    },
    "ioConfig": {
      "topic": "events",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092",
        "group.id": "druid-console-consumer"
      },
      "taskCount": 3,
      "replicas": 2,
      "taskDuration": "PT1H",
      "useEarliestOffset": false
    },
    "tuningConfig": {
      "type": "kafka",
      "maxRowsInMemory": 100000,
      "maxBytesInMemory": 50000000,
      "intermediatePersistPeriod": "PT10M",
      "maxPendingPersists": 0
    }
  }
}

# スーパーバイザーの起動
curl -X POST \
  http://localhost:8888/druid/indexer/v1/supervisor \
  -H 'Content-Type: application/json' \
  -d @supervisor-spec.json

# スーパーバイザー状態確認
curl -X GET http://localhost:8888/druid/indexer/v1/supervisor

# スーパーバイザー停止
curl -X POST http://localhost:8888/druid/indexer/v1/supervisor/live_events/terminate

実用例

-- リアルタイムダッシュボード
SELECT
  TIME_FLOOR(__time, 'PT1M') AS minute,
  COUNT(*) AS events_per_minute,
  COUNT(DISTINCT userId) AS active_users,
  SUM(value) AS total_value,
  AVG(value) AS avg_value,
  COUNT(*) FILTER (WHERE eventType = 'purchase') AS purchases
FROM live_events
WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY 1
ORDER BY 1 DESC;

-- A/Bテスト分析
WITH experiment_assignment AS (
  SELECT
    userId,
    CASE WHEN MOD(CAST(userId AS BIGINT), 2) = 0 THEN 'A' ELSE 'B' END AS variant
  FROM (
    SELECT DISTINCT userId FROM web_events
    WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
  )
)
SELECT
  ea.variant,
  COUNT(DISTINCT we.userId) AS unique_users,
  COUNT(*) AS total_events,
  COUNT(*) FILTER (WHERE we.page = '/purchase') AS conversions,
  ROUND(
    COUNT(*) FILTER (WHERE we.page = '/purchase') * 100.0 / 
    COUNT(DISTINCT we.userId), 2
  ) AS conversion_rate
FROM web_events we
JOIN experiment_assignment ea ON we.userId = ea.userId  
WHERE we.__time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
GROUP BY ea.variant;

-- 異常検知
WITH hourly_metrics AS (
  SELECT
    TIME_FLOOR(__time, 'PT1H') AS hour,
    COUNT(*) AS events,
    COUNT(DISTINCT userId) AS unique_users,
    AVG(duration) AS avg_duration
  FROM web_events
  WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '7' DAY
  GROUP BY 1
),
metrics_with_stats AS (
  SELECT
    hour,
    events,
    unique_users,
    avg_duration,
    AVG(events) OVER (
      ORDER BY hour 
      ROWS BETWEEN 23 PRECEDING AND 1 PRECEDING
    ) AS avg_events_24h,
    STDDEV(events) OVER (
      ORDER BY hour
      ROWS BETWEEN 23 PRECEDING AND 1 PRECEDING  
    ) AS stddev_events_24h
  FROM hourly_metrics
)
SELECT
  hour,
  events,
  avg_events_24h,
  ABS(events - avg_events_24h) / stddev_events_24h AS z_score
FROM metrics_with_stats
WHERE ABS(events - avg_events_24h) / stddev_events_24h > 2
  AND hour >= CURRENT_TIMESTAMP - INTERVAL '24' HOUR
ORDER BY hour DESC;