python-json-logger

JSON output formatter for Python's standard library logging. Outputs log entries in structured JSON format, facilitating integration with log aggregation systems. Supports modern log management infrastructure while maintaining full compatibility with standard library.

LoggingPythonJSONStructured LogsFormatterStandard Library

GitHub Overview

madzak/python-json-logger

Json Formatter for the standard python logger

Stars1,765
Watchers26
Forks231
Created:December 27, 2011
Language:Python
License:BSD 2-Clause "Simplified" License

Topics

loggingpython

Star History

madzak/python-json-logger Star History
Data as of: 7/17/2025, 11:24 PM

Library

python-json-logger

Overview

python-json-logger (pythonjsonlogger) is a lightweight and practical library that enables JSON format log output while maintaining complete compatibility with Python's standard library logging package. It can be introduced without modifying existing log configurations, and dramatically simplifies analysis and ingestion in log aggregation tools through machine-readable JSON format log output. The 2025 version boasts over 46 million downloads per month, making it the most trusted and proven solution in the Python JSON logging field.

Details

The 2025 version of python-json-logger achieves optimization specifically for JSON output while maintaining complete compatibility with the Python standard library. It supports multiple JSON encoders including json, orjson, and msgspec, allowing selection based on performance requirements. Provides fully customizable output fields, control of required/excluded/static fields, and automatic custom attribute retrieval from LogRecord objects. Features robustness that can safely log any input by automatically converting typically JSON-difficult types like UUID, bytes, and Enum into appropriate formats.

Key Features

  • Complete Standard Library Compatibility: Can be introduced without modifying existing log configurations
  • Multiple JSON Encoder Support: Choose between json, orjson, msgspec based on use case
  • Fully Customizable Fields: Fine control over required/excluded/static fields
  • Arbitrary Type Encoding: Automatic conversion of complex types like UUID, bytes, Enum
  • High Safety: Robust design that can appropriately log any input
  • Lightweight & High Performance: High-speed JSON output with minimal overhead

Pros and Cons

Pros

  • Perfect compatibility with Python standard library minimizes learning cost
  • Immediate JSON conversion by simply changing formatter in existing log configuration
  • Excellent integration with log aggregation systems (ELK Stack, Splunk, etc.)
  • Flexible performance response with multiple JSON encoder support
  • High reliability with 46 million monthly downloads track record
  • High maintainability through lightweight and simple implementation

Cons

  • Specialized for basic JSON output with limited advanced structuring features
  • Not suitable for dynamic log message transformation or complex processing
  • Lacks rich processor chain functionality like structlog
  • Limited beautiful output features for non-JSON formats (development readability)
  • May be insufficient for large-scale structured logging systems
  • Custom field control depends on configuration, making dynamic control difficult

Reference Pages

Usage Examples

Basic Installation and Configuration

# Installation
# pip install python-json-logger

import logging
from pythonjsonlogger import jsonlogger

# Basic JSON formatter configuration
json_handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
json_handler.setFormatter(formatter)

# Logger configuration
logger = logging.getLogger()
logger.addHandler(json_handler)
logger.setLevel(logging.INFO)

# Basic log output
logger.info("Application started")
logger.debug("Debug information", extra={"user_id": 12345})
logger.warning("Warning message", extra={"error_code": "W001"})
logger.error("Error occurred", extra={"exception_type": "ValueError"})

# Detailed logging with structured data
logger.info("User login", extra={
    "user_id": 67890,
    "ip_address": "192.168.1.100",
    "user_agent": "Mozilla/5.0...",
    "session_id": "sess_abc123"
})

# Log with exception information
try:
    result = 10 / 0
except ZeroDivisionError as e:
    logger.exception("Calculation error", extra={
        "operation": "division",
        "dividend": 10,
        "divisor": 0,
        "error_details": str(e)
    })

Custom Formatter and Output Field Control

import logging
from pythonjsonlogger import jsonlogger
import uuid
from datetime import datetime
from enum import Enum

class UserRole(Enum):
    ADMIN = "admin"
    USER = "user"
    GUEST = "guest"

