Logstash

オープンソースのサーバーサイドデータ処理パイプライン。複数のソースからデータを取得し、変換・加工して任意の保存先に送信。200以上のプラグインで高い拡張性を提供。

監視サーバーデータ処理ELKスタックログ収集データパイプラインETLリアルタイム処理

監視サーバー

Logstash

概要

Logstashはオープンソースのサーバーサイドデータ処理パイプラインです。複数のソースからデータを取得し、変換・加工して任意の保存先に送信します。200以上のプラグインで高い拡張性を提供し、ELKスタックの中核データ処理エンジンとして、マルチパイプラインアーキテクチャ、クラウドネイティブ展開、リアルタイム処理機能で継続的な需要を獲得しています。

詳細

Logstashは2009年にJordan Sisselによって開発が開始され、現在ではELKスタックの中核データ処理エンジンとして位置づけられています。マルチパイプラインアーキテクチャ、クラウドネイティブ展開、リアルタイム処理機能で継続的な需要を維持し、バージョン8.x系列では更なる機能強化が図られています。200以上のプラグインエコシステムにより、多様なデータソースとの統合が可能です。

主要な技術的特徴

  • Input-Filter-Output架構: 明確な3段階データ処理パイプライン
  • 豊富なプラグイン: 200以上の入力・フィルター・出力プラグイン
  • マルチパイプライン: 複数の独立したパイプライン並行実行
  • リアルタイム処理: ストリーミングデータの即座な変換
  • スケーラビリティ: 水平・垂直スケーリング対応

用途

  • ログ収集と正規化
  • データ変換とETL処理
  • リアルタイムデータストリーミング
  • システム統合とデータルーティング
  • セキュリティログ分析

メリット・デメリット

メリット

  • 豊富なプラグイン: 200以上のプラグインエコシステム
  • 柔軟なデータ変換: 強力なフィルター機能
  • ELKスタック統合: Elasticsearch、Kibanaとのシームレス連携
  • 高い拡張性: カスタムプラグイン開発可能
  • リアルタイム処理: 低レイテンシーデータ処理
  • オープンソース: 無料で高機能なデータ処理エンジン

デメリット

  • リソース消費: JRubyベースによる高いメモリ使用量
  • パフォーマンス: 複雑なフィルター処理時の性能低下
  • 設定複雑性: 高度な設定の学習コスト
  • JVMチューニング: Java仮想マシンの最適化が必要
  • デバッグ困難: パイプライン問題の特定が複雑

参考ページ

書き方の例

基本的なLogstash設定

# logstash.conf
input {
  beats {
    port => 5044
  }
  
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    codec => "plain"
  }
  
  http {
    port => 8080
    codec => "json"
  }
  
  tcp {
    port => 5000
    codec => "json_lines"
  }
  
  syslog {
    port => 514
  }
}

filter {
  if [fileset][module] == "nginx" {
    if [fileset][name] == "access" {
      grok {
        match => { 
          "message" => "%{NGINXACCESS}" 
        }
      }
      
      date {
        match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      }
      
      mutate {
        convert => { 
          "response" => "integer"
          "bytes" => "integer"
          "responsetime" => "float"
        }
      }
      
      geoip {
        source => "clientip"
        target => "geoip"
      }
    }
  }
  
  if [level] == "ERROR" {
    mutate {
      add_tag => [ "error" ]
    }
  }
  
  ruby {
    code => "event.set('processed_at', Time.now)"
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logs-%{+YYYY.MM.dd}"
    template_name => "logstash"
    template => "/etc/logstash/templates/logstash.json"
    template_overwrite => true
  }
  
  if "error" in [tags] {
    slack {
      url => "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
      channel => "#alerts"
      username => "Logstash"
      format => "Error detected: %{message}"
    }
  }
  
  stdout {
    codec => rubydebug
  }
}

複雑なログ解析設定

# advanced-parsing.conf
input {
  beats {
    port => 5044
  }
}

