データベース
InfluxDB
概要
InfluxDBは、時系列データに特化したオープンソースのNoSQLデータベースです。メトリクス、イベント、リアルタイム分析のためのスケーラブルなデータストアとして設計されており、IoTセンサーデータ、アプリケーション監視、ビジネスメトリクスなどの時系列データを効率的に保存・クエリできます。
詳細
InfluxDBは2013年にInfluxDataによって開発されました。時系列データの特性(時間順序、高い書き込み頻度、範囲クエリ中心)に最適化された設計で、TICKスタック(Telegraf、InfluxDB、Chronograf、Kapacitor)の中核コンポーネントとして機能します。SQL風のクエリ言語「Flux」とHTTP APIを提供し、高いパフォーマンスを実現しています。
InfluxDBの主な特徴:
- 時系列データ専用設計
- 高速な書き込み・読み取り性能
- SQL風クエリ言語(Flux)
- スキーマレス設計
- 自動データ保持期間管理
- 高精度タイムスタンプ
- タグとフィールドによるデータモデル
- 水平スケーリング(クラスター版)
- リアルタイム集約・ダウンサンプリング
- RESTful HTTP API
メリット・デメリット
メリット
- 高性能: 時系列データに最適化された高速な書き込み・読み取り
- 使いやすさ: SQL風のクエリ言語で学習コストが低い
- 自動管理: データ保持期間やダウンサンプリングの自動化
- 豊富な機能: 統計関数、時間窓集約、予測機能
- エコシステム: TICKスタックによる完全なソリューション
- APIs: HTTP APIによる言語非依存のアクセス
- 可視化: Grafana等の可視化ツールとの高い親和性
デメリット
- 専用性: 時系列データ以外の用途には向かない
- メモリ使用量: 大量のメモリを消費する場合がある
- 複雑性: 高度な機能の設定が複雑
- 学習コスト: Fluxクエリ言語の習得が必要
- ライセンス: エンタープライズ機能は有償
主要リンク
書き方の例
インストール・セットアップ
# Docker での実行(推奨)
docker run -d --name influxdb \
-p 8086:8086 \
-v influxdb-storage:/var/lib/influxdb2 \
-e DOCKER_INFLUXDB_INIT_MODE=setup \
-e DOCKER_INFLUXDB_INIT_USERNAME=admin \
-e DOCKER_INFLUXDB_INIT_PASSWORD=password123 \
-e DOCKER_INFLUXDB_INIT_ORG=myorg \
-e DOCKER_INFLUXDB_INIT_BUCKET=mybucket \
influxdb:2.7
# Ubuntu/Debian
wget -qO- https://repos.influxdata.com/influxdata-archive_compat.key | sudo apt-key add -
echo "deb https://repos.influxdata.com/ubuntu stable main" | sudo tee /etc/apt/sources.list.d/influxdb.list
sudo apt update && sudo apt install influxdb2
# Red Hat/CentOS
cat > /etc/yum.repos.d/influxdb.repo << EOF
[influxdb]
name = InfluxDB Repository - RHEL \$releasever
baseurl = https://repos.influxdata.com/rhel/\$releasever/\$basearch/stable
enabled = 1
gpgcheck = 1
gpgkey = https://repos.influxdata.com/influxdata-archive_compat.key
EOF
sudo yum install influxdb2
# macOS (Homebrew)
brew install influxdb
# サービス起動
sudo systemctl start influxdb
sudo systemctl enable influxdb
# 初期セットアップ
influx setup
基本操作(HTTP API)
# 組織とバケット作成
curl -X POST "http://localhost:8086/api/v2/orgs" \
-H "Authorization: Token YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "myorg"
}'
curl -X POST "http://localhost:8086/api/v2/buckets" \
-H "Authorization: Token YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "sensors",
"orgID": "YOUR_ORG_ID",
"retentionRules": [
{
"type": "expire",
"everySeconds": 2592000
}
]
}'
# データ書き込み(Line Protocol)
curl -X POST "http://localhost:8086/api/v2/write?org=myorg&bucket=sensors" \
-H "Authorization: Token YOUR_TOKEN" \
-H "Content-Type: text/plain" \
-d 'temperature,location=room1,sensor=DHT22 value=23.5 1640995200000000000
humidity,location=room1,sensor=DHT22 value=65.2 1640995200000000000
cpu_usage,host=server1,region=us-east value=85.3 1640995260000000000'
# データ読み取り(Flux クエリ)
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
-H "Authorization: Token YOUR_TOKEN" \
-H "Content-Type: application/vnd.flux" \
-d 'from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.location == "room1")'
CLI操作
# InfluxDB CLI設定
influx config create \
--config-name myconfig \
--host-url http://localhost:8086 \
--org myorg \
--token YOUR_TOKEN \
--active
# データ書き込み
influx write \
--bucket sensors \
--precision s \
'temperature,location=room2 value=24.1 1640995320'
# ファイルからデータ書き込み
cat > data.txt << EOF
temperature,location=room1 value=23.5 1640995200
temperature,location=room2 value=24.1 1640995260
humidity,location=room1 value=65.2 1640995200
humidity,location=room2 value=67.8 1640995260
EOF
influx write --bucket sensors --file data.txt
# Fluxクエリ実行
influx query 'from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> mean()'
# バケット一覧
influx bucket list
# 組織一覧
influx org list
Fluxクエリ言語
// 基本的な範囲クエリ
from(bucket: "sensors")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.location == "room1")
// 集約とグループ化
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> group(columns: ["host"])
|> mean()
// 時間窓での集約
from(bucket: "sensors")
|> range(start: -6h)
|> filter(fn: (r) => r._measurement == "temperature")
|> aggregateWindow(every: 10m, fn: mean)
// 複数測定値の結合
temp = from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
humidity = from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "humidity")
join(tables: {temp: temp, humidity: humidity}, on: ["_time", "location"])
// 統計関数
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> group(columns: ["host"])
|> aggregateWindow(every: 5m, fn: mean)
|> percentile(percentile: 0.95)
// データ変換
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> map(fn: (r) => ({
r with
_value: (r._value * 9.0 / 5.0) + 32.0,
unit: "°F"
}))
データ保持とダウンサンプリング
# タスクによる自動ダウンサンプリング
influx task create --file - << EOF
option task = {name: "downsample-5m", every: 1h}
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> aggregateWindow(every: 5m, fn: mean)
|> to(bucket: "sensors_5m")
EOF
# データ保持期間設定
influx bucket update \
--id YOUR_BUCKET_ID \
--retention 720h # 30日間
# 古いデータの削除
influx delete \
--bucket sensors \
--start 2023-01-01T00:00:00Z \
--stop 2023-01-31T23:59:59Z \
--predicate '_measurement="old_data"'
実用例
// IoTセンサーデータの監視
from(bucket: "iot")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> group(columns: ["device_id"])
|> aggregateWindow(every: 1m, fn: last)
|> map(fn: (r) => ({
r with
alert: if r._value > 30.0 then "HIGH" else "NORMAL"
}))
// アプリケーション性能監視
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "http_requests")
|> filter(fn: (r) => r._field == "response_time")
|> group(columns: ["endpoint", "status_code"])
|> aggregateWindow(every: 5m, fn: mean)
|> filter(fn: (r) => r._value > 1000.0) // 1秒以上のレスポンス
// ビジネスメトリクス分析
sales = from(bucket: "business")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "sales")
|> filter(fn: (r) => r._field == "amount")
|> aggregateWindow(every: 1d, fn: sum)
revenue = from(bucket: "business")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "revenue")
|> filter(fn: (r) => r._field == "total")
|> aggregateWindow(every: 1d, fn: sum)
join(tables: {sales: sales, revenue: revenue}, on: ["_time"])
|> map(fn: (r) => ({
_time: r._time,
avg_order_value: r.revenue_total / r.sales_amount
}))
// 予測分析
from(bucket: "sensors")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "energy_consumption")
|> aggregateWindow(every: 1h, fn: mean)
|> holtWinters(n: 24, seasonality: 24) // 24時間先の予測
Pythonクライアント
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
# クライアント接続
client = InfluxDBClient(
url="http://localhost:8086",
token="YOUR_TOKEN",
org="myorg"
)
# データ書き込み
write_api = client.write_api(write_options=SYNCHRONOUS)
# ポイント作成と書き込み
point = Point("temperature") \
.tag("location", "room1") \
.tag("sensor", "DHT22") \
.field("value", 23.5) \
.time(datetime.datetime.utcnow())
write_api.write(bucket="sensors", record=point)
# バルク書き込み
points = []
for i in range(100):
point = Point("cpu_usage") \
.tag("host", f"server{i%5}") \
.field("value", 50 + i % 40) \
.time(datetime.datetime.utcnow() - datetime.timedelta(minutes=i))
points.append(point)
write_api.write(bucket="metrics", record=points)
# データ読み取り
query_api = client.query_api()
query = '''
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.location == "room1")
'''
result = query_api.query(query)
for table in result:
for record in table.records:
print(f"Time: {record.get_time()}, Value: {record.get_value()}")
# パンダス DataFrame への変換
df = query_api.query_data_frame(query)
print(df.head())
# クライアント終了
client.close()
設定・最適化
# influxdb.conf の主要設定
[http]
bind-address = ":8086"
auth-enabled = true
[meta]
dir = "/var/lib/influxdb/meta"
retention-autocreate = true
[data]
dir = "/var/lib/influxdb/data"
wal-dir = "/var/lib/influxdb/wal"
series-id-set-cache-size = 100
[cluster]
shard-writer-timeout = "5s"
write-timeout = "10s"
[retention]
enabled = true
check-interval = "30m"
[shard-precreation]
enabled = true
check-interval = "10m"
advance-period = "30m"
[monitor]
store-enabled = true
store-database = "_internal"
[admin]
enabled = true
bind-address = ":8083"
[subscriber]
enabled = true
http-timeout = "30s"
[continuous_queries]
enabled = true
log-enabled = true
run-interval = "1s"
監視とメンテナンス
# システム統計
influx query 'from(bucket: "_monitoring")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "influxdb_database")
|> last()'
# パフォーマンス監視
curl "http://localhost:8086/metrics"
# バックアップ
influx backup /path/to/backup
# リストア
influx restore /path/to/backup
# データ整合性チェック
influx inspect verify-seriesfile /var/lib/influxdb/data
# TSMファイル情報
influx inspect dump-tsm /var/lib/influxdb/data/mydb/autogen/1/000000001-000000001.tsm