def setup_custom_json_logging():
    """Customized JSON log configuration"""
    
    # Custom format string
    custom_format = '%(asctime)s %(name)s %(levelname)s %(message)s'
    
    # Advanced customization options
    custom_formatter = jsonlogger.JsonFormatter(
        fmt=custom_format,
        # Required fields (always output)
        required_fields=["timestamp", "level", "logger", "message"],
        # Excluded fields (not output)
        exclude_fields=["module", "funcName"],
        # Static fields (add fixed values)
        static_fields={"service": "user-api", "version": "1.0.0"},
        # Field renaming
        rename_fields={"levelname": "level", "name": "logger"},
        # Default values (set if not exists)
        defaults={"environment": "production", "region": "us-west-2"}
    )
    
    # Handler configuration
    handler = logging.StreamHandler()
    handler.setFormatter(custom_formatter)
    
    # Logger configuration
    logger = logging.getLogger("CustomApp")
    logger.addHandler(handler)
    logger.setLevel(logging.DEBUG)
    
    return logger

# Custom logger usage example
custom_logger = setup_custom_json_logging()

class UserService:
    def __init__(self):
        self.logger = custom_logger
    
    def create_user(self, user_data):
        user_id = uuid.uuid4()
        creation_time = datetime.now()
        user_role = UserRole.ADMIN
        
        self.logger.info("User creation started", extra={
            "user_id": user_id,          # UUID type (automatically converted to string)
            "creation_time": creation_time,  # datetime type (converted to ISO format string)
            "user_role": user_role,      # Enum type (value automatically extracted)
            "user_data": user_data,
            "request_size": len(str(user_data).encode()),  # bytes calculation
            "metadata": {
                "source": "api",
                "client_version": "2.1.0"
            }
        })
        
        try:
            # User creation processing simulation
            processed_data = self._process_user_data(user_data, user_id)
            
            self.logger.info("User creation completed", extra={
                "user_id": user_id,
                "email": user_data.get("email"),
                "role": user_role,
                "processing_time_ms": 150,
                "created_fields": list(processed_data.keys())
            })
            
            return str(user_id)
            
        except Exception as e:
            self.logger.error("User creation error", extra={
                "user_id": user_id,
                "error_type": type(e).__name__,
                "error_message": str(e),
                "input_data": user_data,
                "stack_trace": True  # Traceback automatically included
            }, exc_info=True)
            raise
    
    def _process_user_data(self, user_data, user_id):
        # Processing simulation
        return {
            "id": user_id,
            "processed_at": datetime.now(),
            **user_data
        }

# Usage example
service = UserService()
service.create_user({
    "email": "[email protected]",
    "name": "John Doe",
    "age": 30,
    "preferences": {"theme": "dark", "language": "en"}
})

Multiple JSON Encoders and Performance Optimization

import logging
from pythonjsonlogger import jsonlogger
import time
import json
from typing import Any, Dict

def setup_performance_optimized_logging():
    """Performance-optimized JSON log configuration"""
    
    # Fast JSON encoder selection
    encoders = []
    
    # orjson encoder (highest performance)
    try:
        import orjson
        encoders.append(("orjson", jsonlogger.JsonFormatter(
            json_encoder=orjson.dumps,
            json_default=lambda obj: str(obj),  # fallback for non-serializable objects
        )))
    except ImportError:
        pass
    
    # msgspec encoder (high performance)
    try:
        import msgspec
        msgspec_encoder = msgspec.json.Encoder()
        encoders.append(("msgspec", jsonlogger.JsonFormatter(
            json_encoder=msgspec_encoder.encode,
            json_default=lambda obj: str(obj),
        )))
    except ImportError:
        pass
    
    # Standard json encoder (fallback)
    encoders.append(("json", jsonlogger.JsonFormatter(
        json_encoder=json.dumps,
        json_default=str,
        ensure_ascii=False  # Japanese support
    )))
    
    # Select the highest performance encoder
    encoder_name, best_formatter = encoders[0]
    
    # Log handler configuration
    handler = logging.StreamHandler()
    handler.setFormatter(best_formatter)
    
    logger = logging.getLogger("PerformanceApp")
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    
    # Log encoder setup
    logger.info(f"JSON encoder configuration completed", extra={
        "selected_encoder": encoder_name,
        "available_encoders": [name for name, _ in encoders]
    })
    
    return logger, encoder_name

