Fluentd

統合ログ収集レイヤーを提供するオープンソースデータコレクター。700以上のプラグインエコシステム、ゼロダウンタイム再起動機能を提供。CNCFの卒業プロジェクト。

監視サーバーログ収集データコレクター統合ログCNCFRubyオープンソース

監視サーバー

Fluentd

概要

Fluentdは統合ログ収集レイヤーを提供するオープンソースデータコレクターです。700以上のプラグインエコシステム、ゼロダウンタイム再起動機能を提供し、CNCFの卒業プロジェクトとして5,000以上の企業で採用されています。Kubernetesログ収集の事実上の標準として、v1.18.0でゼロダウンタイム再起動機能を追加し、クラウドネイティブ環境での信頼性を向上させています。

詳細

Fluentdは2011年にTreasure Dataによって開発が開始され、現在では5,000以上の企業で採用されるCNCF卒業プロジェクトです。Kubernetesログ収集の事実上の標準として位置づけられ、v1.18.0でゼロダウンタイム再起動機能を追加しました。700以上のプラグインエコシステムにより、多様なデータソースとの統合が可能で、統合ログ収集における業界標準として確立されています。

主要な技術的特徴

  • 統合ログ収集: 異なるログフォーマットの統一
  • 豊富なプラグイン: 700以上の入力・出力プラグイン
  • メモリバッファリング: 高効率なメモリ管理
  • ゼロダウンタイム再起動: サービス中断なしの設定変更
  • JSON構造化: ログの構造化とパース機能

用途

  • Kubernetesログ収集
  • マイクロサービスログ統合
  • クラウドネイティブロギング
  • ログルーティングとフィルタリング
  • リアルタイムログストリーミング

メリット・デメリット

メリット

  • CNCF卒業プロジェクト: 高い信頼性と業界標準
  • 豊富なプラグイン: 700以上のプラグインエコシステム
  • Kubernetes統合: ネイティブなK8s環境サポート
  • 軽量設計: 低リソース消費での高効率処理
  • ゼロダウンタイム: サービス中断なしの運用
  • オープンソース: 無料で高機能なログコレクター

デメリット

  • Ruby依存: Rubyランタイムへの依存
  • メモリ制限: 大量データ処理時のメモリ制約
  • 設定複雑性: 高度な設定の学習コスト
  • デバッグ困難: プラグイン間の問題特定が複雑
  • パフォーマンス: 非常に高スループット要求時の制限

参考ページ

書き方の例

基本的なFluentd設定

# fluent.conf
<system>
  log_level info
  process_name fluentd
  workers 4
  
  <log>
    format json
    time_format %Y-%m-%d %H:%M:%S %z
  </log>
</system>

# 入力設定
<source>
  @type tail
  path /var/log/nginx/access.log
  pos_file /var/log/fluentd/nginx-access.log.pos
  tag nginx.access
  format nginx
  refresh_interval 5
</source>

