Fluentd

Open-source data collector that provides unified logging layer. Features 700+ plugin ecosystem and zero-downtime restart capability. CNCF graduated project.

Monitoring ServerLog CollectorUnified LoggingCNCFData PipelineReal-time ProcessingEvent Streaming

Server

Fluentd

Overview

Fluentd is an open-source data collector that provides unified logging layer. It features a 700+ plugin ecosystem and zero-downtime restart capability as a CNCF graduated project. Adopted by 5,000+ companies, it serves as the de facto standard for Kubernetes log collection and added zero-downtime restart capability in v1.18.0.

Details

Fluentd is adopted by over 5,000 companies and is a CNCF graduated project, serving as the de facto standard for Kubernetes log collection. Version 1.18.0 added zero-downtime restart capability, enhancing operational reliability. With Ruby and C implementation providing both flexibility and performance, it offers a rich plugin ecosystem of 700+ extensions. The unified logging layer approach simplifies complex log processing pipelines and enables real-time data processing across diverse environments.

Key Technical Features

  • Unified Logging Layer: Centralized log collection and processing
  • 700+ Plugin Ecosystem: Extensive input, output, and filter plugins
  • Zero-downtime Restart: Graceful restart without data loss
  • High Performance: Memory-efficient processing with buffering
  • Flexible Routing: Tag-based routing and data transformation
  • Cloud Native: First-class Kubernetes and container support

Use Cases

  • Centralized log aggregation
  • Real-time log processing and analysis
  • Data pipeline for analytics
  • Container and Kubernetes logging
  • Event streaming and data routing
  • Log forwarding to multiple destinations

Pros and Cons

Pros

  • CNCF Graduated: Enterprise-grade reliability and community support
  • Rich Plugin Ecosystem: 700+ plugins for diverse data sources
  • Zero-downtime Operations: Seamless configuration updates
  • Memory Efficient: Optimized for high-throughput processing
  • Flexible Configuration: Ruby-based DSL for complex routing
  • Cloud Native Ready: Excellent Kubernetes integration

Cons

  • Ruby Dependency: Requires Ruby runtime environment
  • Configuration Complexity: Learning curve for advanced setups
  • Memory Usage: Can consume significant memory under high load
  • Plugin Quality Variance: Inconsistent quality across plugins
  • Performance Bottlenecks: Single-threaded processing limitations
  • Debugging Difficulty: Complex troubleshooting for pipeline issues

Reference Pages

Code Examples

Installation and Basic Setup

# Install via gem
gem install fluentd

# Install via package manager (Ubuntu/Debian)
curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-focal-td-agent4.sh | sh

# Install via package manager (CentOS/RHEL)
curl -fsSL https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh | sh

# Docker installation
docker pull fluent/fluentd:v1.18-1

# Verify installation
fluentd --version

# Generate configuration file
fluentd --setup ./fluent

# Run Fluentd
fluentd -c fluent.conf -v

Basic Configuration

# fluent.conf
<system>
  workers 4
  root_dir /var/log/fluentd
  log_level info
  suppress_repeated_stacktrace true
  emit_error_log_interval 30s
  suppress_config_dump
  without_source
</system>

# Input sources
<source>
  @type tail
  @id input_tail
  @label @mainstream
  path /var/log/nginx/access.log
  pos_file /var/log/fluentd/nginx-access.log.pos
  tag nginx.access
  <parse>
    @type nginx
    key_name message
    reserve_data true
    reserve_time true
  </parse>
</source>

<source>
  @type tail
  @id input_error_tail
  @label @mainstream
  path /var/log/nginx/error.log
  pos_file /var/log/fluentd/nginx-error.log.pos
  tag nginx.error
  <parse>
    @type multiline
    format_firstline /^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}/
    format1 /^(?<time>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<log_level>\w+)\] (?<pid>\d+).(?<tid>\d+): (?<message>.*)/
  </parse>