class PerformanceBenchmark:
    def __init__(self):
        self.logger, self.encoder = setup_performance_optimized_logging()
    
    def benchmark_logging_performance(self, iterations=10000):
        """Log output performance benchmark"""
        
        # Test data preparation
        test_data = {
            "iteration": 0,
            "timestamp": time.time(),
            "user_data": {
                "id": 12345,
                "name": "Performance Test",
                "metadata": {
                    "tags": ["test", "performance", "json"],
                    "scores": [95, 87, 92, 88, 91]
                }
            },
            "request_info": {
                "method": "POST",
                "path": "/api/benchmark",
                "headers": {"Content-Type": "application/json"},
                "size": 1024
            }
        }
        
        # Benchmark execution
        start_time = time.perf_counter()
        
        for i in range(iterations):
            test_data["iteration"] = i
            test_data["timestamp"] = time.time()
            
            if i % 1000 == 0:
                # Progress log (with detailed data)
                self.logger.info("Benchmark progress", extra={
                    **test_data,
                    "progress_percent": round((i / iterations) * 100, 1),
                    "encoder": self.encoder
                })
            else:
                # Normal log (lightweight)
                self.logger.debug("Benchmark execution", extra={
                    "iteration": i,
                    "encoder": self.encoder
                })
        
        end_time = time.perf_counter()
        duration = end_time - start_time
        
        # Result log
        self.logger.info("Benchmark completed", extra={
            "total_iterations": iterations,
            "duration_seconds": round(duration, 3),
            "logs_per_second": round(iterations / duration, 0),
            "encoder": self.encoder,
            "average_time_per_log_ms": round((duration / iterations) * 1000, 4)
        })
        
        return duration
    
    def compare_json_encoders(self):
        """Performance comparison of multiple JSON encoders"""
        
        test_data = {
            "complex_data": {
                "nested": {"deeply": {"nested": {"data": "test"}}},
                "list": list(range(100)),
                "unicode": "Japanese test data🚀",
                "numbers": [1.23456789, 987654321, 0.000001]
            }
        }
        
        encoders_config = [
            ("standard_json", json.dumps),
            ("compact_json", lambda obj: json.dumps(obj, separators=(',', ':'))),
            ("ensure_ascii_false", lambda obj: json.dumps(obj, ensure_ascii=False)),
        ]
        
        # If orjson is available
        try:
            import orjson
            encoders_config.append(("orjson", orjson.dumps))
        except ImportError:
            pass
        
        # If msgspec is available
        try:
            import msgspec
            msgspec_encoder = msgspec.json.Encoder()
            encoders_config.append(("msgspec", msgspec_encoder.encode))
        except ImportError:
            pass
        
        results = {}
        iterations = 5000
        
        for encoder_name, encoder_func in encoders_config:
            start_time = time.perf_counter()
            
            for _ in range(iterations):
                try:
                    encoded = encoder_func(test_data)
                except Exception as e:
                    self.logger.error(f"Encoder error: {encoder_name}", extra={
                        "encoder": encoder_name,
                        "error": str(e)
                    })
                    continue
            
            duration = time.perf_counter() - start_time
            results[encoder_name] = {
                "duration": round(duration, 4),
                "ops_per_second": round(iterations / duration, 0)
            }
        
        # Comparison result log
        self.logger.info("JSON encoder performance comparison", extra={
            "test_iterations": iterations,
            "results": results,
            "fastest_encoder": min(results.keys(), key=lambda k: results[k]["duration"]),
            "test_data_complexity": "nested_objects_arrays_unicode"
        })
        
        return results

# Performance test execution
benchmark = PerformanceBenchmark()

# Log output performance test
print("=== Log Output Performance Test ===")
duration = benchmark.benchmark_logging_performance(1000)
print(f"Completion time: {duration:.3f}s")

