Logstash
Open-source server-side data processing pipeline that ingests data from multiple sources, transforms it, and sends it to various destinations. Offers high extensibility with 200+ plugins.
Server
Logstash
Overview
Logstash is an open-source server-side data processing pipeline that ingests data from multiple sources, transforms it, and sends it to various destinations. It offers high extensibility with 200+ plugins and serves as the core data processing engine of the ELK stack. With continuous demand driven by multi-pipeline architecture, cloud-native deployment, and real-time processing capabilities, it remains essential for modern data processing workflows.
Details
Logstash serves as the core data processing engine of the ELK stack with continuous demand in enterprise environments. It features multi-pipeline architecture, cloud-native deployment capabilities, and real-time processing functionality. The platform provides robust ETL (Extract, Transform, Load) capabilities with extensive plugin ecosystem supporting diverse data sources and destinations. With JRuby and Java implementation, it offers both flexibility and performance for complex data transformation requirements.
Key Technical Features
- Multi-pipeline Architecture: Parallel processing with isolated pipeline configurations
- 200+ Plugin Ecosystem: Extensive input, filter, and output plugins
- Real-time Processing: Stream processing with configurable buffering
- Data Transformation: Rich filtering and parsing capabilities
- Horizontal Scaling: Distributed processing across multiple nodes
- Monitoring Integration: Built-in metrics and monitoring endpoints
Use Cases
- Log aggregation and centralization
- Data enrichment and transformation
- Real-time data processing pipelines
- ETL operations for analytics
- Security event processing
- IoT data collection and processing
Pros and Cons
Pros
- Rich Plugin Ecosystem: 200+ plugins for diverse integrations
- Flexible Configuration: Ruby-based DSL for complex processing logic
- Horizontal Scalability: Multi-node deployment capabilities
- Real-time Processing: Low-latency stream processing
- Data Transformation: Powerful filtering and parsing capabilities
- ELK Stack Integration: Seamless integration with Elasticsearch and Kibana
Cons
- Resource Intensive: High memory and CPU usage under load
- Complex Configuration: Learning curve for advanced pipeline setups
- JVM Dependency: Requires Java runtime environment
- Performance Bottlenecks: Single-threaded processing limitations
- Memory Leaks: Potential memory issues with certain plugins
- Plugin Quality Variance: Inconsistent quality across community plugins
Reference Pages
Code Examples
Installation and Basic Setup
# Docker installation
docker pull docker.elastic.co/logstash/logstash:8.16.0
# Run Logstash container
docker run -d \
--name logstash \
-p 5044:5044 \
-p 9600:9600 \
-v "$(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro" \
docker.elastic.co/logstash/logstash:8.16.0
# Package installation (Ubuntu/Debian)
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt-get update && sudo apt-get install logstash
# Start service
sudo systemctl enable logstash
sudo systemctl start logstash
# Check status
sudo systemctl status logstash
/usr/share/logstash/bin/logstash --version
Basic Configuration
# logstash.conf
input {
# File input for log files
file {
path => ["/var/log/nginx/access.log", "/var/log/nginx/error.log"]
start_position => "beginning"
sincedb_path => "/dev/null"
tags => ["nginx"]
}
# Beats input for Filebeat
beats {
port => 5044
host => "0.0.0.0"
}
# Syslog input
syslog {
port => 514
protocol => "udp"
tags => ["syslog"]
}
# HTTP input for webhook data
http {
port => 8080
host => "0.0.0.0"
tags => ["webhook"]
codec => json
}
# TCP input for custom applications
tcp {
port => 9999
codec => json_lines
}
# Kafka input for high-throughput processing
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topics => ["logs", "metrics", "events"]
group_id => "logstash-consumer"
consumer_threads => 4
auto_offset_reset => "earliest"
}
}
filter {
# Parse nginx access logs
if "nginx" in [tags] and [path] =~ "access" {
grok {
match => {
"message" => "%{NGINXACCESS}"
}
tag_on_failure => ["_grokparsefailure_nginx_access"]
}
# Parse timestamp
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
}
# Convert response code to integer
mutate {
convert => {
"response" => "integer"
"bytes" => "integer"
}
}
# Add response category
if [response] >= 200 and [response] < 300 {
mutate { add_field => { "response_category" => "success" } }
} else if [response] >= 400 and [response] < 500 {
mutate { add_field => { "response_category" => "client_error" } }
} else if [response] >= 500 {
mutate { add_field => { "response_category" => "server_error" } }
}
}
# Parse nginx error logs
if "nginx" in [tags] and [path] =~ "error" {
grok {
match => {
"message" => "(?<timestamp>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<severity>\w+)\] (?<pid>\d+)#(?<tid>\d+): (?<error_message>.*)"
}
}
date {
match => [ "timestamp", "yyyy/MM/dd HH:mm:ss" ]
target => "@timestamp"
}
}
# JSON parsing for structured logs
if [message] =~ /^\{.*\}$/ {
json {
source => "message"
target => "parsed"
}
# Promote parsed fields to top level
ruby {
code => "
parsed = event.get('parsed')
if parsed.is_a?(Hash)
parsed.each { |key, value| event.set(key, value) }
event.remove('parsed')
end
"
}
}
# GeoIP enrichment
if [clientip] {
geoip {
source => "clientip"
target => "geoip"
database => "/usr/share/logstash/GeoLite2-City.mmdb"
}
}
# User-Agent parsing
if [agent] {
useragent {
source => "agent"
target => "user_agent"
}
}
# DNS lookup
if [client_ip] {
dns {
reverse => ["client_ip"]
action => "replace"
}
}
# Custom field enrichment
if [service] == "web-api" {
mutate {
add_field => {
"environment" => "production"
"team" => "backend"
"alert_priority" => "high"
}
}
}
# Data anonymization
if [email] {
mutate {
gsub => [ "email", "([^@]+)@(.+)", "***@%{[2]}" ]
}
}
# Remove unwanted fields
mutate {
remove_field => [ "host", "path", "message" ]
}
}
output {
# Output to Elasticsearch
if "nginx" in [tags] {
elasticsearch {
hosts => ["elasticsearch-01:9200", "elasticsearch-02:9200"]
index => "nginx-logs-%{+YYYY.MM.dd}"
template_name => "nginx"
template_pattern => "nginx-*"
template => "/etc/logstash/templates/nginx.json"
template_overwrite => true
manage_template => true
}
}
# Output to different index based on log level
if [level] == "ERROR" {
elasticsearch {
hosts => ["elasticsearch-01:9200", "elasticsearch-02:9200"]
index => "errors-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["elasticsearch-01:9200", "elasticsearch-02:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
}
# Output to Kafka for further processing
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topic_id => "processed-logs"
codec => json
}
# Output to file for backup
file {
path => "/var/log/logstash/output-%{+YYYY-MM-dd}.log"
codec => json_lines
}
# Output to S3 for long-term storage
s3 {
access_key_id => "${AWS_ACCESS_KEY_ID}"
secret_access_key => "${AWS_SECRET_ACCESS_KEY}"
region => "us-east-1"
bucket => "log-archive"
prefix => "logstash/%{+YYYY}/%{+MM}/%{+dd}/"
codec => json_lines
time_file => 60
size_file => 50000000
}
# Output to Redis for real-time processing
redis {
host => "redis.example.com"
port => 6379
key => "logstash-output"
data_type => "list"
}
# Conditional output based on severity
if [severity] == "critical" {
http {
url => "https://alerts.example.com/webhook"
http_method => "post"
format => "json"
headers => {
"Authorization" => "Bearer ${ALERT_TOKEN}"
}
}
}
# Debug output (disable in production)
# stdout { codec => rubydebug }
}
Advanced Multi-Pipeline Configuration
# pipelines.yml
- pipeline.id: nginx-pipeline
path.config: "/etc/logstash/conf.d/nginx.conf"
pipeline.workers: 4
pipeline.batch.size: 1000
pipeline.batch.delay: 50
queue.type: persisted
queue.max_bytes: 2gb
- pipeline.id: application-pipeline
path.config: "/etc/logstash/conf.d/application.conf"
pipeline.workers: 2
pipeline.batch.size: 500
pipeline.batch.delay: 100
- pipeline.id: security-pipeline
path.config: "/etc/logstash/conf.d/security.conf"
pipeline.workers: 1
pipeline.batch.size: 100
pipeline.batch.delay: 10
queue.type: memory
- pipeline.id: metrics-pipeline
path.config: "/etc/logstash/conf.d/metrics.conf"
pipeline.workers: 8
pipeline.batch.size: 2000
pipeline.batch.delay: 25
Performance Tuning Configuration
# logstash.yml
node.name: logstash-production-01
path.data: /var/lib/logstash
path.logs: /var/log/logstash
path.settings: /etc/logstash
# Pipeline settings
pipeline.workers: 8
pipeline.batch.size: 1000
pipeline.batch.delay: 50
# Queue settings
queue.type: persisted
queue.max_bytes: 4gb
queue.checkpoint.writes: 1024
queue.drain: true
# Dead letter queue
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
# HTTP API settings
http.host: "0.0.0.0"
http.port: 9600
# Monitoring
monitoring.enabled: true
monitoring.elasticsearch.hosts: ["http://elasticsearch:9200"]
monitoring.elasticsearch.username: logstash_system
monitoring.elasticsearch.password: "${ELASTIC_PASSWORD}"
# Logging
log.level: info
log.format: json
path.logs: /var/log/logstash
# JVM settings
config.reload.automatic: true
config.reload.interval: 3s
# Security
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: logstash_system
xpack.monitoring.elasticsearch.password: "${MONITORING_PASSWORD}"
Custom Plugin Development
# logstash-filter-custom_enrichment.rb
require "logstash/filters/base"
require "logstash/namespace"
class LogStash::Filters::CustomEnrichment < LogStash::Filters::Base
config_name "custom_enrichment"
# Configuration parameters
config :source_field, :validate => :string, :required => true
config :target_field, :validate => :string, :default => "enriched"
config :lookup_table, :validate => :hash, :default => {}
config :default_value, :validate => :string, :default => "unknown"
public
def register
@logger.info("Custom enrichment filter initialized")
end
public
def filter(event)
source_value = event.get(@source_field)
if source_value
enriched_value = @lookup_table[source_value] || @default_value
event.set(@target_field, enriched_value)
# Add metadata
event.set("[@metadata][enrichment_applied]", true)
event.set("[@metadata][enrichment_timestamp]", Time.now.iso8601)
end
# Filter_matched method must be called for metrics
filter_matched(event)
end
end
# Plugin gemspec
# logstash-filter-custom_enrichment.gemspec
Gem::Specification.new do |s|
s.name = 'logstash-filter-custom_enrichment'
s.version = '1.0.0'
s.licenses = ['Apache License (2.0)']
s.summary = 'Custom enrichment filter for Logstash'
s.description = 'This gem provides custom enrichment capabilities'
s.homepage = 'https://github.com/yourcompany/logstash-filter-custom_enrichment'
s.authors = ['Your Name']
s.email = ['[email protected]']
s.require_paths = ['lib']
s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT']
s.test_files = s.files.grep(%r{^(test|spec|features)/})
s.add_runtime_dependency "logstash-core-plugin-api", "~> 2.0"
s.add_runtime_dependency "logstash-codec-plain"
s.add_development_dependency 'logstash-devutils'
end
Monitoring and Health Checks
# monitoring.conf
input {
http_poller {
urls => {
"logstash_stats" => {
method => get
url => "http://localhost:9600/_node/stats"
headers => {
Accept => "application/json"
}
}
}
request_timeout => 60
schedule => { every => "30s" }
codec => "json"
add_field => { "source" => "logstash_monitoring" }
}
}
filter {
if [source] == "logstash_monitoring" {
# Extract key metrics
ruby {
code => "
stats = event.get('logstash_stats')
if stats
# Pipeline metrics
pipelines = stats.dig('pipelines')
if pipelines
pipelines.each do |pipeline_id, pipeline_stats|
events = pipeline_stats.dig('events')
if events
event.set('pipeline_' + pipeline_id + '_events_in', events['in'])
event.set('pipeline_' + pipeline_id + '_events_out', events['out'])
event.set('pipeline_' + pipeline_id + '_events_filtered', events['filtered'])
end
reloads = pipeline_stats.dig('reloads')
if reloads
event.set('pipeline_' + pipeline_id + '_reloads_successes', reloads['successes'])
event.set('pipeline_' + pipeline_id + '_reloads_failures', reloads['failures'])
end
end
end
# JVM metrics
jvm = stats.dig('jvm')
if jvm
memory = jvm.dig('mem')
if memory
event.set('jvm_heap_used_percent', memory['heap_used_percent'])
event.set('jvm_heap_committed_in_bytes', memory['heap_committed_in_bytes'])
end
gc = jvm.dig('gc', 'collectors')
if gc
event.set('jvm_gc_old_collection_count', gc.dig('old', 'collection_count'))
event.set('jvm_gc_young_collection_count', gc.dig('young', 'collection_count'))
end
end
# Process metrics
process = stats.dig('process')
if process
event.set('process_cpu_percent', process['cpu']['percent'])
event.set('process_mem_total_virtual_in_bytes', process['mem']['total_virtual_in_bytes'])
end
end
"
}
# Health check logic
if [jvm_heap_used_percent] and [jvm_heap_used_percent] > 85 {
mutate {
add_field => { "alert_type" => "high_memory_usage" }
add_field => { "alert_severity" => "warning" }
}
}
if [process_cpu_percent] and [process_cpu_percent] > 80 {
mutate {
add_field => { "alert_type" => "high_cpu_usage" }
add_field => { "alert_severity" => "warning" }
}
}
}
}
output {
if [alert_type] {
http {
url => "https://monitoring.example.com/alerts"
http_method => "post"
format => "json"
headers => {
"Authorization" => "Bearer ${MONITORING_TOKEN}"
}
}
}
elasticsearch {
hosts => ["monitoring-elasticsearch:9200"]
index => "logstash-monitoring-%{+YYYY.MM.dd}"
}
}
Security Configuration
# security.conf
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
ssl_verify_mode => "force_peer"
}
http {
port => 8443
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
ssl_verify_mode => "none"
# Authentication
user => "logstash_user"
password => "${HTTP_PASSWORD}"
}
}
filter {
# Data masking for sensitive information
mutate {
gsub => [
# Credit card numbers
"message", "\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", "****-****-****-****",
# Social security numbers
"message", "\b\d{3}-?\d{2}-?\d{4}\b", "***-**-****",
# Phone numbers
"message", "\b\d{3}[\s.-]?\d{3}[\s.-]?\d{4}\b", "***-***-****"
]
}
# PII field removal
if [personally_identifiable] {
mutate {
remove_field => ["email", "phone", "address", "ssn"]
}
}
# Audit logging for sensitive operations
if [action] in ["login", "logout", "password_change", "privilege_escalation"] {
mutate {
add_field => { "audit_required" => "true" }
add_field => { "retention_period" => "7_years" }
}
}
}
output {
# Secure output to Elasticsearch
elasticsearch {
hosts => ["https://secure-elasticsearch:9200"]
user => "logstash_writer"
password => "${ELASTIC_PASSWORD}"
ssl => true
ssl_certificate_verification => true
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
index => "secure-logs-%{+YYYY.MM.dd}"
}
# Audit trail for sensitive data
if [audit_required] == "true" {
file {
path => "/secure/audit/audit-%{+YYYY-MM-dd}.log"
codec => json_lines
file_mode => 0600
}
}
}
Troubleshooting
# Check Logstash status
sudo systemctl status logstash
curl -X GET "localhost:9600"
# View logs
sudo journalctl -u logstash -f
tail -f /var/log/logstash/logstash-plain.log
# Configuration testing
/usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/
# Pipeline reload
curl -X POST "localhost:9600/_node/pipelines/main/_reload"
# Check pipeline stats
curl -X GET "localhost:9600/_node/stats/pipelines"
# Memory analysis
curl -X GET "localhost:9600/_node/stats/jvm"
jmap -histo $(pgrep java | head -1)
# Thread analysis
curl -X GET "localhost:9600/_node/hot_threads"
jstack $(pgrep java | head -1)
# Performance monitoring
curl -X GET "localhost:9600/_node/stats" | jq '.pipelines.main.events'
# Plugin management
/usr/share/logstash/bin/logstash-plugin list
/usr/share/logstash/bin/logstash-plugin install logstash-output-slack
/usr/share/logstash/bin/logstash-plugin update
# Dead letter queue analysis
ls -la /var/lib/logstash/dead_letter_queue/
/usr/share/logstash/bin/logstash --path.data=/var/lib/logstash -e 'input { dead_letter_queue { path => "/var/lib/logstash/dead_letter_queue" } } output { stdout { } }'
# Configuration validation
/usr/share/logstash/bin/logstash --config.debug -f /etc/logstash/conf.d/
# Benchmark testing
echo '{"test": "message"}' | /usr/share/logstash/bin/logstash -e 'input { stdin { } } output { stdout { } }'
# Resource usage analysis
ps aux | grep logstash
cat /proc/$(pgrep java)/status
lsof -p $(pgrep java | head -1) | wc -l