structlog

Python library specialized in structured log output (2,500+ GitHub stars). Supports structured output in JSON and Logfmt formats, generates beautiful console output in development environments and machine-readable logs in production. Supports asyncio, context variables, and type hints.

LoggingPythonStructured LoggingJSONHigh PerformanceAsync Support

Library

structlog

Overview

structlog is positioned as the definitive high-performance logging library for structured logging in Python. Moving beyond traditional string-based logs, it achieves expressive log output through structured data in key-value format. In continuous use in production environments since 2013, it prioritizes support for the latest Python features including asyncio, Context Variables, and type hints. The 2025 version (25.4.0) is the leading library in the Python structured logging field, pursuing proven performance and optimization for large-scale systems.

Details

The 2025 version of structlog is completely established as a production-ready structured logging solution. It provides complete integration with the standard library logging, dedicated support for the Twisted framework, and flexible log transformation pipelines through custom processor chains. It natively supports JSON, logfmt, and beautiful console output, accommodating multiple output formats. Through design emphasizing asynchronous processing, Context Variables utilization, and type safety, it achieves high performance and maintainability in modern Python applications.

Key Features

  • Structured Data: Expressive log representation in key-value format
  • High-Performance Processors: Customizable log transformation pipeline
  • Complete Standard Library Integration: Seamless cooperation with Python standard logging
  • Async Support: Optimized log processing in asyncio environments
  • Multiple Output Formats: Flexible switching between JSON, logfmt, and console display
  • Context Variables: Thread-safe context management

Pros and Cons

Pros

  • Overwhelming analytical capabilities and debugging efficiency improvement through structured logging
  • Flexible and highly extensible log transformation through processor chains
  • Complete compatibility with standard library enables gradual introduction to existing code
  • High-performance log output in asynchronous processing environments
  • Excellent integration with log aggregation systems through JSON output
  • Rich documentation and over 10 years of production operation experience

Cons

  • Initial learning cost is somewhat high, requiring understanding of processor concepts
  • Over-structuring can add complexity for simple logs
  • Slightly more overhead than standard library during high-volume log output
  • Unexpected behavior due to processor chain configuration errors
  • Some features limited in non-JSON output formats
  • Debugging complex configurations can be somewhat difficult

Reference Pages

Usage Examples

Basic Installation and Setup

# Installation
# pip install structlog

import structlog
import logging

# Simplest configuration
structlog.configure()
log = structlog.get_logger()

# Basic log output
log.info("Application started")
log.debug("Debug information", user_id=12345)
log.warning("Warning message", error_code="W001")
log.error("Error occurred", exception_type="ValueError")

# Detailed logging with structured data
log.info("User login", 
         user_id=67890, 
         ip_address="192.168.1.100", 
         user_agent="Mozilla/5.0...",
         session_id="sess_abc123")

# Context information binding
user_log = log.bind(user_id=12345, session="abc123")
user_log.info("Page view", page="/dashboard")
user_log.info("Data retrieval", table="users", count=150)

# Log with exception information
try:
    result = 10 / 0
except ZeroDivisionError:
    log.exception("Calculation error", operation="division", dividend=10, divisor=0)

Advanced Production Environment Configuration

import structlog
import logging
import sys
import orjson

def setup_production_logging():
    """Optimized configuration for production environment"""
    
    # Configuration branching based on environment
    if sys.stderr.isatty():
        # Development environment: beautiful console output
        processors = [
            structlog.contextvars.merge_contextvars,
            structlog.processors.add_log_level,
            structlog.processors.StackInfoRenderer(),
            structlog.dev.set_exc_info,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.dev.ConsoleRenderer(colors=True),
        ]
    else:
        # Production environment: high-speed JSON output
        processors = [
            structlog.contextvars.merge_contextvars,
            structlog.processors.add_log_level,
            structlog.processors.format_exc_info,
            structlog.processors.dict_tracebacks,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.processors.JSONRenderer(serializer=orjson.dumps),
        ]
    
    # High-performance configuration
    structlog.configure(
        cache_logger_on_first_use=True,
        wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
        processors=processors,
        logger_factory=structlog.BytesLoggerFactory(),
    )

# Usage example
setup_production_logging()