# JSON encoder comparison
print("\n=== JSON Encoder Performance Comparison ===")
encoder_results = benchmark.compare_json_encoders()
for encoder, result in encoder_results.items():
    print(f"{encoder}: {result['ops_per_second']} ops/sec")

File Output and Multiple Handler Integration

import logging
from pythonjsonlogger import jsonlogger
import logging.handlers
import os
from datetime import datetime
import gzip
import shutil

def setup_comprehensive_file_logging():
    """Comprehensive file log configuration"""
    
    # Create log directory
    log_dir = "logs"
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    
    # Multiple JSON formatters
    formatters = {
        # Detailed log (all fields)
        "detailed": jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(lineno)d %(message)s',
            static_fields={"service": "web-api", "environment": "production"},
            rename_fields={"levelname": "level", "name": "logger", "funcName": "function", "lineno": "line"}
        ),
        
        # Compact log (minimum required)
        "compact": jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(levelname)s %(message)s',
            exclude_fields=["module", "pathname", "filename"],
            static_fields={"service": "web-api"}
        ),
        
        # Error-focused (exception information priority)
        "error_focused": jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(message)s',
            required_fields=["timestamp", "logger", "level", "function", "message", "exc_info"],
            static_fields={"alert": True, "requires_attention": True}
        )
    }
    
    # Main logger configuration
    main_logger = logging.getLogger("MainApp")
    main_logger.setLevel(logging.DEBUG)
    
    # 1. Console output (for development)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatters["compact"])
    console_handler.setLevel(logging.INFO)
    main_logger.addHandler(console_handler)
    
    # 2. Daily rotation file (detailed log)
    today = datetime.now().strftime('%Y-%m-%d')
    daily_handler = logging.FileHandler(
        f"{log_dir}/app-{today}.log", 
        encoding='utf-8'
    )
    daily_handler.setFormatter(formatters["detailed"])
    daily_handler.setLevel(logging.DEBUG)
    main_logger.addHandler(daily_handler)
    
    # 3. Size-based rotation (general log)
    rotating_handler = logging.handlers.RotatingFileHandler(
        f"{log_dir}/app-rotating.log",
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5,
        encoding='utf-8'
    )
    rotating_handler.setFormatter(formatters["compact"])
    rotating_handler.setLevel(logging.INFO)
    main_logger.addHandler(rotating_handler)
    
    # 4. Error-only file
    error_handler = logging.FileHandler(
        f"{log_dir}/errors.log",
        encoding='utf-8'
    )
    error_handler.setFormatter(formatters["error_focused"])
    error_handler.setLevel(logging.ERROR)
    main_logger.addHandler(error_handler)
    
    # 5. Time-based rotation (for production)
    timed_handler = logging.handlers.TimedRotatingFileHandler(
        f"{log_dir}/app-timed.log",
        when='midnight',
        interval=1,
        backupCount=30,  # Keep for 30 days
        encoding='utf-8'
    )
    timed_handler.setFormatter(formatters["detailed"])
    timed_handler.setLevel(logging.INFO)
    main_logger.addHandler(timed_handler)
    
    return main_logger

