InfluxDB
時系列データに特化したオープンソースデータベース。IoTメトリクス、アプリケーション監視、リアルタイム分析に最適化。高速書き込み・クエリ性能。
データベースサーバー
InfluxDB
概要
InfluxDBは、メトリクス、イベント、リアルタイム分析に特化した高性能な時系列データベースです。2013年にInfluxData社によって開発され、Go言語で実装された現代的な時系列データ管理プラットフォームとして、IoT、DevOps、アプリケーション監視、リアルタイム分析の分野で広く採用されています。SQLライクなクエリ言語(InfluxQL・Flux)による直感的なデータ操作、自動データ圧縮とダウンサンプリング、分散クラスタリング機能により、数百万ポイント/秒の高速データ取り込みと効率的なストレージ管理を実現します。タイムスタンプを持つデータの保存・検索に最適化された設計により、従来のRDBMSでは困難だった大規模時系列データの高速処理を可能にし、現代のデータドリブン組織における重要なインフラコンポーネントとして機能します。
詳細
InfluxDB 2025年版は、InfluxDB 3.0 Coreの登場により、Apache Arrow、Apache Parquet、Apache DataFusion技術を基盤とした第三世代アーキテクチャとして大幅に進化しています。最新版では、オブジェクトストレージ(S3、GCS、Azure)ネイティブサポート、インメモリクエリエンジンの高速化、SQLとInfluxQLの完全統合、クラウドネイティブ設計による無制限スケーラビリティを実現。InfluxDB Cloudでは、InfluxDB Serverless、InfluxDB Dedicated、InfluxDB Clusteredの3つのデプロイメントオプションを提供し、スタートアップから大企業まで幅広いニーズに対応します。Python処理エンジン統合により、時系列データに対するカスタム分析・機械学習パイプラインの直接実行が可能となり、データサイエンティストとエンジニアの生産性を大幅に向上。TelegrafエージェントとKapacitorによる包括的なETLとアラート機能、充実したエコシステムにより、エンドツーエンドの時系列データ管理ソリューションを提供しています。
主な特徴
- 時系列最適化: タイムスタンプインデックスと圧縮技術による超高速時系列データ処理
- 高性能取り込み: 毎秒数百万ポイントの並列データ取り込み性能
- クラウドネイティブ: オブジェクトストレージベースのモダンアーキテクチャ
- SQLサポート: 標準SQLとInfluxQLによる柔軟なクエリ機能
- 自動ダウンサンプリング: データ保持ポリシーによる自動データ生成と圧縮
- 分散アーキテクチャ: 水平スケーリングによる無制限の拡張性
メリット・デメリット
メリット
- 時系列データに特化した世界最高レベルの取り込み・クエリ性能
- 自動データ圧縮と保持ポリシーによる効率的なストレージ運用
- SQL標準対応により既存データベースエンジニアの学習コスト最小化
- Telegrafエージェントによる200+データソース対応の包括的なデータ収集
- InfluxDB CloudでのフルマネージドSaaSによる運用負荷ゼロ選択肢
- Go言語実装による高いメモリ効率とクロスプラットフォーム対応
デメリット
- 時系列データ以外の汎用的なワークロードには不向き
- 複雑なJOIN操作や正規化されたリレーショナルデータ構造の制限
- 最新のInfluxDB 3.0での一部機能変更による既存環境からの移行コスト
- 高可用性構成でのライセンス費用とインフラコストの増大
- Flux言語の習得が必要な高度なデータ変換処理
- ACID特性よりも性能を重視した設計による一部の整合性制約
参考ページ
書き方の例
インストールと基本セットアップ
# Docker Composeを使用したInfluxDB環境構築
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
influxdb:
image: influxdb:2.7-alpine
container_name: influxdb
restart: unless-stopped
ports:
- "8086:8086"
volumes:
- influxdb_data:/var/lib/influxdb2
- influxdb_config:/etc/influxdb2
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=adminpassword
- DOCKER_INFLUXDB_INIT_ORG=myorg
- DOCKER_INFLUXDB_INIT_BUCKET=mybucket
- DOCKER_INFLUXDB_INIT_RETENTION=30d
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=mytoken123456789
grafana:
image: grafana/grafana:latest
container_name: grafana
restart: unless-stopped
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
environment:
- GF_SECURITY_ADMIN_PASSWORD=grafanaadmin
depends_on:
- influxdb
volumes:
influxdb_data:
influxdb_config:
grafana_data:
EOF
# サービス起動
docker-compose up -d
# 動作確認
curl http://localhost:8086/health
# Linux環境でのネイティブインストール
# InfluxData公式リポジトリ追加
wget -q https://repos.influxdata.com/influxdata-archive_compat.key
echo '393e8779c89ac8d958f81f942f9ad7fb82a25e133faddaf92e15b16e6ac9ce4c influxdata-archive_compat.key' | sha256sum -c && cat influxdata-archive_compat.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg > /dev/null
echo 'deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg] https://repos.influxdata.com/debian stable main' | sudo tee /etc/apt/sources.list.d/influxdata.list
# InfluxDBインストール
sudo apt update
sudo apt install influxdb2
# サービス開始
sudo systemctl enable influxdb
sudo systemctl start influxdb
sudo systemctl status influxdb
# 初期セットアップ(Webベース)
# http://localhost:8086 にアクセス
# CLIでの初期セットアップ
influx setup \
--org myorg \
--bucket mybucket \
--username admin \
--password adminpassword \
--retention 30d \
--force
# 設定確認
influx config list
基本的なデータ書き込みと読み込み
# Line Protocolを使用したデータ書き込み
curl -X POST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
-H "Authorization: Token mytoken123456789" \
-H "Content-Type: text/plain" \
--data-raw '
temperature,host=server01,region=tokyo value=25.6 1642680000000000000
temperature,host=server02,region=osaka value=22.1 1642680000000000000
humidity,host=server01,region=tokyo value=65.2 1642680000000000000
humidity,host=server02,region=osaka value=58.7 1642680000000000000
cpu_usage,host=server01,region=tokyo,cpu=cpu0 value=85.2 1642680000000000000
cpu_usage,host=server01,region=tokyo,cpu=cpu1 value=72.4 1642680000000000000
memory_usage,host=server01,region=tokyo value=76.8,available=7680000000 1642680000000000000
disk_usage,host=server01,region=tokyo,device=/dev/sda1 value=45.2,free=550000000000 1642680000000000000
'
# 複数ポイントの一括書き込み
cat > sample_data.txt << 'EOF'
temperature,host=server01,region=tokyo value=26.1 1642683600000000000
temperature,host=server01,region=tokyo value=25.8 1642687200000000000
temperature,host=server01,region=tokyo value=24.9 1642690800000000000
temperature,host=server02,region=osaka value=23.2 1642683600000000000
temperature,host=server02,region=osaka value=22.8 1642687200000000000
temperature,host=server02,region=osaka value=21.5 1642690800000000000
EOF
curl -X POST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
-H "Authorization: Token mytoken123456789" \
-H "Content-Type: text/plain" \
--data-binary @sample_data.txt
# Fluxクエリによるデータ読み込み
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
-H "Authorization: Token mytoken123456789" \
-H "Content-Type: application/vnd.flux" \
--data 'from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "value")
|> sort(columns: ["_time"], desc: false)'
# InfluxDB CLIを使用したクエリ
influx query '
from(bucket: "mybucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> filter(fn: (r) => r._field == "value")
|> group(columns: ["host"])
|> mean()' \
--org myorg
# SQLクエリの実行(InfluxDB 3.0)
influx query --format=table '
SELECT
time,
host,
region,
AVG(value) as avg_temp
FROM temperature
WHERE time >= now() - INTERVAL '1 hour'
GROUP BY time(10m), host, region
ORDER BY time DESC' \
--org myorg
高度なクエリとデータ変換
# Fluxによる高度なデータ分析
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
-H "Authorization: Token mytoken123456789" \
-H "Content-Type: application/vnd.flux" \
--data '
// 温度データの統計分析
temperature_stats = from(bucket: "mybucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "value")
|> group(columns: ["host", "region"])
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> yield(name: "hourly_avg")
// 異常値検知
temperature_anomalies = from(bucket: "mybucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "value")
|> group(columns: ["host"])
|> aggregateWindow(every: 10m, fn: mean)
|> map(fn: (r) => ({
r with
anomaly: if r._value > 30.0 or r._value < 10.0 then "high" else "normal"
}))
|> filter(fn: (r) => r.anomaly == "high")
|> yield(name: "anomalies")
// マルチメトリクス結合分析
combined_metrics = from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu_usage" or r._measurement == "memory_usage")
|> filter(fn: (r) => r._field == "value")
|> group(columns: ["host", "_measurement"])
|> aggregateWindow(every: 5m, fn: mean)
|> pivot(rowKey: ["_time", "host"], columnKey: ["_measurement"], valueColumn: "_value")
|> map(fn: (r) => ({
r with
performance_score: (100.0 - r.cpu_usage) * (100.0 - r.memory_usage) / 100.0
}))
|> yield(name: "performance")
'
# 時系列予測とトレンド分析
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
-H "Authorization: Token mytoken123456789" \
-H "Content-Type: application/vnd.flux" \
--data '
// ローリング平均とトレンド計算
from(bucket: "mybucket")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "value")
|> group(columns: ["host"])
|> aggregateWindow(every: 1h, fn: mean)
|> movingAverage(n: 24) // 24時間移動平均
|> derivative(unit: 1h, nonNegative: false)
|> map(fn: (r) => ({
r with
trend: if r._value > 0.1 then "increasing"
else if r._value < -0.1 then "decreasing"
else "stable"
}))
'
# ダウンサンプリングとデータ集約
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
-H "Authorization: Token mytoken123456789" \
-H "Content-Type: application/vnd.flux" \
--data '
// 時間別統計サマリー
from(bucket: "mybucket")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> filter(fn: (r) => r._field == "value")
|> group(columns: ["host"])
|> aggregateWindow(
every: 1d,
fn: (column, tables=<-) => tables
|> max(column: "_value")
|> set(key: "_field", value: "daily_max")
|> union(tables: tables |> min(column: "_value") |> set(key: "_field", value: "daily_min"))
|> union(tables: tables |> mean(column: "_value") |> set(key: "_field", value: "daily_avg"))
|> union(tables: tables |> stddev(column: "_value") |> set(key: "_field", value: "daily_stddev"))
)
|> sort(columns: ["_time", "_field"])
'
Telegrafによるデータ収集設定
# Telegrafインストール
sudo apt install telegraf
# Telegraf設定ファイル作成
sudo tee /etc/telegraf/telegraf.conf << 'EOF'
# Telegraf Configuration
# Global tags
[global_tags]
environment = "production"
datacenter = "tokyo"
# Agent設定
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
precision = ""
hostname = ""
omit_hostname = false
# Output plugins
[[outputs.influxdb_v2]]
urls = ["http://localhost:8086"]
token = "mytoken123456789"
organization = "myorg"
bucket = "mybucket"
# Input plugins
[[inputs.cpu]]
percpu = true
totalcpu = true
collect_cpu_time = false
report_active = false
[[inputs.disk]]
ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]
[[inputs.diskio]]
[[inputs.kernel]]
[[inputs.mem]]
[[inputs.processes]]
[[inputs.swap]]
[[inputs.system]]
[[inputs.net]]
[[inputs.netstat]]
# Docker metrics
[[inputs.docker]]
endpoint = "unix:///var/run/docker.sock"
gather_services = false
container_names = []
source_tag = false
container_name_include = []
container_name_exclude = []
timeout = "5s"
perdevice = true
total = false
docker_label_include = []
docker_label_exclude = []
# HTTP response monitoring
[[inputs.http_response]]
urls = [
"http://localhost:8086/health",
"https://example.com",
"https://api.github.com"
]
response_timeout = "5s"
method = "GET"
follow_redirects = false
# PostgreSQL monitoring
[[inputs.postgresql]]
address = "host=localhost user=postgres sslmode=disable"
databases = ["postgres"]
# Nginx monitoring
[[inputs.nginx]]
urls = ["http://localhost/nginx_status"]
response_timeout = "5s"
EOF
# Telegrafサービス開始
sudo systemctl enable telegraf
sudo systemctl start telegraf
sudo systemctl status telegraf
# Telegraf設定テスト
telegraf --config /etc/telegraf/telegraf.conf --test
# カスタムメトリクス収集スクリプト
cat > custom_metrics.sh << 'EOF'
#!/bin/bash
# カスタムアプリケーションメトリクス収集
while true; do
# アプリケーション固有のメトリクス
app_connections=$(netstat -an | grep :8080 | grep ESTABLISHED | wc -l)
app_memory=$(ps aux | grep myapp | awk '{sum+=$6} END {print sum}')
app_cpu=$(ps aux | grep myapp | awk '{sum+=$3} END {print sum}')
# InfluxDBにデータ送信
curl -X POST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
-H "Authorization: Token mytoken123456789" \
-H "Content-Type: text/plain" \
--data "myapp,host=$HOSTNAME connections=$app_connections,memory=$app_memory,cpu=$app_cpu $(date +%s)000000000"
sleep 30
done
EOF
chmod +x custom_metrics.sh
データ保持ポリシーとダウンサンプリング
# バケットとデータ保持ポリシーの管理
# 短期データ用バケット作成(1日保持)
influx bucket create \
--name realtime \
--org myorg \
--retention 24h
# 中期データ用バケット作成(30日保持)
influx bucket create \
--name daily \
--org myorg \
--retention 720h
# 長期データ用バケット作成(1年保持)
influx bucket create \
--name monthly \
--org myorg \
--retention 8760h
# データ保持ポリシー確認
influx bucket list --org myorg
# Taskによる自動ダウンサンプリング設定
influx task create --org myorg << 'EOF'
option task = {name: "Daily Downsampling", every: 1h}
from(bucket: "realtime")
|> range(start: -2h, stop: -1h)
|> filter(fn: (r) => r._measurement == "temperature" or r._measurement == "humidity")
|> aggregateWindow(every: 1h, fn: mean)
|> to(bucket: "daily", org: "myorg")
EOF
# 月次集約タスク作成
influx task create --org myorg << 'EOF'
option task = {name: "Monthly Aggregation", cron: "0 0 1 * *"} // 毎月1日実行
from(bucket: "daily")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "temperature")
|> aggregateWindow(every: 1d, fn: mean)
|> map(fn: (r) => ({r with _measurement: "temperature_daily_avg"}))
|> to(bucket: "monthly", org: "myorg")
EOF
# タスク一覧確認
influx task list --org myorg
# タスクログ確認
influx task log list --task-id <task-id> --org myorg
# 手動でのデータダウンサンプリング実行例
influx query '
from(bucket: "mybucket")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> aggregateWindow(every: 1h, fn: mean)
|> to(bucket: "daily", org: "myorg")
' --org myorg
パフォーマンス最適化と監視
# InfluxDB設定最適化
sudo tee -a /etc/influxdb/config.toml << 'EOF'
# パフォーマンス設定
http-bind-address = ":8086"
http-max-body-size = 25000000
http-max-concurrent-write-limit = 0
http-max-enqueued-write-limit = 0
# ストレージエンジン設定
store = "tsi1"
engine-path = "/var/lib/influxdb/engine"
wal-dir = "/var/lib/influxdb/wal"
wal-fsync-delay = "0s"
max-series-per-database = 1000000
max-values-per-tag = 100000
# キャッシュ設定
query-timeout = "0s"
log-queries-after = "0s"
max-select-point = 0
max-select-series = 0
max-select-buckets = 0
EOF
# システムリソース監視
# InfluxDB固有のメトリクス確認
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
-H "Authorization: Token mytoken123456789" \
-H "Content-Type: application/vnd.flux" \
--data '
from(bucket: "_monitoring")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "influxdb_httpd")
|> group(columns: ["_field"])
|> last()
'
# クエリパフォーマンス分析
influx query --profiler-mode=query '
from(bucket: "mybucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "temperature")
|> group(columns: ["host"])
|> mean()
' --org myorg
# バケット統計情報
influx bucket list --org myorg --json | jq '.[] | {name: .name, retentionRules: .retentionRules}'
# インデックス統計
influx query '
import "influxdata/influxdb/monitor"
monitor.from(
start: -1h,
fn: (r) => r._measurement == "qc_all_duration_seconds"
)
|> mean()
' --org myorg
# ディスク使用量監視
du -sh /var/lib/influxdb2/
df -h | grep influxdb
# プロセス監視
ps aux | grep influxd
netstat -tlnp | grep 8086
# ログ監視
sudo journalctl -u influxdb -f
tail -f /var/log/influxdb/influxd.log
アプリケーション統合例
# Python InfluxDBクライアント
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime, timezone
import time
import random
class InfluxDBManager:
def __init__(self, url, token, org, bucket):
"""InfluxDBクライアント初期化"""
self.client = InfluxDBClient(
url=url,
token=token,
org=org,
timeout=30000,
enable_gzip=True
)
self.org = org
self.bucket = bucket
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
def write_point(self, measurement, tags, fields, timestamp=None):
"""単一ポイント書き込み"""
try:
point = Point(measurement)
# タグ追加
for key, value in tags.items():
point = point.tag(key, value)
# フィールド追加
for key, value in fields.items():
point = point.field(key, value)
# タイムスタンプ設定
if timestamp:
point = point.time(timestamp, WritePrecision.NS)
else:
point = point.time(datetime.now(timezone.utc), WritePrecision.NS)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
return True
except Exception as e:
print(f"書き込みエラー: {e}")
return False
def write_batch(self, points_data):
"""バッチ書き込み"""
try:
points = []
for data in points_data:
point = Point(data['measurement'])
for key, value in data['tags'].items():
point = point.tag(key, value)
for key, value in data['fields'].items():
point = point.field(key, value)
timestamp = data.get('timestamp', datetime.now(timezone.utc))
point = point.time(timestamp, WritePrecision.NS)
points.append(point)
self.write_api.write(bucket=self.bucket, org=self.org, record=points)
print(f"バッチ書き込み完了: {len(points)}ポイント")
return True
except Exception as e:
print(f"バッチ書き込みエラー: {e}")
return False
def query_flux(self, flux_query):
"""Fluxクエリ実行"""
try:
result = self.query_api.query(org=self.org, query=flux_query)
data = []
for table in result:
for record in table.records:
data.append({
'time': record.get_time(),
'measurement': record.get_measurement(),
'field': record.get_field(),
'value': record.get_value(),
'tags': {k: v for k, v in record.values.items()
if k not in ['_time', '_measurement', '_field', '_value']}
})
return data
except Exception as e:
print(f"クエリエラー: {e}")
return None
def get_latest_values(self, measurement, time_range="-1h"):
"""最新値取得"""
flux_query = f'''
from(bucket: "{self.bucket}")
|> range(start: {time_range})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> last()
'''
return self.query_flux(flux_query)
def get_aggregated_data(self, measurement, window="1h", func="mean", time_range="-24h"):
"""集約データ取得"""
flux_query = f'''
from(bucket: "{self.bucket}")
|> range(start: {time_range})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> aggregateWindow(every: {window}, fn: {func})
|> yield(name: "{func}")
'''
return self.query_flux(flux_query)
def close(self):
"""接続クローズ"""
self.client.close()
# 使用例とモニタリングアプリケーション
def generate_sample_metrics(influx_manager):
"""サンプルメトリクス生成"""
hosts = ['server01', 'server02', 'server03']
regions = ['tokyo', 'osaka', 'nagoya']
while True:
try:
# CPU使用率データ生成
cpu_points = []
for host in hosts:
for cpu_id in range(4): # 4コア想定
cpu_points.append({
'measurement': 'cpu_usage',
'tags': {
'host': host,
'region': random.choice(regions),
'cpu': f'cpu{cpu_id}'
},
'fields': {
'usage_percent': random.uniform(10, 90),
'user': random.uniform(5, 40),
'system': random.uniform(2, 20),
'idle': random.uniform(20, 80)
}
})
# メモリ使用率データ生成
memory_points = []
for host in hosts:
total_memory = 16 * 1024 * 1024 * 1024 # 16GB
used = random.uniform(0.3, 0.8) * total_memory
memory_points.append({
'measurement': 'memory_usage',
'tags': {
'host': host,
'region': random.choice(regions)
},
'fields': {
'used_bytes': used,
'available_bytes': total_memory - used,
'used_percent': (used / total_memory) * 100,
'cached_bytes': random.uniform(0.1, 0.3) * total_memory
}
})
# 温度データ生成
temperature_points = []
for host in hosts:
temperature_points.append({
'measurement': 'temperature',
'tags': {
'host': host,
'region': random.choice(regions),
'sensor': 'cpu'
},
'fields': {
'celsius': random.uniform(30, 75),
'fahrenheit': random.uniform(86, 167)
}
})
# 全ポイントをバッチ書き込み
all_points = cpu_points + memory_points + temperature_points
influx_manager.write_batch(all_points)
print(f"メトリクス送信完了: {len(all_points)}ポイント - {datetime.now()}")
time.sleep(10) # 10秒間隔
except KeyboardInterrupt:
print("\nメトリクス生成を停止します")
break
except Exception as e:
print(f"メトリクス生成エラー: {e}")
time.sleep(5)
if __name__ == "__main__":
# InfluxDB接続設定
influx_config = {
'url': 'http://localhost:8086',
'token': 'mytoken123456789',
'org': 'myorg',
'bucket': 'mybucket'
}
# InfluxDBマネージャー初期化
influx_manager = InfluxDBManager(**influx_config)
try:
# 最新のCPU使用率取得
latest_cpu = influx_manager.get_latest_values('cpu_usage')
print(f"最新CPU使用率: {latest_cpu}")
# 過去24時間の平均温度取得
avg_temperature = influx_manager.get_aggregated_data(
'temperature',
window='1h',
func='mean',
time_range='-24h'
)
print(f"平均温度データ数: {len(avg_temperature) if avg_temperature else 0}")
# サンプルメトリクス生成開始
print("サンプルメトリクス生成を開始します (Ctrl+Cで停止)")
generate_sample_metrics(influx_manager)
except Exception as e:
print(f"エラー: {e}")
finally:
influx_manager.close()