データベース

QuestDB

概要

QuestDBは、高性能な時系列データに特化したオープンソースSQLデータベースです。PostgreSQL Wire Protocolとの互換性を提供し、既存のPostgreSQLクライアントライブラリを使用してアクセス可能です。純粋なカラム型アーキテクチャを採用し、金融、IoT、監視分野で優れたパフォーマンスを発揮します。

詳細

主要特徴

  • 超高速インジェスト: InfluxDB Line Protocolサポートにより毎秒数百万レコードの取り込みが可能
  • PostgreSQL互換性: PostgreSQL Wire Protocolサポートによる既存ツールとの高い互換性
  • 純粋カラム型: 時系列分析に最適化されたカラム型ストレージエンジン
  • SQL拡張: 時系列専用のSQL関数(SAMPLE BY、LATEST ON)を提供
  • ゼロコピー: メモリマップドファイルによる効率的なデータアクセス
  • ZFS圧縮: 2024年v8.0で追加、6倍のストレージ効率化とクエリ性能向上

アーキテクチャ

  • スキーマレス取り込み: 動的テーブル作成とカラム追加
  • パーティション: 自動的な時間ベースパーティショニング
  • インデックス: SYMBOLデータ型による高効率インデックス
  • 並列処理: マルチコアCPUを活用した並列クエリ実行
  • WAL(Write-Ahead Log): データ耐久性とレプリケーション機能

メリット・デメリット

メリット

  • 圧倒的なパフォーマンス: TimescaleDBの10〜150倍、InfluxDBの6.5倍のインジェスト性能
  • PostgreSQL互換: 既存のPostgreSQLアプリケーションから移行が容易
  • 学習コストの低さ: 標準SQLに時系列専用関数を追加した直感的な操作
  • 軽量運用: 複雑な設定不要で高性能を実現
  • 豊富なクライアント: Java、Python、Node.js、C#、Go、Rust等の対応
  • リアルタイム分析: 超高速クエリによるリアルタイムダッシュボード構築

デメリット

  • PostgreSQL機能の制限: 一部のPostgreSQL固有機能は非対応
  • トランザクション制約: 複雑なトランザクションパターンに制限あり
  • データ型制限: PostgreSQLの全データ型には対応していない
  • メタデータクエリ: 一部のDBツール用メタデータクエリが動作しない場合がある

主要リンク

書き方の例

インストール・セットアップ

# Dockerで起動
docker run -p 9000:9000 -p 9009:9009 -p 8812:8812 questdb/questdb

# HomebrewでmacOS
brew install questdb
questdb start

# Linux(.tar.gz)
wget https://github.com/questdb/questdb/releases/download/7.4.2/questdb-7.4.2-rt-linux-amd64.tar.gz
tar -xzf questdb-*.tar.gz
cd questdb-*
./questdb.sh start

基本操作(データ挿入・クエリ)

-- テーブル作成
CREATE TABLE cpu_metrics (
    timestamp TIMESTAMP,
    cpu_id SYMBOL,
    usage DOUBLE,
    temperature DOUBLE
) timestamp(timestamp) PARTITION BY DAY;

-- データ挿入(PostgreSQL Wire Protocol)
INSERT INTO cpu_metrics VALUES 
    (now(), 'cpu-001', 45.2, 68.5),
    (now(), 'cpu-002', 52.1, 71.2);

-- 時系列クエリ(SAMPLE BY)
SELECT timestamp, cpu_id, avg(usage) as avg_usage
FROM cpu_metrics 
WHERE timestamp >= dateadd('h', -24, now())
SAMPLE BY 1h;

-- 最新値取得(LATEST ON)
SELECT * FROM cpu_metrics 
LATEST ON timestamp PARTITION BY cpu_id;

データモデリング