class FileLoggingApplication:
    def __init__(self):
        self.logger = setup_comprehensive_file_logging()
        self.logger.info("Application initialization completed", extra={
            "startup_time": datetime.now().isoformat(),
            "log_handlers": len(self.logger.handlers),
            "config": "comprehensive_file_logging"
        })
    
    def simulate_application_operations(self):
        """Application operation simulation"""
        
        # Normal operation log
        self.logger.info("User session started", extra={
            "user_id": 12345,
            "session_id": "sess_abc123",
            "ip_address": "192.168.1.100",
            "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
        })
        
        # Debug information (detailed log file only)
        self.logger.debug("Database connection established", extra={
            "connection_pool": "primary",
            "connection_count": 5,
            "max_connections": 20,
            "database": "userdb"
        })
        
        # Warning log
        self.logger.warning("Approaching API limits", extra={
            "current_requests": 980,
            "limit": 1000,
            "time_window": "1 hour",
            "user_id": 12345
        })
        
        # Error log (also output to error-only file)
        try:
            # Intentional error
            raise ValueError("Invalid input data")
        except ValueError as e:
            self.logger.error("Input data validation error", extra={
                "error_type": "validation",
                "input_data": {"field1": "invalid_value"},
                "validation_rules": ["required", "format", "length"],
                "user_id": 12345
            }, exc_info=True)
        
        # Important information log
        self.logger.info("Data processing completed", extra={
            "processed_records": 1500,
            "processing_time_seconds": 45.6,
            "success_rate": 98.7,
            "failed_records": 19,
            "batch_id": "batch_20250624_001"
        })
    
    def demonstrate_different_log_levels(self):
        """Output examples for each log level"""
        
        operations = [
            (logging.DEBUG, "Detailed debug information", {"debug_level": "verbose"}),
            (logging.INFO, "General information", {"operation": "normal"}),
            (logging.WARNING, "Attention required", {"warning_type": "performance"}),
            (logging.ERROR, "Error occurred", {"error_severity": "medium"}),
            (logging.CRITICAL, "Critical error", {"alert_level": "critical"})
        ]
        
        for level, message, extra_data in operations:
            self.logger.log(level, message, extra={
                **extra_data,
                "demonstration": True,
                "timestamp": datetime.now().isoformat()
            })
    
    def compress_old_logs(self):
        """Compress old log files"""
        log_dir = "logs"
        
        for filename in os.listdir(log_dir):
            if filename.endswith('.log') and 'rotating' not in filename:
                file_path = os.path.join(log_dir, filename)
                
                # File size check (compress if over 1MB)
                if os.path.getsize(file_path) > 1024*1024:
                    compressed_path = f"{file_path}.gz"
                    
                    with open(file_path, 'rb') as f_in:
                        with gzip.open(compressed_path, 'wb') as f_out:
                            shutil.copyfileobj(f_in, f_out)
                    
                    self.logger.info("Log file compression completed", extra={
                        "original_file": filename,
                        "compressed_file": f"{filename}.gz",
                        "original_size_mb": round(os.path.getsize(file_path) / 1024 / 1024, 2),
                        "compressed_size_mb": round(os.path.getsize(compressed_path) / 1024 / 1024, 2)
                    })

# File logging application execution example
app = FileLoggingApplication()

print("=== Application Operation Simulation ===")
app.simulate_application_operations()

print("\n=== Each Log Level Output Example ===")
app.demonstrate_different_log_levels()

print("\n=== Log File Compression Processing ===")
app.compress_old_logs()

print("\n=== Log File Verification ===")
log_files = os.listdir("logs")
for log_file in sorted(log_files):
    file_path = os.path.join("logs", log_file)
    size_kb = round(os.path.getsize(file_path) / 1024, 1)
    print(f"{log_file}: {size_kb} KB")

System Integration and Log Aggregation System Integration

import logging
from pythonjsonlogger import jsonlogger
import socket
import json
import requests
import time
from typing import Dict, Any
import threading
import queue