class UserService:
    def __init__(self):
        self.log = structlog.get_logger(self.__class__.__name__)
    
    def create_user(self, user_data):
        request_id = f"req_{int(time.time())}"
        log = self.log.bind(request_id=request_id, operation="create_user")
        
        log.info("User creation started", input_data_keys=list(user_data.keys()))
        
        try:
            # Validation
            self._validate_user_data(user_data, log)
            
            # Database save
            user_id = self._save_to_database(user_data, log)
            
            # Success log
            log.info("User creation completed", 
                    user_id=user_id,
                    email=user_data.get("email"),
                    registration_method="direct")
            
            return user_id
            
        except ValidationError as e:
            log.warning("Validation error",
                       error_code=e.code,
                       error_fields=e.fields,
                       input_data=user_data)
            raise
            
        except DatabaseError as e:
            log.error("Database error",
                     error_type="database",
                     error_message=str(e),
                     table="users",
                     severity="high")
            raise
    
    def _validate_user_data(self, user_data, log):
        log.debug("Data validation started")
        # Validation processing
        pass
    
    def _save_to_database(self, user_data, log):
        log.debug("Database save started", table="users")
        # Database processing
        return 12345

# Usage example
service = UserService()
service.create_user({
    "email": "[email protected]",
    "name": "John Doe",
    "age": 30
})

Standard Library logging Integration

import logging
import structlog
import logging.config

