python-json-logger

Python標準ライブラリloggingのJSON出力フォーマッター。ログエントリを構造化JSON形式で出力し、ログ集約システムとの統合を容易にする。標準ライブラリとの完全互換性を保ちながら、現代的なログ管理インフラに対応。

ロギングPythonJSON構造化ログフォーマッター標準ライブラリ

GitHub概要

madzak/python-json-logger

Json Formatter for the standard python logger

スター1,765
ウォッチ26
フォーク231
作成日:2011年12月27日
言語:Python
ライセンス:BSD 2-Clause "Simplified" License

トピックス

loggingpython

スター履歴

madzak/python-json-logger Star History
データ取得日時: 2025/7/17 23:24

ライブラリ

python-json-logger

概要

python-json-logger(pythonjsonlogger)は、Python標準ライブラリのloggingパッケージと完全に互換性を保ちながら、JSON形式でのログ出力を可能にする軽量で実用的なライブラリです。既存のログ設定を変更することなく導入でき、機械可読なJSON形式でのログ出力によりログ集約ツールでの解析と取り込みを劇的に簡素化します。2025年版では月間4600万回以上のダウンロードを誇る、Python JSON ログ分野の最も信頼され実績のあるソリューションです。

詳細

python-json-logger 2025年版は、Python標準ライブラリとの完全な互換性を保ちながら、JSON出力に特化した最適化を実現しています。json、orjson、msgspecなど複数のJSONエンコーダーをサポートし、パフォーマンス要件に応じた選択が可能。完全にカスタマイズ可能な出力フィールド、必須・除外・静的フィールドの制御、LogRecordオブジェクトの自動カスタム属性取得機能を提供。UUID、bytes、Enumなど通常はJSON化困難な型も自動的に適切な形式に変換し、どんな入力でも安全にログ出力できる堅牢性を備えています。

主な特徴

  • 標準ライブラリ完全互換: 既存ログ設定を変更せずに導入可能
  • 複数JSONエンコーダー対応: json、orjson、msgspecを用途に応じて選択
  • 完全カスタマイズ可能フィールド: 必須・除外・静的フィールドの細かい制御
  • 任意型エンコーディング: UUID、bytes、Enumなど複雑な型も自動変換
  • 高い安全性: どんな入力でも適切にログ出力できる堅牢な設計
  • 軽量・高性能: 最小限のオーバーヘッドでの高速JSON出力

メリット・デメリット

メリット

  • Python標準ライブラリとの完璧な互換性で学習コストが最小
  • 既存のログ設定にフォーマッターを変更するだけで即座にJSON化
  • ログ集約システム(ELK Stack、Splunk等)との優れた統合性
  • 複数JSONエンコーダー対応でパフォーマンス要件に柔軟対応
  • 月間4600万ダウンロードの実績による高い信頼性
  • 軽量でシンプルな実装による保守性の高さ

デメリット

  • 基本的なJSON出力に特化しており高度な構造化機能は限定的
  • ログメッセージの動的な変換や複雑なプロセッシングには不向き
  • structlogのような豊富なプロセッサーチェーン機能はなし
  • 非JSON形式での美しい出力(開発時の可読性)機能が限定的
  • 大規模な構造化ログシステムでは機能が物足りない場合がある
  • カスタムフィールド制御は設定に依存し、動的な制御が困難

参考ページ

書き方の例

基本的なインストールと設定

# インストール
# pip install python-json-logger

import logging
from pythonjsonlogger import jsonlogger

# 基本的なJSONフォーマッター設定
json_handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
json_handler.setFormatter(formatter)

# ロガーの設定
logger = logging.getLogger()
logger.addHandler(json_handler)
logger.setLevel(logging.INFO)

# 基本的なログ出力
logger.info("アプリケーション開始")
logger.debug("デバッグ情報", extra={"user_id": 12345})
logger.warning("警告メッセージ", extra={"error_code": "W001"})
logger.error("エラーが発生", extra={"exception_type": "ValueError"})

# 構造化データでの詳細ログ
logger.info("ユーザーログイン", extra={
    "user_id": 67890,
    "ip_address": "192.168.1.100",
    "user_agent": "Mozilla/5.0...",
    "session_id": "sess_abc123"
})

# 例外情報付きログ
try:
    result = 10 / 0