</source>

<source>
  @type forward
  @id input_forward
  @label @mainstream
  port 24224
  bind 0.0.0.0
  <security>
    self_hostname "#{Socket.gethostname}"
    shared_key fluentd_secret_key
  </security>
</source>

<source>
  @type http
  @id input_http
  @label @mainstream
  port 8888
  bind 0.0.0.0
  body_size_limit 32m
  keepalive_timeout 10s
  <cors>
    allow_origins ["*"]
    allow_methods ["GET", "POST"]
  </cors>
</source>

# Filters and transformations
<label @mainstream>
  <filter nginx.**>
    @type record_transformer
    <record>
      hostname "#{Socket.gethostname}"
      tag ${tag}
      timestamp ${time}
    </record>
  </filter>

  <filter nginx.access>
    @type parser
    key_name message
    reserve_data true
    inject_key_prefix parsed_
    <parse>
      @type nginx
    </parse>
  </filter>

  <filter **>
    @type grep
    <exclude>
      key message
      pattern /health_check/
    </exclude>
  </filter>

  # Output configurations
  <match nginx.access>
    @type elasticsearch
    @id output_elasticsearch
    host elasticsearch.example.com
    port 9200
    index_name nginx-access
    type_name _doc
    include_tag_key true
    tag_key @log_name
    <buffer>
      @type file
      path /var/log/fluentd/buffers/elasticsearch
      flush_mode interval
      retry_type exponential_backoff
      flush_thread_count 2
      flush_interval 5s
      retry_forever
      retry_max_interval 30
      chunk_limit_size 2M
      queue_limit_length 8
      overflow_action block
    </buffer>
  </match>

  <match nginx.error>
    @type copy
    <store>
      @type elasticsearch
      @id output_elasticsearch_error
      host elasticsearch.example.com
      port 9200
      index_name nginx-error
      type_name _doc
    </store>
    <store>
      @type slack
      webhook_url https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK
      channel "#alerts"
      username "fluentd"
      color danger
      message "Nginx Error: %s"
      message_keys ["message"]
    </store>
  </match>

  <match **>
    @type file
    @id output_file
    path /var/log/fluentd/data.*.log
    symlink_path /var/log/fluentd/data.log
    append true
    time_slice_format %Y%m%d
    time_slice_wait 10m
    time_format %Y%m%dT%H%M%S%z
    buffer_type file
    buffer_path /var/log/fluentd/buffer
    flush_interval 30s
  </match>
</label>

Kubernetes Integration

# fluentd-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: kube-system
  labels:
    k8s-app: fluentd-logging
    version: v1
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-logging
      version: v1
  template:
    metadata:
      labels:
        k8s-app: fluentd-logging
        version: v1
    spec:
      serviceAccount: fluentd
      serviceAccountName: fluentd
      tolerations:
      - key: node-role.kubernetes.io/master
        effect: NoSchedule
      containers:
      - name: fluentd
        image: fluent/fluentd-kubernetes-daemonset:v1.18-debian-elasticsearch7-1
        env:
        - name: FLUENT_ELASTICSEARCH_HOST
          value: "elasticsearch.logging.svc.cluster.local"
        - name: FLUENT_ELASTICSEARCH_PORT
          value: "9200"
        - name: FLUENT_ELASTICSEARCH_SCHEME
          value: "http"
        - name: FLUENTD_SYSTEMD_CONF
          value: disable
        - name: FLUENT_CONTAINER_TAIL_EXCLUDE_PATH
          value: /var/log/containers/fluent*
        - name: FLUENT_CONTAINER_TAIL_PARSER_TYPE
          value: /^(?<time>.+) (?<stream>stdout|stderr)( (?<logtag>.))? (?<log>.*)$/
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: config-volume
          mountPath: /fluentd/etc
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: config-volume
        configMap:
          name: fluentd-config

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-config
  namespace: kube-system
