InfluxDB

時系列データに特化したオープンソースデータベース。IoTメトリクス、アプリケーション監視、リアルタイム分析に最適化。高速書き込み・クエリ性能。

データベースサーバー時系列データベース分散システムリアルタイムアナリティクス高性能監視IoTメトリクス分析Go言語実装

データベースサーバー

InfluxDB

概要

InfluxDBは、メトリクス、イベント、リアルタイム分析に特化した高性能な時系列データベースです。2013年にInfluxData社によって開発され、Go言語で実装された現代的な時系列データ管理プラットフォームとして、IoT、DevOps、アプリケーション監視、リアルタイム分析の分野で広く採用されています。SQLライクなクエリ言語(InfluxQL・Flux)による直感的なデータ操作、自動データ圧縮とダウンサンプリング、分散クラスタリング機能により、数百万ポイント/秒の高速データ取り込みと効率的なストレージ管理を実現します。タイムスタンプを持つデータの保存・検索に最適化された設計により、従来のRDBMSでは困難だった大規模時系列データの高速処理を可能にし、現代のデータドリブン組織における重要なインフラコンポーネントとして機能します。

詳細

InfluxDB 2025年版は、InfluxDB 3.0 Coreの登場により、Apache Arrow、Apache Parquet、Apache DataFusion技術を基盤とした第三世代アーキテクチャとして大幅に進化しています。最新版では、オブジェクトストレージ(S3、GCS、Azure)ネイティブサポート、インメモリクエリエンジンの高速化、SQLとInfluxQLの完全統合、クラウドネイティブ設計による無制限スケーラビリティを実現。InfluxDB Cloudでは、InfluxDB Serverless、InfluxDB Dedicated、InfluxDB Clusteredの3つのデプロイメントオプションを提供し、スタートアップから大企業まで幅広いニーズに対応します。Python処理エンジン統合により、時系列データに対するカスタム分析・機械学習パイプラインの直接実行が可能となり、データサイエンティストとエンジニアの生産性を大幅に向上。TelegrafエージェントとKapacitorによる包括的なETLとアラート機能、充実したエコシステムにより、エンドツーエンドの時系列データ管理ソリューションを提供しています。

主な特徴

  • 時系列最適化: タイムスタンプインデックスと圧縮技術による超高速時系列データ処理
  • 高性能取り込み: 毎秒数百万ポイントの並列データ取り込み性能
  • クラウドネイティブ: オブジェクトストレージベースのモダンアーキテクチャ
  • SQLサポート: 標準SQLとInfluxQLによる柔軟なクエリ機能
  • 自動ダウンサンプリング: データ保持ポリシーによる自動データ生成と圧縮
  • 分散アーキテクチャ: 水平スケーリングによる無制限の拡張性

メリット・デメリット

メリット

  • 時系列データに特化した世界最高レベルの取り込み・クエリ性能
  • 自動データ圧縮と保持ポリシーによる効率的なストレージ運用
  • SQL標準対応により既存データベースエンジニアの学習コスト最小化
  • Telegrafエージェントによる200+データソース対応の包括的なデータ収集
  • InfluxDB CloudでのフルマネージドSaaSによる運用負荷ゼロ選択肢
  • Go言語実装による高いメモリ効率とクロスプラットフォーム対応

デメリット

  • 時系列データ以外の汎用的なワークロードには不向き
  • 複雑なJOIN操作や正規化されたリレーショナルデータ構造の制限
  • 最新のInfluxDB 3.0での一部機能変更による既存環境からの移行コスト
  • 高可用性構成でのライセンス費用とインフラコストの増大
  • Flux言語の習得が必要な高度なデータ変換処理
  • ACID特性よりも性能を重視した設計による一部の整合性制約

参考ページ

書き方の例

インストールと基本セットアップ