except ZeroDivisionError as e:
    logger.exception("計算エラー", extra={
        "operation": "division",
        "dividend": 10,
        "divisor": 0,
        "error_details": str(e)
    })

カスタムフォーマッターと出力フィールド制御

import logging
from pythonjsonlogger import jsonlogger
import uuid
from datetime import datetime
from enum import Enum

class UserRole(Enum):
    ADMIN = "admin"
    USER = "user"
    GUEST = "guest"

def setup_custom_json_logging():
    """カスタマイズされたJSONログ設定"""
    
    # カスタムフォーマット文字列
    custom_format = '%(asctime)s %(name)s %(levelname)s %(message)s'
    
    # 高度なカスタマイズオプション
    custom_formatter = jsonlogger.JsonFormatter(
        fmt=custom_format,
        # 必須フィールド(常に出力)
        required_fields=["timestamp", "level", "logger", "message"],
        # 除外フィールド(出力しない)
        exclude_fields=["module", "funcName"],
        # 静的フィールド(固定値を追加)
        static_fields={"service": "user-api", "version": "1.0.0"},
        # フィールド名のリネーム
        rename_fields={"levelname": "level", "name": "logger"},
        # デフォルト値(存在しない場合に設定)
        defaults={"environment": "production", "region": "us-west-2"}
    )
    
    # ハンドラーの設定
    handler = logging.StreamHandler()
    handler.setFormatter(custom_formatter)
    
    # ロガーの設定
    logger = logging.getLogger("CustomApp")
    logger.addHandler(handler)
    logger.setLevel(logging.DEBUG)
    
    return logger

# カスタムロガーの使用例
custom_logger = setup_custom_json_logging()

class UserService:
    def __init__(self):
        self.logger = custom_logger
    
    def create_user(self, user_data):
        user_id = uuid.uuid4()
        creation_time = datetime.now()
        user_role = UserRole.ADMIN
        
        self.logger.info("ユーザー作成開始", extra={
            "user_id": user_id,          # UUID型(自動的に文字列変換)
            "creation_time": creation_time,  # datetime型(ISO形式文字列に変換)
            "user_role": user_role,      # Enum型(値が自動抽出)
            "user_data": user_data,
            "request_size": len(str(user_data).encode()),  # bytes計算
            "metadata": {
                "source": "api",
                "client_version": "2.1.0"
            }
        })
        
        try:
            # ユーザー作成処理のシミュレーション
            processed_data = self._process_user_data(user_data, user_id)
            
            self.logger.info("ユーザー作成完了", extra={
                "user_id": user_id,
                "email": user_data.get("email"),
                "role": user_role,
                "processing_time_ms": 150,
                "created_fields": list(processed_data.keys())
            })
            
            return str(user_id)
            
        except Exception as e:
            self.logger.error("ユーザー作成エラー", extra={
                "user_id": user_id,
                "error_type": type(e).__name__,
                "error_message": str(e),
                "input_data": user_data,
                "stack_trace": True  # Tracebackが自動的に含まれる
            }, exc_info=True)
            raise
    
    def _process_user_data(self, user_data, user_id):
        # 処理シミュレーション
        return {
            "id": user_id,
            "processed_at": datetime.now(),
            **user_data
        }

# 使用例
service = UserService()
service.create_user({
    "email": "[email protected]",
    "name": "田中太郎",
    "age": 30,
    "preferences": {"theme": "dark", "language": "ja"}
})

複数のJSONエンコーダーとパフォーマンス最適化

import logging
from pythonjsonlogger import jsonlogger
import time
import json
from typing import Any, Dict

def setup_performance_optimized_logging():
    """パフォーマンス最適化されたJSONログ設定"""
    
    # 高速JSONエンコーダーの選択
    encoders = []
    
    # orjsonエンコーダー(最高性能)
    try:
        import orjson
        encoders.append(("orjson", jsonlogger.JsonFormatter(
            json_encoder=orjson.dumps,
            json_default=lambda obj: str(obj),  # fallback for non-serializable objects
        )))
    except ImportError:
        pass
    
    # msgspecエンコーダー(高性能)
    try:
        import msgspec
        msgspec_encoder = msgspec.json.Encoder()
        encoders.append(("msgspec", jsonlogger.JsonFormatter(
            json_encoder=msgspec_encoder.encode,
            json_default=lambda obj: str(obj),
        )))
    except ImportError:
        pass
    
    # 標準jsonエンコーダー(フォールバック)
    encoders.append(("json", jsonlogger.JsonFormatter(
        json_encoder=json.dumps,
        json_default=str,
        ensure_ascii=False  # 日本語対応
    )))
    
    # 最も高性能なエンコーダーを選択
    encoder_name, best_formatter = encoders[0]
    
    # ログハンドラーの設定
    handler = logging.StreamHandler()
    handler.setFormatter(best_formatter)
    
    logger = logging.getLogger("PerformanceApp")
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    
    # 使用エンコーダーをログ出力
    logger.info(f"JSONエンコーダー設定完了", extra={
        "selected_encoder": encoder_name,
        "available_encoders": [name for name, _ in encoders]
    })
    
    return logger, encoder_name