class LogAggregationIntegration:
    """Integration with log aggregation systems"""
    
    def __init__(self):
        self.setup_aggregation_loggers()
    
    def setup_aggregation_loggers(self):
        """Configuration for various log aggregation systems"""
        
        # ELK Stack configuration
        self.elk_logger = self._setup_elk_logger()
        
        # Fluentd configuration
        self.fluentd_logger = self._setup_fluentd_logger()
        
        # Splunk configuration
        self.splunk_logger = self._setup_splunk_logger()
        
        # Custom log aggregation configuration
        self.custom_logger = self._setup_custom_aggregation_logger()
    
    def _setup_elk_logger(self):
        """Optimized configuration for ELK Stack"""
        
        # Format optimized for Elasticsearch
        elk_formatter = jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
            static_fields={
                "@version": "1",
                "service": "web-api",
                "environment": "production",
                "host": socket.gethostname()
            },
            rename_fields={
                "asctime": "@timestamp",
                "levelname": "level",
                "name": "logger"
            },
            # Optimized for Elasticsearch mapping
            json_default=str,
            json_encoder=json.dumps
        )
        
        # File handler (collected by Filebeat)
        elk_handler = logging.FileHandler('logs/elk-formatted.log', encoding='utf-8')
        elk_handler.setFormatter(elk_formatter)
        
        elk_logger = logging.getLogger('ELKLogger')
        elk_logger.addHandler(elk_handler)
        elk_logger.setLevel(logging.INFO)
        
        return elk_logger
    
    def _setup_fluentd_logger(self):
        """Configuration for Fluentd"""
        
        fluentd_formatter = jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
            static_fields={
                "tag": "web-api.application",
                "source": "python-application",
                "datacenter": "us-west-2"
            },
            # Optimized for Fluentd JSON parser
            json_default=lambda obj: str(obj),
            ensure_ascii=False
        )
        
        # Output to Fluentd monitoring directory
        fluentd_handler = logging.FileHandler('logs/fluentd.log', encoding='utf-8')
        fluentd_handler.setFormatter(fluentd_formatter)
        
        fluentd_logger = logging.getLogger('FluentdLogger')
        fluentd_logger.addHandler(fluentd_handler)
        fluentd_logger.setLevel(logging.INFO)
        
        return fluentd_logger
    
    def _setup_splunk_logger(self):
        """Configuration for Splunk"""
        
        splunk_formatter = jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(funcName)s %(message)s',
            static_fields={
                "index": "application-logs",
                "sourcetype": "python:json",
                "source": "web-api"
            },
            rename_fields={
                "asctime": "time",
                "levelname": "severity",
                "name": "component",
                "funcName": "function"
            }
        )
        
        splunk_handler = logging.FileHandler('logs/splunk.log', encoding='utf-8')
        splunk_handler.setFormatter(splunk_formatter)
        
        splunk_logger = logging.getLogger('SplunkLogger')
        splunk_logger.addHandler(splunk_handler)
        splunk_logger.setLevel(logging.INFO)
        
        return splunk_logger
    
    def _setup_custom_aggregation_logger(self):
        """Configuration for custom log aggregation system"""
        
        custom_formatter = jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
            static_fields={
                "schema_version": "2.0",
                "application": "web-api",
                "team": "backend",
                "criticality": "high"
            },
            required_fields=["timestamp", "level", "logger", "message", "correlation_id"],
            defaults={"correlation_id": "unknown", "trace_id": "N/A"}
        )
        
        custom_handler = logging.FileHandler('logs/custom-aggregation.log', encoding='utf-8')
        custom_handler.setFormatter(custom_formatter)
        
        custom_logger = logging.getLogger('CustomAggregation')
        custom_logger.addHandler(custom_handler)
        custom_logger.setLevel(logging.DEBUG)
        
        return custom_logger
    
    def log_application_event(self, event_type: str, event_data: Dict[str, Any]):
        """Unified event sending to various log aggregation systems"""
        
        # Common event data
        common_data = {
            "event_type": event_type,
            "timestamp": time.time(),
            "correlation_id": f"corr_{int(time.time() * 1000)}",
            **event_data
        }
        
        # Send to ELK Stack
        self.elk_logger.info(f"Application Event: {event_type}", extra={
            **common_data,
            "elastic_compatible": True
        })
        
        # Send to Fluentd
        self.fluentd_logger.info(f"Event: {event_type}", extra={
            **common_data,
            "fluentd_tag": f"app.event.{event_type}"
        })
        
        # Send to Splunk
        self.splunk_logger.info(f"{event_type} Event", extra={
            **common_data,
            "splunk_index": "app-events"
        })
        
        # Send to custom aggregation system
        self.custom_logger.info(f"EVENT: {event_type}", extra={
            **common_data,
            "trace_id": f"trace_{int(time.time())}"
        })