# Docker Composeを使用したInfluxDB環境構築
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  influxdb:
    image: influxdb:2.7-alpine
    container_name: influxdb
    restart: unless-stopped
    ports:
      - "8086:8086"
    volumes:
      - influxdb_data:/var/lib/influxdb2
      - influxdb_config:/etc/influxdb2
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=admin
      - DOCKER_INFLUXDB_INIT_PASSWORD=adminpassword
      - DOCKER_INFLUXDB_INIT_ORG=myorg
      - DOCKER_INFLUXDB_INIT_BUCKET=mybucket
      - DOCKER_INFLUXDB_INIT_RETENTION=30d
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=mytoken123456789
    
  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    restart: unless-stopped
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=grafanaadmin
    depends_on:
      - influxdb

volumes:
  influxdb_data:
  influxdb_config:
  grafana_data:
EOF

# サービス起動
docker-compose up -d

# 動作確認
curl http://localhost:8086/health

# Linux環境でのネイティブインストール
# InfluxData公式リポジトリ追加
wget -q https://repos.influxdata.com/influxdata-archive_compat.key
echo '393e8779c89ac8d958f81f942f9ad7fb82a25e133faddaf92e15b16e6ac9ce4c influxdata-archive_compat.key' | sha256sum -c && cat influxdata-archive_compat.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg > /dev/null
echo 'deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg] https://repos.influxdata.com/debian stable main' | sudo tee /etc/apt/sources.list.d/influxdata.list

# InfluxDBインストール
sudo apt update
sudo apt install influxdb2

# サービス開始
sudo systemctl enable influxdb
sudo systemctl start influxdb
sudo systemctl status influxdb

# 初期セットアップ(Webベース)
# http://localhost:8086 にアクセス

# CLIでの初期セットアップ
influx setup \
  --org myorg \
  --bucket mybucket \
  --username admin \
  --password adminpassword \
  --retention 30d \
  --force

# 設定確認
influx config list

基本的なデータ書き込みと読み込み

# Line Protocolを使用したデータ書き込み
curl -X POST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
  -H "Authorization: Token mytoken123456789" \
  -H "Content-Type: text/plain" \
  --data-raw '
temperature,host=server01,region=tokyo value=25.6 1642680000000000000
temperature,host=server02,region=osaka value=22.1 1642680000000000000
humidity,host=server01,region=tokyo value=65.2 1642680000000000000
humidity,host=server02,region=osaka value=58.7 1642680000000000000
cpu_usage,host=server01,region=tokyo,cpu=cpu0 value=85.2 1642680000000000000
cpu_usage,host=server01,region=tokyo,cpu=cpu1 value=72.4 1642680000000000000
memory_usage,host=server01,region=tokyo value=76.8,available=7680000000 1642680000000000000
disk_usage,host=server01,region=tokyo,device=/dev/sda1 value=45.2,free=550000000000 1642680000000000000
'

# 複数ポイントの一括書き込み
cat > sample_data.txt << 'EOF'
temperature,host=server01,region=tokyo value=26.1 1642683600000000000
temperature,host=server01,region=tokyo value=25.8 1642687200000000000
temperature,host=server01,region=tokyo value=24.9 1642690800000000000
temperature,host=server02,region=osaka value=23.2 1642683600000000000
temperature,host=server02,region=osaka value=22.8 1642687200000000000
temperature,host=server02,region=osaka value=21.5 1642690800000000000
EOF

curl -X POST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
  -H "Authorization: Token mytoken123456789" \
  -H "Content-Type: text/plain" \
  --data-binary @sample_data.txt

# Fluxクエリによるデータ読み込み
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
  -H "Authorization: Token mytoken123456789" \
  -H "Content-Type: application/vnd.flux" \
  --data 'from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r._field == "value")
  |> sort(columns: ["_time"], desc: false)'

# InfluxDB CLIを使用したクエリ
influx query '
from(bucket: "mybucket")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "cpu_usage")
  |> filter(fn: (r) => r._field == "value")
  |> group(columns: ["host"])
  |> mean()' \
  --org myorg

# SQLクエリの実行(InfluxDB 3.0)
influx query --format=table '
SELECT 
  time, 
  host, 
  region, 
  AVG(value) as avg_temp
FROM temperature 
WHERE time >= now() - INTERVAL '1 hour'
GROUP BY time(10m), host, region
ORDER BY time DESC' \
  --org myorg

高度なクエリとデータ変換