class PerformanceBenchmark:
    def __init__(self):
        self.logger, self.encoder = setup_performance_optimized_logging()
    
    def benchmark_logging_performance(self, iterations=10000):
        """ログ出力パフォーマンスベンチマーク"""
        
        # テストデータの準備
        test_data = {
            "iteration": 0,
            "timestamp": time.time(),
            "user_data": {
                "id": 12345,
                "name": "パフォーマンステスト",
                "metadata": {
                    "tags": ["test", "performance", "json"],
                    "scores": [95, 87, 92, 88, 91]
                }
            },
            "request_info": {
                "method": "POST",
                "path": "/api/benchmark",
                "headers": {"Content-Type": "application/json"},
                "size": 1024
            }
        }
        
        # ベンチマーク実行
        start_time = time.perf_counter()
        
        for i in range(iterations):
            test_data["iteration"] = i
            test_data["timestamp"] = time.time()
            
            if i % 1000 == 0:
                # 進捗ログ(詳細データ付き)
                self.logger.info("ベンチマーク進捗", extra={
                    **test_data,
                    "progress_percent": round((i / iterations) * 100, 1),
                    "encoder": self.encoder
                })
            else:
                # 通常ログ(軽量)
                self.logger.debug("ベンチマーク実行", extra={
                    "iteration": i,
                    "encoder": self.encoder
                })
        
        end_time = time.perf_counter()
        duration = end_time - start_time
        
        # 結果ログ
        self.logger.info("ベンチマーク完了", extra={
            "total_iterations": iterations,
            "duration_seconds": round(duration, 3),
            "logs_per_second": round(iterations / duration, 0),
            "encoder": self.encoder,
            "average_time_per_log_ms": round((duration / iterations) * 1000, 4)
        })
        
        return duration
    
    def compare_json_encoders(self):
        """複数JSONエンコーダーの性能比較"""
        
        test_data = {
            "complex_data": {
                "nested": {"deeply": {"nested": {"data": "test"}}},
                "list": list(range(100)),
                "unicode": "日本語テストデータ🚀",
                "numbers": [1.23456789, 987654321, 0.000001]
            }
        }
        
        encoders_config = [
            ("standard_json", json.dumps),
            ("compact_json", lambda obj: json.dumps(obj, separators=(',', ':'))),
            ("ensure_ascii_false", lambda obj: json.dumps(obj, ensure_ascii=False)),
        ]
        
        # orjsonが利用可能な場合
        try:
            import orjson
            encoders_config.append(("orjson", orjson.dumps))
        except ImportError:
            pass
        
        # msgspecが利用可能な場合
        try:
            import msgspec
            msgspec_encoder = msgspec.json.Encoder()
            encoders_config.append(("msgspec", msgspec_encoder.encode))
        except ImportError:
            pass
        
        results = {}
        iterations = 5000
        
        for encoder_name, encoder_func in encoders_config:
            start_time = time.perf_counter()
            
            for _ in range(iterations):
                try:
                    encoded = encoder_func(test_data)
                except Exception as e:
                    self.logger.error(f"エンコーダーエラー: {encoder_name}", extra={
                        "encoder": encoder_name,
                        "error": str(e)
                    })
                    continue
            
            duration = time.perf_counter() - start_time
            results[encoder_name] = {
                "duration": round(duration, 4),
                "ops_per_second": round(iterations / duration, 0)
            }
        
        # 比較結果ログ
        self.logger.info("JSONエンコーダー性能比較", extra={
            "test_iterations": iterations,
            "results": results,
            "fastest_encoder": min(results.keys(), key=lambda k: results[k]["duration"]),
            "test_data_complexity": "nested_objects_arrays_unicode"
        })
        
        return results