-- IoT센서 데이터 테이블
CREATE TABLE sensor_readings (
    ts TIMESTAMP,
    sensor_id SYMBOL CAPACITY 10000 CACHE,
    location SYMBOL,
    temperature DOUBLE,
    humidity DOUBLE,
    pressure DOUBLE
) timestamp(ts) PARTITION BY DAY;

-- 金融データモデル
CREATE TABLE trades (
    timestamp TIMESTAMP,
    symbol SYMBOL,
    price DOUBLE,
    volume LONG,
    side SYMBOL
) timestamp(timestamp) PARTITION BY DAY;

-- インデックス追加
CREATE INDEX trades_symbol_ts ON trades (symbol, timestamp);

パフォーマンス最適化

-- SYMBOLタイプの最適化
CREATE TABLE optimized_table (
    ts TIMESTAMP,
    device SYMBOL CAPACITY 1000000 NOCACHE,  -- 大量ユニーク値用
    status SYMBOL CACHE,                     -- 少数値用
    value DOUBLE
) timestamp(ts) PARTITION BY DAY;

-- バッチ挿入の最適化
INSERT BATCH 10000 INTO metrics 
SELECT * FROM source_table;

-- O3(Out of Order)設定
ALTER TABLE metrics SET PARAM o3MaxLag = '10s';

実用例

// Java(PostgreSQL JDBC)
import java.sql.*;
import java.util.Properties;

public class QuestDBExample {
    public static void main(String[] args) throws SQLException {
        String url = "jdbc:postgresql://localhost:8812/qdb";
        Properties props = new Properties();
        props.setProperty("user", "admin");
        props.setProperty("password", "quest");
        
        try (Connection conn = DriverManager.getConnection(url, props)) {
            // 時系列データクエリ
            String query = """
                SELECT ts, symbol, avg(price) as avg_price 
                FROM trades 
                WHERE ts >= dateadd('d', -7, now()) 
                SAMPLE BY 1h
                """;
            
            try (PreparedStatement ps = conn.prepareStatement(query);
                 ResultSet rs = ps.executeQuery()) {
                while (rs.next()) {
                    System.out.printf("Time: %s, Symbol: %s, Price: %.2f%n",
                        rs.getTimestamp("ts"), 
                        rs.getString("symbol"),
                        rs.getDouble("avg_price"));
                }
            }
        }
    }
}
# Python(psycopg3)
import psycopg

async def fetch_metrics():
    async with await psycopg.AsyncConnection.connect(
        host='localhost', port=8812, user='admin', 
        password='quest', dbname='qdb'
    ) as conn:
        async with conn.cursor() as cur:
            await cur.execute("""
                SELECT ts, sensor_id, avg(temperature) as avg_temp
                FROM sensor_readings 
                WHERE ts >= now() - INTERVAL '1 DAY'
                SAMPLE BY 10m
            """)
            async for row in cur:
                print(f"Sensor: {row[1]}, Avg Temp: {row[2]:.1f}°C")

ベストプラクティス

-- 1. パーティション戦略
-- 高頻度データは日次、低頻度は月次
CREATE TABLE high_freq_data (...) PARTITION BY DAY;
CREATE TABLE low_freq_data (...) PARTITION BY MONTH;

-- 2. SYMBOL最適化
-- 予想されるユニーク値数を事前設定
CREATE TABLE events (
    ts TIMESTAMP,
    event_type SYMBOL CAPACITY 100 CACHE,      -- 少数の種類
    user_id SYMBOL CAPACITY 1000000 NOCACHE,   -- 大量のユーザー
    data VARCHAR
) timestamp(ts);

-- 3. クエリパフォーマンス
-- WHERE句で時間範囲を必ず指定
SELECT * FROM metrics 
WHERE ts BETWEEN '2024-01-01' AND '2024-01-02'
AND sensor_id = 'temp-001';

-- 4. 一括操作の活用
-- COPY文でCSVファイルからの高速インポート
COPY trades FROM '/path/to/trades.csv' WITH HEADER true;