def setup_integrated_logging():
    """Complete integration configuration of standard logging and structlog"""
    
    # Timestamp configuration
    timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
    
    # Shared processors
    shared_processors = [
        structlog.stdlib.add_log_level,
        structlog.stdlib.ExtraAdder(),
        timestamper,
    ]
    
    # Custom processor: extract process and thread information
    def extract_from_record(_, __, event_dict):
        record = event_dict["_record"]
        event_dict["thread_name"] = record.threadName
        event_dict["process_name"] = record.processName
        event_dict["module"] = record.module
        event_dict["function"] = record.funcName
        event_dict["line"] = record.lineno
        return event_dict
    
    # Detailed configuration with logging.config.dictConfig
    logging.config.dictConfig({
        "version": 1,
        "disable_existing_loggers": False,
        "formatters": {
            "json_file": {
                "()": structlog.stdlib.ProcessorFormatter,
                "processors": [
                    extract_from_record,
                    structlog.stdlib.ProcessorFormatter.remove_processors_meta,
                    structlog.processors.JSONRenderer(),
                ],
                "foreign_pre_chain": shared_processors,
            },
            "colored_console": {
                "()": structlog.stdlib.ProcessorFormatter,
                "processors": [
                    extract_from_record,
                    structlog.stdlib.ProcessorFormatter.remove_processors_meta,
                    structlog.dev.ConsoleRenderer(colors=True),
                ],
                "foreign_pre_chain": shared_processors,
            },
            "plain_console": {
                "()": structlog.stdlib.ProcessorFormatter,
                "processors": [
                    structlog.stdlib.ProcessorFormatter.remove_processors_meta,
                    structlog.dev.ConsoleRenderer(colors=False),
                ],
                "foreign_pre_chain": shared_processors,
            }
        },
        "handlers": {
            "console": {
                "level": "DEBUG",
                "class": "logging.StreamHandler",
                "formatter": "colored_console",
                "stream": "ext://sys.stdout"
            },
            "file_info": {
                "level": "INFO",
                "class": "logging.handlers.RotatingFileHandler",
                "filename": "logs/app.log",
                "formatter": "json_file",
                "maxBytes": 10485760,  # 10MB
                "backupCount": 5,
                "encoding": "utf-8"
            },
            "file_error": {
                "level": "ERROR",
                "class": "logging.handlers.RotatingFileHandler",
                "filename": "logs/error.log",
                "formatter": "json_file",
                "maxBytes": 10485760,
                "backupCount": 10,
                "encoding": "utf-8"
            }
        },
        "loggers": {
            "": {  # Root logger
                "handlers": ["console", "file_info", "file_error"],
                "level": "DEBUG",
                "propagate": False,
            },
        },
    })
    
    # structlog configuration
    structlog.configure(
        processors=[
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            timestamper,
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )

# Integrated log usage example
setup_integrated_logging()

class WebApplication:
    def __init__(self):
        # structlog logger
        self.struct_log = structlog.get_logger("WebApp")
        # Standard logger
        self.std_log = logging.getLogger("WebApp.Standard")
    
    def handle_request(self, request_data):
        request_id = f"req_{int(time.time() * 1000)}"
        
        # Structured logging with structlog
        self.struct_log.info("Request received",
                           request_id=request_id,
                           method=request_data["method"],
                           path=request_data["path"],
                           user_agent=request_data.get("user_agent"),
                           content_length=len(request_data.get("body", "")))
        
        # Standard logging log (can coexist with structlog)
        self.std_log.info(f"Standard log: Processing request {request_id}")
        
        try:
            result = self._process_request(request_data, request_id)
            
            self.struct_log.info("Request processing completed",
                               request_id=request_id,
                               response_size=len(str(result)),
                               processing_time_ms=45)
            
            return result
            
        except Exception as e:
            # Log with exception information
            self.struct_log.exception("Request processing error",
                                    request_id=request_id,
                                    error_type=type(e).__name__,
                                    request_path=request_data["path"])
            raise
    
    def _process_request(self, request_data, request_id):
        # Processing simulation
        return {"status": "success", "request_id": request_id}

# Usage example
app = WebApplication()
app.handle_request({
    "method": "POST",
    "path": "/api/users",
    "user_agent": "Mozilla/5.0...",
    "body": '{"name": "John Doe"}'
})

Async Processing and Context Variables Usage

import asyncio
import structlog
import contextvars
import time
from typing import Optional

# Context Variables configuration
request_id_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('request_id', default=None)
user_id_var: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar('user_id', default=None)

def setup_async_logging():
    """structlog configuration for async processing"""
    
    # Custom processor: automatically add Context Variables
    def add_context_vars(logger, method_name, event_dict):
        # Automatic retrieval from Context Variables
        if request_id := request_id_var.get(None):
            event_dict["request_id"] = request_id
        if user_id := user_id_var.get(None):
            event_dict["user_id"] = user_id
        return event_dict
    
    structlog.configure(
        processors=[
            # Context Variables integration
            structlog.contextvars.merge_contextvars,
            add_context_vars,
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer(),
        ],
        wrapper_class=structlog.make_filtering_bound_logger(20),  # INFO and above
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

setup_async_logging()

class AsyncAPIService:
    def __init__(self):
        self.log = structlog.get_logger(self.__class__.__name__)
    
    async def handle_async_request(self, request_data):
        """Async request processing"""
        request_id = f"async_req_{int(time.time() * 1000)}"
        user_id = request_data.get("user_id")
        
        # Set in Context Variables
        request_id_var.set(request_id)
        user_id_var.set(user_id)
        
        self.log.info("Async request started",
                     method=request_data["method"],
                     endpoint=request_data["endpoint"])
        
        try:
            # Execute multiple async tasks in parallel
            tasks = [
                self._validate_async(request_data),
                self._fetch_user_data_async(user_id),
                self._check_permissions_async(user_id, request_data["endpoint"])
            ]
            
            validation_result, user_data, permissions = await asyncio.gather(*tasks)
            
            # Main processing
            result = await self._process_main_logic_async(
                request_data, user_data, permissions
            )
            
            self.log.info("Async request completed",
                         result_size=len(str(result)),
                         tasks_completed=len(tasks))
            
            return result
            
        except Exception as e:
            self.log.exception("Async processing error",
                             error_context="main_handler",
                             request_method=request_data["method"])
            raise
        finally:
            # Clear Context Variables
            request_id_var.set(None)
            user_id_var.set(None)
    
    async def _validate_async(self, request_data):
        self.log.debug("Async validation started")
        await asyncio.sleep(0.1)  # Validation processing simulation
        
        if not request_data.get("required_field"):
            self.log.warning("Validation warning", 
                           missing_field="required_field")
            raise ValueError("Required field missing")
        
        self.log.debug("Async validation completed", 
                      validated_fields=list(request_data.keys()))
        return True
    
    async def _fetch_user_data_async(self, user_id):
        self.log.debug("User data retrieval started", target_user_id=user_id)
        await asyncio.sleep(0.2)  # Database access simulation
        
        user_data = {
            "id": user_id,
            "name": "John Doe",
            "role": "admin",
            "permissions": ["read", "write", "delete"]
        }
        
        self.log.debug("User data retrieval completed",
                      user_role=user_data["role"],
                      permission_count=len(user_data["permissions"]))
        return user_data
    
    async def _check_permissions_async(self, user_id, endpoint):
        self.log.debug("Permission check started", endpoint=endpoint)
        await asyncio.sleep(0.05)  # Permission check simulation
        
        # Permission check logic
        allowed = endpoint.startswith("/api/user/")
        
        self.log.debug("Permission check completed",
                      endpoint=endpoint,
                      access_allowed=allowed)
        return allowed
    
    async def _process_main_logic_async(self, request_data, user_data, permissions):
        self.log.info("Main logic processing started")
        
        if not permissions:
            self.log.error("No access permission",
                          user_role=user_data["role"],
                          requested_endpoint=request_data["endpoint"])
            raise PermissionError("No access permission")
        
        # Main processing simulation
        await asyncio.sleep(0.3)
        
        result = {
            "status": "success",
            "processed_at": time.time(),
            "user_name": user_data["name"],
            "data": {"message": "Processing completed"}
        }
        
        self.log.info("Main logic processing completed",
                     result_status=result["status"],
                     processing_user=user_data["name"])
        
        return result

# Async processing usage example
async def main():
    service = AsyncAPIService()
    
    # Simultaneous processing of multiple async requests
    request_data_list = [
        {
            "method": "POST",
            "endpoint": "/api/user/create",
            "user_id": 1001,
            "required_field": "value1"
        },
        {
            "method": "GET", 
            "endpoint": "/api/user/profile",
            "user_id": 1002,
            "required_field": "value2"
        },
        {
            "method": "PUT",
            "endpoint": "/api/user/update",
            "user_id": 1003,
            "required_field": "value3"
        }
    ]
    
    # Parallel processing of multiple requests
    tasks = [
        service.handle_async_request(request_data) 
        for request_data in request_data_list
    ]
    
    try:
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Request {i} error: {result}")
            else:
                print(f"Request {i} success: {result['status']}")
                
    except Exception as e:
        print(f"Overall error: {e}")

# Execution
if __name__ == "__main__":
    asyncio.run(main())

Custom Processors and Performance Optimization

import structlog
import time
import json
import gzip
from typing import Any, Dict
import logging

class PerformanceProcessor:
    """Performance measurement processor"""
    
    def __init__(self):
        self.start_times = {}
    
    def __call__(self, logger, method_name, event_dict):
        # Start performance measurement
        if event_dict.get("_performance_start"):
            operation_id = event_dict.get("operation_id", "unknown")
            self.start_times[operation_id] = time.perf_counter()
            event_dict.pop("_performance_start", None)
        
        # End performance measurement
        if event_dict.get("_performance_end"):
            operation_id = event_dict.get("operation_id", "unknown")
            if operation_id in self.start_times:
                duration = time.perf_counter() - self.start_times[operation_id]
                event_dict["performance_ms"] = round(duration * 1000, 3)
                del self.start_times[operation_id]
            event_dict.pop("_performance_end", None)
        
        return event_dict

class SensitiveDataProcessor:
    """Sensitive data masking processor"""
    
    SENSITIVE_KEYS = {
        "password", "token", "secret", "key", "credential",
        "ssn", "credit_card", "email", "phone", "address"
    }
    
    def __call__(self, logger, method_name, event_dict):
        return self._mask_sensitive_data(event_dict)
    
    def _mask_sensitive_data(self, data):
        if isinstance(data, dict):
            masked = {}
            for key, value in data.items():
                if any(sensitive in key.lower() for sensitive in self.SENSITIVE_KEYS):
                    # Sensitive data masking
                    if isinstance(value, str) and len(value) > 4:
                        masked[key] = f"{value[:2]}***{value[-2:]}"
                    else:
                        masked[key] = "***"
                else:
                    # Recursive masking
                    masked[key] = self._mask_sensitive_data(value)
            return masked
        elif isinstance(data, (list, tuple)):
            return [self._mask_sensitive_data(item) for item in data]
        else:
            return data

class CompressionProcessor:
    """Large data compression processor"""
    
    def __init__(self, threshold_bytes=1024):
        self.threshold_bytes = threshold_bytes
    
    def __call__(self, logger, method_name, event_dict):
        # Compress large data
        for key, value in event_dict.items():
            if isinstance(value, (str, bytes)):
                value_bytes = value.encode() if isinstance(value, str) else value
                if len(value_bytes) > self.threshold_bytes:
                    compressed = gzip.compress(value_bytes)
                    event_dict[key] = {
                        "_compressed": True,
                        "_original_size": len(value_bytes),
                        "_compressed_size": len(compressed),
                        "_data": compressed.hex()
                    }
        
        return event_dict

def setup_high_performance_logging():
    """High-performance, high-functionality logging configuration"""
    
    # Custom processor instances
    performance_processor = PerformanceProcessor()
    sensitive_processor = SensitiveDataProcessor()
    compression_processor = CompressionProcessor(threshold_bytes=512)
    
    # Fast JSON serializer configuration
    try:
        import orjson
        json_serializer = orjson.dumps
    except ImportError:
        json_serializer = json.dumps
    
    structlog.configure(
        cache_logger_on_first_use=True,
        wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
        processors=[
            # Context Variables integration
            structlog.contextvars.merge_contextvars,
            
            # Custom processors
            performance_processor,
            sensitive_processor,
            compression_processor,
            
            # Standard processors
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.processors.format_exc_info,
            
            # Fast JSON output
            structlog.processors.JSONRenderer(serializer=json_serializer),
        ],
        logger_factory=structlog.BytesLoggerFactory(),
    )

# High-performance logging usage example
setup_high_performance_logging()

class OptimizedService:
    def __init__(self):
        self.log = structlog.get_logger(self.__class__.__name__)
    
    def process_large_dataset(self, data):
        operation_id = f"op_{int(time.time() * 1000)}"
        
        # Start performance measurement
        self.log.info("Large dataset processing started",
                     operation_id=operation_id,
                     data_size=len(data),
                     _performance_start=True)
        
        try:
            # Processing with sensitive data
            user_data = {
                "user_id": 12345,
                "email": "[email protected]",  # Masking target
                "password": "secret123",       # Masking target
                "name": "John Doe",
                "preferences": data[:100]  # Data sample
            }
            
            self.log.debug("User data processing",
                          operation_id=operation_id,
                          user_info=user_data)
            
            # Large data processing simulation
            processed_results = []
            for i, item in enumerate(data):
                if i % 1000 == 0:
                    self.log.debug("Processing progress",
                                  operation_id=operation_id,
                                  progress_percent=round((i / len(data)) * 100, 1),
                                  current_index=i)
                
                processed_results.append(f"processed_{item}")
            
            # Large result data (compression target)
            large_result = {
                "results": processed_results,
                "large_data": "x" * 2000,  # Large data (will be compressed)
                "metadata": {
                    "processing_method": "optimized",
                    "total_items": len(data)
                }
            }
            
            # End performance measurement
            self.log.info("Large dataset processing completed",
                         operation_id=operation_id,
                         processed_count=len(processed_results),
                         result_data=large_result,
                         _performance_end=True)
            
            return processed_results
            
        except Exception as e:
            self.log.exception("Data processing error",
                             operation_id=operation_id,
                             error_type=type(e).__name__)
            raise
    
    def batch_processing_with_metrics(self, batch_data):
        """Batch processing with metrics"""
        batch_id = f"batch_{int(time.time())}"
        
        self.log.info("Batch processing started",
                     batch_id=batch_id,
                     batch_size=len(batch_data),
                     _performance_start=True)
        
        success_count = 0
        error_count = 0
        
        for i, item in enumerate(batch_data):
            try:
                # Individual item processing
                result = self._process_single_item(item, batch_id, i)
                success_count += 1
                
                if i % 100 == 0:  # Output metrics every 100 items
                    self.log.info("Batch progress",
                                 batch_id=batch_id,
                                 progress_ratio=f"{i}/{len(batch_data)}",
                                 success_count=success_count,
                                 error_count=error_count)
                
            except Exception as e:
                error_count += 1
                self.log.warning("Item processing error",
                               batch_id=batch_id,
                               item_index=i,
                               error_message=str(e))
        
        # Final result
        self.log.info("Batch processing completed",
                     batch_id=batch_id,
                     total_processed=len(batch_data),
                     success_count=success_count,
                     error_count=error_count,
                     success_rate=round((success_count / len(batch_data)) * 100, 2),
                     _performance_end=True)
        
        return {
            "batch_id": batch_id,
            "success_count": success_count,
            "error_count": error_count
        }
    
    def _process_single_item(self, item, batch_id, index):
        # Individual processing simulation
        if index % 50 == 0:  # 1 in 50 errors
            raise ValueError(f"Processing error: {item}")
        return f"processed_{item}"

# Usage example and performance test
def run_performance_test():
    service = OptimizedService()
    
    # Large data processing test
    large_data = list(range(10000))
    print("=== Large Data Processing Test ===")
    service.process_large_dataset(large_data)
    
    # Batch processing test  
    batch_data = [f"item_{i}" for i in range(500)]
    print("\n=== Batch Processing Test ===")
    service.batch_processing_with_metrics(batch_data)

if __name__ == "__main__":
    run_performance_test()

Log Capture and Assertions in Test Environment

import pytest
import structlog
from structlog.testing import LogCapture, capture_logs
import json

# structlog configuration for testing
def setup_test_logging():
    structlog.configure(
        processors=[
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.processors.JSONRenderer(),
        ],
        wrapper_class=structlog.stdlib.BoundLogger,
        logger_factory=structlog.testing.TestingLoggerFactory(),
        cache_logger_on_first_use=False,
    )

# Pytest fixtures
@pytest.fixture(name="log_capture")
def fixture_log_capture():
    return LogCapture()

@pytest.fixture(autouse=True)
def fixture_configure_structlog_for_tests(log_capture):
    structlog.configure(
        processors=[log_capture],
        wrapper_class=structlog.stdlib.BoundLogger,
        logger_factory=structlog.testing.TestingLoggerFactory(),
        cache_logger_on_first_use=False,
    )

class TestableService:
    def __init__(self):
        self.log = structlog.get_logger(self.__class__.__name__)
    
    def create_user(self, user_data):
        if not user_data.get("email"):
            self.log.error("Email required", validation_error="missing_email")
            raise ValueError("Email is required")
        
        if "@" not in user_data["email"]:
            self.log.warning("Email format warning", 
                           email=user_data["email"],
                           validation_warning="invalid_format")
            
        self.log.info("User creation started", 
                     email=user_data["email"],
                     user_data_keys=list(user_data.keys()))
        
        # User creation processing simulation
        user_id = 12345
        
        self.log.info("User creation completed",
                     user_id=user_id,
                     email=user_data["email"],
                     creation_method="api")
        
        return user_id
    
    def process_batch(self, items):
        self.log.info("Batch processing started", batch_size=len(items))
        
        results = []
        for i, item in enumerate(items):
            try:
                if item == "error_item":
                    raise ValueError("Processing error")
                
                result = f"processed_{item}"
                results.append(result)
                
                self.log.debug("Item processing completed",
                             item_index=i,
                             item_value=item,
                             result=result)
                
            except Exception as e:
                self.log.error("Item processing error",
                             item_index=i,
                             item_value=item,
                             error_message=str(e))
                
        self.log.info("Batch processing completed",
                     total_items=len(items),
                     successful_items=len(results),
                     failed_items=len(items) - len(results))
        
        return results

# Test class
class TestStructlogIntegration:
    
    def test_successful_user_creation(self, log_capture):
        """Log test for successful user creation"""
        service = TestableService()
        
        user_data = {
            "email": "[email protected]",
            "name": "Test User",
            "age": 25
        }
        
        result = service.create_user(user_data)
        
        # Basic assertion
        assert result == 12345
        
        # Log entries verification
        assert len(log_capture.entries) == 2
        
        # Start log verification
        start_log = log_capture.entries[0]
        assert start_log["event"] == "User creation started"
        assert start_log["email"] == "[email protected]"
        assert "user_data_keys" in start_log
        assert start_log["level"] == "info"
        
        # Completion log verification
        end_log = log_capture.entries[1]
        assert end_log["event"] == "User creation completed"
        assert end_log["user_id"] == 12345
        assert end_log["email"] == "[email protected]"
        assert end_log["creation_method"] == "api"
    
    def test_user_creation_with_invalid_email(self, log_capture):
        """Warning log test with invalid email address"""
        service = TestableService()
        
        user_data = {
            "email": "invalid_email",
            "name": "Test User"
        }
        
        result = service.create_user(user_data)
        
        # Verify warning log is output
        warning_logs = [
            entry for entry in log_capture.entries 
            if entry["level"] == "warning"
        ]
        
        assert len(warning_logs) == 1
        warning_log = warning_logs[0]
        assert warning_log["event"] == "Email format warning"
        assert warning_log["email"] == "invalid_email"
        assert warning_log["validation_warning"] == "invalid_format"
    
    def test_user_creation_missing_email_error(self, log_capture):
        """Error log test for missing required field"""
        service = TestableService()
        
        user_data = {"name": "Test User"}
        
        with pytest.raises(ValueError, match="Email is required"):
            service.create_user(user_data)
        
        # Error log verification
        error_logs = [
            entry for entry in log_capture.entries 
            if entry["level"] == "error"
        ]
        
        assert len(error_logs) == 1
        error_log = error_logs[0]
        assert error_log["event"] == "Email required"
        assert error_log["validation_error"] == "missing_email"
    
    def test_batch_processing_with_mixed_results(self, log_capture):
        """Log test for batch processing with mixed success/failure"""
        service = TestableService()
        
        items = ["item1", "item2", "error_item", "item3"]
        results = service.process_batch(items)
        
        # Result verification
        assert len(results) == 3
        assert "processed_item1" in results
        assert "processed_item2" in results
        assert "processed_item3" in results
        
        # Log level verification
        info_logs = [e for e in log_capture.entries if e["level"] == "info"]
        debug_logs = [e for e in log_capture.entries if e["level"] == "debug"]  
        error_logs = [e for e in log_capture.entries if e["level"] == "error"]
        
        # INFO: start and completion logs
        assert len(info_logs) == 2
        assert info_logs[0]["event"] == "Batch processing started"
        assert info_logs[0]["batch_size"] == 4
        
        assert info_logs[1]["event"] == "Batch processing completed"
        assert info_logs[1]["total_items"] == 4
        assert info_logs[1]["successful_items"] == 3
        assert info_logs[1]["failed_items"] == 1
        
        # DEBUG: individual item processing logs (successful only)
        assert len(debug_logs) == 3
        
        # ERROR: failed item logs
        assert len(error_logs) == 1
        assert error_logs[0]["event"] == "Item processing error"
        assert error_logs[0]["item_value"] == "error_item"
    
    def test_log_context_binding(self, log_capture):
        """Log context binding test"""
        base_log = structlog.get_logger("TestLogger")
        
        # Context binding
        bound_log = base_log.bind(
            session_id="sess_12345",
            user_id=67890,
            operation="test_operation"
        )
        
        bound_log.info("Log with context", additional_data="test_value")
        
        # Verify context is correctly included
        assert len(log_capture.entries) == 1
        log_entry = log_capture.entries[0]
        
        assert log_entry["session_id"] == "sess_12345"
        assert log_entry["user_id"] == 67890
        assert log_entry["operation"] == "test_operation"
        assert log_entry["additional_data"] == "test_value"
        assert log_entry["event"] == "Log with context"

# Log capture test using context manager
class TestLogCaptureContext:
    
    def test_with_capture_logs_context_manager(self):
        """Test using capture_logs context manager"""
        service = TestableService()
        
        with capture_logs() as captured_logs:
            service.create_user({
                "email": "[email protected]",
                "name": "Context Test"
            })
        
        # Captured log verification
        assert len(captured_logs) == 2
        
        # JSON format verification (after processor processing)
        start_log = captured_logs[0]
        assert "timestamp" in start_log
        assert start_log["event"] == "User creation started"
        assert start_log["email"] == "[email protected]"
        
        end_log = captured_logs[1]
        assert end_log["event"] == "User creation completed"
        assert end_log["user_id"] == 12345

# Test execution example
if __name__ == "__main__":
    # Manual test execution
    pytest.main([__file__, "-v"])