python-json-logger
Python標準ライブラリloggingのJSON出力フォーマッター。ログエントリを構造化JSON形式で出力し、ログ集約システムとの統合を容易にする。標準ライブラリとの完全互換性を保ちながら、現代的なログ管理インフラに対応。
GitHub概要
madzak/python-json-logger
Json Formatter for the standard python logger
スター1,765
ウォッチ26
フォーク231
作成日:2011年12月27日
言語:Python
ライセンス:BSD 2-Clause "Simplified" License
トピックス
loggingpython
スター履歴
データ取得日時: 2025/7/17 23:24
ライブラリ
python-json-logger
概要
python-json-logger(pythonjsonlogger)は、Python標準ライブラリのloggingパッケージと完全に互換性を保ちながら、JSON形式でのログ出力を可能にする軽量で実用的なライブラリです。既存のログ設定を変更することなく導入でき、機械可読なJSON形式でのログ出力によりログ集約ツールでの解析と取り込みを劇的に簡素化します。2025年版では月間4600万回以上のダウンロードを誇る、Python JSON ログ分野の最も信頼され実績のあるソリューションです。
詳細
python-json-logger 2025年版は、Python標準ライブラリとの完全な互換性を保ちながら、JSON出力に特化した最適化を実現しています。json、orjson、msgspecなど複数のJSONエンコーダーをサポートし、パフォーマンス要件に応じた選択が可能。完全にカスタマイズ可能な出力フィールド、必須・除外・静的フィールドの制御、LogRecordオブジェクトの自動カスタム属性取得機能を提供。UUID、bytes、Enumなど通常はJSON化困難な型も自動的に適切な形式に変換し、どんな入力でも安全にログ出力できる堅牢性を備えています。
主な特徴
- 標準ライブラリ完全互換: 既存ログ設定を変更せずに導入可能
- 複数JSONエンコーダー対応: json、orjson、msgspecを用途に応じて選択
- 完全カスタマイズ可能フィールド: 必須・除外・静的フィールドの細かい制御
- 任意型エンコーディング: UUID、bytes、Enumなど複雑な型も自動変換
- 高い安全性: どんな入力でも適切にログ出力できる堅牢な設計
- 軽量・高性能: 最小限のオーバーヘッドでの高速JSON出力
メリット・デメリット
メリット
- Python標準ライブラリとの完璧な互換性で学習コストが最小
- 既存のログ設定にフォーマッターを変更するだけで即座にJSON化
- ログ集約システム(ELK Stack、Splunk等)との優れた統合性
- 複数JSONエンコーダー対応でパフォーマンス要件に柔軟対応
- 月間4600万ダウンロードの実績による高い信頼性
- 軽量でシンプルな実装による保守性の高さ
デメリット
- 基本的なJSON出力に特化しており高度な構造化機能は限定的
- ログメッセージの動的な変換や複雑なプロセッシングには不向き
- structlogのような豊富なプロセッサーチェーン機能はなし
- 非JSON形式での美しい出力(開発時の可読性)機能が限定的
- 大規模な構造化ログシステムでは機能が物足りない場合がある
- カスタムフィールド制御は設定に依存し、動的な制御が困難
参考ページ
書き方の例
基本的なインストールと設定
# インストール
# pip install python-json-logger
import logging
from pythonjsonlogger import jsonlogger
# 基本的なJSONフォーマッター設定
json_handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
json_handler.setFormatter(formatter)
# ロガーの設定
logger = logging.getLogger()
logger.addHandler(json_handler)
logger.setLevel(logging.INFO)
# 基本的なログ出力
logger.info("アプリケーション開始")
logger.debug("デバッグ情報", extra={"user_id": 12345})
logger.warning("警告メッセージ", extra={"error_code": "W001"})
logger.error("エラーが発生", extra={"exception_type": "ValueError"})
# 構造化データでの詳細ログ
logger.info("ユーザーログイン", extra={
"user_id": 67890,
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0...",
"session_id": "sess_abc123"
})
# 例外情報付きログ
try:
result = 10 / 0
except ZeroDivisionError as e:
logger.exception("計算エラー", extra={
"operation": "division",
"dividend": 10,
"divisor": 0,
"error_details": str(e)
})
カスタムフォーマッターと出力フィールド制御
import logging
from pythonjsonlogger import jsonlogger
import uuid
from datetime import datetime
from enum import Enum
class UserRole(Enum):
ADMIN = "admin"
USER = "user"
GUEST = "guest"
def setup_custom_json_logging():
"""カスタマイズされたJSONログ設定"""
# カスタムフォーマット文字列
custom_format = '%(asctime)s %(name)s %(levelname)s %(message)s'
# 高度なカスタマイズオプション
custom_formatter = jsonlogger.JsonFormatter(
fmt=custom_format,
# 必須フィールド(常に出力)
required_fields=["timestamp", "level", "logger", "message"],
# 除外フィールド(出力しない)
exclude_fields=["module", "funcName"],
# 静的フィールド(固定値を追加)
static_fields={"service": "user-api", "version": "1.0.0"},
# フィールド名のリネーム
rename_fields={"levelname": "level", "name": "logger"},
# デフォルト値(存在しない場合に設定)
defaults={"environment": "production", "region": "us-west-2"}
)
# ハンドラーの設定
handler = logging.StreamHandler()
handler.setFormatter(custom_formatter)
# ロガーの設定
logger = logging.getLogger("CustomApp")
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger
# カスタムロガーの使用例
custom_logger = setup_custom_json_logging()
class UserService:
def __init__(self):
self.logger = custom_logger
def create_user(self, user_data):
user_id = uuid.uuid4()
creation_time = datetime.now()
user_role = UserRole.ADMIN
self.logger.info("ユーザー作成開始", extra={
"user_id": user_id, # UUID型(自動的に文字列変換)
"creation_time": creation_time, # datetime型(ISO形式文字列に変換)
"user_role": user_role, # Enum型(値が自動抽出)
"user_data": user_data,
"request_size": len(str(user_data).encode()), # bytes計算
"metadata": {
"source": "api",
"client_version": "2.1.0"
}
})
try:
# ユーザー作成処理のシミュレーション
processed_data = self._process_user_data(user_data, user_id)
self.logger.info("ユーザー作成完了", extra={
"user_id": user_id,
"email": user_data.get("email"),
"role": user_role,
"processing_time_ms": 150,
"created_fields": list(processed_data.keys())
})
return str(user_id)
except Exception as e:
self.logger.error("ユーザー作成エラー", extra={
"user_id": user_id,
"error_type": type(e).__name__,
"error_message": str(e),
"input_data": user_data,
"stack_trace": True # Tracebackが自動的に含まれる
}, exc_info=True)
raise
def _process_user_data(self, user_data, user_id):
# 処理シミュレーション
return {
"id": user_id,
"processed_at": datetime.now(),
**user_data
}
# 使用例
service = UserService()
service.create_user({
"email": "[email protected]",
"name": "田中太郎",
"age": 30,
"preferences": {"theme": "dark", "language": "ja"}
})
複数のJSONエンコーダーとパフォーマンス最適化
import logging
from pythonjsonlogger import jsonlogger
import time
import json
from typing import Any, Dict
def setup_performance_optimized_logging():
"""パフォーマンス最適化されたJSONログ設定"""
# 高速JSONエンコーダーの選択
encoders = []
# orjsonエンコーダー(最高性能)
try:
import orjson
encoders.append(("orjson", jsonlogger.JsonFormatter(
json_encoder=orjson.dumps,
json_default=lambda obj: str(obj), # fallback for non-serializable objects
)))
except ImportError:
pass
# msgspecエンコーダー(高性能)
try:
import msgspec
msgspec_encoder = msgspec.json.Encoder()
encoders.append(("msgspec", jsonlogger.JsonFormatter(
json_encoder=msgspec_encoder.encode,
json_default=lambda obj: str(obj),
)))
except ImportError:
pass
# 標準jsonエンコーダー(フォールバック)
encoders.append(("json", jsonlogger.JsonFormatter(
json_encoder=json.dumps,
json_default=str,
ensure_ascii=False # 日本語対応
)))
# 最も高性能なエンコーダーを選択
encoder_name, best_formatter = encoders[0]
# ログハンドラーの設定
handler = logging.StreamHandler()
handler.setFormatter(best_formatter)
logger = logging.getLogger("PerformanceApp")
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# 使用エンコーダーをログ出力
logger.info(f"JSONエンコーダー設定完了", extra={
"selected_encoder": encoder_name,
"available_encoders": [name for name, _ in encoders]
})
return logger, encoder_name
class PerformanceBenchmark:
def __init__(self):
self.logger, self.encoder = setup_performance_optimized_logging()
def benchmark_logging_performance(self, iterations=10000):
"""ログ出力パフォーマンスベンチマーク"""
# テストデータの準備
test_data = {
"iteration": 0,
"timestamp": time.time(),
"user_data": {
"id": 12345,
"name": "パフォーマンステスト",
"metadata": {
"tags": ["test", "performance", "json"],
"scores": [95, 87, 92, 88, 91]
}
},
"request_info": {
"method": "POST",
"path": "/api/benchmark",
"headers": {"Content-Type": "application/json"},
"size": 1024
}
}
# ベンチマーク実行
start_time = time.perf_counter()
for i in range(iterations):
test_data["iteration"] = i
test_data["timestamp"] = time.time()
if i % 1000 == 0:
# 進捗ログ(詳細データ付き)
self.logger.info("ベンチマーク進捗", extra={
**test_data,
"progress_percent": round((i / iterations) * 100, 1),
"encoder": self.encoder
})
else:
# 通常ログ(軽量)
self.logger.debug("ベンチマーク実行", extra={
"iteration": i,
"encoder": self.encoder
})
end_time = time.perf_counter()
duration = end_time - start_time
# 結果ログ
self.logger.info("ベンチマーク完了", extra={
"total_iterations": iterations,
"duration_seconds": round(duration, 3),
"logs_per_second": round(iterations / duration, 0),
"encoder": self.encoder,
"average_time_per_log_ms": round((duration / iterations) * 1000, 4)
})
return duration
def compare_json_encoders(self):
"""複数JSONエンコーダーの性能比較"""
test_data = {
"complex_data": {
"nested": {"deeply": {"nested": {"data": "test"}}},
"list": list(range(100)),
"unicode": "日本語テストデータ🚀",
"numbers": [1.23456789, 987654321, 0.000001]
}
}
encoders_config = [
("standard_json", json.dumps),
("compact_json", lambda obj: json.dumps(obj, separators=(',', ':'))),
("ensure_ascii_false", lambda obj: json.dumps(obj, ensure_ascii=False)),
]
# orjsonが利用可能な場合
try:
import orjson
encoders_config.append(("orjson", orjson.dumps))
except ImportError:
pass
# msgspecが利用可能な場合
try:
import msgspec
msgspec_encoder = msgspec.json.Encoder()
encoders_config.append(("msgspec", msgspec_encoder.encode))
except ImportError:
pass
results = {}
iterations = 5000
for encoder_name, encoder_func in encoders_config:
start_time = time.perf_counter()
for _ in range(iterations):
try:
encoded = encoder_func(test_data)
except Exception as e:
self.logger.error(f"エンコーダーエラー: {encoder_name}", extra={
"encoder": encoder_name,
"error": str(e)
})
continue
duration = time.perf_counter() - start_time
results[encoder_name] = {
"duration": round(duration, 4),
"ops_per_second": round(iterations / duration, 0)
}
# 比較結果ログ
self.logger.info("JSONエンコーダー性能比較", extra={
"test_iterations": iterations,
"results": results,
"fastest_encoder": min(results.keys(), key=lambda k: results[k]["duration"]),
"test_data_complexity": "nested_objects_arrays_unicode"
})
return results
# パフォーマンステストの実行
benchmark = PerformanceBenchmark()
# ログ出力性能テスト
print("=== ログ出力パフォーマンステスト ===")
duration = benchmark.benchmark_logging_performance(1000)
print(f"完了時間: {duration:.3f}秒")
# JSONエンコーダー比較
print("\n=== JSONエンコーダー性能比較 ===")
encoder_results = benchmark.compare_json_encoders()
for encoder, result in encoder_results.items():
print(f"{encoder}: {result['ops_per_second']} ops/sec")
ファイル出力と複数ハンドラー統合
import logging
from pythonjsonlogger import jsonlogger
import logging.handlers
import os
from datetime import datetime
import gzip
import shutil
def setup_comprehensive_file_logging():
"""包括的なファイルログ設定"""
# ログディレクトリの作成
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 複数のJSONフォーマッター
formatters = {
# 詳細ログ用(全フィールド)
"detailed": jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(lineno)d %(message)s',
static_fields={"service": "web-api", "environment": "production"},
rename_fields={"levelname": "level", "name": "logger", "funcName": "function", "lineno": "line"}
),
# 簡潔ログ用(必要最小限)
"compact": jsonlogger.JsonFormatter(
fmt='%(asctime)s %(levelname)s %(message)s',
exclude_fields=["module", "pathname", "filename"],
static_fields={"service": "web-api"}
),
# エラー専用(例外情報重視)
"error_focused": jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(message)s',
required_fields=["timestamp", "logger", "level", "function", "message", "exc_info"],
static_fields={"alert": True, "requires_attention": True}
)
}
# メインロガーの設定
main_logger = logging.getLogger("MainApp")
main_logger.setLevel(logging.DEBUG)
# 1. コンソール出力(開発用)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatters["compact"])
console_handler.setLevel(logging.INFO)
main_logger.addHandler(console_handler)
# 2. 日別ローテーションファイル(詳細ログ)
today = datetime.now().strftime('%Y-%m-%d')
daily_handler = logging.FileHandler(
f"{log_dir}/app-{today}.log",
encoding='utf-8'
)
daily_handler.setFormatter(formatters["detailed"])
daily_handler.setLevel(logging.DEBUG)
main_logger.addHandler(daily_handler)
# 3. サイズベースローテーション(一般ログ)
rotating_handler = logging.handlers.RotatingFileHandler(
f"{log_dir}/app-rotating.log",
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
rotating_handler.setFormatter(formatters["compact"])
rotating_handler.setLevel(logging.INFO)
main_logger.addHandler(rotating_handler)
# 4. エラー専用ファイル
error_handler = logging.FileHandler(
f"{log_dir}/errors.log",
encoding='utf-8'
)
error_handler.setFormatter(formatters["error_focused"])
error_handler.setLevel(logging.ERROR)
main_logger.addHandler(error_handler)
# 5. 時間ベースローテーション(本番運用向け)
timed_handler = logging.handlers.TimedRotatingFileHandler(
f"{log_dir}/app-timed.log",
when='midnight',
interval=1,
backupCount=30, # 30日保持
encoding='utf-8'
)
timed_handler.setFormatter(formatters["detailed"])
timed_handler.setLevel(logging.INFO)
main_logger.addHandler(timed_handler)
return main_logger
class FileLoggingApplication:
def __init__(self):
self.logger = setup_comprehensive_file_logging()
self.logger.info("アプリケーション初期化完了", extra={
"startup_time": datetime.now().isoformat(),
"log_handlers": len(self.logger.handlers),
"config": "comprehensive_file_logging"
})
def simulate_application_operations(self):
"""アプリケーション動作のシミュレーション"""
# 正常な操作ログ
self.logger.info("ユーザーセッション開始", extra={
"user_id": 12345,
"session_id": "sess_abc123",
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
})
# デバッグ情報(詳細ログファイルのみ)
self.logger.debug("データベース接続確立", extra={
"connection_pool": "primary",
"connection_count": 5,
"max_connections": 20,
"database": "userdb"
})
# 警告ログ
self.logger.warning("API制限に近づいています", extra={
"current_requests": 980,
"limit": 1000,
"time_window": "1時間",
"user_id": 12345
})
# エラーログ(エラー専用ファイルにも出力)
try:
# 意図的なエラー
raise ValueError("無効な入力データです")
except ValueError as e:
self.logger.error("入力データ検証エラー", extra={
"error_type": "validation",
"input_data": {"field1": "invalid_value"},
"validation_rules": ["required", "format", "length"],
"user_id": 12345
}, exc_info=True)
# 重要な情報ログ
self.logger.info("データ処理完了", extra={
"processed_records": 1500,
"processing_time_seconds": 45.6,
"success_rate": 98.7,
"failed_records": 19,
"batch_id": "batch_20250624_001"
})
def demonstrate_different_log_levels(self):
"""各ログレベルでの出力例"""
operations = [
(logging.DEBUG, "詳細デバッグ情報", {"debug_level": "verbose"}),
(logging.INFO, "一般情報", {"operation": "normal"}),
(logging.WARNING, "注意が必要", {"warning_type": "performance"}),
(logging.ERROR, "エラーが発生", {"error_severity": "medium"}),
(logging.CRITICAL, "重大なエラー", {"alert_level": "critical"})
]
for level, message, extra_data in operations:
self.logger.log(level, message, extra={
**extra_data,
"demonstration": True,
"timestamp": datetime.now().isoformat()
})
def compress_old_logs(self):
"""古いログファイルの圧縮"""
log_dir = "logs"
for filename in os.listdir(log_dir):
if filename.endswith('.log') and 'rotating' not in filename:
file_path = os.path.join(log_dir, filename)
# ファイルサイズチェック(1MB以上で圧縮)
if os.path.getsize(file_path) > 1024*1024:
compressed_path = f"{file_path}.gz"
with open(file_path, 'rb') as f_in:
with gzip.open(compressed_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
self.logger.info("ログファイル圧縮完了", extra={
"original_file": filename,
"compressed_file": f"{filename}.gz",
"original_size_mb": round(os.path.getsize(file_path) / 1024 / 1024, 2),
"compressed_size_mb": round(os.path.getsize(compressed_path) / 1024 / 1024, 2)
})
# ファイルロギングアプリケーションの実行例
app = FileLoggingApplication()
print("=== アプリケーション動作シミュレーション ===")
app.simulate_application_operations()
print("\n=== 各ログレベル出力例 ===")
app.demonstrate_different_log_levels()
print("\n=== ログファイル圧縮処理 ===")
app.compress_old_logs()
print("\n=== ログファイル確認 ===")
log_files = os.listdir("logs")
for log_file in sorted(log_files):
file_path = os.path.join("logs", log_file)
size_kb = round(os.path.getsize(file_path) / 1024, 1)
print(f"{log_file}: {size_kb} KB")
システム統合とログ集約システム連携
import logging
from pythonjsonlogger import jsonlogger
import socket
import json
import requests
import time
from typing import Dict, Any
import threading
import queue
class LogAggregationIntegration:
"""ログ集約システムとの統合"""
def __init__(self):
self.setup_aggregation_loggers()
def setup_aggregation_loggers(self):
"""各種ログ集約システム向けの設定"""
# ELK Stack向け設定
self.elk_logger = self._setup_elk_logger()
# Fluentd向け設定
self.fluentd_logger = self._setup_fluentd_logger()
# Splunk向け設定
self.splunk_logger = self._setup_splunk_logger()
# カスタムログ集約向け設定
self.custom_logger = self._setup_custom_aggregation_logger()
def _setup_elk_logger(self):
"""ELK Stack向けの最適化設定"""
# Elasticsearchに最適化されたフォーマット
elk_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
static_fields={
"@version": "1",
"service": "web-api",
"environment": "production",
"host": socket.gethostname()
},
rename_fields={
"asctime": "@timestamp",
"levelname": "level",
"name": "logger"
},
# Elasticsearchのマッピングに最適化
json_default=str,
json_encoder=json.dumps
)
# ファイルハンドラー(Filebeatで収集される)
elk_handler = logging.FileHandler('logs/elk-formatted.log', encoding='utf-8')
elk_handler.setFormatter(elk_formatter)
elk_logger = logging.getLogger('ELKLogger')
elk_logger.addHandler(elk_handler)
elk_logger.setLevel(logging.INFO)
return elk_logger
def _setup_fluentd_logger(self):
"""Fluentd向けの設定"""
fluentd_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
static_fields={
"tag": "web-api.application",
"source": "python-application",
"datacenter": "us-west-2"
},
# FluentdのJSONパーサーに最適化
json_default=lambda obj: str(obj),
ensure_ascii=False
)
# Fluentdの監視ディレクトリに出力
fluentd_handler = logging.FileHandler('logs/fluentd.log', encoding='utf-8')
fluentd_handler.setFormatter(fluentd_formatter)
fluentd_logger = logging.getLogger('FluentdLogger')
fluentd_logger.addHandler(fluentd_handler)
fluentd_logger.setLevel(logging.INFO)
return fluentd_logger
def _setup_splunk_logger(self):
"""Splunk向けの設定"""
splunk_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(message)s',
static_fields={
"index": "application-logs",
"sourcetype": "python:json",
"source": "web-api"
},
rename_fields={
"asctime": "time",
"levelname": "severity",
"name": "component",
"funcName": "function"
}
)
splunk_handler = logging.FileHandler('logs/splunk.log', encoding='utf-8')
splunk_handler.setFormatter(splunk_formatter)
splunk_logger = logging.getLogger('SplunkLogger')
splunk_logger.addHandler(splunk_handler)
splunk_logger.setLevel(logging.INFO)
return splunk_logger
def _setup_custom_aggregation_logger(self):
"""カスタムログ集約システム向け設定"""
custom_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
static_fields={
"schema_version": "2.0",
"application": "web-api",
"team": "backend",
"criticality": "high"
},
required_fields=["timestamp", "level", "logger", "message", "correlation_id"],
defaults={"correlation_id": "unknown", "trace_id": "N/A"}
)
custom_handler = logging.FileHandler('logs/custom-aggregation.log', encoding='utf-8')
custom_handler.setFormatter(custom_formatter)
custom_logger = logging.getLogger('CustomAggregation')
custom_logger.addHandler(custom_handler)
custom_logger.setLevel(logging.DEBUG)
return custom_logger
def log_application_event(self, event_type: str, event_data: Dict[str, Any]):
"""各種ログ集約システムへの統一イベント送信"""
# 共通のイベントデータ
common_data = {
"event_type": event_type,
"timestamp": time.time(),
"correlation_id": f"corr_{int(time.time() * 1000)}",
**event_data
}
# ELK Stackへの送信
self.elk_logger.info(f"Application Event: {event_type}", extra={
**common_data,
"elastic_compatible": True
})
# Fluentdへの送信
self.fluentd_logger.info(f"Event: {event_type}", extra={
**common_data,
"fluentd_tag": f"app.event.{event_type}"
})
# Splunkへの送信
self.splunk_logger.info(f"{event_type} Event", extra={
**common_data,
"splunk_index": "app-events"
})
# カスタム集約システムへの送信
self.custom_logger.info(f"EVENT: {event_type}", extra={
**common_data,
"trace_id": f"trace_{int(time.time())}"
})
class RealTimeLogProcessor:
"""リアルタイムログ処理とアラート"""
def __init__(self):
self.alert_queue = queue.Queue()
self.processor_thread = threading.Thread(target=self._process_alerts)
self.processor_thread.daemon = True
self.processor_thread.start()
self.setup_alert_logger()
def setup_alert_logger(self):
"""アラート用ロガーの設定"""
alert_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(levelname)s %(message)s',
static_fields={
"alert": True,
"notification_required": True,
"priority": "high"
},
required_fields=["timestamp", "level", "message", "alert_type", "severity"]
)
# アラート専用ハンドラー
alert_handler = logging.FileHandler('logs/alerts.log', encoding='utf-8')
alert_handler.setFormatter(alert_formatter)
self.alert_logger = logging.getLogger('AlertSystem')
self.alert_logger.addHandler(alert_handler)
self.alert_logger.setLevel(logging.WARNING)
def send_alert(self, alert_type: str, severity: str, message: str, details: Dict[str, Any]):
"""アラートの送信"""
alert_data = {
"alert_type": alert_type,
"severity": severity,
"details": details,
"requires_action": severity in ["critical", "high"],
"alert_id": f"alert_{int(time.time() * 1000)}"
}
# アラートログの記録
if severity == "critical":
self.alert_logger.critical(message, extra=alert_data)
elif severity == "high":
self.alert_logger.error(message, extra=alert_data)
else:
self.alert_logger.warning(message, extra=alert_data)
# リアルタイム処理キューに追加
self.alert_queue.put({
"message": message,
"data": alert_data,
"timestamp": time.time()
})
def _process_alerts(self):
"""アラートのリアルタイム処理"""
while True:
try:
alert = self.alert_queue.get(timeout=1)
self._handle_alert(alert)
self.alert_queue.task_done()
except queue.Empty:
continue
except Exception as e:
# エラーハンドリング
self.alert_logger.error("アラート処理エラー", extra={
"error": str(e),
"alert_type": "system_error"
})
def _handle_alert(self, alert):
"""個別アラートの処理"""
severity = alert["data"]["severity"]
# 重大度に応じた処理
if severity == "critical":
self._send_immediate_notification(alert)
elif severity == "high":
self._escalate_to_team(alert)
else:
self._log_for_review(alert)
def _send_immediate_notification(self, alert):
"""即座の通知送信"""
# 実際の実装では、Slack、メール、SMS等に送信
print(f"🚨 CRITICAL ALERT: {alert['message']}")
def _escalate_to_team(self, alert):
"""チームエスカレーション"""
print(f"⚠️ HIGH PRIORITY: {alert['message']}")
def _log_for_review(self, alert):
"""レビュー用ログ"""
print(f"📝 REVIEW REQUIRED: {alert['message']}")
# システム統合の使用例
def demonstrate_system_integration():
# ログ集約統合の初期化
aggregation = LogAggregationIntegration()
# リアルタイム処理の初期化
alert_processor = RealTimeLogProcessor()
# 様々なアプリケーションイベント
events = [
("user_login", {"user_id": 12345, "method": "oauth", "success": True}),
("payment_processed", {"amount": 150.00, "currency": "USD", "payment_id": "pay_123"}),
("api_error", {"endpoint": "/api/users", "status_code": 500, "error": "database_timeout"}),
("security_violation", {"ip": "192.168.1.100", "attempt": "unauthorized_access"}),
("performance_degradation", {"response_time": 5000, "threshold": 3000, "endpoint": "/api/search"})
]
# イベントログの送信
for event_type, event_data in events:
aggregation.log_application_event(event_type, event_data)
# 重要なイベントはアラートも送信
if event_type in ["api_error", "security_violation", "performance_degradation"]:
severity_map = {
"api_error": "high",
"security_violation": "critical",
"performance_degradation": "medium"
}
alert_processor.send_alert(
alert_type=event_type,
severity=severity_map[event_type],
message=f"{event_type.replace('_', ' ').title()} detected",
details=event_data
)
# しばらく待ってアラート処理を確認
time.sleep(2)
print("ログ集約とアラートシステムの動作完了")
if __name__ == "__main__":
demonstrate_system_integration()