# パフォーマンステストの実行
benchmark = PerformanceBenchmark()

# ログ出力性能テスト
print("=== ログ出力パフォーマンステスト ===")
duration = benchmark.benchmark_logging_performance(1000)
print(f"完了時間: {duration:.3f}秒")

# JSONエンコーダー比較
print("\n=== JSONエンコーダー性能比較 ===")
encoder_results = benchmark.compare_json_encoders()
for encoder, result in encoder_results.items():
    print(f"{encoder}: {result['ops_per_second']} ops/sec")

ファイル出力と複数ハンドラー統合

import logging
from pythonjsonlogger import jsonlogger
import logging.handlers
import os
from datetime import datetime
import gzip
import shutil

def setup_comprehensive_file_logging():
    """包括的なファイルログ設定"""
    
    # ログディレクトリの作成
    log_dir = "logs"
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    
    # 複数のJSONフォーマッター
    formatters = {
        # 詳細ログ用(全フィールド)
        "detailed": jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(lineno)d %(message)s',
            static_fields={"service": "web-api", "environment": "production"},
            rename_fields={"levelname": "level", "name": "logger", "funcName": "function", "lineno": "line"}
        ),
        
        # 簡潔ログ用(必要最小限)
        "compact": jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(levelname)s %(message)s',
            exclude_fields=["module", "pathname", "filename"],
            static_fields={"service": "web-api"}
        ),
        
        # エラー専用(例外情報重視)
        "error_focused": jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(message)s',
            required_fields=["timestamp", "logger", "level", "function", "message", "exc_info"],
            static_fields={"alert": True, "requires_attention": True}
        )
    }
    
    # メインロガーの設定
    main_logger = logging.getLogger("MainApp")
    main_logger.setLevel(logging.DEBUG)
    
    # 1. コンソール出力(開発用)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatters["compact"])
    console_handler.setLevel(logging.INFO)
    main_logger.addHandler(console_handler)
    
    # 2. 日別ローテーションファイル(詳細ログ)
    today = datetime.now().strftime('%Y-%m-%d')
    daily_handler = logging.FileHandler(
        f"{log_dir}/app-{today}.log", 
        encoding='utf-8'
    )
    daily_handler.setFormatter(formatters["detailed"])
    daily_handler.setLevel(logging.DEBUG)
    main_logger.addHandler(daily_handler)
    
    # 3. サイズベースローテーション(一般ログ)
    rotating_handler = logging.handlers.RotatingFileHandler(
        f"{log_dir}/app-rotating.log",
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5,
        encoding='utf-8'
    )
    rotating_handler.setFormatter(formatters["compact"])
    rotating_handler.setLevel(logging.INFO)
    main_logger.addHandler(rotating_handler)
    
    # 4. エラー専用ファイル
    error_handler = logging.FileHandler(
        f"{log_dir}/errors.log",
        encoding='utf-8'
    )
    error_handler.setFormatter(formatters["error_focused"])
    error_handler.setLevel(logging.ERROR)
    main_logger.addHandler(error_handler)
    
    # 5. 時間ベースローテーション(本番運用向け)
    timed_handler = logging.handlers.TimedRotatingFileHandler(
        f"{log_dir}/app-timed.log",
        when='midnight',
        interval=1,
        backupCount=30,  # 30日保持
        encoding='utf-8'
    )
    timed_handler.setFormatter(formatters["detailed"])
    timed_handler.setLevel(logging.INFO)
    main_logger.addHandler(timed_handler)
    
    return main_logger

