Logstash

Open-source server-side data processing pipeline that ingests data from multiple sources, transforms it, and sends it to various destinations. Offers high extensibility with 200+ plugins.

Monitoring ServerData Processing PipelineELK StackETLLog ProcessingReal-time ProcessingData Transformation

Server

Logstash

Overview

Logstash is an open-source server-side data processing pipeline that ingests data from multiple sources, transforms it, and sends it to various destinations. It offers high extensibility with 200+ plugins and serves as the core data processing engine of the ELK stack. With continuous demand driven by multi-pipeline architecture, cloud-native deployment, and real-time processing capabilities, it remains essential for modern data processing workflows.

Details

Logstash serves as the core data processing engine of the ELK stack with continuous demand in enterprise environments. It features multi-pipeline architecture, cloud-native deployment capabilities, and real-time processing functionality. The platform provides robust ETL (Extract, Transform, Load) capabilities with extensive plugin ecosystem supporting diverse data sources and destinations. With JRuby and Java implementation, it offers both flexibility and performance for complex data transformation requirements.

Key Technical Features

  • Multi-pipeline Architecture: Parallel processing with isolated pipeline configurations
  • 200+ Plugin Ecosystem: Extensive input, filter, and output plugins
  • Real-time Processing: Stream processing with configurable buffering
  • Data Transformation: Rich filtering and parsing capabilities
  • Horizontal Scaling: Distributed processing across multiple nodes
  • Monitoring Integration: Built-in metrics and monitoring endpoints

Use Cases

  • Log aggregation and centralization
  • Data enrichment and transformation
  • Real-time data processing pipelines
  • ETL operations for analytics
  • Security event processing
  • IoT data collection and processing

Pros and Cons

Pros

  • Rich Plugin Ecosystem: 200+ plugins for diverse integrations
  • Flexible Configuration: Ruby-based DSL for complex processing logic
  • Horizontal Scalability: Multi-node deployment capabilities
  • Real-time Processing: Low-latency stream processing
  • Data Transformation: Powerful filtering and parsing capabilities
  • ELK Stack Integration: Seamless integration with Elasticsearch and Kibana

Cons

  • Resource Intensive: High memory and CPU usage under load
  • Complex Configuration: Learning curve for advanced pipeline setups
  • JVM Dependency: Requires Java runtime environment
  • Performance Bottlenecks: Single-threaded processing limitations
  • Memory Leaks: Potential memory issues with certain plugins
  • Plugin Quality Variance: Inconsistent quality across community plugins

Reference Pages

Code Examples

Installation and Basic Setup

# Docker installation
docker pull docker.elastic.co/logstash/logstash:8.16.0

# Run Logstash container
docker run -d \
  --name logstash \
  -p 5044:5044 \
  -p 9600:9600 \
  -v "$(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro" \
  docker.elastic.co/logstash/logstash:8.16.0

# Package installation (Ubuntu/Debian)
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt-get update && sudo apt-get install logstash

# Start service
sudo systemctl enable logstash
sudo systemctl start logstash

# Check status
sudo systemctl status logstash
/usr/share/logstash/bin/logstash --version

Basic Configuration

# logstash.conf
input {
  # File input for log files
  file {
    path => ["/var/log/nginx/access.log", "/var/log/nginx/error.log"]
    start_position => "beginning"
    sincedb_path => "/dev/null"
    tags => ["nginx"]
  }
  
  # Beats input for Filebeat
  beats {
    port => 5044
    host => "0.0.0.0"
  }
  
  # Syslog input
  syslog {
    port => 514
    protocol => "udp"
    tags => ["syslog"]
  }
  
  # HTTP input for webhook data
  http {
    port => 8080
    host => "0.0.0.0"
    tags => ["webhook"]
    codec => json
  }
  
  # TCP input for custom applications
  tcp {
    port => 9999
    codec => json_lines
  }
  
  # Kafka input for high-throughput processing
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topics => ["logs", "metrics", "events"]
    group_id => "logstash-consumer"
    consumer_threads => 4
    auto_offset_reset => "earliest"
  }
}