class RealTimeLogProcessor:
    """Real-time log processing and alerts"""
    
    def __init__(self):
        self.alert_queue = queue.Queue()
        self.processor_thread = threading.Thread(target=self._process_alerts)
        self.processor_thread.daemon = True
        self.processor_thread.start()
        
        self.setup_alert_logger()
    
    def setup_alert_logger(self):
        """Alert logger configuration"""
        
        alert_formatter = jsonlogger.JsonFormatter(
            fmt='%(asctime)s %(levelname)s %(message)s',
            static_fields={
                "alert": True,
                "notification_required": True,
                "priority": "high"
            },
            required_fields=["timestamp", "level", "message", "alert_type", "severity"]
        )
        
        # Alert-only handler
        alert_handler = logging.FileHandler('logs/alerts.log', encoding='utf-8')
        alert_handler.setFormatter(alert_formatter)
        
        self.alert_logger = logging.getLogger('AlertSystem')
        self.alert_logger.addHandler(alert_handler)
        self.alert_logger.setLevel(logging.WARNING)
    
    def send_alert(self, alert_type: str, severity: str, message: str, details: Dict[str, Any]):
        """Send alert"""
        
        alert_data = {
            "alert_type": alert_type,
            "severity": severity,
            "details": details,
            "requires_action": severity in ["critical", "high"],
            "alert_id": f"alert_{int(time.time() * 1000)}"
        }
        
        # Record alert log
        if severity == "critical":
            self.alert_logger.critical(message, extra=alert_data)
        elif severity == "high":
            self.alert_logger.error(message, extra=alert_data)
        else:
            self.alert_logger.warning(message, extra=alert_data)
        
        # Add to real-time processing queue
        self.alert_queue.put({
            "message": message,
            "data": alert_data,
            "timestamp": time.time()
        })
    
    def _process_alerts(self):
        """Real-time alert processing"""
        while True:
            try:
                alert = self.alert_queue.get(timeout=1)
                self._handle_alert(alert)
                self.alert_queue.task_done()
            except queue.Empty:
                continue
            except Exception as e:
                # Error handling
                self.alert_logger.error("Alert processing error", extra={
                    "error": str(e),
                    "alert_type": "system_error"
                })
    
    def _handle_alert(self, alert):
        """Handle individual alert"""
        
        severity = alert["data"]["severity"]
        
        # Processing based on severity
        if severity == "critical":
            self._send_immediate_notification(alert)
        elif severity == "high":
            self._escalate_to_team(alert)
        else:
            self._log_for_review(alert)
    
    def _send_immediate_notification(self, alert):
        """Send immediate notification"""
        # In actual implementation, send to Slack, email, SMS, etc.
        print(f"🚨 CRITICAL ALERT: {alert['message']}")
    
    def _escalate_to_team(self, alert):
        """Team escalation"""
        print(f"⚠️  HIGH PRIORITY: {alert['message']}")
    
    def _log_for_review(self, alert):
        """Log for review"""
        print(f"📝 REVIEW REQUIRED: {alert['message']}")

# System integration usage example
def demonstrate_system_integration():
    # Initialize log aggregation integration
    aggregation = LogAggregationIntegration()
    
    # Initialize real-time processing
    alert_processor = RealTimeLogProcessor()
    
    # Various application events
    events = [
        ("user_login", {"user_id": 12345, "method": "oauth", "success": True}),
        ("payment_processed", {"amount": 150.00, "currency": "USD", "payment_id": "pay_123"}),
        ("api_error", {"endpoint": "/api/users", "status_code": 500, "error": "database_timeout"}),
        ("security_violation", {"ip": "192.168.1.100", "attempt": "unauthorized_access"}),
        ("performance_degradation", {"response_time": 5000, "threshold": 3000, "endpoint": "/api/search"})
    ]
    
    # Send event logs
    for event_type, event_data in events:
        aggregation.log_application_event(event_type, event_data)
        
        # Send alerts for important events
        if event_type in ["api_error", "security_violation", "performance_degradation"]:
            severity_map = {
                "api_error": "high",
                "security_violation": "critical", 
                "performance_degradation": "medium"
            }
            
            alert_processor.send_alert(
                alert_type=event_type,
                severity=severity_map[event_type],
                message=f"{event_type.replace('_', ' ').title()} detected",
                details=event_data
            )
    
    # Wait a bit to check alert processing
    time.sleep(2)
    
    print("Log aggregation and alert system operation completed")

if __name__ == "__main__":
    demonstrate_system_integration()