filter {
  # Apache Combined Log Format
  if [fields][logtype] == "apache" {
    grok {
      match => {
        "message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) "%{DATA:referrer}" "%{DATA:agent}"'
      }
    }
    
    date {
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
    
    if [response] >= 400 {
      mutate {
        add_tag => [ "error_response" ]
      }
    }
    
    useragent {
      source => "agent"
      target => "user_agent"
    }
    
    geoip {
      source => "clientip"
      target => "geoip"
    }
  }
  
  # JSON Application Logs
  if [fields][logtype] == "application" {
    json {
      source => "message"
    }
    
    date {
      match => [ "timestamp", "ISO8601" ]
    }
    
    if [level] in ["ERROR", "FATAL"] {
      mutate {
        add_tag => [ "application_error" ]
      }
    }
    
    # Extract stack trace
    if [exception] {
      mutate {
        add_field => { "has_exception" => "true" }
      }
    }
  }
  
  # System Logs
  if [fields][logtype] == "system" {
    grok {
      match => {
        "message" => "%{SYSLOGTIMESTAMP:timestamp} %{IPORHOST:server} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:log_message}"
      }
    }
    
    date {
      match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
  
  # Common enrichment
  mutate {
    add_field => {
      "[@metadata][index_prefix]" => "logs"
      "[@metadata][document_type]" => "%{[fields][logtype]}"
    }
  }
  
  # Remove unnecessary fields
  mutate {
    remove_field => [ "message", "fields", "host", "beat", "prospector" ]
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "%{[@metadata][index_prefix]}-%{[@metadata][document_type]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][document_type]}"
  }
}

マルチパイプライン設定

# pipelines.yml
- pipeline.id: web-logs
  path.config: "/etc/logstash/conf.d/web-logs.conf"
  pipeline.workers: 4
  pipeline.batch.size: 1000
  pipeline.batch.delay: 50

- pipeline.id: application-logs
  path.config: "/etc/logstash/conf.d/application-logs.conf"
  pipeline.workers: 2
  pipeline.batch.size: 500

- pipeline.id: security-logs
  path.config: "/etc/logstash/conf.d/security-logs.conf"
  pipeline.workers: 1
  pipeline.batch.size: 250
  queue.type: persisted
  queue.max_bytes: 1gb

パフォーマンスチューニング設定

# logstash.yml
node.name: logstash-node-1

# パフォーマンス設定
pipeline.workers: 4
pipeline.batch.size: 1000
pipeline.batch.delay: 50

# メモリ設定
pipeline.ecs_compatibility: disabled

# 永続キュー設定
queue.type: persisted
queue.max_bytes: 2gb
queue.checkpoint.writes: 1024

# JVM設定
jvm.options: |
  -Xms2g
  -Xmx2g
  -XX:+UseG1GC
  -XX:G1HeapRegionSize=16m
  -XX:+UseGCLogFileRotation
  -XX:NumberOfGCLogFiles=32
  -XX:GCLogFileSize=64m

# 監視設定
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://elasticsearch:9200"]

# セキュリティ設定
xpack.security.enabled: false

# ログ設定
log.level: info
path.logs: /var/log/logstash

# 設定の自動リロード
config.reload.automatic: true
config.reload.interval: 3s

# API設定
api.http.host: 0.0.0.0
api.http.port: 9600

カスタムプラグイン例

# custom_filter_plugin.rb
require "logstash/filters/base"
require "logstash/namespace"

class LogStash::Filters::CustomFilter < LogStash::Filters::Base
  config_name "custom_filter"
  
  config :field, :validate => :string, :required => true
  config :target, :validate => :string, :default => "processed"
  config :pattern, :validate => :string, :required => true

  def register
    @regex = Regexp.compile(@pattern)
  end

  def filter(event)
    value = event.get(@field)
    return unless value
    
    if match = @regex.match(value)
      result = {
        "original" => value,
        "extracted" => match.captures,
        "timestamp" => Time.now.to_f
      }
      
      event.set(@target, result)
      event.tag("custom_processed")
    else
      event.tag("custom_processing_failed")
    end
    
    filter_matched(event)
  end
end

Docker Compose設定

version: '3.8'

services:
  logstash:
    image: docker.elastic.co/logstash/logstash:8.16.0
    container_name: logstash
    restart: unless-stopped
    environment:
      - "LS_JAVA_OPTS=-Xmx2g -Xms2g"
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    volumes:
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./logstash/pipeline:/usr/share/logstash/pipeline:ro
      - ./logstash/patterns:/usr/share/logstash/patterns:ro
      - ./logstash/templates:/usr/share/logstash/templates:ro
      - logstash-data:/usr/share/logstash/data
    ports:
      - "5044:5044"
      - "9600:9600"
      - "5000:5000/tcp"
      - "5000:5000/udp"
    networks:
      - elastic
    depends_on:
      - elasticsearch

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.16.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
      - xpack.security.enabled=false
    volumes:
      - elasticsearch-data:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
    networks:
      - elastic

  kibana:
    image: docker.elastic.co/kibana/kibana:8.16.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    networks:
      - elastic
    depends_on:
      - elasticsearch

volumes:
  logstash-data:
  elasticsearch-data:

networks:
  elastic:
    driver: bridge

Beats統合設定

# filebeat.yml (Logstashへの送信)
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/*.log
  fields:
    logtype: nginx
  fields_under_root: true

- type: log
  enabled: true
  paths:
    - /var/log/app/*.log
  fields:
    logtype: application
  fields_under_root: true
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
  multiline.negate: true
  multiline.match: after

output.logstash:
  hosts: ["logstash:5044"]

processors:
- add_host_metadata:
    when.not.contains.tags: forwarded
- add_docker_metadata: ~
- add_kubernetes_metadata: ~

監視とメトリクス設定

# monitoring.conf
input {
  http {
    port => 8080
    type => "monitoring"
  }
}

filter {
  if [type] == "monitoring" {
    json {
      source => "message"
    }
    
    # メトリクス処理
    if [metrics] {
      ruby {
        code => '
          metrics = event.get("[metrics]")
          metrics.each do |key, value|
            event.set("[metric][#{key}]", value)
          end
        '
      }
    }
    
    # アラート条件チェック
    if [metric][cpu_usage] and [metric][cpu_usage] > 80 {
      mutate {
        add_tag => [ "high_cpu_alert" ]
      }
    }
    
    if [metric][memory_usage] and [metric][memory_usage] > 90 {
      mutate {
        add_tag => [ "high_memory_alert" ]
      }
    }
  }
}

output {
  if "high_cpu_alert" in [tags] or "high_memory_alert" in [tags] {
    http {
      url => "https://hooks.slack.com/services/YOUR/WEBHOOK"
      http_method => "post"
      format => "json"
      mapping => {
        "text" => "Alert: %{tags} - Host: %{host}"
      }
    }
  }
  
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "monitoring-%{+YYYY.MM.dd}"
  }
}

セキュリティログ処理

# security-logs.conf
input {
  beats {
    port => 5045
    type => "security"
  }
}

filter {
  if [type] == "security" {
    # SSH ログ解析
    if [source] =~ /auth\.log/ {
      grok {
        match => {
          "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} sshd\[%{POSINT:pid}\]: %{GREEDYDATA:ssh_message}"
        }
      }
      
      if [ssh_message] =~ /Failed password/ {
        grok {
          match => {
            "ssh_message" => "Failed password for %{USERNAME:username} from %{IP:source_ip} port %{POSINT:source_port}"
          }
        }
        mutate {
          add_tag => [ "failed_login" ]
        }
      }
      
      if [ssh_message] =~ /Accepted password/ {
        grok {
          match => {
            "ssh_message" => "Accepted password for %{USERNAME:username} from %{IP:source_ip} port %{POSINT:source_port}"
          }
        }
        mutate {
          add_tag => [ "successful_login" ]
        }
      }
    }
    
    # GeoIP情報追加
    if [source_ip] {
      geoip {
        source => "source_ip"
        target => "geoip"
      }
    }
    
    # 脅威インテリジェンス
    if [source_ip] {
      translate {
        field => "source_ip"
        destination => "threat_intel"
        dictionary_path => "/etc/logstash/threat_intel.yml"
        fallback => "clean"
      }
    }
  }
}

output {
  if "failed_login" in [tags] {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "security-alerts-%{+YYYY.MM.dd}"
    }
  } else {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "security-logs-%{+YYYY.MM.dd}"
    }
  }
}