Logstash
オープンソースのサーバーサイドデータ処理パイプライン。複数のソースからデータを取得し、変換・加工して任意の保存先に送信。200以上のプラグインで高い拡張性を提供。
監視サーバー
Logstash
概要
Logstashはオープンソースのサーバーサイドデータ処理パイプラインです。複数のソースからデータを取得し、変換・加工して任意の保存先に送信します。200以上のプラグインで高い拡張性を提供し、ELKスタックの中核データ処理エンジンとして、マルチパイプラインアーキテクチャ、クラウドネイティブ展開、リアルタイム処理機能で継続的な需要を獲得しています。
詳細
Logstashは2009年にJordan Sisselによって開発が開始され、現在ではELKスタックの中核データ処理エンジンとして位置づけられています。マルチパイプラインアーキテクチャ、クラウドネイティブ展開、リアルタイム処理機能で継続的な需要を維持し、バージョン8.x系列では更なる機能強化が図られています。200以上のプラグインエコシステムにより、多様なデータソースとの統合が可能です。
主要な技術的特徴
- Input-Filter-Output架構: 明確な3段階データ処理パイプライン
- 豊富なプラグイン: 200以上の入力・フィルター・出力プラグイン
- マルチパイプライン: 複数の独立したパイプライン並行実行
- リアルタイム処理: ストリーミングデータの即座な変換
- スケーラビリティ: 水平・垂直スケーリング対応
用途
- ログ収集と正規化
- データ変換とETL処理
- リアルタイムデータストリーミング
- システム統合とデータルーティング
- セキュリティログ分析
メリット・デメリット
メリット
- 豊富なプラグイン: 200以上のプラグインエコシステム
- 柔軟なデータ変換: 強力なフィルター機能
- ELKスタック統合: Elasticsearch、Kibanaとのシームレス連携
- 高い拡張性: カスタムプラグイン開発可能
- リアルタイム処理: 低レイテンシーデータ処理
- オープンソース: 無料で高機能なデータ処理エンジン
デメリット
- リソース消費: JRubyベースによる高いメモリ使用量
- パフォーマンス: 複雑なフィルター処理時の性能低下
- 設定複雑性: 高度な設定の学習コスト
- JVMチューニング: Java仮想マシンの最適化が必要
- デバッグ困難: パイプライン問題の特定が複雑
参考ページ
書き方の例
基本的なLogstash設定
# logstash.conf
input {
beats {
port => 5044
}
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
codec => "plain"
}
http {
port => 8080
codec => "json"
}
tcp {
port => 5000
codec => "json_lines"
}
syslog {
port => 514
}
}
filter {
if [fileset][module] == "nginx" {
if [fileset][name] == "access" {
grok {
match => {
"message" => "%{NGINXACCESS}"
}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
mutate {
convert => {
"response" => "integer"
"bytes" => "integer"
"responsetime" => "float"
}
}
geoip {
source => "clientip"
target => "geoip"
}
}
}
if [level] == "ERROR" {
mutate {
add_tag => [ "error" ]
}
}
ruby {
code => "event.set('processed_at', Time.now)"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
template_name => "logstash"
template => "/etc/logstash/templates/logstash.json"
template_overwrite => true
}
if "error" in [tags] {
slack {
url => "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
channel => "#alerts"
username => "Logstash"
format => "Error detected: %{message}"
}
}
stdout {
codec => rubydebug
}
}
複雑なログ解析設定
# advanced-parsing.conf
input {
beats {
port => 5044
}
}
filter {
# Apache Combined Log Format
if [fields][logtype] == "apache" {
grok {
match => {
"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) "%{DATA:referrer}" "%{DATA:agent}"'
}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
if [response] >= 400 {
mutate {
add_tag => [ "error_response" ]
}
}
useragent {
source => "agent"
target => "user_agent"
}
geoip {
source => "clientip"
target => "geoip"
}
}
# JSON Application Logs
if [fields][logtype] == "application" {
json {
source => "message"
}
date {
match => [ "timestamp", "ISO8601" ]
}
if [level] in ["ERROR", "FATAL"] {
mutate {
add_tag => [ "application_error" ]
}
}
# Extract stack trace
if [exception] {
mutate {
add_field => { "has_exception" => "true" }
}
}
}
# System Logs
if [fields][logtype] == "system" {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:timestamp} %{IPORHOST:server} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:log_message}"
}
}
date {
match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
# Common enrichment
mutate {
add_field => {
"[@metadata][index_prefix]" => "logs"
"[@metadata][document_type]" => "%{[fields][logtype]}"
}
}
# Remove unnecessary fields
mutate {
remove_field => [ "message", "fields", "host", "beat", "prospector" ]
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][index_prefix]}-%{[@metadata][document_type]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][document_type]}"
}
}
マルチパイプライン設定
# pipelines.yml
- pipeline.id: web-logs
path.config: "/etc/logstash/conf.d/web-logs.conf"
pipeline.workers: 4
pipeline.batch.size: 1000
pipeline.batch.delay: 50
- pipeline.id: application-logs
path.config: "/etc/logstash/conf.d/application-logs.conf"
pipeline.workers: 2
pipeline.batch.size: 500
- pipeline.id: security-logs
path.config: "/etc/logstash/conf.d/security-logs.conf"
pipeline.workers: 1
pipeline.batch.size: 250
queue.type: persisted
queue.max_bytes: 1gb
パフォーマンスチューニング設定
# logstash.yml
node.name: logstash-node-1
# パフォーマンス設定
pipeline.workers: 4
pipeline.batch.size: 1000
pipeline.batch.delay: 50
# メモリ設定
pipeline.ecs_compatibility: disabled
# 永続キュー設定
queue.type: persisted
queue.max_bytes: 2gb
queue.checkpoint.writes: 1024
# JVM設定
jvm.options: |
-Xms2g
-Xmx2g
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=32
-XX:GCLogFileSize=64m
# 監視設定
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://elasticsearch:9200"]
# セキュリティ設定
xpack.security.enabled: false
# ログ設定
log.level: info
path.logs: /var/log/logstash
# 設定の自動リロード
config.reload.automatic: true
config.reload.interval: 3s
# API設定
api.http.host: 0.0.0.0
api.http.port: 9600
カスタムプラグイン例
# custom_filter_plugin.rb
require "logstash/filters/base"
require "logstash/namespace"
class LogStash::Filters::CustomFilter < LogStash::Filters::Base
config_name "custom_filter"
config :field, :validate => :string, :required => true
config :target, :validate => :string, :default => "processed"
config :pattern, :validate => :string, :required => true
def register
@regex = Regexp.compile(@pattern)
end
def filter(event)
value = event.get(@field)
return unless value
if match = @regex.match(value)
result = {
"original" => value,
"extracted" => match.captures,
"timestamp" => Time.now.to_f
}
event.set(@target, result)
event.tag("custom_processed")
else
event.tag("custom_processing_failed")
end
filter_matched(event)
end
end
Docker Compose設定
version: '3.8'
services:
logstash:
image: docker.elastic.co/logstash/logstash:8.16.0
container_name: logstash
restart: unless-stopped
environment:
- "LS_JAVA_OPTS=-Xmx2g -Xms2g"
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro
- ./logstash/patterns:/usr/share/logstash/patterns:ro
- ./logstash/templates:/usr/share/logstash/templates:ro
- logstash-data:/usr/share/logstash/data
ports:
- "5044:5044"
- "9600:9600"
- "5000:5000/tcp"
- "5000:5000/udp"
networks:
- elastic
depends_on:
- elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.16.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
- xpack.security.enabled=false
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
networks:
- elastic
kibana:
image: docker.elastic.co/kibana/kibana:8.16.0
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
networks:
- elastic
depends_on:
- elasticsearch
volumes:
logstash-data:
elasticsearch-data:
networks:
elastic:
driver: bridge
Beats統合設定
# filebeat.yml (Logstashへの送信)
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/*.log
fields:
logtype: nginx
fields_under_root: true
- type: log
enabled: true
paths:
- /var/log/app/*.log
fields:
logtype: application
fields_under_root: true
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
output.logstash:
hosts: ["logstash:5044"]
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
監視とメトリクス設定
# monitoring.conf
input {
http {
port => 8080
type => "monitoring"
}
}
filter {
if [type] == "monitoring" {
json {
source => "message"
}
# メトリクス処理
if [metrics] {
ruby {
code => '
metrics = event.get("[metrics]")
metrics.each do |key, value|
event.set("[metric][#{key}]", value)
end
'
}
}
# アラート条件チェック
if [metric][cpu_usage] and [metric][cpu_usage] > 80 {
mutate {
add_tag => [ "high_cpu_alert" ]
}
}
if [metric][memory_usage] and [metric][memory_usage] > 90 {
mutate {
add_tag => [ "high_memory_alert" ]
}
}
}
}
output {
if "high_cpu_alert" in [tags] or "high_memory_alert" in [tags] {
http {
url => "https://hooks.slack.com/services/YOUR/WEBHOOK"
http_method => "post"
format => "json"
mapping => {
"text" => "Alert: %{tags} - Host: %{host}"
}
}
}
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "monitoring-%{+YYYY.MM.dd}"
}
}
セキュリティログ処理
# security-logs.conf
input {
beats {
port => 5045
type => "security"
}
}
filter {
if [type] == "security" {
# SSH ログ解析
if [source] =~ /auth\.log/ {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} sshd\[%{POSINT:pid}\]: %{GREEDYDATA:ssh_message}"
}
}
if [ssh_message] =~ /Failed password/ {
grok {
match => {
"ssh_message" => "Failed password for %{USERNAME:username} from %{IP:source_ip} port %{POSINT:source_port}"
}
}
mutate {
add_tag => [ "failed_login" ]
}
}
if [ssh_message] =~ /Accepted password/ {
grok {
match => {
"ssh_message" => "Accepted password for %{USERNAME:username} from %{IP:source_ip} port %{POSINT:source_port}"
}
}
mutate {
add_tag => [ "successful_login" ]
}
}
}
# GeoIP情報追加
if [source_ip] {
geoip {
source => "source_ip"
target => "geoip"
}
}
# 脅威インテリジェンス
if [source_ip] {
translate {
field => "source_ip"
destination => "threat_intel"
dictionary_path => "/etc/logstash/threat_intel.yml"
fallback => "clean"
}
}
}
}
output {
if "failed_login" in [tags] {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "security-alerts-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "security-logs-%{+YYYY.MM.dd}"
}
}
}