データベース

InfluxDB

概要

InfluxDBは、時系列データに特化したオープンソースのNoSQLデータベースです。メトリクス、イベント、リアルタイム分析のためのスケーラブルなデータストアとして設計されており、IoTセンサーデータ、アプリケーション監視、ビジネスメトリクスなどの時系列データを効率的に保存・クエリできます。

詳細

InfluxDBは2013年にInfluxDataによって開発されました。時系列データの特性(時間順序、高い書き込み頻度、範囲クエリ中心)に最適化された設計で、TICKスタック(Telegraf、InfluxDB、Chronograf、Kapacitor)の中核コンポーネントとして機能します。SQL風のクエリ言語「Flux」とHTTP APIを提供し、高いパフォーマンスを実現しています。

InfluxDBの主な特徴:

  • 時系列データ専用設計
  • 高速な書き込み・読み取り性能
  • SQL風クエリ言語(Flux)
  • スキーマレス設計
  • 自動データ保持期間管理
  • 高精度タイムスタンプ
  • タグとフィールドによるデータモデル
  • 水平スケーリング(クラスター版)
  • リアルタイム集約・ダウンサンプリング
  • RESTful HTTP API

メリット・デメリット

メリット

  • 高性能: 時系列データに最適化された高速な書き込み・読み取り
  • 使いやすさ: SQL風のクエリ言語で学習コストが低い
  • 自動管理: データ保持期間やダウンサンプリングの自動化
  • 豊富な機能: 統計関数、時間窓集約、予測機能
  • エコシステム: TICKスタックによる完全なソリューション
  • APIs: HTTP APIによる言語非依存のアクセス
  • 可視化: Grafana等の可視化ツールとの高い親和性

デメリット

  • 専用性: 時系列データ以外の用途には向かない
  • メモリ使用量: 大量のメモリを消費する場合がある
  • 複雑性: 高度な機能の設定が複雑
  • 学習コスト: Fluxクエリ言語の習得が必要
  • ライセンス: エンタープライズ機能は有償

主要リンク

書き方の例

インストール・セットアップ

# Docker での実行(推奨)
docker run -d --name influxdb \
  -p 8086:8086 \
  -v influxdb-storage:/var/lib/influxdb2 \
  -e DOCKER_INFLUXDB_INIT_MODE=setup \
  -e DOCKER_INFLUXDB_INIT_USERNAME=admin \
  -e DOCKER_INFLUXDB_INIT_PASSWORD=password123 \
  -e DOCKER_INFLUXDB_INIT_ORG=myorg \
  -e DOCKER_INFLUXDB_INIT_BUCKET=mybucket \
  influxdb:2.7

# Ubuntu/Debian
wget -qO- https://repos.influxdata.com/influxdata-archive_compat.key | sudo apt-key add -
echo "deb https://repos.influxdata.com/ubuntu stable main" | sudo tee /etc/apt/sources.list.d/influxdb.list
sudo apt update && sudo apt install influxdb2

# Red Hat/CentOS
cat > /etc/yum.repos.d/influxdb.repo << EOF
[influxdb]
name = InfluxDB Repository - RHEL \$releasever
baseurl = https://repos.influxdata.com/rhel/\$releasever/\$basearch/stable
enabled = 1
gpgcheck = 1
gpgkey = https://repos.influxdata.com/influxdata-archive_compat.key
EOF
sudo yum install influxdb2

# macOS (Homebrew)
brew install influxdb

# サービス起動
sudo systemctl start influxdb
sudo systemctl enable influxdb

# 初期セットアップ
influx setup

基本操作(HTTP API)

# 組織とバケット作成
curl -X POST "http://localhost:8086/api/v2/orgs" \
  -H "Authorization: Token YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "myorg"
  }'

curl -X POST "http://localhost:8086/api/v2/buckets" \
  -H "Authorization: Token YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "sensors",
    "orgID": "YOUR_ORG_ID",
    "retentionRules": [
      {
        "type": "expire",
        "everySeconds": 2592000
      }
    ]
  }'

# データ書き込み(Line Protocol)
curl -X POST "http://localhost:8086/api/v2/write?org=myorg&bucket=sensors" \
  -H "Authorization: Token YOUR_TOKEN" \
  -H "Content-Type: text/plain" \
  -d 'temperature,location=room1,sensor=DHT22 value=23.5 1640995200000000000
humidity,location=room1,sensor=DHT22 value=65.2 1640995200000000000
cpu_usage,host=server1,region=us-east value=85.3 1640995260000000000'

# データ読み取り(Flux クエリ)
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
  -H "Authorization: Token YOUR_TOKEN" \
  -H "Content-Type: application/vnd.flux" \
  -d 'from(bucket: "sensors")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "temperature")
    |> filter(fn: (r) => r.location == "room1")'

CLI操作

# InfluxDB CLI設定
influx config create \
  --config-name myconfig \
  --host-url http://localhost:8086 \
  --org myorg \
  --token YOUR_TOKEN \
  --active

# データ書き込み
influx write \
  --bucket sensors \
  --precision s \
  'temperature,location=room2 value=24.1 1640995320'

# ファイルからデータ書き込み
cat > data.txt << EOF
temperature,location=room1 value=23.5 1640995200
temperature,location=room2 value=24.1 1640995260
humidity,location=room1 value=65.2 1640995200
humidity,location=room2 value=67.8 1640995260
EOF

influx write --bucket sensors --file data.txt

# Fluxクエリ実行
influx query 'from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> mean()'

# バケット一覧
influx bucket list

# 組織一覧
influx org list

Fluxクエリ言語