filter {
  # Parse nginx access logs
  if "nginx" in [tags] and [path] =~ "access" {
    grok {
      match => { 
        "message" => "%{NGINXACCESS}" 
      }
      tag_on_failure => ["_grokparsefailure_nginx_access"]
    }
    
    # Parse timestamp
    date {
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      target => "@timestamp"
    }
    
    # Convert response code to integer
    mutate {
      convert => { 
        "response" => "integer"
        "bytes" => "integer" 
      }
    }
    
    # Add response category
    if [response] >= 200 and [response] < 300 {
      mutate { add_field => { "response_category" => "success" } }
    } else if [response] >= 400 and [response] < 500 {
      mutate { add_field => { "response_category" => "client_error" } }
    } else if [response] >= 500 {
      mutate { add_field => { "response_category" => "server_error" } }
    }
  }
  
  # Parse nginx error logs
  if "nginx" in [tags] and [path] =~ "error" {
    grok {
      match => { 
        "message" => "(?<timestamp>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<severity>\w+)\] (?<pid>\d+)#(?<tid>\d+): (?<error_message>.*)" 
      }
    }
    
    date {
      match => [ "timestamp", "yyyy/MM/dd HH:mm:ss" ]
      target => "@timestamp"
    }
  }
  
  # JSON parsing for structured logs
  if [message] =~ /^\{.*\}$/ {
    json {
      source => "message"
      target => "parsed"
    }
    
    # Promote parsed fields to top level
    ruby {
      code => "
        parsed = event.get('parsed')
        if parsed.is_a?(Hash)
          parsed.each { |key, value| event.set(key, value) }
          event.remove('parsed')
        end
      "
    }
  }
  
  # GeoIP enrichment
  if [clientip] {
    geoip {
      source => "clientip"
      target => "geoip"
      database => "/usr/share/logstash/GeoLite2-City.mmdb"
    }
  }
  
  # User-Agent parsing
  if [agent] {
    useragent {
      source => "agent"
      target => "user_agent"
    }
  }
  
  # DNS lookup
  if [client_ip] {
    dns {
      reverse => ["client_ip"]
      action => "replace"
    }
  }
  
  # Custom field enrichment
  if [service] == "web-api" {
    mutate {
      add_field => { 
        "environment" => "production"
        "team" => "backend"
        "alert_priority" => "high"
      }
    }
  }
  
  # Data anonymization
  if [email] {
    mutate {
      gsub => [ "email", "([^@]+)@(.+)", "***@%{[2]}" ]
    }
  }
  
  # Remove unwanted fields
  mutate {
    remove_field => [ "host", "path", "message" ]
  }
}

output {
  # Output to Elasticsearch
  if "nginx" in [tags] {
    elasticsearch {
      hosts => ["elasticsearch-01:9200", "elasticsearch-02:9200"]
      index => "nginx-logs-%{+YYYY.MM.dd}"
      template_name => "nginx"
      template_pattern => "nginx-*"
      template => "/etc/logstash/templates/nginx.json"
      template_overwrite => true
      manage_template => true
    }
  }
  
  # Output to different index based on log level
  if [level] == "ERROR" {
    elasticsearch {
      hosts => ["elasticsearch-01:9200", "elasticsearch-02:9200"]
      index => "errors-%{+YYYY.MM.dd}"
    }
  } else {
    elasticsearch {
      hosts => ["elasticsearch-01:9200", "elasticsearch-02:9200"]
      index => "logs-%{+YYYY.MM.dd}"
    }
  }
  
  # Output to Kafka for further processing
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topic_id => "processed-logs"
    codec => json
  }
  
  # Output to file for backup
  file {
    path => "/var/log/logstash/output-%{+YYYY-MM-dd}.log"
    codec => json_lines
  }
  
  # Output to S3 for long-term storage
  s3 {
    access_key_id => "${AWS_ACCESS_KEY_ID}"
    secret_access_key => "${AWS_SECRET_ACCESS_KEY}"
    region => "us-east-1"
    bucket => "log-archive"
    prefix => "logstash/%{+YYYY}/%{+MM}/%{+dd}/"
    codec => json_lines
    time_file => 60
    size_file => 50000000
  }
  
  # Output to Redis for real-time processing
  redis {
    host => "redis.example.com"
    port => 6379
    key => "logstash-output"
    data_type => "list"
  }
  
  # Conditional output based on severity
  if [severity] == "critical" {
    http {
      url => "https://alerts.example.com/webhook"
      http_method => "post"
      format => "json"
      headers => {
        "Authorization" => "Bearer ${ALERT_TOKEN}"
      }
    }
  }
  
  # Debug output (disable in production)
  # stdout { codec => rubydebug }
}

Advanced Multi-Pipeline Configuration

# pipelines.yml
- pipeline.id: nginx-pipeline
  path.config: "/etc/logstash/conf.d/nginx.conf"
  pipeline.workers: 4
  pipeline.batch.size: 1000
  pipeline.batch.delay: 50
  queue.type: persisted
  queue.max_bytes: 2gb

- pipeline.id: application-pipeline
  path.config: "/etc/logstash/conf.d/application.conf"
  pipeline.workers: 2
  pipeline.batch.size: 500
  pipeline.batch.delay: 100

- pipeline.id: security-pipeline
  path.config: "/etc/logstash/conf.d/security.conf"
  pipeline.workers: 1
  pipeline.batch.size: 100
  pipeline.batch.delay: 10
  queue.type: memory