class FileLoggingApplication:
    def __init__(self):
        self.logger = setup_comprehensive_file_logging()
        self.logger.info("アプリケーション初期化完了", extra={
            "startup_time": datetime.now().isoformat(),
            "log_handlers": len(self.logger.handlers),
            "config": "comprehensive_file_logging"
        })
    
    def simulate_application_operations(self):
        """アプリケーション動作のシミュレーション"""
        
        # 正常な操作ログ
        self.logger.info("ユーザーセッション開始", extra={
            "user_id": 12345,
            "session_id": "sess_abc123",
            "ip_address": "192.168.1.100",
            "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
        })
        
        # デバッグ情報(詳細ログファイルのみ)
        self.logger.debug("データベース接続確立", extra={
            "connection_pool": "primary",
            "connection_count": 5,
            "max_connections": 20,
            "database": "userdb"
        })
        
        # 警告ログ
        self.logger.warning("API制限に近づいています", extra={
            "current_requests": 980,
            "limit": 1000,
            "time_window": "1時間",
            "user_id": 12345
        })
        
        # エラーログ(エラー専用ファイルにも出力)
        try:
            # 意図的なエラー
            raise ValueError("無効な入力データです")
        except ValueError as e:
            self.logger.error("入力データ検証エラー", extra={
                "error_type": "validation",
                "input_data": {"field1": "invalid_value"},
                "validation_rules": ["required", "format", "length"],
                "user_id": 12345
            }, exc_info=True)
        
        # 重要な情報ログ
        self.logger.info("データ処理完了", extra={
            "processed_records": 1500,
            "processing_time_seconds": 45.6,
            "success_rate": 98.7,
            "failed_records": 19,
            "batch_id": "batch_20250624_001"
        })
    
    def demonstrate_different_log_levels(self):
        """各ログレベルでの出力例"""
        
        operations = [
            (logging.DEBUG, "詳細デバッグ情報", {"debug_level": "verbose"}),
            (logging.INFO, "一般情報", {"operation": "normal"}),
            (logging.WARNING, "注意が必要", {"warning_type": "performance"}),
            (logging.ERROR, "エラーが発生", {"error_severity": "medium"}),
            (logging.CRITICAL, "重大なエラー", {"alert_level": "critical"})
        ]
        
        for level, message, extra_data in operations:
            self.logger.log(level, message, extra={
                **extra_data,
                "demonstration": True,
                "timestamp": datetime.now().isoformat()
            })
    
    def compress_old_logs(self):
        """古いログファイルの圧縮"""
        log_dir = "logs"
        
        for filename in os.listdir(log_dir):
            if filename.endswith('.log') and 'rotating' not in filename:
                file_path = os.path.join(log_dir, filename)
                
                # ファイルサイズチェック(1MB以上で圧縮)
                if os.path.getsize(file_path) > 1024*1024:
                    compressed_path = f"{file_path}.gz"
                    
                    with open(file_path, 'rb') as f_in:
                        with gzip.open(compressed_path, 'wb') as f_out:
                            shutil.copyfileobj(f_in, f_out)
                    
                    self.logger.info("ログファイル圧縮完了", extra={
                        "original_file": filename,
                        "compressed_file": f"{filename}.gz",
                        "original_size_mb": round(os.path.getsize(file_path) / 1024 / 1024, 2),
                        "compressed_size_mb": round(os.path.getsize(compressed_path) / 1024 / 1024, 2)
                    })

# ファイルロギングアプリケーションの実行例
app = FileLoggingApplication()

print("=== アプリケーション動作シミュレーション ===")
app.simulate_application_operations()

print("\n=== 各ログレベル出力例 ===")
app.demonstrate_different_log_levels()

print("\n=== ログファイル圧縮処理 ===")
app.compress_old_logs()

print("\n=== ログファイル確認 ===")
log_files = os.listdir("logs")
for log_file in sorted(log_files):
    file_path = os.path.join("logs", log_file)
    size_kb = round(os.path.getsize(file_path) / 1024, 1)
    print(f"{log_file}: {size_kb} KB")

システム統合とログ集約システム連携

import logging
from pythonjsonlogger import jsonlogger
import socket
import json
import requests
import time
from typing import Dict, Any
import threading
import queue