data:
  fluent.conf: |
    <source>
      @type tail
      @id in_tail_container_logs
      path /var/log/containers/*.log
      pos_file /var/log/fluentd-containers.log.pos
      tag raw.kubernetes.*
      read_from_head true
      <parse>
        @type multi_format
        <pattern>
          format json
          time_key time
          time_format %Y-%m-%dT%H:%M:%S.%NZ
        </pattern>
        <pattern>
          format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
          time_format %Y-%m-%dT%H:%M:%S.%N%:z
        </pattern>
      </parse>
    </source>

    <filter raw.kubernetes.**>
      @type kubernetes_metadata
    </filter>

    <match raw.kubernetes.**>
      @type elasticsearch
      @id out_es
      @log_level info
      include_tag_key true
      host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
      port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
      scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
      ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"
      ssl_version "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERSION'] || 'TLSv1_2'}"
      reload_connections false
      reconnect_on_error true
      reload_on_failure true
      log_es_400_reason false
      logstash_prefix "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX'] || 'logstash'}"
      logstash_format true
      index_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_INDEX_NAME'] || 'logstash'}"
      type_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_TYPE_NAME'] || 'fluentd'}"
      <buffer>
        flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}"
        flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}"
        chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '2M'}"
        queue_limit_length "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH'] || '32'}"
        retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '30'}"
        retry_forever true
      </buffer>
    </match>

Custom Plugin Development

# fluent-plugin-custom/lib/fluent/plugin/out_custom.rb
require 'fluent/plugin/output'

module Fluent
  module Plugin
    class CustomOutput < Fluent::Plugin::Output
      Fluent::Plugin.register_output('custom', self)

      # Configuration parameters
      config_param :endpoint, :string
      config_param :api_key, :string, secret: true
      config_param :timeout, :integer, default: 30
      config_param :retry_limit, :integer, default: 3

      def configure(conf)
        super
        @http_client = setup_http_client
      end

      def start
        super
        log.info "Custom output plugin started"
      end

      def shutdown
        super
        @http_client&.close
        log.info "Custom output plugin stopped"
      end

      def process(tag, es)
        es.each do |time, record|
          send_record(tag, time, record)
        end
      end

      private

      def setup_http_client
        require 'net/http'
        require 'uri'
        
        uri = URI.parse(@endpoint)
        http = Net::HTTP.new(uri.host, uri.port)
        http.use_ssl = (uri.scheme == 'https')
        http.read_timeout = @timeout
        http
      end

      def send_record(tag, time, record)
        retries = 0
        
        begin
          payload = {
            tag: tag,
            timestamp: Time.at(time).iso8601,
            record: record
          }.to_json

          request = Net::HTTP::Post.new(@endpoint)
          request['Content-Type'] = 'application/json'
          request['Authorization'] = "Bearer #{@api_key}"
          request.body = payload

          response = @http_client.request(request)
          
          unless response.is_a?(Net::HTTPSuccess)
            raise "HTTP #{response.code}: #{response.body}"
          end

          log.debug "Record sent successfully: #{tag}"

        rescue => e
          retries += 1
          if retries <= @retry_limit
            log.warn "Failed to send record (attempt #{retries}): #{e.message}"
            sleep(2 ** retries)
            retry
          else
            log.error "Failed to send record after #{@retry_limit} attempts: #{e.message}"
            raise
          end
        end
      end
    end
  end
end

# Plugin gemspec
# fluent-plugin-custom.gemspec
Gem::Specification.new do |spec|
  spec.name          = "fluent-plugin-custom"
  spec.version       = "1.0.0"
  spec.authors       = ["Your Name"]
  spec.email         = ["[email protected]"]
  spec.summary       = "Custom Fluentd output plugin"
  spec.description   = "A custom output plugin for Fluentd"
  spec.homepage      = "https://github.com/yourname/fluent-plugin-custom"
  spec.license       = "MIT"

  spec.files         = Dir["lib/**/*", "README.md", "LICENSE"]
  spec.require_paths = ["lib"]

  spec.add_runtime_dependency "fluentd", ">= 1.0", "< 2"
  spec.add_development_dependency "test-unit", ">= 3.0"