# Fluxによる高度なデータ分析
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
  -H "Authorization: Token mytoken123456789" \
  -H "Content-Type: application/vnd.flux" \
  --data '
// 温度データの統計分析
temperature_stats = from(bucket: "mybucket")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r._field == "value")
  |> group(columns: ["host", "region"])
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> yield(name: "hourly_avg")

// 異常値検知
temperature_anomalies = from(bucket: "mybucket")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r._field == "value")
  |> group(columns: ["host"])
  |> aggregateWindow(every: 10m, fn: mean)
  |> map(fn: (r) => ({
      r with
      anomaly: if r._value > 30.0 or r._value < 10.0 then "high" else "normal"
    }))
  |> filter(fn: (r) => r.anomaly == "high")
  |> yield(name: "anomalies")

// マルチメトリクス結合分析
combined_metrics = from(bucket: "mybucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu_usage" or r._measurement == "memory_usage")
  |> filter(fn: (r) => r._field == "value")
  |> group(columns: ["host", "_measurement"])
  |> aggregateWindow(every: 5m, fn: mean)
  |> pivot(rowKey: ["_time", "host"], columnKey: ["_measurement"], valueColumn: "_value")
  |> map(fn: (r) => ({
      r with
      performance_score: (100.0 - r.cpu_usage) * (100.0 - r.memory_usage) / 100.0
    }))
  |> yield(name: "performance")
'

# 時系列予測とトレンド分析
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
  -H "Authorization: Token mytoken123456789" \
  -H "Content-Type: application/vnd.flux" \
  --data '
// ローリング平均とトレンド計算
from(bucket: "mybucket")
  |> range(start: -7d)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r._field == "value")
  |> group(columns: ["host"])
  |> aggregateWindow(every: 1h, fn: mean)
  |> movingAverage(n: 24)  // 24時間移動平均
  |> derivative(unit: 1h, nonNegative: false)
  |> map(fn: (r) => ({
      r with
      trend: if r._value > 0.1 then "increasing" 
             else if r._value < -0.1 then "decreasing" 
             else "stable"
    }))
'

# ダウンサンプリングとデータ集約
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
  -H "Authorization: Token mytoken123456789" \
  -H "Content-Type: application/vnd.flux" \
  --data '
// 時間別統計サマリー
from(bucket: "mybucket")
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "cpu_usage")
  |> filter(fn: (r) => r._field == "value")
  |> group(columns: ["host"])
  |> aggregateWindow(
      every: 1d,
      fn: (column, tables=<-) => tables
        |> max(column: "_value")
        |> set(key: "_field", value: "daily_max")
        |> union(tables: tables |> min(column: "_value") |> set(key: "_field", value: "daily_min"))
        |> union(tables: tables |> mean(column: "_value") |> set(key: "_field", value: "daily_avg"))
        |> union(tables: tables |> stddev(column: "_value") |> set(key: "_field", value: "daily_stddev"))
    )
  |> sort(columns: ["_time", "_field"])
'

Telegrafによるデータ収集設定

# Telegrafインストール
sudo apt install telegraf

# Telegraf設定ファイル作成
sudo tee /etc/telegraf/telegraf.conf << 'EOF'
# Telegraf Configuration

# Global tags
[global_tags]
  environment = "production"
  datacenter = "tokyo"

# Agent設定
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  hostname = ""
  omit_hostname = false

# Output plugins
[[outputs.influxdb_v2]]
  urls = ["http://localhost:8086"]
  token = "mytoken123456789"
  organization = "myorg"
  bucket = "mybucket"
  
# Input plugins
[[inputs.cpu]]
  percpu = true
  totalcpu = true
  collect_cpu_time = false
  report_active = false

[[inputs.disk]]
  ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]

[[inputs.diskio]]

[[inputs.kernel]]

[[inputs.mem]]

[[inputs.processes]]

[[inputs.swap]]

[[inputs.system]]

[[inputs.net]]

[[inputs.netstat]]

