python-json-logger
JSON output formatter for Python's standard library logging. Outputs log entries in structured JSON format, facilitating integration with log aggregation systems. Supports modern log management infrastructure while maintaining full compatibility with standard library.
GitHub Overview
madzak/python-json-logger
Json Formatter for the standard python logger
Topics
Star History
Library
python-json-logger
Overview
python-json-logger (pythonjsonlogger) is a lightweight and practical library that enables JSON format log output while maintaining complete compatibility with Python's standard library logging package. It can be introduced without modifying existing log configurations, and dramatically simplifies analysis and ingestion in log aggregation tools through machine-readable JSON format log output. The 2025 version boasts over 46 million downloads per month, making it the most trusted and proven solution in the Python JSON logging field.
Details
The 2025 version of python-json-logger achieves optimization specifically for JSON output while maintaining complete compatibility with the Python standard library. It supports multiple JSON encoders including json, orjson, and msgspec, allowing selection based on performance requirements. Provides fully customizable output fields, control of required/excluded/static fields, and automatic custom attribute retrieval from LogRecord objects. Features robustness that can safely log any input by automatically converting typically JSON-difficult types like UUID, bytes, and Enum into appropriate formats.
Key Features
- Complete Standard Library Compatibility: Can be introduced without modifying existing log configurations
- Multiple JSON Encoder Support: Choose between json, orjson, msgspec based on use case
- Fully Customizable Fields: Fine control over required/excluded/static fields
- Arbitrary Type Encoding: Automatic conversion of complex types like UUID, bytes, Enum
- High Safety: Robust design that can appropriately log any input
- Lightweight & High Performance: High-speed JSON output with minimal overhead
Pros and Cons
Pros
- Perfect compatibility with Python standard library minimizes learning cost
- Immediate JSON conversion by simply changing formatter in existing log configuration
- Excellent integration with log aggregation systems (ELK Stack, Splunk, etc.)
- Flexible performance response with multiple JSON encoder support
- High reliability with 46 million monthly downloads track record
- High maintainability through lightweight and simple implementation
Cons
- Specialized for basic JSON output with limited advanced structuring features
- Not suitable for dynamic log message transformation or complex processing
- Lacks rich processor chain functionality like structlog
- Limited beautiful output features for non-JSON formats (development readability)
- May be insufficient for large-scale structured logging systems
- Custom field control depends on configuration, making dynamic control difficult
Reference Pages
- python-json-logger Official Documentation
- python-json-logger GitHub Repository
- python-json-logger PyPI
Usage Examples
Basic Installation and Configuration
# Installation
# pip install python-json-logger
import logging
from pythonjsonlogger import jsonlogger
# Basic JSON formatter configuration
json_handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
json_handler.setFormatter(formatter)
# Logger configuration
logger = logging.getLogger()
logger.addHandler(json_handler)
logger.setLevel(logging.INFO)
# Basic log output
logger.info("Application started")
logger.debug("Debug information", extra={"user_id": 12345})
logger.warning("Warning message", extra={"error_code": "W001"})
logger.error("Error occurred", extra={"exception_type": "ValueError"})
# Detailed logging with structured data
logger.info("User login", extra={
"user_id": 67890,
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0...",
"session_id": "sess_abc123"
})
# Log with exception information
try:
result = 10 / 0
except ZeroDivisionError as e:
logger.exception("Calculation error", extra={
"operation": "division",
"dividend": 10,
"divisor": 0,
"error_details": str(e)
})
Custom Formatter and Output Field Control
import logging
from pythonjsonlogger import jsonlogger
import uuid
from datetime import datetime
from enum import Enum
class UserRole(Enum):
ADMIN = "admin"
USER = "user"
GUEST = "guest"
def setup_custom_json_logging():
"""Customized JSON log configuration"""
# Custom format string
custom_format = '%(asctime)s %(name)s %(levelname)s %(message)s'
# Advanced customization options
custom_formatter = jsonlogger.JsonFormatter(
fmt=custom_format,
# Required fields (always output)
required_fields=["timestamp", "level", "logger", "message"],
# Excluded fields (not output)
exclude_fields=["module", "funcName"],
# Static fields (add fixed values)
static_fields={"service": "user-api", "version": "1.0.0"},
# Field renaming
rename_fields={"levelname": "level", "name": "logger"},
# Default values (set if not exists)
defaults={"environment": "production", "region": "us-west-2"}
)
# Handler configuration
handler = logging.StreamHandler()
handler.setFormatter(custom_formatter)
# Logger configuration
logger = logging.getLogger("CustomApp")
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger
# Custom logger usage example
custom_logger = setup_custom_json_logging()
class UserService:
def __init__(self):
self.logger = custom_logger
def create_user(self, user_data):
user_id = uuid.uuid4()
creation_time = datetime.now()
user_role = UserRole.ADMIN
self.logger.info("User creation started", extra={
"user_id": user_id, # UUID type (automatically converted to string)
"creation_time": creation_time, # datetime type (converted to ISO format string)
"user_role": user_role, # Enum type (value automatically extracted)
"user_data": user_data,
"request_size": len(str(user_data).encode()), # bytes calculation
"metadata": {
"source": "api",
"client_version": "2.1.0"
}
})
try:
# User creation processing simulation
processed_data = self._process_user_data(user_data, user_id)
self.logger.info("User creation completed", extra={
"user_id": user_id,
"email": user_data.get("email"),
"role": user_role,
"processing_time_ms": 150,
"created_fields": list(processed_data.keys())
})
return str(user_id)
except Exception as e:
self.logger.error("User creation error", extra={
"user_id": user_id,
"error_type": type(e).__name__,
"error_message": str(e),
"input_data": user_data,
"stack_trace": True # Traceback automatically included
}, exc_info=True)
raise
def _process_user_data(self, user_data, user_id):
# Processing simulation
return {
"id": user_id,
"processed_at": datetime.now(),
**user_data
}
# Usage example
service = UserService()
service.create_user({
"email": "[email protected]",
"name": "John Doe",
"age": 30,
"preferences": {"theme": "dark", "language": "en"}
})
Multiple JSON Encoders and Performance Optimization
import logging
from pythonjsonlogger import jsonlogger
import time
import json
from typing import Any, Dict
def setup_performance_optimized_logging():
"""Performance-optimized JSON log configuration"""
# Fast JSON encoder selection
encoders = []
# orjson encoder (highest performance)
try:
import orjson
encoders.append(("orjson", jsonlogger.JsonFormatter(
json_encoder=orjson.dumps,
json_default=lambda obj: str(obj), # fallback for non-serializable objects
)))
except ImportError:
pass
# msgspec encoder (high performance)
try:
import msgspec
msgspec_encoder = msgspec.json.Encoder()
encoders.append(("msgspec", jsonlogger.JsonFormatter(
json_encoder=msgspec_encoder.encode,
json_default=lambda obj: str(obj),
)))
except ImportError:
pass
# Standard json encoder (fallback)
encoders.append(("json", jsonlogger.JsonFormatter(
json_encoder=json.dumps,
json_default=str,
ensure_ascii=False # Japanese support
)))
# Select the highest performance encoder
encoder_name, best_formatter = encoders[0]
# Log handler configuration
handler = logging.StreamHandler()
handler.setFormatter(best_formatter)
logger = logging.getLogger("PerformanceApp")
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Log encoder setup
logger.info(f"JSON encoder configuration completed", extra={
"selected_encoder": encoder_name,
"available_encoders": [name for name, _ in encoders]
})
return logger, encoder_name
class PerformanceBenchmark:
def __init__(self):
self.logger, self.encoder = setup_performance_optimized_logging()
def benchmark_logging_performance(self, iterations=10000):
"""Log output performance benchmark"""
# Test data preparation
test_data = {
"iteration": 0,
"timestamp": time.time(),
"user_data": {
"id": 12345,
"name": "Performance Test",
"metadata": {
"tags": ["test", "performance", "json"],
"scores": [95, 87, 92, 88, 91]
}
},
"request_info": {
"method": "POST",
"path": "/api/benchmark",
"headers": {"Content-Type": "application/json"},
"size": 1024
}
}
# Benchmark execution
start_time = time.perf_counter()
for i in range(iterations):
test_data["iteration"] = i
test_data["timestamp"] = time.time()
if i % 1000 == 0:
# Progress log (with detailed data)
self.logger.info("Benchmark progress", extra={
**test_data,
"progress_percent": round((i / iterations) * 100, 1),
"encoder": self.encoder
})
else:
# Normal log (lightweight)
self.logger.debug("Benchmark execution", extra={
"iteration": i,
"encoder": self.encoder
})
end_time = time.perf_counter()
duration = end_time - start_time
# Result log
self.logger.info("Benchmark completed", extra={
"total_iterations": iterations,
"duration_seconds": round(duration, 3),
"logs_per_second": round(iterations / duration, 0),
"encoder": self.encoder,
"average_time_per_log_ms": round((duration / iterations) * 1000, 4)
})
return duration
def compare_json_encoders(self):
"""Performance comparison of multiple JSON encoders"""
test_data = {
"complex_data": {
"nested": {"deeply": {"nested": {"data": "test"}}},
"list": list(range(100)),
"unicode": "Japanese test data🚀",
"numbers": [1.23456789, 987654321, 0.000001]
}
}
encoders_config = [
("standard_json", json.dumps),
("compact_json", lambda obj: json.dumps(obj, separators=(',', ':'))),
("ensure_ascii_false", lambda obj: json.dumps(obj, ensure_ascii=False)),
]
# If orjson is available
try:
import orjson
encoders_config.append(("orjson", orjson.dumps))
except ImportError:
pass
# If msgspec is available
try:
import msgspec
msgspec_encoder = msgspec.json.Encoder()
encoders_config.append(("msgspec", msgspec_encoder.encode))
except ImportError:
pass
results = {}
iterations = 5000
for encoder_name, encoder_func in encoders_config:
start_time = time.perf_counter()
for _ in range(iterations):
try:
encoded = encoder_func(test_data)
except Exception as e:
self.logger.error(f"Encoder error: {encoder_name}", extra={
"encoder": encoder_name,
"error": str(e)
})
continue
duration = time.perf_counter() - start_time
results[encoder_name] = {
"duration": round(duration, 4),
"ops_per_second": round(iterations / duration, 0)
}
# Comparison result log
self.logger.info("JSON encoder performance comparison", extra={
"test_iterations": iterations,
"results": results,
"fastest_encoder": min(results.keys(), key=lambda k: results[k]["duration"]),
"test_data_complexity": "nested_objects_arrays_unicode"
})
return results
# Performance test execution
benchmark = PerformanceBenchmark()
# Log output performance test
print("=== Log Output Performance Test ===")
duration = benchmark.benchmark_logging_performance(1000)
print(f"Completion time: {duration:.3f}s")
# JSON encoder comparison
print("\n=== JSON Encoder Performance Comparison ===")
encoder_results = benchmark.compare_json_encoders()
for encoder, result in encoder_results.items():
print(f"{encoder}: {result['ops_per_second']} ops/sec")
File Output and Multiple Handler Integration
import logging
from pythonjsonlogger import jsonlogger
import logging.handlers
import os
from datetime import datetime
import gzip
import shutil
def setup_comprehensive_file_logging():
"""Comprehensive file log configuration"""
# Create log directory
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# Multiple JSON formatters
formatters = {
# Detailed log (all fields)
"detailed": jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(lineno)d %(message)s',
static_fields={"service": "web-api", "environment": "production"},
rename_fields={"levelname": "level", "name": "logger", "funcName": "function", "lineno": "line"}
),
# Compact log (minimum required)
"compact": jsonlogger.JsonFormatter(
fmt='%(asctime)s %(levelname)s %(message)s',
exclude_fields=["module", "pathname", "filename"],
static_fields={"service": "web-api"}
),
# Error-focused (exception information priority)
"error_focused": jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(message)s',
required_fields=["timestamp", "logger", "level", "function", "message", "exc_info"],
static_fields={"alert": True, "requires_attention": True}
)
}
# Main logger configuration
main_logger = logging.getLogger("MainApp")
main_logger.setLevel(logging.DEBUG)
# 1. Console output (for development)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatters["compact"])
console_handler.setLevel(logging.INFO)
main_logger.addHandler(console_handler)
# 2. Daily rotation file (detailed log)
today = datetime.now().strftime('%Y-%m-%d')
daily_handler = logging.FileHandler(
f"{log_dir}/app-{today}.log",
encoding='utf-8'
)
daily_handler.setFormatter(formatters["detailed"])
daily_handler.setLevel(logging.DEBUG)
main_logger.addHandler(daily_handler)
# 3. Size-based rotation (general log)
rotating_handler = logging.handlers.RotatingFileHandler(
f"{log_dir}/app-rotating.log",
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
rotating_handler.setFormatter(formatters["compact"])
rotating_handler.setLevel(logging.INFO)
main_logger.addHandler(rotating_handler)
# 4. Error-only file
error_handler = logging.FileHandler(
f"{log_dir}/errors.log",
encoding='utf-8'
)
error_handler.setFormatter(formatters["error_focused"])
error_handler.setLevel(logging.ERROR)
main_logger.addHandler(error_handler)
# 5. Time-based rotation (for production)
timed_handler = logging.handlers.TimedRotatingFileHandler(
f"{log_dir}/app-timed.log",
when='midnight',
interval=1,
backupCount=30, # Keep for 30 days
encoding='utf-8'
)
timed_handler.setFormatter(formatters["detailed"])
timed_handler.setLevel(logging.INFO)
main_logger.addHandler(timed_handler)
return main_logger
class FileLoggingApplication:
def __init__(self):
self.logger = setup_comprehensive_file_logging()
self.logger.info("Application initialization completed", extra={
"startup_time": datetime.now().isoformat(),
"log_handlers": len(self.logger.handlers),
"config": "comprehensive_file_logging"
})
def simulate_application_operations(self):
"""Application operation simulation"""
# Normal operation log
self.logger.info("User session started", extra={
"user_id": 12345,
"session_id": "sess_abc123",
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
})
# Debug information (detailed log file only)
self.logger.debug("Database connection established", extra={
"connection_pool": "primary",
"connection_count": 5,
"max_connections": 20,
"database": "userdb"
})
# Warning log
self.logger.warning("Approaching API limits", extra={
"current_requests": 980,
"limit": 1000,
"time_window": "1 hour",
"user_id": 12345
})
# Error log (also output to error-only file)
try:
# Intentional error
raise ValueError("Invalid input data")
except ValueError as e:
self.logger.error("Input data validation error", extra={
"error_type": "validation",
"input_data": {"field1": "invalid_value"},
"validation_rules": ["required", "format", "length"],
"user_id": 12345
}, exc_info=True)
# Important information log
self.logger.info("Data processing completed", extra={
"processed_records": 1500,
"processing_time_seconds": 45.6,
"success_rate": 98.7,
"failed_records": 19,
"batch_id": "batch_20250624_001"
})
def demonstrate_different_log_levels(self):
"""Output examples for each log level"""
operations = [
(logging.DEBUG, "Detailed debug information", {"debug_level": "verbose"}),
(logging.INFO, "General information", {"operation": "normal"}),
(logging.WARNING, "Attention required", {"warning_type": "performance"}),
(logging.ERROR, "Error occurred", {"error_severity": "medium"}),
(logging.CRITICAL, "Critical error", {"alert_level": "critical"})
]
for level, message, extra_data in operations:
self.logger.log(level, message, extra={
**extra_data,
"demonstration": True,
"timestamp": datetime.now().isoformat()
})
def compress_old_logs(self):
"""Compress old log files"""
log_dir = "logs"
for filename in os.listdir(log_dir):
if filename.endswith('.log') and 'rotating' not in filename:
file_path = os.path.join(log_dir, filename)
# File size check (compress if over 1MB)
if os.path.getsize(file_path) > 1024*1024:
compressed_path = f"{file_path}.gz"
with open(file_path, 'rb') as f_in:
with gzip.open(compressed_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
self.logger.info("Log file compression completed", extra={
"original_file": filename,
"compressed_file": f"{filename}.gz",
"original_size_mb": round(os.path.getsize(file_path) / 1024 / 1024, 2),
"compressed_size_mb": round(os.path.getsize(compressed_path) / 1024 / 1024, 2)
})
# File logging application execution example
app = FileLoggingApplication()
print("=== Application Operation Simulation ===")
app.simulate_application_operations()
print("\n=== Each Log Level Output Example ===")
app.demonstrate_different_log_levels()
print("\n=== Log File Compression Processing ===")
app.compress_old_logs()
print("\n=== Log File Verification ===")
log_files = os.listdir("logs")
for log_file in sorted(log_files):
file_path = os.path.join("logs", log_file)
size_kb = round(os.path.getsize(file_path) / 1024, 1)
print(f"{log_file}: {size_kb} KB")
System Integration and Log Aggregation System Integration
import logging
from pythonjsonlogger import jsonlogger
import socket
import json
import requests
import time
from typing import Dict, Any
import threading
import queue
class LogAggregationIntegration:
"""Integration with log aggregation systems"""
def __init__(self):
self.setup_aggregation_loggers()
def setup_aggregation_loggers(self):
"""Configuration for various log aggregation systems"""
# ELK Stack configuration
self.elk_logger = self._setup_elk_logger()
# Fluentd configuration
self.fluentd_logger = self._setup_fluentd_logger()
# Splunk configuration
self.splunk_logger = self._setup_splunk_logger()
# Custom log aggregation configuration
self.custom_logger = self._setup_custom_aggregation_logger()
def _setup_elk_logger(self):
"""Optimized configuration for ELK Stack"""
# Format optimized for Elasticsearch
elk_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
static_fields={
"@version": "1",
"service": "web-api",
"environment": "production",
"host": socket.gethostname()
},
rename_fields={
"asctime": "@timestamp",
"levelname": "level",
"name": "logger"
},
# Optimized for Elasticsearch mapping
json_default=str,
json_encoder=json.dumps
)
# File handler (collected by Filebeat)
elk_handler = logging.FileHandler('logs/elk-formatted.log', encoding='utf-8')
elk_handler.setFormatter(elk_formatter)
elk_logger = logging.getLogger('ELKLogger')
elk_logger.addHandler(elk_handler)
elk_logger.setLevel(logging.INFO)
return elk_logger
def _setup_fluentd_logger(self):
"""Configuration for Fluentd"""
fluentd_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
static_fields={
"tag": "web-api.application",
"source": "python-application",
"datacenter": "us-west-2"
},
# Optimized for Fluentd JSON parser
json_default=lambda obj: str(obj),
ensure_ascii=False
)
# Output to Fluentd monitoring directory
fluentd_handler = logging.FileHandler('logs/fluentd.log', encoding='utf-8')
fluentd_handler.setFormatter(fluentd_formatter)
fluentd_logger = logging.getLogger('FluentdLogger')
fluentd_logger.addHandler(fluentd_handler)
fluentd_logger.setLevel(logging.INFO)
return fluentd_logger
def _setup_splunk_logger(self):
"""Configuration for Splunk"""
splunk_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(message)s',
static_fields={
"index": "application-logs",
"sourcetype": "python:json",
"source": "web-api"
},
rename_fields={
"asctime": "time",
"levelname": "severity",
"name": "component",
"funcName": "function"
}
)
splunk_handler = logging.FileHandler('logs/splunk.log', encoding='utf-8')
splunk_handler.setFormatter(splunk_formatter)
splunk_logger = logging.getLogger('SplunkLogger')
splunk_logger.addHandler(splunk_handler)
splunk_logger.setLevel(logging.INFO)
return splunk_logger
def _setup_custom_aggregation_logger(self):
"""Configuration for custom log aggregation system"""
custom_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
static_fields={
"schema_version": "2.0",
"application": "web-api",
"team": "backend",
"criticality": "high"
},
required_fields=["timestamp", "level", "logger", "message", "correlation_id"],
defaults={"correlation_id": "unknown", "trace_id": "N/A"}
)
custom_handler = logging.FileHandler('logs/custom-aggregation.log', encoding='utf-8')
custom_handler.setFormatter(custom_formatter)
custom_logger = logging.getLogger('CustomAggregation')
custom_logger.addHandler(custom_handler)
custom_logger.setLevel(logging.DEBUG)
return custom_logger
def log_application_event(self, event_type: str, event_data: Dict[str, Any]):
"""Unified event sending to various log aggregation systems"""
# Common event data
common_data = {
"event_type": event_type,
"timestamp": time.time(),
"correlation_id": f"corr_{int(time.time() * 1000)}",
**event_data
}
# Send to ELK Stack
self.elk_logger.info(f"Application Event: {event_type}", extra={
**common_data,
"elastic_compatible": True
})
# Send to Fluentd
self.fluentd_logger.info(f"Event: {event_type}", extra={
**common_data,
"fluentd_tag": f"app.event.{event_type}"
})
# Send to Splunk
self.splunk_logger.info(f"{event_type} Event", extra={
**common_data,
"splunk_index": "app-events"
})
# Send to custom aggregation system
self.custom_logger.info(f"EVENT: {event_type}", extra={
**common_data,
"trace_id": f"trace_{int(time.time())}"
})
class RealTimeLogProcessor:
"""Real-time log processing and alerts"""
def __init__(self):
self.alert_queue = queue.Queue()
self.processor_thread = threading.Thread(target=self._process_alerts)
self.processor_thread.daemon = True
self.processor_thread.start()
self.setup_alert_logger()
def setup_alert_logger(self):
"""Alert logger configuration"""
alert_formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(levelname)s %(message)s',
static_fields={
"alert": True,
"notification_required": True,
"priority": "high"
},
required_fields=["timestamp", "level", "message", "alert_type", "severity"]
)
# Alert-only handler
alert_handler = logging.FileHandler('logs/alerts.log', encoding='utf-8')
alert_handler.setFormatter(alert_formatter)
self.alert_logger = logging.getLogger('AlertSystem')
self.alert_logger.addHandler(alert_handler)
self.alert_logger.setLevel(logging.WARNING)
def send_alert(self, alert_type: str, severity: str, message: str, details: Dict[str, Any]):
"""Send alert"""
alert_data = {
"alert_type": alert_type,
"severity": severity,
"details": details,
"requires_action": severity in ["critical", "high"],
"alert_id": f"alert_{int(time.time() * 1000)}"
}
# Record alert log
if severity == "critical":
self.alert_logger.critical(message, extra=alert_data)
elif severity == "high":
self.alert_logger.error(message, extra=alert_data)
else:
self.alert_logger.warning(message, extra=alert_data)
# Add to real-time processing queue
self.alert_queue.put({
"message": message,
"data": alert_data,
"timestamp": time.time()
})
def _process_alerts(self):
"""Real-time alert processing"""
while True:
try:
alert = self.alert_queue.get(timeout=1)
self._handle_alert(alert)
self.alert_queue.task_done()
except queue.Empty:
continue
except Exception as e:
# Error handling
self.alert_logger.error("Alert processing error", extra={
"error": str(e),
"alert_type": "system_error"
})
def _handle_alert(self, alert):
"""Handle individual alert"""
severity = alert["data"]["severity"]
# Processing based on severity
if severity == "critical":
self._send_immediate_notification(alert)
elif severity == "high":
self._escalate_to_team(alert)
else:
self._log_for_review(alert)
def _send_immediate_notification(self, alert):
"""Send immediate notification"""
# In actual implementation, send to Slack, email, SMS, etc.
print(f"🚨 CRITICAL ALERT: {alert['message']}")
def _escalate_to_team(self, alert):
"""Team escalation"""
print(f"⚠️ HIGH PRIORITY: {alert['message']}")
def _log_for_review(self, alert):
"""Log for review"""
print(f"📝 REVIEW REQUIRED: {alert['message']}")
# System integration usage example
def demonstrate_system_integration():
# Initialize log aggregation integration
aggregation = LogAggregationIntegration()
# Initialize real-time processing
alert_processor = RealTimeLogProcessor()
# Various application events
events = [
("user_login", {"user_id": 12345, "method": "oauth", "success": True}),
("payment_processed", {"amount": 150.00, "currency": "USD", "payment_id": "pay_123"}),
("api_error", {"endpoint": "/api/users", "status_code": 500, "error": "database_timeout"}),
("security_violation", {"ip": "192.168.1.100", "attempt": "unauthorized_access"}),
("performance_degradation", {"response_time": 5000, "threshold": 3000, "endpoint": "/api/search"})
]
# Send event logs
for event_type, event_data in events:
aggregation.log_application_event(event_type, event_data)
# Send alerts for important events
if event_type in ["api_error", "security_violation", "performance_degradation"]:
severity_map = {
"api_error": "high",
"security_violation": "critical",
"performance_degradation": "medium"
}
alert_processor.send_alert(
alert_type=event_type,
severity=severity_map[event_type],
message=f"{event_type.replace('_', ' ').title()} detected",
details=event_data
)
# Wait a bit to check alert processing
time.sleep(2)
print("Log aggregation and alert system operation completed")
if __name__ == "__main__":
demonstrate_system_integration()