end

Monitoring and Health Checks

# monitoring.conf
<system>
  enable_input_metrics true
  enable_size_metrics true
  <metrics>
    @type prometheus
    port 24231
    metrics_path /metrics
  </metrics>
</system>

<source>
  @type monitor_agent
  bind 0.0.0.0
  port 24220
  include_config false
  include_retry false
</source>

<source>
  @type prometheus_monitor
  <labels>
    host #{hostname}
    service fluentd
  </labels>
</source>

<source>
  @type prometheus_output_monitor
  <labels>
    host #{hostname}
    service fluentd
  </labels>
</source>

# Health check endpoint
<source>
  @type http
  port 9880
  bind 0.0.0.0
  <parse>
    @type json
  </parse>
</source>

<match health.check>
  @type null
</match>

Performance Tuning

# performance.conf
<system>
  workers 4
  worker_limits 256MB
  root_dir /tmp/fluentd-buffers/
  log_level warn
  suppress_repeated_stacktrace true
  emit_error_log_interval 30s
  suppress_config_dump
  
  # Memory optimization
  <worker 0>
    <system>
      workers 1
      log_level info
    </system>
  </worker>
</system>

# High-performance input
<source>
  @type forward
  port 24224
  bind 0.0.0.0
  chunk_size_limit 16MB
  skip_invalid_event true
  source_hostname_key hostname
  
  <security>
    self_hostname "#{Socket.gethostname}"
    shared_key your_shared_key_here
  </security>
  
  <client>
    host 192.168.1.10
    network_timeout 10
    heartbeat_type tcp
  </client>
</source>

# Optimized buffering
<match **>
  @type elasticsearch
  @id output_elasticsearch
  host elasticsearch-cluster.example.com
  port 9200
  
  # Connection pooling
  reload_connections false
  reconnect_on_error true
  reload_on_failure true
  
  # Performance settings
  bulk_message_request_threshold 1024
  request_timeout 30s
  
  <buffer tag,time>
    @type file
    path /tmp/fluentd-buffers/elasticsearch
    
    # Buffer settings for high throughput
    flush_mode interval
    flush_interval 5s
    flush_thread_count 8
    
    # Memory settings
    chunk_limit_size 16MB
    chunk_limit_records 10000
    total_limit_size 2GB
    queue_limit_length 256
    
    # Retry settings
    retry_type exponential_backoff
    retry_max_interval 60s
    retry_forever true
    
    # Overflow handling
    overflow_action drop_oldest_chunk
    
    # Compression
    compress gzip
  </buffer>
</match>

Troubleshooting

# Check Fluentd status
sudo systemctl status td-agent

# View logs
sudo tail -f /var/log/td-agent/td-agent.log

# Validate configuration
fluentd --dry-run -c /etc/td-agent/td-agent.conf

# Test configuration syntax
fluentd -c /etc/td-agent/td-agent.conf --check-syntax

# Monitor buffer status
curl http://localhost:24220/api/plugins.json | jq

# Check plugin information
fluentd --show-plugin-config=output:elasticsearch

# Performance monitoring
curl http://localhost:24231/metrics

# Memory usage analysis
ps aux | grep fluentd
cat /proc/$(pgrep fluentd)/status

# Buffer directory analysis
ls -la /var/log/td-agent/buffer/
du -sh /var/log/td-agent/buffer/

# Network connectivity test
telnet elasticsearch.example.com 9200

# Plugin installation
sudo td-agent-gem install fluent-plugin-elasticsearch

# Send test event
echo '{"test": "message"}' | curl -X POST -d @- http://localhost:8888/test.tag