# Docker metrics
[[inputs.docker]]
  endpoint = "unix:///var/run/docker.sock"
  gather_services = false
  container_names = []
  source_tag = false
  container_name_include = []
  container_name_exclude = []
  timeout = "5s"
  perdevice = true
  total = false
  docker_label_include = []
  docker_label_exclude = []

# HTTP response monitoring
[[inputs.http_response]]
  urls = [
    "http://localhost:8086/health",
    "https://example.com",
    "https://api.github.com"
  ]
  response_timeout = "5s"
  method = "GET"
  follow_redirects = false

# PostgreSQL monitoring
[[inputs.postgresql]]
  address = "host=localhost user=postgres sslmode=disable"
  databases = ["postgres"]

# Nginx monitoring
[[inputs.nginx]]
  urls = ["http://localhost/nginx_status"]
  response_timeout = "5s"
EOF

# Telegrafサービス開始
sudo systemctl enable telegraf
sudo systemctl start telegraf
sudo systemctl status telegraf

# Telegraf設定テスト
telegraf --config /etc/telegraf/telegraf.conf --test

# カスタムメトリクス収集スクリプト
cat > custom_metrics.sh << 'EOF'
#!/bin/bash

# カスタムアプリケーションメトリクス収集
while true; do
  # アプリケーション固有のメトリクス
  app_connections=$(netstat -an | grep :8080 | grep ESTABLISHED | wc -l)
  app_memory=$(ps aux | grep myapp | awk '{sum+=$6} END {print sum}')
  app_cpu=$(ps aux | grep myapp | awk '{sum+=$3} END {print sum}')
  
  # InfluxDBにデータ送信
  curl -X POST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \
    -H "Authorization: Token mytoken123456789" \
    -H "Content-Type: text/plain" \
    --data "myapp,host=$HOSTNAME connections=$app_connections,memory=$app_memory,cpu=$app_cpu $(date +%s)000000000"
  
  sleep 30
done
EOF

chmod +x custom_metrics.sh

データ保持ポリシーとダウンサンプリング

# バケットとデータ保持ポリシーの管理
# 短期データ用バケット作成(1日保持)
influx bucket create \
  --name realtime \
  --org myorg \
  --retention 24h

# 中期データ用バケット作成(30日保持)
influx bucket create \
  --name daily \
  --org myorg \
  --retention 720h

# 長期データ用バケット作成(1年保持)
influx bucket create \
  --name monthly \
  --org myorg \
  --retention 8760h

# データ保持ポリシー確認
influx bucket list --org myorg

# Taskによる自動ダウンサンプリング設定
influx task create --org myorg << 'EOF'
option task = {name: "Daily Downsampling", every: 1h}

from(bucket: "realtime")
  |> range(start: -2h, stop: -1h)
  |> filter(fn: (r) => r._measurement == "temperature" or r._measurement == "humidity")
  |> aggregateWindow(every: 1h, fn: mean)
  |> to(bucket: "daily", org: "myorg")
EOF

# 月次集約タスク作成
influx task create --org myorg << 'EOF'
option task = {name: "Monthly Aggregation", cron: "0 0 1 * *"}  // 毎月1日実行

from(bucket: "daily")
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> aggregateWindow(every: 1d, fn: mean)
  |> map(fn: (r) => ({r with _measurement: "temperature_daily_avg"}))
  |> to(bucket: "monthly", org: "myorg")
EOF

# タスク一覧確認
influx task list --org myorg

# タスクログ確認
influx task log list --task-id <task-id> --org myorg

# 手動でのデータダウンサンプリング実行例
influx query '
from(bucket: "mybucket")
  |> range(start: -7d)
  |> filter(fn: (r) => r._measurement == "cpu_usage")
  |> aggregateWindow(every: 1h, fn: mean)
  |> to(bucket: "daily", org: "myorg")
' --org myorg

パフォーマンス最適化と監視

# InfluxDB設定最適化
sudo tee -a /etc/influxdb/config.toml << 'EOF'
# パフォーマンス設定
http-bind-address = ":8086"
http-max-body-size = 25000000
http-max-concurrent-write-limit = 0
http-max-enqueued-write-limit = 0