- pipeline.id: metrics-pipeline
  path.config: "/etc/logstash/conf.d/metrics.conf"
  pipeline.workers: 8
  pipeline.batch.size: 2000
  pipeline.batch.delay: 25

Performance Tuning Configuration

# logstash.yml
node.name: logstash-production-01
path.data: /var/lib/logstash
path.logs: /var/log/logstash
path.settings: /etc/logstash

# Pipeline settings
pipeline.workers: 8
pipeline.batch.size: 1000
pipeline.batch.delay: 50

# Queue settings
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024
queue.drain: true

# Dead letter queue
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb

# HTTP API settings
http.host: "0.0.0.0"
http.port: 9600

# Monitoring
monitoring.enabled: true
monitoring.elasticsearch.hosts: ["http://elasticsearch:9200"]
monitoring.elasticsearch.username: logstash_system
monitoring.elasticsearch.password: "${ELASTIC_PASSWORD}"

# Logging
log.level: info
log.format: json
path.logs: /var/log/logstash

# JVM settings
config.reload.automatic: true
config.reload.interval: 3s

# Security
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: logstash_system
xpack.monitoring.elasticsearch.password: "${MONITORING_PASSWORD}"

Custom Plugin Development

# logstash-filter-custom_enrichment.rb
require "logstash/filters/base"
require "logstash/namespace"

class LogStash::Filters::CustomEnrichment < LogStash::Filters::Base
  config_name "custom_enrichment"
  
  # Configuration parameters
  config :source_field, :validate => :string, :required => true
  config :target_field, :validate => :string, :default => "enriched"
  config :lookup_table, :validate => :hash, :default => {}
  config :default_value, :validate => :string, :default => "unknown"
  
  public
  def register
    @logger.info("Custom enrichment filter initialized")
  end

  public
  def filter(event)
    source_value = event.get(@source_field)
    
    if source_value
      enriched_value = @lookup_table[source_value] || @default_value
      event.set(@target_field, enriched_value)
      
      # Add metadata
      event.set("[@metadata][enrichment_applied]", true)
      event.set("[@metadata][enrichment_timestamp]", Time.now.iso8601)
    end

    # Filter_matched method must be called for metrics
    filter_matched(event)
  end
end

# Plugin gemspec
# logstash-filter-custom_enrichment.gemspec
Gem::Specification.new do |s|
  s.name          = 'logstash-filter-custom_enrichment'
  s.version       = '1.0.0'
  s.licenses      = ['Apache License (2.0)']
  s.summary       = 'Custom enrichment filter for Logstash'
  s.description   = 'This gem provides custom enrichment capabilities'
  s.homepage      = 'https://github.com/yourcompany/logstash-filter-custom_enrichment'
  s.authors       = ['Your Name']
  s.email         = ['[email protected]']
  s.require_paths = ['lib']

  s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT']

  s.test_files = s.files.grep(%r{^(test|spec|features)/})

  s.add_runtime_dependency "logstash-core-plugin-api", "~> 2.0"
  s.add_runtime_dependency "logstash-codec-plain"
  s.add_development_dependency 'logstash-devutils'
end

Monitoring and Health Checks

# monitoring.conf
input {
  http_poller {
    urls => {
      "logstash_stats" => {
        method => get
        url => "http://localhost:9600/_node/stats"
        headers => {
          Accept => "application/json"
        }
      }
    }
    request_timeout => 60
    schedule => { every => "30s" }
    codec => "json"
    add_field => { "source" => "logstash_monitoring" }
  }
}

filter {
  if [source] == "logstash_monitoring" {
    # Extract key metrics
    ruby {
      code => "
        stats = event.get('logstash_stats')
        if stats
          # Pipeline metrics
          pipelines = stats.dig('pipelines')
          if pipelines
            pipelines.each do |pipeline_id, pipeline_stats|
              events = pipeline_stats.dig('events')
              if events
                event.set('pipeline_' + pipeline_id + '_events_in', events['in'])
                event.set('pipeline_' + pipeline_id + '_events_out', events['out'])
                event.set('pipeline_' + pipeline_id + '_events_filtered', events['filtered'])
              end
              
              reloads = pipeline_stats.dig('reloads')
              if reloads
                event.set('pipeline_' + pipeline_id + '_reloads_successes', reloads['successes'])
                event.set('pipeline_' + pipeline_id + '_reloads_failures', reloads['failures'])
              end
            end
          end
          
          # JVM metrics
          jvm = stats.dig('jvm')
          if jvm
            memory = jvm.dig('mem')
            if memory
              event.set('jvm_heap_used_percent', memory['heap_used_percent'])
              event.set('jvm_heap_committed_in_bytes', memory['heap_committed_in_bytes'])
            end
            
            gc = jvm.dig('gc', 'collectors')
            if gc
              event.set('jvm_gc_old_collection_count', gc.dig('old', 'collection_count'))
              event.set('jvm_gc_young_collection_count', gc.dig('young', 'collection_count'))
            end
          end
          
          # Process metrics
          process = stats.dig('process')
          if process
            event.set('process_cpu_percent', process['cpu']['percent'])
            event.set('process_mem_total_virtual_in_bytes', process['mem']['total_virtual_in_bytes'])
          end
        end
      "
    }
    
    # Health check logic
    if [jvm_heap_used_percent] and [jvm_heap_used_percent] > 85 {
      mutate {
        add_field => { "alert_type" => "high_memory_usage" }
        add_field => { "alert_severity" => "warning" }
      }
    }
    
    if [process_cpu_percent] and [process_cpu_percent] > 80 {
      mutate {
        add_field => { "alert_type" => "high_cpu_usage" }
        add_field => { "alert_severity" => "warning" }
      }
    }
  }
}