<source>
  @type tail
  path /var/log/app/*.log
  pos_file /var/log/fluentd/app.log.pos
  tag app.logs
  format json
  time_key timestamp
  time_format %Y-%m-%d %H:%M:%S
  read_from_head true
</source>

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<source>
  @type http
  port 8888
  bind 0.0.0.0
  body_size_limit 32m
  keepalive_timeout 10s
</source>

# フィルター設定
<filter nginx.access>
  @type parser
  key_name message
  reserve_data true
  <parse>
    @type nginx
  </parse>
</filter>

<filter app.logs>
  @type record_transformer
  <record>
    hostname ${hostname}
    service_name myapp
    environment production
  </record>
</filter>

<filter **>
  @type grep
  <exclude>
    key level
    pattern /DEBUG/
  </exclude>
</filter>

# 出力設定
<match nginx.**>
  @type elasticsearch
  host elasticsearch
  port 9200
  index_name nginx-logs
  type_name nginx
  logstash_format true
  logstash_prefix nginx
  
  <buffer>
    @type file
    path /var/log/fluentd/nginx.buffer
    flush_mode interval
    flush_interval 10s
    flush_thread_count 2
  </buffer>
</match>

<match app.**>
  @type elasticsearch
  host elasticsearch
  port 9200
  index_name app-logs
  type_name application
  logstash_format true
  logstash_prefix application
  
  <buffer>
    @type memory
    flush_mode immediate
  </buffer>
</match>

<match **>
  @type stdout
</match>

Kubernetes環境設定

# fluentd-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: kube-system
  labels:
    k8s-app: fluentd-logging
    version: v1
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-logging
      version: v1
  template:
    metadata:
      labels:
        k8s-app: fluentd-logging
        version: v1
    spec:
      serviceAccount: fluentd
      serviceAccountName: fluentd
      tolerations:
      - key: node-role.kubernetes.io/master
        effect: NoSchedule
      containers:
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch7-1
        env:
          - name: FLUENTD_SYSTEMD_CONF
            value: "disable"
          - name: FLUENTD_PROMETHEUS_CONF
            value: "disable"
          - name: K8S_NODE_NAME
            valueFrom:
              fieldRef:
                fieldPath: spec.nodeName
        resources:
          limits:
            memory: 500Mi
            cpu: 500m
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: config-volume
          mountPath: /fluentd/etc
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: config-volume
        configMap:
          name: fluentd-config

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-config
  namespace: kube-system
data:
  fluent.conf: |
    <source>
      @type tail
      @id in_tail_container_logs
      path /var/log/containers/*.log
      pos_file /var/log/fluentd-containers.log.pos
      tag kubernetes.*
      read_from_head true
      <parse>
        @type json
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>

    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>

    <match **>
      @type elasticsearch
      host elasticsearch.default.svc.cluster.local
      port 9200
      logstash_format true
      logstash_prefix k8s
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>

マルチワーカー設定

# fluent.conf
<system>
  workers 4
  root_dir /tmp/fluentd
</system>

<worker 0>
  <source>
    @type tail
    path /var/log/nginx/access.log
    pos_file /var/log/fluentd/nginx-access.log.pos.0
    tag nginx.access
    format nginx
  </source>
</worker>

<worker 1>
  <source>
    @type tail
    path /var/log/app/app1.log
    pos_file /var/log/fluentd/app1.log.pos.1
    tag app.app1
    format json
  </source>
</worker>

<worker 2>
  <source>
    @type tail
    path /var/log/app/app2.log
    pos_file /var/log/fluentd/app2.log.pos.2
    tag app.app2
    format json
  </source>
</worker>

<worker 3>
  <source>
    @type forward
    port 24224
    bind 0.0.0.0
  </source>
</worker>

# 共通フィルターと出力
<filter **>
  @type record_transformer
  <record>
    worker_id ${worker_id}
    hostname ${hostname}
  </record>
</filter>

<match **>
  @type elasticsearch
  host elasticsearch
  port 9200
  logstash_format true
  
  <buffer>
    @type file
    path /var/log/fluentd/buffer
    flush_mode interval
    flush_interval 10s
    flush_thread_count 2
  </buffer>
</match>

高可用性設定

# fluent.conf (Primary)
<system>
  log_level info
  suppress_config_dump
</system>

# フォワード入力
<source>
  @type forward
  port 24224
  bind 0.0.0.0
  
  # セキュリティ設定
  <security>
    self_hostname primary-fluentd
    shared_key your_shared_key_here
  </security>
</source>

# プライマリからセカンダリへの複製
<match **>
  @type copy
  
  <store>
    @type elasticsearch
    host elasticsearch-primary
    port 9200
    index_name primary-logs
    
    <buffer>
      @type file
      path /var/log/fluentd/primary.buffer
      flush_mode interval
      flush_interval 5s
    </buffer>
  </store>
  
  <store>
    @type forward
    send_timeout 60s
    recover_wait 10s
    heartbeat_interval 1s
    phi_threshold 16
    hard_timeout 60s
    
    <server>
      name secondary-fluentd
      host secondary-fluentd
      port 24224
      weight 60
    </server>
    
    <secondary>
      @type file
      path /var/log/fluentd/failed_records
    </secondary>
    
    <buffer>
      @type file
      path /var/log/fluentd/forward.buffer
      flush_mode interval
      flush_interval 5s
      retry_forever true
    </buffer>
  </store>
</match>

カスタムプラグイン開発

# lib/fluent/plugin/filter_custom_enrichment.rb
require 'fluent/plugin/filter'

module Fluent
  module Plugin
    class CustomEnrichmentFilter < Fluent::Plugin::Filter
      Fluent::Plugin.register_filter('custom_enrichment', self)

      config_param :enrich_key, :string, default: 'level'
      config_param :enrich_map, :hash, default: {}
      config_param :default_value, :string, default: 'unknown'

      def configure(conf)
        super
        @enrichment_map = @enrich_map
      end

      def filter(tag, time, record)
        key_value = record[@enrich_key]
        
        if key_value && @enrichment_map.has_key?(key_value)
          record['enriched_data'] = @enrichment_map[key_value]
        else
          record['enriched_data'] = @default_value
        end

        # タイムスタンプ追加
        record['processed_at'] = Time.now.to_f
        
        # タグ情報追加
        record['fluentd_tag'] = tag
        
        record
      end
    end
  end
end

パフォーマンス監視設定

# fluent.conf
<system>
  enable_input_metrics true
  enable_size_metrics true
  
  <metrics>
    @type prometheus
    port 24231
    bind 0.0.0.0
  </metrics>
</system>

<source>
  @type prometheus
  port 24231
</source>

<source>
  @type prometheus_monitor
  interval 5
  <labels>
    host ${hostname}
  </labels>
</source>

<source>
  @type prometheus_output_monitor
  interval 5
  <labels>
    host ${hostname}
  </labels>
</source>

# メトリクス出力
<filter **>
  @type prometheus
  <metric>
    name fluentd_input_status_num_records_total
    type counter
    desc The total number of incoming records
    <labels>
      tag ${tag}
      hostname ${hostname}
    </labels>
  </metric>
</filter>

<match **>
  @type copy
  
  <store>
    @type elasticsearch
    host elasticsearch
    port 9200
    
    <buffer>
      @type file
      path /var/log/fluentd/elasticsearch.buffer
      flush_mode interval
      flush_interval 10s
      
      # メトリクス有効化
      enable_size_metrics true
    </buffer>
  </store>
  
  <store>
    @type prometheus
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of outgoing records
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>
</match>

Docker環境設定

# docker-compose.yml
version: '3.8'

services:
  fluentd:
    image: fluent/fluentd:v1.16-debian-1
    container_name: fluentd
    restart: unless-stopped
    environment:
      - FLUENTD_CONF=fluent.conf
      - FLUENTD_OPT=-v
    volumes:
      - ./fluent.conf:/fluentd/etc/fluent.conf:ro
      - ./plugins:/fluentd/plugins:ro
      - /var/log:/var/log:ro
      - fluentd-buffer:/var/log/fluentd
    ports:
      - "24224:24224"
      - "24224:24224/udp"
      - "8888:8888"
      - "24231:24231"
    networks:
      - logging
    depends_on:
      - elasticsearch

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.16.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
      - xpack.security.enabled=false
    volumes:
      - elasticsearch-data:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
    networks:
      - logging

  kibana:
    image: docker.elastic.co/kibana/kibana:8.16.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    networks:
      - logging
    depends_on:
      - elasticsearch

volumes:
  fluentd-buffer:
  elasticsearch-data:

networks:
  logging:
    driver: bridge

ログ解析とパース設定

# fluent.conf
<source>
  @type tail
  path /var/log/application/*.log
  pos_file /var/log/fluentd/application.log.pos
  tag application.logs
  
  <parse>
    @type multiline
    format_firstline /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}/
    format1 /^(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) \[(?<level>[^\]]+)\] \[(?<thread>[^\]]+)\] (?<class>[^ ]+) - (?<message>.*)/
  </parse>
</source>

<filter application.logs>
  @type parser
  key_name message
  reserve_data true
  
  <parse>
    @type grok
    grok_pattern %{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] \[%{DATA:thread}\] %{DATA:class} - %{GREEDYDATA:message}
  </parse>
</filter>

<filter application.logs>
  @type record_transformer
  enable_ruby true
  
  <record>
    # タイムスタンプ正規化
    normalized_timestamp ${Time.parse(record["timestamp"]).to_f}
    
    # ログレベル正規化
    normalized_level ${record["level"].upcase}
    
    # 構造化データ抽出
    structured_data ${
      begin
        if record["message"].start_with?("{")
          JSON.parse(record["message"])
        else
          {"raw_message" => record["message"]}
        end
      rescue
        {"parse_error" => true, "raw_message" => record["message"]}
      end
    }
  </record>
</filter>

<filter application.logs>
  @type grep
  <regexp>
    key level
    pattern /^(ERROR|WARN|INFO)$/
  </regexp>
</filter>

<match application.logs>
  @type elasticsearch
  host elasticsearch
  port 9200
  logstash_format true
  logstash_prefix application
  type_name application_log
  
  <buffer>
    @type file
    path /var/log/fluentd/application.buffer
    flush_mode interval
    flush_interval 30s
    flush_thread_count 2
    chunk_limit_size 8MB
    queue_limit_length 32
    retry_max_interval 30
    retry_forever true
  </buffer>
</match>