// 基本的な範囲クエリ
from(bucket: "sensors")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r.location == "room1")

// 集約とグループ化
from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu_usage")
  |> group(columns: ["host"])
  |> mean()

// 時間窓での集約
from(bucket: "sensors")
  |> range(start: -6h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> aggregateWindow(every: 10m, fn: mean)

// 複数測定値の結合
temp = from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")

humidity = from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "humidity")

join(tables: {temp: temp, humidity: humidity}, on: ["_time", "location"])

// 統計関数
from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu_usage")
  |> group(columns: ["host"])
  |> aggregateWindow(every: 5m, fn: mean)
  |> percentile(percentile: 0.95)

// データ変換
from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> map(fn: (r) => ({
      r with
      _value: (r._value * 9.0 / 5.0) + 32.0,
      unit: "°F"
    }))

データ保持とダウンサンプリング

# タスクによる自動ダウンサンプリング
influx task create --file - << EOF
option task = {name: "downsample-5m", every: 1h}

from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> aggregateWindow(every: 5m, fn: mean)
  |> to(bucket: "sensors_5m")
EOF

# データ保持期間設定
influx bucket update \
  --id YOUR_BUCKET_ID \
  --retention 720h  # 30日間

# 古いデータの削除
influx delete \
  --bucket sensors \
  --start 2023-01-01T00:00:00Z \
  --stop 2023-01-31T23:59:59Z \
  --predicate '_measurement="old_data"'

実用例

// IoTセンサーデータの監視
from(bucket: "iot")
  |> range(start: -15m)
  |> filter(fn: (r) => r._measurement == "sensor_data")
  |> filter(fn: (r) => r._field == "temperature")
  |> group(columns: ["device_id"])
  |> aggregateWindow(every: 1m, fn: last)
  |> map(fn: (r) => ({
      r with
      alert: if r._value > 30.0 then "HIGH" else "NORMAL"
    }))

// アプリケーション性能監視
from(bucket: "metrics")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "http_requests")
  |> filter(fn: (r) => r._field == "response_time")
  |> group(columns: ["endpoint", "status_code"])
  |> aggregateWindow(every: 5m, fn: mean)
  |> filter(fn: (r) => r._value > 1000.0)  // 1秒以上のレスポンス

// ビジネスメトリクス分析
sales = from(bucket: "business")
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "sales")
  |> filter(fn: (r) => r._field == "amount")
  |> aggregateWindow(every: 1d, fn: sum)

revenue = from(bucket: "business")
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "revenue")
  |> filter(fn: (r) => r._field == "total")
  |> aggregateWindow(every: 1d, fn: sum)

join(tables: {sales: sales, revenue: revenue}, on: ["_time"])
  |> map(fn: (r) => ({
      _time: r._time,
      avg_order_value: r.revenue_total / r.sales_amount
    }))

// 予測分析
from(bucket: "sensors")
  |> range(start: -7d)
  |> filter(fn: (r) => r._measurement == "energy_consumption")
  |> aggregateWindow(every: 1h, fn: mean)
  |> holtWinters(n: 24, seasonality: 24)  // 24時間先の予測

Pythonクライアント

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime

# クライアント接続
client = InfluxDBClient(
    url="http://localhost:8086",
    token="YOUR_TOKEN",
    org="myorg"
)

# データ書き込み
write_api = client.write_api(write_options=SYNCHRONOUS)

# ポイント作成と書き込み
point = Point("temperature") \
    .tag("location", "room1") \
    .tag("sensor", "DHT22") \
    .field("value", 23.5) \
    .time(datetime.datetime.utcnow())

write_api.write(bucket="sensors", record=point)

# バルク書き込み
points = []
for i in range(100):
    point = Point("cpu_usage") \
        .tag("host", f"server{i%5}") \
        .field("value", 50 + i % 40) \
        .time(datetime.datetime.utcnow() - datetime.timedelta(minutes=i))
    points.append(point)

write_api.write(bucket="metrics", record=points)

# データ読み取り
query_api = client.query_api()
query = '''
from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r.location == "room1")
'''

result = query_api.query(query)

for table in result:
    for record in table.records:
        print(f"Time: {record.get_time()}, Value: {record.get_value()}")

# パンダス DataFrame への変換
df = query_api.query_data_frame(query)
print(df.head())

# クライアント終了
client.close()

設定・最適化

# influxdb.conf の主要設定
[http]
bind-address = ":8086"
auth-enabled = true

[meta]
dir = "/var/lib/influxdb/meta"
retention-autocreate = true

[data]
dir = "/var/lib/influxdb/data"
wal-dir = "/var/lib/influxdb/wal"
series-id-set-cache-size = 100

[cluster]
shard-writer-timeout = "5s"
write-timeout = "10s"

[retention]
enabled = true
check-interval = "30m"

[shard-precreation]
enabled = true
check-interval = "10m"
advance-period = "30m"

[monitor]
store-enabled = true
store-database = "_internal"

[admin]
enabled = true
bind-address = ":8083"

[subscriber]
enabled = true
http-timeout = "30s"

[continuous_queries]
enabled = true
log-enabled = true
run-interval = "1s"

監視とメンテナンス

# システム統計
influx query 'from(bucket: "_monitoring")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "influxdb_database")
  |> last()'

# パフォーマンス監視
curl "http://localhost:8086/metrics"

# バックアップ
influx backup /path/to/backup

# リストア
influx restore /path/to/backup

# データ整合性チェック
influx inspect verify-seriesfile /var/lib/influxdb/data

# TSMファイル情報
influx inspect dump-tsm /var/lib/influxdb/data/mydb/autogen/1/000000001-000000001.tsm