class LogAggregationIntegration:
    """ログ集約システムとの統合"""
    
    def __init__(self):
        self.setup_aggregation_loggers()
    
    def setup_aggregation_loggers(self):
        """各種ログ集約システム向けの設定"""
        
        # ELK Stack向け設定
        self.elk_logger = self._setup_elk_logger()
        
        # Fluentd向け設定
        self.fluentd_logger = self._setup_fluentd_logger()
        
        # Splunk向け設定
        self.splunk_logger = self._setup_splunk_logger()
        
        # カスタムログ集約向け設定
        self.custom_logger = self._setup_custom_aggregation_logger()
    
    def _setup_elk_logger(self):
        """ELK Stack向けの最適化設定"""
        
        # Elasticsearchに最適化されたフォーマット
        elk_formatter = jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
            static_fields={
                "@version": "1",
                "service": "web-api",
                "environment": "production",
                "host": socket.gethostname()
            },
            rename_fields={
                "asctime": "@timestamp",
                "levelname": "level",
                "name": "logger"
            },
            # Elasticsearchのマッピングに最適化
            json_default=str,
            json_encoder=json.dumps
        )
        
        # ファイルハンドラー(Filebeatで収集される)
        elk_handler = logging.FileHandler('logs/elk-formatted.log', encoding='utf-8')
        elk_handler.setFormatter(elk_formatter)
        
        elk_logger = logging.getLogger('ELKLogger')
        elk_logger.addHandler(elk_handler)
        elk_logger.setLevel(logging.INFO)
        
        return elk_logger
    
    def _setup_fluentd_logger(self):
        """Fluentd向けの設定"""
        
        fluentd_formatter = jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
            static_fields={
                "tag": "web-api.application",
                "source": "python-application",
                "datacenter": "us-west-2"
            },
            # FluentdのJSONパーサーに最適化
            json_default=lambda obj: str(obj),
            ensure_ascii=False
        )
        
        # Fluentdの監視ディレクトリに出力
        fluentd_handler = logging.FileHandler('logs/fluentd.log', encoding='utf-8')
        fluentd_handler.setFormatter(fluentd_formatter)
        
        fluentd_logger = logging.getLogger('FluentdLogger')
        fluentd_logger.addHandler(fluentd_handler)
        fluentd_logger.setLevel(logging.INFO)
        
        return fluentd_logger
    
    def _setup_splunk_logger(self):
        """Splunk向けの設定"""
        
        splunk_formatter = jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(message)s',
            static_fields={
                "index": "application-logs",
                "sourcetype": "python:json",
                "source": "web-api"
            },
            rename_fields={
                "asctime": "time",
                "levelname": "severity",
                "name": "component",
                "funcName": "function"
            }
        )
        
        splunk_handler = logging.FileHandler('logs/splunk.log', encoding='utf-8')
        splunk_handler.setFormatter(splunk_formatter)
        
        splunk_logger = logging.getLogger('SplunkLogger')
        splunk_logger.addHandler(splunk_handler)
        splunk_logger.setLevel(logging.INFO)
        
        return splunk_logger
    
    def _setup_custom_aggregation_logger(self):
        """カスタムログ集約システム向け設定"""
        
        custom_formatter = jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
            static_fields={
                "schema_version": "2.0",
                "application": "web-api",
                "team": "backend",
                "criticality": "high"
            },
            required_fields=["timestamp", "level", "logger", "message", "correlation_id"],
            defaults={"correlation_id": "unknown", "trace_id": "N/A"}
        )
        
        custom_handler = logging.FileHandler('logs/custom-aggregation.log', encoding='utf-8')
        custom_handler.setFormatter(custom_formatter)
        
        custom_logger = logging.getLogger('CustomAggregation')
        custom_logger.addHandler(custom_handler)
        custom_logger.setLevel(logging.DEBUG)
        
        return custom_logger
    
    def log_application_event(self, event_type: str, event_data: Dict[str, Any]):
        """各種ログ集約システムへの統一イベント送信"""
        
        # 共通のイベントデータ
        common_data = {
            "event_type": event_type,
            "timestamp": time.time(),
            "correlation_id": f"corr_{int(time.time() * 1000)}",
            **event_data
        }
        
        # ELK Stackへの送信
        self.elk_logger.info(f"Application Event: {event_type}", extra={
            **common_data,
            "elastic_compatible": True
        })
        
        # Fluentdへの送信
        self.fluentd_logger.info(f"Event: {event_type}", extra={
            **common_data,
            "fluentd_tag": f"app.event.{event_type}"
        })
        
        # Splunkへの送信
        self.splunk_logger.info(f"{event_type} Event", extra={
            **common_data,
            "splunk_index": "app-events"
        })
        
        # カスタム集約システムへの送信
        self.custom_logger.info(f"EVENT: {event_type}", extra={
            **common_data,
            "trace_id": f"trace_{int(time.time())}"
        })

