データベース
ClickHouse
概要
ClickHouseは、リアルタイム分析データベース管理システムです。列指向ストレージアーキテクチャを採用し、大規模なデータセットに対する高速なクエリ実行を実現します。OLAP(Online Analytical Processing)に特化した設計で、数十億行のデータでも秒単位でクエリ結果を返すことができます。
詳細
ClickHouseは2016年にYandexによって開発され、現在はClickHouse Inc.が開発・保守しています。列指向ストレージ、ベクトル化された実行エンジン、積極的なデータ圧縮により、従来のOLTPデータベースでは実現困難な高速分析を可能にします。分散アーキテクチャをサポートし、水平スケーリングによって大規模なデータ処理にも対応します。
ClickHouseの主な特徴:
- 列指向ストレージエンジン
- ベクトル化クエリ実行
- 高度なデータ圧縮
- 分散処理とシャーディング
- SQLライクなクエリ言語
- 高速データ取り込み
- リアルタイム分析
- 多様なデータ型サポート
- 豊富な集約関数
- HTTP/TCP インターフェース
メリット・デメリット
メリット
- 超高速: 列指向ストレージによる高速クエリ実行
- 高圧縮: 効率的なデータ圧縮によるストレージコスト削減
- スケーラビリティ: 水平スケーリングで大容量データに対応
- リアルタイム: 挿入データの即座の分析可能
- SQL互換: 標準SQLに近い構文で学習コストが低い
- 豊富な機能: 統計関数、時系列分析、近似計算
- オープンソース: Apache 2.0ライセンスで商用利用可
デメリット
- 更新制限: 頻繁な更新・削除には不向き
- OLTP不適: トランザクション処理には向かない
- 複雑性: 分散構成の設定・運用が複雑
- メモリ消費: 大きなクエリで大量メモリを消費
- 結合制限: 大きなテーブル同士の結合は重い
主要リンク
書き方の例
インストール・セットアップ
# Docker での実行(推奨)
docker run -d --name clickhouse-server \
--ulimit nofile=262144:262144 \
-p 8123:8123 -p 9000:9000 \
-v clickhouse-data:/var/lib/clickhouse \
clickhouse/clickhouse-server
# Ubuntu/Debian
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754
echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee /etc/apt/sources.list.d/clickhouse.list
sudo apt update && sudo apt install -y clickhouse-server clickhouse-client
# Red Hat/CentOS
sudo yum install -y yum-utils
sudo yum-config-manager --add-repo https://packages.clickhouse.com/rpm/clickhouse.repo
sudo yum install -y clickhouse-server clickhouse-client
# macOS (Homebrew)
brew install clickhouse
# サービス起動
sudo systemctl start clickhouse-server
sudo systemctl enable clickhouse-server
# クライアント接続
clickhouse-client
# Webインターフェース
# http://localhost:8123/play
基本操作(DDL/DML)
-- データベース作成
CREATE DATABASE analytics;
USE analytics;
-- テーブル作成(MergeTreeエンジン)
CREATE TABLE events
(
event_time DateTime,
user_id UInt32,
session_id String,
page_url String,
referrer String,
user_agent String,
country_code FixedString(2),
device_type Enum('desktop' = 1, 'mobile' = 2, 'tablet' = 3),
duration UInt32,
bytes_downloaded UInt64
)
ENGINE = MergeTree()
ORDER BY (event_time, user_id)
PARTITION BY toYYYYMM(event_time);
-- データ挿入
INSERT INTO events VALUES
('2024-01-15 10:30:00', 1001, 'sess001', '/home', 'https://google.com', 'Mozilla/5.0...', 'JP', 'desktop', 120, 204800),
('2024-01-15 10:31:00', 1002, 'sess002', '/products', 'https://google.com', 'Mozilla/5.0...', 'US', 'mobile', 45, 51200),
('2024-01-15 10:32:00', 1001, 'sess001', '/about', '/home', 'Mozilla/5.0...', 'JP', 'desktop', 90, 153600);
-- バルクインサート(CSVファイルから)
INSERT INTO events
SELECT *
FROM file('/path/to/events.csv', 'CSV',
'event_time DateTime, user_id UInt32, session_id String, page_url String, referrer String, user_agent String, country_code FixedString(2), device_type String, duration UInt32, bytes_downloaded UInt64');
-- データ更新(ALTER UPDATE)
ALTER TABLE events UPDATE device_type = 'mobile' WHERE user_agent LIKE '%Mobile%';
-- データ削除(ALTER DELETE)
ALTER TABLE events DELETE WHERE event_time < '2024-01-01';
クエリ・分析
-- 基本的な集計
SELECT
toDate(event_time) AS date,
country_code,
device_type,
count() AS page_views,
uniq(user_id) AS unique_users,
avg(duration) AS avg_duration,
sum(bytes_downloaded) AS total_bytes
FROM events
WHERE event_time >= '2024-01-01'
GROUP BY date, country_code, device_type
ORDER BY date DESC, page_views DESC;
-- 時系列分析
SELECT
toStartOfHour(event_time) AS hour,
count() AS events,
uniq(user_id) AS unique_users,
quantile(0.5)(duration) AS median_duration,
quantile(0.95)(duration) AS p95_duration
FROM events
WHERE event_time >= today() - 7
GROUP BY hour
ORDER BY hour;
-- ユーザー行動分析
SELECT
user_id,
count() AS page_views,
sum(duration) AS total_time,
groupArray(page_url) AS visited_pages,
min(event_time) AS first_visit,
max(event_time) AS last_visit
FROM events
WHERE toDate(event_time) = today()
GROUP BY user_id
HAVING page_views > 5
ORDER BY total_time DESC
LIMIT 100;
-- コホート分析
SELECT
toYYYYMM(min_event_time) AS cohort_month,
dateDiff('month', min_event_time, event_time) AS period_number,
count(DISTINCT user_id) AS users
FROM (
SELECT
user_id,
event_time,
min(event_time) OVER (PARTITION BY user_id) AS min_event_time
FROM events
)
GROUP BY cohort_month, period_number
ORDER BY cohort_month, period_number;
高度な分析機能
-- ウィンドウ関数
SELECT
user_id,
event_time,
page_url,
lagInFrame(page_url) OVER (
PARTITION BY user_id
ORDER BY event_time
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
) AS previous_page,
row_number() OVER (
PARTITION BY user_id
ORDER BY event_time
) AS page_sequence
FROM events
ORDER BY user_id, event_time;
-- 配列関数
SELECT
user_id,
groupArray(page_url) AS user_journey,
arrayStringConcat(groupArray(page_url), ' -> ') AS journey_path,
arrayElement(groupArray(page_url), 1) AS landing_page,
arrayElement(groupArray(page_url), -1) AS exit_page
FROM events
GROUP BY user_id
HAVING length(user_journey) > 3;
-- 近似計算(HyperLogLog)
SELECT
country_code,
uniq(user_id) AS exact_unique_users,
uniqHLL12(user_id) AS approx_unique_users
FROM events
GROUP BY country_code;
-- JSON データの処理
CREATE TABLE user_events
(
user_id UInt32,
event_time DateTime,
properties String
)
ENGINE = MergeTree()
ORDER BY (event_time, user_id);
INSERT INTO user_events VALUES
(1001, '2024-01-15 10:30:00', '{"product_id": 123, "category": "electronics", "price": 299.99}');
SELECT
user_id,
JSONExtractString(properties, 'category') AS category,
JSONExtractFloat(properties, 'price') AS price
FROM user_events
WHERE JSONHas(properties, 'price');
パフォーマンス最適化
-- テーブル圧縮設定
CREATE TABLE compressed_events
(
event_time DateTime CODEC(DoubleDelta),
user_id UInt32 CODEC(Delta, LZ4),
page_url String CODEC(LZ4),
duration UInt32 CODEC(Delta, LZ4)
)
ENGINE = MergeTree()
ORDER BY (event_time, user_id)
PARTITION BY toYYYYMM(event_time);
-- プロジェクション(事前集計)
ALTER TABLE events
ADD PROJECTION daily_stats
(
SELECT
toDate(event_time) AS date,
country_code,
count(),
uniq(user_id)
GROUP BY date, country_code
);
-- マテリアライズドビュー
CREATE MATERIALIZED VIEW daily_summary
ENGINE = MergeTree()
ORDER BY (date, country_code)
AS SELECT
toDate(event_time) AS date,
country_code,
device_type,
count() AS events,
uniq(user_id) AS unique_users,
avg(duration) AS avg_duration
FROM events
GROUP BY date, country_code, device_type;
-- 実行計画の確認
EXPLAIN PIPELINE
SELECT count(), avg(duration)
FROM events
WHERE event_time >= '2024-01-01'
AND country_code = 'JP';
分散テーブル
-- レプリケートテーブル
CREATE TABLE events_replica ON CLUSTER cluster_name
(
event_time DateTime,
user_id UInt32,
page_url String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events', '{replica}')
ORDER BY (event_time, user_id)
PARTITION BY toYYYYMM(event_time);
-- 分散テーブル
CREATE TABLE events_distributed ON CLUSTER cluster_name
AS events_replica
ENGINE = Distributed(cluster_name, analytics, events_replica, rand());
-- クラスター情報確認
SELECT * FROM system.clusters;
-- 分散クエリ
SELECT
count() AS total_events,
uniq(user_id) AS unique_users
FROM events_distributed
WHERE event_time >= today() - 30;
実用例
-- リアルタイム ダッシュボード用クエリ
SELECT
toStartOfMinute(event_time) AS minute,
count() AS requests_per_minute,
countIf(page_url = '/checkout') AS checkout_views,
uniq(user_id) AS active_users,
avg(duration) AS avg_page_load_time
FROM events
WHERE event_time >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute DESC
LIMIT 60;
-- A/Bテスト分析
WITH experiment_users AS (
SELECT user_id, if(user_id % 2 = 0, 'A', 'B') AS variant
FROM (SELECT DISTINCT user_id FROM events)
)
SELECT
e.variant,
count() AS page_views,
uniq(ev.user_id) AS unique_users,
countIf(ev.page_url = '/purchase') AS conversions,
conversions / unique_users AS conversion_rate
FROM events ev
JOIN experiment_users e ON ev.user_id = e.user_id
WHERE ev.event_time >= '2024-01-01'
GROUP BY e.variant;
-- 異常検知
SELECT
toStartOfHour(event_time) AS hour,
count() AS events,
avg(count()) OVER (
ORDER BY hour
ROWS BETWEEN 23 PRECEDING AND 1 PRECEDING
) AS avg_previous_24h,
events / avg_previous_24h AS anomaly_ratio
FROM events
WHERE event_time >= now() - INTERVAL 48 HOUR
GROUP BY hour
HAVING anomaly_ratio > 2 OR anomaly_ratio < 0.5
ORDER BY hour DESC;
外部データ統合
-- S3からのデータ読み込み
SELECT count()
FROM s3('https://bucket.s3.amazonaws.com/events/*.csv.gz',
'AccessKeyId', 'SecretAccessKey',
'CSV', 'event_time DateTime, user_id UInt32, action String');
-- MySQLエンジンテーブル
CREATE TABLE mysql_users
(
id UInt32,
name String,
email String
)
ENGINE = MySQL('mysql_server:3306', 'database', 'users', 'username', 'password');
-- URLからのデータ取得
SELECT *
FROM url('https://api.example.com/data.json', 'JSONEachRow')
LIMIT 10;
-- Kafka連携
CREATE TABLE kafka_events
(
event_time DateTime,
user_id UInt32,
action String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'events',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow';
システム管理・監視
-- テーブルサイズ確認
SELECT
table,
formatReadableSize(sum(bytes)) AS size,
sum(rows) AS rows,
count() AS parts
FROM system.parts
WHERE active
GROUP BY table
ORDER BY sum(bytes) DESC;
-- クエリ履歴確認
SELECT
type,
event_time,
query_duration_ms,
formatReadableSize(memory_usage) AS memory,
read_rows,
substring(query, 1, 100) AS query_preview
FROM system.query_log
WHERE event_time >= now() - INTERVAL 1 HOUR
AND type = 'QueryFinish'
ORDER BY query_duration_ms DESC
LIMIT 10;
-- プロファイリング
SELECT
ProfileEvents.Names,
ProfileEvents.Values
FROM system.query_log
WHERE query_id = 'YOUR_QUERY_ID'
AND type = 'QueryFinish'
FORMAT Vertical;
-- バックグラウンドプロセス監視
SELECT * FROM system.metrics
WHERE metric LIKE 'Background%';
-- レプリケーション状況
SELECT * FROM system.replicas;
設定・チューニング
<!-- config.xml の主要設定 -->
<clickhouse>
<max_connections>4096</max_connections>
<keep_alive_timeout>3</keep_alive_timeout>
<max_concurrent_queries>100</max_concurrent_queries>
<merge_tree>
<max_suspicious_broken_parts>5</max_suspicious_broken_parts>
<parts_to_delay_insert>150</parts_to_delay_insert>
<parts_to_throw_insert>300</parts_to_throw_insert>
</merge_tree>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>
<compression>
<case>
<min_part_size>10000000000</min_part_size>
<min_part_size_ratio>0.01</min_part_size_ratio>
<method>lz4hc</method>
</case>
</compression>
</clickhouse>
-- セッション設定
SET max_memory_usage = 10000000000;
SET max_execution_time = 300;
SET optimize_use_projections = 1;
SET allow_experimental_analyzer = 1;
-- グローバル設定確認
SELECT name, value FROM system.settings
WHERE name LIKE '%memory%'
ORDER BY name;