output {
  if [alert_type] {
    http {
      url => "https://monitoring.example.com/alerts"
      http_method => "post"
      format => "json"
      headers => {
        "Authorization" => "Bearer ${MONITORING_TOKEN}"
      }
    }
  }
  
  elasticsearch {
    hosts => ["monitoring-elasticsearch:9200"]
    index => "logstash-monitoring-%{+YYYY.MM.dd}"
  }
}

Security Configuration

# security.conf
input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
    ssl_verify_mode => "force_peer"
  }
  
  http {
    port => 8443
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
    ssl_verify_mode => "none"
    
    # Authentication
    user => "logstash_user"
    password => "${HTTP_PASSWORD}"
  }
}

filter {
  # Data masking for sensitive information
  mutate {
    gsub => [
      # Credit card numbers
      "message", "\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", "****-****-****-****",
      # Social security numbers
      "message", "\b\d{3}-?\d{2}-?\d{4}\b", "***-**-****",
      # Phone numbers
      "message", "\b\d{3}[\s.-]?\d{3}[\s.-]?\d{4}\b", "***-***-****"
    ]
  }
  
  # PII field removal
  if [personally_identifiable] {
    mutate {
      remove_field => ["email", "phone", "address", "ssn"]
    }
  }
  
  # Audit logging for sensitive operations
  if [action] in ["login", "logout", "password_change", "privilege_escalation"] {
    mutate {
      add_field => { "audit_required" => "true" }
      add_field => { "retention_period" => "7_years" }
    }
  }
}

output {
  # Secure output to Elasticsearch
  elasticsearch {
    hosts => ["https://secure-elasticsearch:9200"]
    user => "logstash_writer"
    password => "${ELASTIC_PASSWORD}"
    ssl => true
    ssl_certificate_verification => true
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
    index => "secure-logs-%{+YYYY.MM.dd}"
  }
  
  # Audit trail for sensitive data
  if [audit_required] == "true" {
    file {
      path => "/secure/audit/audit-%{+YYYY-MM-dd}.log"
      codec => json_lines
      file_mode => 0600
    }
  }
}

Troubleshooting

# Check Logstash status
sudo systemctl status logstash
curl -X GET "localhost:9600"

# View logs
sudo journalctl -u logstash -f
tail -f /var/log/logstash/logstash-plain.log

# Configuration testing
/usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/

# Pipeline reload
curl -X POST "localhost:9600/_node/pipelines/main/_reload"

# Check pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines"

# Memory analysis
curl -X GET "localhost:9600/_node/stats/jvm"
jmap -histo $(pgrep java | head -1)

# Thread analysis
curl -X GET "localhost:9600/_node/hot_threads"
jstack $(pgrep java | head -1)

# Performance monitoring
curl -X GET "localhost:9600/_node/stats" | jq '.pipelines.main.events'

# Plugin management
/usr/share/logstash/bin/logstash-plugin list
/usr/share/logstash/bin/logstash-plugin install logstash-output-slack
/usr/share/logstash/bin/logstash-plugin update

# Dead letter queue analysis
ls -la /var/lib/logstash/dead_letter_queue/
/usr/share/logstash/bin/logstash --path.data=/var/lib/logstash -e 'input { dead_letter_queue { path => "/var/lib/logstash/dead_letter_queue" } } output { stdout { } }'

# Configuration validation
/usr/share/logstash/bin/logstash --config.debug -f /etc/logstash/conf.d/

# Benchmark testing
echo '{"test": "message"}' | /usr/share/logstash/bin/logstash -e 'input { stdin { } } output { stdout { } }'

# Resource usage analysis
ps aux | grep logstash
cat /proc/$(pgrep java)/status
lsof -p $(pgrep java | head -1) | wc -l