# ストレージエンジン設定
store = "tsi1"
engine-path = "/var/lib/influxdb/engine"
wal-dir = "/var/lib/influxdb/wal"
wal-fsync-delay = "0s"
max-series-per-database = 1000000
max-values-per-tag = 100000

# キャッシュ設定
query-timeout = "0s"
log-queries-after = "0s"
max-select-point = 0
max-select-series = 0
max-select-buckets = 0
EOF

# システムリソース監視
# InfluxDB固有のメトリクス確認
curl -X POST "http://localhost:8086/api/v2/query?org=myorg" \
  -H "Authorization: Token mytoken123456789" \
  -H "Content-Type: application/vnd.flux" \
  --data '
from(bucket: "_monitoring")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "influxdb_httpd")
  |> group(columns: ["_field"])
  |> last()
'

# クエリパフォーマンス分析
influx query --profiler-mode=query '
from(bucket: "mybucket")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> group(columns: ["host"])
  |> mean()
' --org myorg

# バケット統計情報
influx bucket list --org myorg --json | jq '.[] | {name: .name, retentionRules: .retentionRules}'

# インデックス統計
influx query '
import "influxdata/influxdb/monitor"

monitor.from(
  start: -1h,
  fn: (r) => r._measurement == "qc_all_duration_seconds"
)
  |> mean()
' --org myorg

# ディスク使用量監視
du -sh /var/lib/influxdb2/
df -h | grep influxdb

# プロセス監視
ps aux | grep influxd
netstat -tlnp | grep 8086

# ログ監視
sudo journalctl -u influxdb -f
tail -f /var/log/influxdb/influxd.log

アプリケーション統合例

# Python InfluxDBクライアント
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime, timezone
import time
import random

class InfluxDBManager:
    def __init__(self, url, token, org, bucket):
        """InfluxDBクライアント初期化"""
        self.client = InfluxDBClient(
            url=url,
            token=token,
            org=org,
            timeout=30000,
            enable_gzip=True
        )
        self.org = org
        self.bucket = bucket
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()
    
    def write_point(self, measurement, tags, fields, timestamp=None):
        """単一ポイント書き込み"""
        try:
            point = Point(measurement)
            
            # タグ追加
            for key, value in tags.items():
                point = point.tag(key, value)
            
            # フィールド追加
            for key, value in fields.items():
                point = point.field(key, value)
            
            # タイムスタンプ設定
            if timestamp:
                point = point.time(timestamp, WritePrecision.NS)
            else:
                point = point.time(datetime.now(timezone.utc), WritePrecision.NS)
            
            self.write_api.write(bucket=self.bucket, org=self.org, record=point)
            return True
            
        except Exception as e:
            print(f"書き込みエラー: {e}")
            return False
    
    def write_batch(self, points_data):
        """バッチ書き込み"""
        try:
            points = []
            for data in points_data:
                point = Point(data['measurement'])
                
                for key, value in data['tags'].items():
                    point = point.tag(key, value)
                
                for key, value in data['fields'].items():
                    point = point.field(key, value)
                
                timestamp = data.get('timestamp', datetime.now(timezone.utc))
                point = point.time(timestamp, WritePrecision.NS)
                points.append(point)
            
            self.write_api.write(bucket=self.bucket, org=self.org, record=points)
            print(f"バッチ書き込み完了: {len(points)}ポイント")
            return True
            
        except Exception as e:
            print(f"バッチ書き込みエラー: {e}")
            return False
    
    def query_flux(self, flux_query):
        """Fluxクエリ実行"""
        try:
            result = self.query_api.query(org=self.org, query=flux_query)
            
            data = []
            for table in result:
                for record in table.records:
                    data.append({
                        'time': record.get_time(),
                        'measurement': record.get_measurement(),
                        'field': record.get_field(),
                        'value': record.get_value(),
                        'tags': {k: v for k, v in record.values.items() 
                                if k not in ['_time', '_measurement', '_field', '_value']}
                    })
            
            return data
            
        except Exception as e:
            print(f"クエリエラー: {e}")
            return None
    
    def get_latest_values(self, measurement, time_range="-1h"):
        """最新値取得"""
        flux_query = f'''
        from(bucket: "{self.bucket}")
        |> range(start: {time_range})
        |> filter(fn: (r) => r._measurement == "{measurement}")
        |> last()
        '''
        return self.query_flux(flux_query)
    
    def get_aggregated_data(self, measurement, window="1h", func="mean", time_range="-24h"):
        """集約データ取得"""
        flux_query = f'''
        from(bucket: "{self.bucket}")
        |> range(start: {time_range})
        |> filter(fn: (r) => r._measurement == "{measurement}")
        |> aggregateWindow(every: {window}, fn: {func})
        |> yield(name: "{func}")
        '''
        return self.query_flux(flux_query)
    
    def close(self):
        """接続クローズ"""
        self.client.close()

