Fluentd
Open-source data collector that provides unified logging layer. Features 700+ plugin ecosystem and zero-downtime restart capability. CNCF graduated project.
Server
Fluentd
Overview
Fluentd is an open-source data collector that provides unified logging layer. It features a 700+ plugin ecosystem and zero-downtime restart capability as a CNCF graduated project. Adopted by 5,000+ companies, it serves as the de facto standard for Kubernetes log collection and added zero-downtime restart capability in v1.18.0.
Details
Fluentd is adopted by over 5,000 companies and is a CNCF graduated project, serving as the de facto standard for Kubernetes log collection. Version 1.18.0 added zero-downtime restart capability, enhancing operational reliability. With Ruby and C implementation providing both flexibility and performance, it offers a rich plugin ecosystem of 700+ extensions. The unified logging layer approach simplifies complex log processing pipelines and enables real-time data processing across diverse environments.
Key Technical Features
- Unified Logging Layer: Centralized log collection and processing
- 700+ Plugin Ecosystem: Extensive input, output, and filter plugins
- Zero-downtime Restart: Graceful restart without data loss
- High Performance: Memory-efficient processing with buffering
- Flexible Routing: Tag-based routing and data transformation
- Cloud Native: First-class Kubernetes and container support
Use Cases
- Centralized log aggregation
- Real-time log processing and analysis
- Data pipeline for analytics
- Container and Kubernetes logging
- Event streaming and data routing
- Log forwarding to multiple destinations
Pros and Cons
Pros
- CNCF Graduated: Enterprise-grade reliability and community support
- Rich Plugin Ecosystem: 700+ plugins for diverse data sources
- Zero-downtime Operations: Seamless configuration updates
- Memory Efficient: Optimized for high-throughput processing
- Flexible Configuration: Ruby-based DSL for complex routing
- Cloud Native Ready: Excellent Kubernetes integration
Cons
- Ruby Dependency: Requires Ruby runtime environment
- Configuration Complexity: Learning curve for advanced setups
- Memory Usage: Can consume significant memory under high load
- Plugin Quality Variance: Inconsistent quality across plugins
- Performance Bottlenecks: Single-threaded processing limitations
- Debugging Difficulty: Complex troubleshooting for pipeline issues
Reference Pages
Code Examples
Installation and Basic Setup
# Install via gem
gem install fluentd
# Install via package manager (Ubuntu/Debian)
curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-focal-td-agent4.sh | sh
# Install via package manager (CentOS/RHEL)
curl -fsSL https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh | sh
# Docker installation
docker pull fluent/fluentd:v1.18-1
# Verify installation
fluentd --version
# Generate configuration file
fluentd --setup ./fluent
# Run Fluentd
fluentd -c fluent.conf -v
Basic Configuration
# fluent.conf
<system>
workers 4
root_dir /var/log/fluentd
log_level info
suppress_repeated_stacktrace true
emit_error_log_interval 30s
suppress_config_dump
without_source
</system>
# Input sources
<source>
@type tail
@id input_tail
@label @mainstream
path /var/log/nginx/access.log
pos_file /var/log/fluentd/nginx-access.log.pos
tag nginx.access
<parse>
@type nginx
key_name message
reserve_data true
reserve_time true
</parse>
</source>
<source>
@type tail
@id input_error_tail
@label @mainstream
path /var/log/nginx/error.log
pos_file /var/log/fluentd/nginx-error.log.pos
tag nginx.error
<parse>
@type multiline
format_firstline /^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}/
format1 /^(?<time>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<log_level>\w+)\] (?<pid>\d+).(?<tid>\d+): (?<message>.*)/
</parse>
</source>
<source>
@type forward
@id input_forward
@label @mainstream
port 24224
bind 0.0.0.0
<security>
self_hostname "#{Socket.gethostname}"
shared_key fluentd_secret_key
</security>
</source>
<source>
@type http
@id input_http
@label @mainstream
port 8888
bind 0.0.0.0
body_size_limit 32m
keepalive_timeout 10s
<cors>
allow_origins ["*"]
allow_methods ["GET", "POST"]
</cors>
</source>
# Filters and transformations
<label @mainstream>
<filter nginx.**>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
tag ${tag}
timestamp ${time}
</record>
</filter>
<filter nginx.access>
@type parser
key_name message
reserve_data true
inject_key_prefix parsed_
<parse>
@type nginx
</parse>
</filter>
<filter **>
@type grep
<exclude>
key message
pattern /health_check/
</exclude>
</filter>
# Output configurations
<match nginx.access>
@type elasticsearch
@id output_elasticsearch
host elasticsearch.example.com
port 9200
index_name nginx-access
type_name _doc
include_tag_key true
tag_key @log_name
<buffer>
@type file
path /var/log/fluentd/buffers/elasticsearch
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 5s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
queue_limit_length 8
overflow_action block
</buffer>
</match>
<match nginx.error>
@type copy
<store>
@type elasticsearch
@id output_elasticsearch_error
host elasticsearch.example.com
port 9200
index_name nginx-error
type_name _doc
</store>
<store>
@type slack
webhook_url https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
channel "#alerts"
username "fluentd"
color danger
message "Nginx Error: %s"
message_keys ["message"]
</store>
</match>
<match **>
@type file
@id output_file
path /var/log/fluentd/data.*.log
symlink_path /var/log/fluentd/data.log
append true
time_slice_format %Y%m%d
time_slice_wait 10m
time_format %Y%m%dT%H%M%S%z
buffer_type file
buffer_path /var/log/fluentd/buffer
flush_interval 30s
</match>
</label>
Kubernetes Integration
# fluentd-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: kube-system
labels:
k8s-app: fluentd-logging
version: v1
spec:
selector:
matchLabels:
k8s-app: fluentd-logging
version: v1
template:
metadata:
labels:
k8s-app: fluentd-logging
version: v1
spec:
serviceAccount: fluentd
serviceAccountName: fluentd
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule
containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1.18-debian-elasticsearch7-1
env:
- name: FLUENT_ELASTICSEARCH_HOST
value: "elasticsearch.logging.svc.cluster.local"
- name: FLUENT_ELASTICSEARCH_PORT
value: "9200"
- name: FLUENT_ELASTICSEARCH_SCHEME
value: "http"
- name: FLUENTD_SYSTEMD_CONF
value: disable
- name: FLUENT_CONTAINER_TAIL_EXCLUDE_PATH
value: /var/log/containers/fluent*
- name: FLUENT_CONTAINER_TAIL_PARSER_TYPE
value: /^(?<time>.+) (?<stream>stdout|stderr)( (?<logtag>.))? (?<log>.*)$/
resources:
limits:
memory: 500Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: config-volume
mountPath: /fluentd/etc
terminationGracePeriodSeconds: 30
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: config-volume
configMap:
name: fluentd-config
---
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-config
namespace: kube-system
data:
fluent.conf: |
<source>
@type tail
@id in_tail_container_logs
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag raw.kubernetes.*
read_from_head true
<parse>
@type multi_format
<pattern>
format json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
</pattern>
<pattern>
format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
time_format %Y-%m-%dT%H:%M:%S.%N%:z
</pattern>
</parse>
</source>
<filter raw.kubernetes.**>
@type kubernetes_metadata
</filter>
<match raw.kubernetes.**>
@type elasticsearch
@id out_es
@log_level info
include_tag_key true
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"
ssl_version "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERSION'] || 'TLSv1_2'}"
reload_connections false
reconnect_on_error true
reload_on_failure true
log_es_400_reason false
logstash_prefix "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX'] || 'logstash'}"
logstash_format true
index_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_INDEX_NAME'] || 'logstash'}"
type_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_TYPE_NAME'] || 'fluentd'}"
<buffer>
flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}"
flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}"
chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '2M'}"
queue_limit_length "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH'] || '32'}"
retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '30'}"
retry_forever true
</buffer>
</match>
Custom Plugin Development
# fluent-plugin-custom/lib/fluent/plugin/out_custom.rb
require 'fluent/plugin/output'
module Fluent
module Plugin
class CustomOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('custom', self)
# Configuration parameters
config_param :endpoint, :string
config_param :api_key, :string, secret: true
config_param :timeout, :integer, default: 30
config_param :retry_limit, :integer, default: 3
def configure(conf)
super
@http_client = setup_http_client
end
def start
super
log.info "Custom output plugin started"
end
def shutdown
super
@http_client&.close
log.info "Custom output plugin stopped"
end
def process(tag, es)
es.each do |time, record|
send_record(tag, time, record)
end
end
private
def setup_http_client
require 'net/http'
require 'uri'
uri = URI.parse(@endpoint)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = (uri.scheme == 'https')
http.read_timeout = @timeout
http
end
def send_record(tag, time, record)
retries = 0
begin
payload = {
tag: tag,
timestamp: Time.at(time).iso8601,
record: record
}.to_json
request = Net::HTTP::Post.new(@endpoint)
request['Content-Type'] = 'application/json'
request['Authorization'] = "Bearer #{@api_key}"
request.body = payload
response = @http_client.request(request)
unless response.is_a?(Net::HTTPSuccess)
raise "HTTP #{response.code}: #{response.body}"
end
log.debug "Record sent successfully: #{tag}"
rescue => e
retries += 1
if retries <= @retry_limit
log.warn "Failed to send record (attempt #{retries}): #{e.message}"
sleep(2 ** retries)
retry
else
log.error "Failed to send record after #{@retry_limit} attempts: #{e.message}"
raise
end
end
end
end
end
end
# Plugin gemspec
# fluent-plugin-custom.gemspec
Gem::Specification.new do |spec|
spec.name = "fluent-plugin-custom"
spec.version = "1.0.0"
spec.authors = ["Your Name"]
spec.email = ["[email protected]"]
spec.summary = "Custom Fluentd output plugin"
spec.description = "A custom output plugin for Fluentd"
spec.homepage = "https://github.com/yourname/fluent-plugin-custom"
spec.license = "MIT"
spec.files = Dir["lib/**/*", "README.md", "LICENSE"]
spec.require_paths = ["lib"]
spec.add_runtime_dependency "fluentd", ">= 1.0", "< 2"
spec.add_development_dependency "test-unit", ">= 3.0"
end
Monitoring and Health Checks
# monitoring.conf
<system>
enable_input_metrics true
enable_size_metrics true
<metrics>
@type prometheus
port 24231
metrics_path /metrics
</metrics>
</system>
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
include_config false
include_retry false
</source>
<source>
@type prometheus_monitor
<labels>
host #{hostname}
service fluentd
</labels>
</source>
<source>
@type prometheus_output_monitor
<labels>
host #{hostname}
service fluentd
</labels>
</source>
# Health check endpoint
<source>
@type http
port 9880
bind 0.0.0.0
<parse>
@type json
</parse>
</source>
<match health.check>
@type null
</match>
Performance Tuning
# performance.conf
<system>
workers 4
worker_limits 256MB
root_dir /tmp/fluentd-buffers/
log_level warn
suppress_repeated_stacktrace true
emit_error_log_interval 30s
suppress_config_dump
# Memory optimization
<worker 0>
<system>
workers 1
log_level info
</system>
</worker>
</system>
# High-performance input
<source>
@type forward
port 24224
bind 0.0.0.0
chunk_size_limit 16MB
skip_invalid_event true
source_hostname_key hostname
<security>
self_hostname "#{Socket.gethostname}"
shared_key your_shared_key_here
</security>
<client>
host 192.168.1.10
network_timeout 10
heartbeat_type tcp
</client>
</source>
# Optimized buffering
<match **>
@type elasticsearch
@id output_elasticsearch
host elasticsearch-cluster.example.com
port 9200
# Connection pooling
reload_connections false
reconnect_on_error true
reload_on_failure true
# Performance settings
bulk_message_request_threshold 1024
request_timeout 30s
<buffer tag,time>
@type file
path /tmp/fluentd-buffers/elasticsearch
# Buffer settings for high throughput
flush_mode interval
flush_interval 5s
flush_thread_count 8
# Memory settings
chunk_limit_size 16MB
chunk_limit_records 10000
total_limit_size 2GB
queue_limit_length 256
# Retry settings
retry_type exponential_backoff
retry_max_interval 60s
retry_forever true
# Overflow handling
overflow_action drop_oldest_chunk
# Compression
compress gzip
</buffer>
</match>
Troubleshooting
# Check Fluentd status
sudo systemctl status td-agent
# View logs
sudo tail -f /var/log/td-agent/td-agent.log
# Validate configuration
fluentd --dry-run -c /etc/td-agent/td-agent.conf
# Test configuration syntax
fluentd -c /etc/td-agent/td-agent.conf --check-syntax
# Monitor buffer status
curl http://localhost:24220/api/plugins.json | jq
# Check plugin information
fluentd --show-plugin-config=output:elasticsearch
# Performance monitoring
curl http://localhost:24231/metrics
# Memory usage analysis
ps aux | grep fluentd
cat /proc/$(pgrep fluentd)/status
# Buffer directory analysis
ls -la /var/log/td-agent/buffer/
du -sh /var/log/td-agent/buffer/
# Network connectivity test
telnet elasticsearch.example.com 9200
# Plugin installation
sudo td-agent-gem install fluent-plugin-elasticsearch
# Send test event
echo '{"test": "message"}' | curl -X POST -d @- http://localhost:8888/test.tag