class RealTimeLogProcessor:
    """リアルタイムログ処理とアラート"""
    
    def __init__(self):
        self.alert_queue = queue.Queue()
        self.processor_thread = threading.Thread(target=self._process_alerts)
        self.processor_thread.daemon = True
        self.processor_thread.start()
        
        self.setup_alert_logger()
    
    def setup_alert_logger(self):
        """アラート用ロガーの設定"""
        
        alert_formatter = jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(levelname)s %(message)s',
            static_fields={
                "alert": True,
                "notification_required": True,
                "priority": "high"
            },
            required_fields=["timestamp", "level", "message", "alert_type", "severity"]
        )
        
        # アラート専用ハンドラー
        alert_handler = logging.FileHandler('logs/alerts.log', encoding='utf-8')
        alert_handler.setFormatter(alert_formatter)
        
        self.alert_logger = logging.getLogger('AlertSystem')
        self.alert_logger.addHandler(alert_handler)
        self.alert_logger.setLevel(logging.WARNING)
    
    def send_alert(self, alert_type: str, severity: str, message: str, details: Dict[str, Any]):
        """アラートの送信"""
        
        alert_data = {
            "alert_type": alert_type,
            "severity": severity,
            "details": details,
            "requires_action": severity in ["critical", "high"],
            "alert_id": f"alert_{int(time.time() * 1000)}"
        }
        
        # アラートログの記録
        if severity == "critical":
            self.alert_logger.critical(message, extra=alert_data)
        elif severity == "high":
            self.alert_logger.error(message, extra=alert_data)
        else:
            self.alert_logger.warning(message, extra=alert_data)
        
        # リアルタイム処理キューに追加
        self.alert_queue.put({
            "message": message,
            "data": alert_data,
            "timestamp": time.time()
        })
    
    def _process_alerts(self):
        """アラートのリアルタイム処理"""
        while True:
            try:
                alert = self.alert_queue.get(timeout=1)
                self._handle_alert(alert)
                self.alert_queue.task_done()
            except queue.Empty:
                continue
            except Exception as e:
                # エラーハンドリング
                self.alert_logger.error("アラート処理エラー", extra={
                    "error": str(e),
                    "alert_type": "system_error"
                })
    
    def _handle_alert(self, alert):
        """個別アラートの処理"""
        
        severity = alert["data"]["severity"]
        
        # 重大度に応じた処理
        if severity == "critical":
            self._send_immediate_notification(alert)
        elif severity == "high":
            self._escalate_to_team(alert)
        else:
            self._log_for_review(alert)
    
    def _send_immediate_notification(self, alert):
        """即座の通知送信"""
        # 実際の実装では、Slack、メール、SMS等に送信
        print(f"🚨 CRITICAL ALERT: {alert['message']}")
    
    def _escalate_to_team(self, alert):
        """チームエスカレーション"""
        print(f"⚠️  HIGH PRIORITY: {alert['message']}")
    
    def _log_for_review(self, alert):
        """レビュー用ログ"""
        print(f"📝 REVIEW REQUIRED: {alert['message']}")

# システム統合の使用例
def demonstrate_system_integration():
    # ログ集約統合の初期化
    aggregation = LogAggregationIntegration()
    
    # リアルタイム処理の初期化
    alert_processor = RealTimeLogProcessor()
    
    # 様々なアプリケーションイベント
    events = [
        ("user_login", {"user_id": 12345, "method": "oauth", "success": True}),
        ("payment_processed", {"amount": 150.00, "currency": "USD", "payment_id": "pay_123"}),
        ("api_error", {"endpoint": "/api/users", "status_code": 500, "error": "database_timeout"}),
        ("security_violation", {"ip": "192.168.1.100", "attempt": "unauthorized_access"}),
        ("performance_degradation", {"response_time": 5000, "threshold": 3000, "endpoint": "/api/search"})
    ]
    
    # イベントログの送信
    for event_type, event_data in events:
        aggregation.log_application_event(event_type, event_data)
        
        # 重要なイベントはアラートも送信
        if event_type in ["api_error", "security_violation", "performance_degradation"]:
            severity_map = {
                "api_error": "high",
                "security_violation": "critical", 
                "performance_degradation": "medium"
            }
            
            alert_processor.send_alert(
                alert_type=event_type,
                severity=severity_map[event_type],
                message=f"{event_type.replace('_', ' ').title()} detected",
                details=event_data
            )
    
    # しばらく待ってアラート処理を確認
    time.sleep(2)
    
    print("ログ集約とアラートシステムの動作完了")

if __name__ == "__main__":
    demonstrate_system_integration()