Fluentd
統合ログ収集レイヤーを提供するオープンソースデータコレクター。700以上のプラグインエコシステム、ゼロダウンタイム再起動機能を提供。CNCFの卒業プロジェクト。
監視サーバー
Fluentd
概要
Fluentdは統合ログ収集レイヤーを提供するオープンソースデータコレクターです。700以上のプラグインエコシステム、ゼロダウンタイム再起動機能を提供し、CNCFの卒業プロジェクトとして5,000以上の企業で採用されています。Kubernetesログ収集の事実上の標準として、v1.18.0でゼロダウンタイム再起動機能を追加し、クラウドネイティブ環境での信頼性を向上させています。
詳細
Fluentdは2011年にTreasure Dataによって開発が開始され、現在では5,000以上の企業で採用されるCNCF卒業プロジェクトです。Kubernetesログ収集の事実上の標準として位置づけられ、v1.18.0でゼロダウンタイム再起動機能を追加しました。700以上のプラグインエコシステムにより、多様なデータソースとの統合が可能で、統合ログ収集における業界標準として確立されています。
主要な技術的特徴
- 統合ログ収集: 異なるログフォーマットの統一
- 豊富なプラグイン: 700以上の入力・出力プラグイン
- メモリバッファリング: 高効率なメモリ管理
- ゼロダウンタイム再起動: サービス中断なしの設定変更
- JSON構造化: ログの構造化とパース機能
用途
- Kubernetesログ収集
- マイクロサービスログ統合
- クラウドネイティブロギング
- ログルーティングとフィルタリング
- リアルタイムログストリーミング
メリット・デメリット
メリット
- CNCF卒業プロジェクト: 高い信頼性と業界標準
- 豊富なプラグイン: 700以上のプラグインエコシステム
- Kubernetes統合: ネイティブなK8s環境サポート
- 軽量設計: 低リソース消費での高効率処理
- ゼロダウンタイム: サービス中断なしの運用
- オープンソース: 無料で高機能なログコレクター
デメリット
- Ruby依存: Rubyランタイムへの依存
- メモリ制限: 大量データ処理時のメモリ制約
- 設定複雑性: 高度な設定の学習コスト
- デバッグ困難: プラグイン間の問題特定が複雑
- パフォーマンス: 非常に高スループット要求時の制限
参考ページ
書き方の例
基本的なFluentd設定
# fluent.conf
<system>
log_level info
process_name fluentd
workers 4
<log>
format json
time_format %Y-%m-%d %H:%M:%S %z
</log>
</system>
# 入力設定
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/fluentd/nginx-access.log.pos
tag nginx.access
format nginx
refresh_interval 5
</source>
<source>
@type tail
path /var/log/app/*.log
pos_file /var/log/fluentd/app.log.pos
tag app.logs
format json
time_key timestamp
time_format %Y-%m-%d %H:%M:%S
read_from_head true
</source>
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<source>
@type http
port 8888
bind 0.0.0.0
body_size_limit 32m
keepalive_timeout 10s
</source>
# フィルター設定
<filter nginx.access>
@type parser
key_name message
reserve_data true
<parse>
@type nginx
</parse>
</filter>
<filter app.logs>
@type record_transformer
<record>
hostname ${hostname}
service_name myapp
environment production
</record>
</filter>
<filter **>
@type grep
<exclude>
key level
pattern /DEBUG/
</exclude>
</filter>
# 出力設定
<match nginx.**>
@type elasticsearch
host elasticsearch
port 9200
index_name nginx-logs
type_name nginx
logstash_format true
logstash_prefix nginx
<buffer>
@type file
path /var/log/fluentd/nginx.buffer
flush_mode interval
flush_interval 10s
flush_thread_count 2
</buffer>
</match>
<match app.**>
@type elasticsearch
host elasticsearch
port 9200
index_name app-logs
type_name application
logstash_format true
logstash_prefix application
<buffer>
@type memory
flush_mode immediate
</buffer>
</match>
<match **>
@type stdout
</match>
Kubernetes環境設定
# fluentd-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: kube-system
labels:
k8s-app: fluentd-logging
version: v1
spec:
selector:
matchLabels:
k8s-app: fluentd-logging
version: v1
template:
metadata:
labels:
k8s-app: fluentd-logging
version: v1
spec:
serviceAccount: fluentd
serviceAccountName: fluentd
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule
containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch7-1
env:
- name: FLUENTD_SYSTEMD_CONF
value: "disable"
- name: FLUENTD_PROMETHEUS_CONF
value: "disable"
- name: K8S_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
resources:
limits:
memory: 500Mi
cpu: 500m
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: config-volume
mountPath: /fluentd/etc
terminationGracePeriodSeconds: 30
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: config-volume
configMap:
name: fluentd-config
---
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
namespace: kube-system
data:
fluent.conf: |
<source>
@type tail
@id in_tail_container_logs
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag kubernetes.*
read_from_head true
<parse>
@type json
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<filter kubernetes.**>
@type kubernetes_metadata
</filter>
<match **>
@type elasticsearch
host elasticsearch.default.svc.cluster.local
port 9200
logstash_format true
logstash_prefix k8s
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 5s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
queue_limit_length 8
overflow_action block
</buffer>
</match>
マルチワーカー設定
# fluent.conf
<system>
workers 4
root_dir /tmp/fluentd
</system>
<worker 0>
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/fluentd/nginx-access.log.pos.0
tag nginx.access
format nginx
</source>
</worker>
<worker 1>
<source>
@type tail
path /var/log/app/app1.log
pos_file /var/log/fluentd/app1.log.pos.1
tag app.app1
format json
</source>
</worker>
<worker 2>
<source>
@type tail
path /var/log/app/app2.log
pos_file /var/log/fluentd/app2.log.pos.2
tag app.app2
format json
</source>
</worker>
<worker 3>
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
</worker>
# 共通フィルターと出力
<filter **>
@type record_transformer
<record>
worker_id ${worker_id}
hostname ${hostname}
</record>
</filter>
<match **>
@type elasticsearch
host elasticsearch
port 9200
logstash_format true
<buffer>
@type file
path /var/log/fluentd/buffer
flush_mode interval
flush_interval 10s
flush_thread_count 2
</buffer>
</match>
高可用性設定
# fluent.conf (Primary)
<system>
log_level info
suppress_config_dump
</system>
# フォワード入力
<source>
@type forward
port 24224
bind 0.0.0.0
# セキュリティ設定
<security>
self_hostname primary-fluentd
shared_key your_shared_key_here
</security>
</source>
# プライマリからセカンダリへの複製
<match **>
@type copy
<store>
@type elasticsearch
host elasticsearch-primary
port 9200
index_name primary-logs
<buffer>
@type file
path /var/log/fluentd/primary.buffer
flush_mode interval
flush_interval 5s
</buffer>
</store>
<store>
@type forward
send_timeout 60s
recover_wait 10s
heartbeat_interval 1s
phi_threshold 16
hard_timeout 60s
<server>
name secondary-fluentd
host secondary-fluentd
port 24224
weight 60
</server>
<secondary>
@type file
path /var/log/fluentd/failed_records
</secondary>
<buffer>
@type file
path /var/log/fluentd/forward.buffer
flush_mode interval
flush_interval 5s
retry_forever true
</buffer>
</store>
</match>
カスタムプラグイン開発
# lib/fluent/plugin/filter_custom_enrichment.rb
require 'fluent/plugin/filter'
module Fluent
module Plugin
class CustomEnrichmentFilter < Fluent::Plugin::Filter
Fluent::Plugin.register_filter('custom_enrichment', self)
config_param :enrich_key, :string, default: 'level'
config_param :enrich_map, :hash, default: {}
config_param :default_value, :string, default: 'unknown'
def configure(conf)
super
@enrichment_map = @enrich_map
end
def filter(tag, time, record)
key_value = record[@enrich_key]
if key_value && @enrichment_map.has_key?(key_value)
record['enriched_data'] = @enrichment_map[key_value]
else
record['enriched_data'] = @default_value
end
# タイムスタンプ追加
record['processed_at'] = Time.now.to_f
# タグ情報追加
record['fluentd_tag'] = tag
record
end
end
end
end
パフォーマンス監視設定
# fluent.conf
<system>
enable_input_metrics true
enable_size_metrics true
<metrics>
@type prometheus
port 24231
bind 0.0.0.0
</metrics>
</system>
<source>
@type prometheus
port 24231
</source>
<source>
@type prometheus_monitor
interval 5
<labels>
host ${hostname}
</labels>
</source>
<source>
@type prometheus_output_monitor
interval 5
<labels>
host ${hostname}
</labels>
</source>
# メトリクス出力
<filter **>
@type prometheus
<metric>
name fluentd_input_status_num_records_total
type counter
desc The total number of incoming records
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</filter>
<match **>
@type copy
<store>
@type elasticsearch
host elasticsearch
port 9200
<buffer>
@type file
path /var/log/fluentd/elasticsearch.buffer
flush_mode interval
flush_interval 10s
# メトリクス有効化
enable_size_metrics true
</buffer>
</store>
<store>
@type prometheus
<metric>
name fluentd_output_status_num_records_total
type counter
desc The total number of outgoing records
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</store>
</match>
Docker環境設定
# docker-compose.yml
version: '3.8'
services:
fluentd:
image: fluent/fluentd:v1.16-debian-1
container_name: fluentd
restart: unless-stopped
environment:
- FLUENTD_CONF=fluent.conf
- FLUENTD_OPT=-v
volumes:
- ./fluent.conf:/fluentd/etc/fluent.conf:ro
- ./plugins:/fluentd/plugins:ro
- /var/log:/var/log:ro
- fluentd-buffer:/var/log/fluentd
ports:
- "24224:24224"
- "24224:24224/udp"
- "8888:8888"
- "24231:24231"
networks:
- logging
depends_on:
- elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.16.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
- xpack.security.enabled=false
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
networks:
- logging
kibana:
image: docker.elastic.co/kibana/kibana:8.16.0
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
networks:
- logging
depends_on:
- elasticsearch
volumes:
fluentd-buffer:
elasticsearch-data:
networks:
logging:
driver: bridge
ログ解析とパース設定
# fluent.conf
<source>
@type tail
path /var/log/application/*.log
pos_file /var/log/fluentd/application.log.pos
tag application.logs
<parse>
@type multiline
format_firstline /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}/
format1 /^(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) \[(?<level>[^\]]+)\] \[(?<thread>[^\]]+)\] (?<class>[^ ]+) - (?<message>.*)/
</parse>
</source>
<filter application.logs>
@type parser
key_name message
reserve_data true
<parse>
@type grok
grok_pattern %{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] \[%{DATA:thread}\] %{DATA:class} - %{GREEDYDATA:message}
</parse>
</filter>
<filter application.logs>
@type record_transformer
enable_ruby true
<record>
# タイムスタンプ正規化
normalized_timestamp ${Time.parse(record["timestamp"]).to_f}
# ログレベル正規化
normalized_level ${record["level"].upcase}
# 構造化データ抽出
structured_data ${
begin
if record["message"].start_with?("{")
JSON.parse(record["message"])
else
{"raw_message" => record["message"]}
end
rescue
{"parse_error" => true, "raw_message" => record["message"]}
end
}
</record>
</filter>
<filter application.logs>
@type grep
<regexp>
key level
pattern /^(ERROR|WARN|INFO)$/
</regexp>
</filter>
<match application.logs>
@type elasticsearch
host elasticsearch
port 9200
logstash_format true
logstash_prefix application
type_name application_log
<buffer>
@type file
path /var/log/fluentd/application.buffer
flush_mode interval
flush_interval 30s
flush_thread_count 2
chunk_limit_size 8MB
queue_limit_length 32
retry_max_interval 30
retry_forever true
</buffer>
</match>