# 使用例とモニタリングアプリケーション
def generate_sample_metrics(influx_manager):
    """サンプルメトリクス生成"""
    hosts = ['server01', 'server02', 'server03']
    regions = ['tokyo', 'osaka', 'nagoya']
    
    while True:
        try:
            # CPU使用率データ生成
            cpu_points = []
            for host in hosts:
                for cpu_id in range(4):  # 4コア想定
                    cpu_points.append({
                        'measurement': 'cpu_usage',
                        'tags': {
                            'host': host,
                            'region': random.choice(regions),
                            'cpu': f'cpu{cpu_id}'
                        },
                        'fields': {
                            'usage_percent': random.uniform(10, 90),
                            'user': random.uniform(5, 40),
                            'system': random.uniform(2, 20),
                            'idle': random.uniform(20, 80)
                        }
                    })
            
            # メモリ使用率データ生成
            memory_points = []
            for host in hosts:
                total_memory = 16 * 1024 * 1024 * 1024  # 16GB
                used = random.uniform(0.3, 0.8) * total_memory
                memory_points.append({
                    'measurement': 'memory_usage',
                    'tags': {
                        'host': host,
                        'region': random.choice(regions)
                    },
                    'fields': {
                        'used_bytes': used,
                        'available_bytes': total_memory - used,
                        'used_percent': (used / total_memory) * 100,
                        'cached_bytes': random.uniform(0.1, 0.3) * total_memory
                    }
                })
            
            # 温度データ生成
            temperature_points = []
            for host in hosts:
                temperature_points.append({
                    'measurement': 'temperature',
                    'tags': {
                        'host': host,
                        'region': random.choice(regions),
                        'sensor': 'cpu'
                    },
                    'fields': {
                        'celsius': random.uniform(30, 75),
                        'fahrenheit': random.uniform(86, 167)
                    }
                })
            
            # 全ポイントをバッチ書き込み
            all_points = cpu_points + memory_points + temperature_points
            influx_manager.write_batch(all_points)
            
            print(f"メトリクス送信完了: {len(all_points)}ポイント - {datetime.now()}")
            time.sleep(10)  # 10秒間隔
            
        except KeyboardInterrupt:
            print("\nメトリクス生成を停止します")
            break
        except Exception as e:
            print(f"メトリクス生成エラー: {e}")
            time.sleep(5)

if __name__ == "__main__":
    # InfluxDB接続設定
    influx_config = {
        'url': 'http://localhost:8086',
        'token': 'mytoken123456789',
        'org': 'myorg',
        'bucket': 'mybucket'
    }
    
    # InfluxDBマネージャー初期化
    influx_manager = InfluxDBManager(**influx_config)
    
    try:
        # 最新のCPU使用率取得
        latest_cpu = influx_manager.get_latest_values('cpu_usage')
        print(f"最新CPU使用率: {latest_cpu}")
        
        # 過去24時間の平均温度取得
        avg_temperature = influx_manager.get_aggregated_data(
            'temperature', 
            window='1h', 
            func='mean', 
            time_range='-24h'
        )
        print(f"平均温度データ数: {len(avg_temperature) if avg_temperature else 0}")
        
        # サンプルメトリクス生成開始
        print("サンプルメトリクス生成を開始します (Ctrl+Cで停止)")
        generate_sample_metrics(influx_manager)
        
    except Exception as e:
        print(f"エラー: {e}")
    
    finally:
        influx_manager.close()