structlog
Python library specialized in structured log output (2,500+ GitHub stars). Supports structured output in JSON and Logfmt formats, generates beautiful console output in development environments and machine-readable logs in production. Supports asyncio, context variables, and type hints.
Library
structlog
Overview
structlog is positioned as the definitive high-performance logging library for structured logging in Python. Moving beyond traditional string-based logs, it achieves expressive log output through structured data in key-value format. In continuous use in production environments since 2013, it prioritizes support for the latest Python features including asyncio, Context Variables, and type hints. The 2025 version (25.4.0) is the leading library in the Python structured logging field, pursuing proven performance and optimization for large-scale systems.
Details
The 2025 version of structlog is completely established as a production-ready structured logging solution. It provides complete integration with the standard library logging, dedicated support for the Twisted framework, and flexible log transformation pipelines through custom processor chains. It natively supports JSON, logfmt, and beautiful console output, accommodating multiple output formats. Through design emphasizing asynchronous processing, Context Variables utilization, and type safety, it achieves high performance and maintainability in modern Python applications.
Key Features
- Structured Data: Expressive log representation in key-value format
- High-Performance Processors: Customizable log transformation pipeline
- Complete Standard Library Integration: Seamless cooperation with Python standard logging
- Async Support: Optimized log processing in asyncio environments
- Multiple Output Formats: Flexible switching between JSON, logfmt, and console display
- Context Variables: Thread-safe context management
Pros and Cons
Pros
- Overwhelming analytical capabilities and debugging efficiency improvement through structured logging
- Flexible and highly extensible log transformation through processor chains
- Complete compatibility with standard library enables gradual introduction to existing code
- High-performance log output in asynchronous processing environments
- Excellent integration with log aggregation systems through JSON output
- Rich documentation and over 10 years of production operation experience
Cons
- Initial learning cost is somewhat high, requiring understanding of processor concepts
- Over-structuring can add complexity for simple logs
- Slightly more overhead than standard library during high-volume log output
- Unexpected behavior due to processor chain configuration errors
- Some features limited in non-JSON output formats
- Debugging complex configurations can be somewhat difficult
Reference Pages
- structlog Official Documentation
- structlog GitHub Repository
- structlog Standard Library Integration Guide
Usage Examples
Basic Installation and Setup
# Installation
# pip install structlog
import structlog
import logging
# Simplest configuration
structlog.configure()
log = structlog.get_logger()
# Basic log output
log.info("Application started")
log.debug("Debug information", user_id=12345)
log.warning("Warning message", error_code="W001")
log.error("Error occurred", exception_type="ValueError")
# Detailed logging with structured data
log.info("User login",
user_id=67890,
ip_address="192.168.1.100",
user_agent="Mozilla/5.0...",
session_id="sess_abc123")
# Context information binding
user_log = log.bind(user_id=12345, session="abc123")
user_log.info("Page view", page="/dashboard")
user_log.info("Data retrieval", table="users", count=150)
# Log with exception information
try:
result = 10 / 0
except ZeroDivisionError:
log.exception("Calculation error", operation="division", dividend=10, divisor=0)
Advanced Production Environment Configuration
import structlog
import logging
import sys
import orjson
def setup_production_logging():
"""Optimized configuration for production environment"""
# Configuration branching based on environment
if sys.stderr.isatty():
# Development environment: beautiful console output
processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.dev.ConsoleRenderer(colors=True),
]
else:
# Production environment: high-speed JSON output
processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.format_exc_info,
structlog.processors.dict_tracebacks,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.JSONRenderer(serializer=orjson.dumps),
]
# High-performance configuration
structlog.configure(
cache_logger_on_first_use=True,
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
processors=processors,
logger_factory=structlog.BytesLoggerFactory(),
)
# Usage example
setup_production_logging()
class UserService:
def __init__(self):
self.log = structlog.get_logger(self.__class__.__name__)
def create_user(self, user_data):
request_id = f"req_{int(time.time())}"
log = self.log.bind(request_id=request_id, operation="create_user")
log.info("User creation started", input_data_keys=list(user_data.keys()))
try:
# Validation
self._validate_user_data(user_data, log)
# Database save
user_id = self._save_to_database(user_data, log)
# Success log
log.info("User creation completed",
user_id=user_id,
email=user_data.get("email"),
registration_method="direct")
return user_id
except ValidationError as e:
log.warning("Validation error",
error_code=e.code,
error_fields=e.fields,
input_data=user_data)
raise
except DatabaseError as e:
log.error("Database error",
error_type="database",
error_message=str(e),
table="users",
severity="high")
raise
def _validate_user_data(self, user_data, log):
log.debug("Data validation started")
# Validation processing
pass
def _save_to_database(self, user_data, log):
log.debug("Database save started", table="users")
# Database processing
return 12345
# Usage example
service = UserService()
service.create_user({
"email": "[email protected]",
"name": "John Doe",
"age": 30
})
Standard Library logging Integration
import logging
import structlog
import logging.config
def setup_integrated_logging():
"""Complete integration configuration of standard logging and structlog"""
# Timestamp configuration
timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
# Shared processors
shared_processors = [
structlog.stdlib.add_log_level,
structlog.stdlib.ExtraAdder(),
timestamper,
]
# Custom processor: extract process and thread information
def extract_from_record(_, __, event_dict):
record = event_dict["_record"]
event_dict["thread_name"] = record.threadName
event_dict["process_name"] = record.processName
event_dict["module"] = record.module
event_dict["function"] = record.funcName
event_dict["line"] = record.lineno
return event_dict
# Detailed configuration with logging.config.dictConfig
logging.config.dictConfig({
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json_file": {
"()": structlog.stdlib.ProcessorFormatter,
"processors": [
extract_from_record,
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.processors.JSONRenderer(),
],
"foreign_pre_chain": shared_processors,
},
"colored_console": {
"()": structlog.stdlib.ProcessorFormatter,
"processors": [
extract_from_record,
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(colors=True),
],
"foreign_pre_chain": shared_processors,
},
"plain_console": {
"()": structlog.stdlib.ProcessorFormatter,
"processors": [
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(colors=False),
],
"foreign_pre_chain": shared_processors,
}
},
"handlers": {
"console": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "colored_console",
"stream": "ext://sys.stdout"
},
"file_info": {
"level": "INFO",
"class": "logging.handlers.RotatingFileHandler",
"filename": "logs/app.log",
"formatter": "json_file",
"maxBytes": 10485760, # 10MB
"backupCount": 5,
"encoding": "utf-8"
},
"file_error": {
"level": "ERROR",
"class": "logging.handlers.RotatingFileHandler",
"filename": "logs/error.log",
"formatter": "json_file",
"maxBytes": 10485760,
"backupCount": 10,
"encoding": "utf-8"
}
},
"loggers": {
"": { # Root logger
"handlers": ["console", "file_info", "file_error"],
"level": "DEBUG",
"propagate": False,
},
},
})
# structlog configuration
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
timestamper,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Integrated log usage example
setup_integrated_logging()
class WebApplication:
def __init__(self):
# structlog logger
self.struct_log = structlog.get_logger("WebApp")
# Standard logger
self.std_log = logging.getLogger("WebApp.Standard")
def handle_request(self, request_data):
request_id = f"req_{int(time.time() * 1000)}"
# Structured logging with structlog
self.struct_log.info("Request received",
request_id=request_id,
method=request_data["method"],
path=request_data["path"],
user_agent=request_data.get("user_agent"),
content_length=len(request_data.get("body", "")))
# Standard logging log (can coexist with structlog)
self.std_log.info(f"Standard log: Processing request {request_id}")
try:
result = self._process_request(request_data, request_id)
self.struct_log.info("Request processing completed",
request_id=request_id,
response_size=len(str(result)),
processing_time_ms=45)
return result
except Exception as e:
# Log with exception information
self.struct_log.exception("Request processing error",
request_id=request_id,
error_type=type(e).__name__,
request_path=request_data["path"])
raise
def _process_request(self, request_data, request_id):
# Processing simulation
return {"status": "success", "request_id": request_id}
# Usage example
app = WebApplication()
app.handle_request({
"method": "POST",
"path": "/api/users",
"user_agent": "Mozilla/5.0...",
"body": '{"name": "John Doe"}'
})
Async Processing and Context Variables Usage
import asyncio
import structlog
import contextvars
import time
from typing import Optional
# Context Variables configuration
request_id_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('request_id', default=None)
user_id_var: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar('user_id', default=None)
def setup_async_logging():
"""structlog configuration for async processing"""
# Custom processor: automatically add Context Variables
def add_context_vars(logger, method_name, event_dict):
# Automatic retrieval from Context Variables
if request_id := request_id_var.get(None):
event_dict["request_id"] = request_id
if user_id := user_id_var.get(None):
event_dict["user_id"] = user_id
return event_dict
structlog.configure(
processors=[
# Context Variables integration
structlog.contextvars.merge_contextvars,
add_context_vars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(20), # INFO and above
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
setup_async_logging()
class AsyncAPIService:
def __init__(self):
self.log = structlog.get_logger(self.__class__.__name__)
async def handle_async_request(self, request_data):
"""Async request processing"""
request_id = f"async_req_{int(time.time() * 1000)}"
user_id = request_data.get("user_id")
# Set in Context Variables
request_id_var.set(request_id)
user_id_var.set(user_id)
self.log.info("Async request started",
method=request_data["method"],
endpoint=request_data["endpoint"])
try:
# Execute multiple async tasks in parallel
tasks = [
self._validate_async(request_data),
self._fetch_user_data_async(user_id),
self._check_permissions_async(user_id, request_data["endpoint"])
]
validation_result, user_data, permissions = await asyncio.gather(*tasks)
# Main processing
result = await self._process_main_logic_async(
request_data, user_data, permissions
)
self.log.info("Async request completed",
result_size=len(str(result)),
tasks_completed=len(tasks))
return result
except Exception as e:
self.log.exception("Async processing error",
error_context="main_handler",
request_method=request_data["method"])
raise
finally:
# Clear Context Variables
request_id_var.set(None)
user_id_var.set(None)
async def _validate_async(self, request_data):
self.log.debug("Async validation started")
await asyncio.sleep(0.1) # Validation processing simulation
if not request_data.get("required_field"):
self.log.warning("Validation warning",
missing_field="required_field")
raise ValueError("Required field missing")
self.log.debug("Async validation completed",
validated_fields=list(request_data.keys()))
return True
async def _fetch_user_data_async(self, user_id):
self.log.debug("User data retrieval started", target_user_id=user_id)
await asyncio.sleep(0.2) # Database access simulation
user_data = {
"id": user_id,
"name": "John Doe",
"role": "admin",
"permissions": ["read", "write", "delete"]
}
self.log.debug("User data retrieval completed",
user_role=user_data["role"],
permission_count=len(user_data["permissions"]))
return user_data
async def _check_permissions_async(self, user_id, endpoint):
self.log.debug("Permission check started", endpoint=endpoint)
await asyncio.sleep(0.05) # Permission check simulation
# Permission check logic
allowed = endpoint.startswith("/api/user/")
self.log.debug("Permission check completed",
endpoint=endpoint,
access_allowed=allowed)
return allowed
async def _process_main_logic_async(self, request_data, user_data, permissions):
self.log.info("Main logic processing started")
if not permissions:
self.log.error("No access permission",
user_role=user_data["role"],
requested_endpoint=request_data["endpoint"])
raise PermissionError("No access permission")
# Main processing simulation
await asyncio.sleep(0.3)
result = {
"status": "success",
"processed_at": time.time(),
"user_name": user_data["name"],
"data": {"message": "Processing completed"}
}
self.log.info("Main logic processing completed",
result_status=result["status"],
processing_user=user_data["name"])
return result
# Async processing usage example
async def main():
service = AsyncAPIService()
# Simultaneous processing of multiple async requests
request_data_list = [
{
"method": "POST",
"endpoint": "/api/user/create",
"user_id": 1001,
"required_field": "value1"
},
{
"method": "GET",
"endpoint": "/api/user/profile",
"user_id": 1002,
"required_field": "value2"
},
{
"method": "PUT",
"endpoint": "/api/user/update",
"user_id": 1003,
"required_field": "value3"
}
]
# Parallel processing of multiple requests
tasks = [
service.handle_async_request(request_data)
for request_data in request_data_list
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Request {i} error: {result}")
else:
print(f"Request {i} success: {result['status']}")
except Exception as e:
print(f"Overall error: {e}")
# Execution
if __name__ == "__main__":
asyncio.run(main())
Custom Processors and Performance Optimization
import structlog
import time
import json
import gzip
from typing import Any, Dict
import logging
class PerformanceProcessor:
"""Performance measurement processor"""
def __init__(self):
self.start_times = {}
def __call__(self, logger, method_name, event_dict):
# Start performance measurement
if event_dict.get("_performance_start"):
operation_id = event_dict.get("operation_id", "unknown")
self.start_times[operation_id] = time.perf_counter()
event_dict.pop("_performance_start", None)
# End performance measurement
if event_dict.get("_performance_end"):
operation_id = event_dict.get("operation_id", "unknown")
if operation_id in self.start_times:
duration = time.perf_counter() - self.start_times[operation_id]
event_dict["performance_ms"] = round(duration * 1000, 3)
del self.start_times[operation_id]
event_dict.pop("_performance_end", None)
return event_dict
class SensitiveDataProcessor:
"""Sensitive data masking processor"""
SENSITIVE_KEYS = {
"password", "token", "secret", "key", "credential",
"ssn", "credit_card", "email", "phone", "address"
}
def __call__(self, logger, method_name, event_dict):
return self._mask_sensitive_data(event_dict)
def _mask_sensitive_data(self, data):
if isinstance(data, dict):
masked = {}
for key, value in data.items():
if any(sensitive in key.lower() for sensitive in self.SENSITIVE_KEYS):
# Sensitive data masking
if isinstance(value, str) and len(value) > 4:
masked[key] = f"{value[:2]}***{value[-2:]}"
else:
masked[key] = "***"
else:
# Recursive masking
masked[key] = self._mask_sensitive_data(value)
return masked
elif isinstance(data, (list, tuple)):
return [self._mask_sensitive_data(item) for item in data]
else:
return data
class CompressionProcessor:
"""Large data compression processor"""
def __init__(self, threshold_bytes=1024):
self.threshold_bytes = threshold_bytes
def __call__(self, logger, method_name, event_dict):
# Compress large data
for key, value in event_dict.items():
if isinstance(value, (str, bytes)):
value_bytes = value.encode() if isinstance(value, str) else value
if len(value_bytes) > self.threshold_bytes:
compressed = gzip.compress(value_bytes)
event_dict[key] = {
"_compressed": True,
"_original_size": len(value_bytes),
"_compressed_size": len(compressed),
"_data": compressed.hex()
}
return event_dict
def setup_high_performance_logging():
"""High-performance, high-functionality logging configuration"""
# Custom processor instances
performance_processor = PerformanceProcessor()
sensitive_processor = SensitiveDataProcessor()
compression_processor = CompressionProcessor(threshold_bytes=512)
# Fast JSON serializer configuration
try:
import orjson
json_serializer = orjson.dumps
except ImportError:
json_serializer = json.dumps
structlog.configure(
cache_logger_on_first_use=True,
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
processors=[
# Context Variables integration
structlog.contextvars.merge_contextvars,
# Custom processors
performance_processor,
sensitive_processor,
compression_processor,
# Standard processors
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.format_exc_info,
# Fast JSON output
structlog.processors.JSONRenderer(serializer=json_serializer),
],
logger_factory=structlog.BytesLoggerFactory(),
)
# High-performance logging usage example
setup_high_performance_logging()
class OptimizedService:
def __init__(self):
self.log = structlog.get_logger(self.__class__.__name__)
def process_large_dataset(self, data):
operation_id = f"op_{int(time.time() * 1000)}"
# Start performance measurement
self.log.info("Large dataset processing started",
operation_id=operation_id,
data_size=len(data),
_performance_start=True)
try:
# Processing with sensitive data
user_data = {
"user_id": 12345,
"email": "[email protected]", # Masking target
"password": "secret123", # Masking target
"name": "John Doe",
"preferences": data[:100] # Data sample
}
self.log.debug("User data processing",
operation_id=operation_id,
user_info=user_data)
# Large data processing simulation
processed_results = []
for i, item in enumerate(data):
if i % 1000 == 0:
self.log.debug("Processing progress",
operation_id=operation_id,
progress_percent=round((i / len(data)) * 100, 1),
current_index=i)
processed_results.append(f"processed_{item}")
# Large result data (compression target)
large_result = {
"results": processed_results,
"large_data": "x" * 2000, # Large data (will be compressed)
"metadata": {
"processing_method": "optimized",
"total_items": len(data)
}
}
# End performance measurement
self.log.info("Large dataset processing completed",
operation_id=operation_id,
processed_count=len(processed_results),
result_data=large_result,
_performance_end=True)
return processed_results
except Exception as e:
self.log.exception("Data processing error",
operation_id=operation_id,
error_type=type(e).__name__)
raise
def batch_processing_with_metrics(self, batch_data):
"""Batch processing with metrics"""
batch_id = f"batch_{int(time.time())}"
self.log.info("Batch processing started",
batch_id=batch_id,
batch_size=len(batch_data),
_performance_start=True)
success_count = 0
error_count = 0
for i, item in enumerate(batch_data):
try:
# Individual item processing
result = self._process_single_item(item, batch_id, i)
success_count += 1
if i % 100 == 0: # Output metrics every 100 items
self.log.info("Batch progress",
batch_id=batch_id,
progress_ratio=f"{i}/{len(batch_data)}",
success_count=success_count,
error_count=error_count)
except Exception as e:
error_count += 1
self.log.warning("Item processing error",
batch_id=batch_id,
item_index=i,
error_message=str(e))
# Final result
self.log.info("Batch processing completed",
batch_id=batch_id,
total_processed=len(batch_data),
success_count=success_count,
error_count=error_count,
success_rate=round((success_count / len(batch_data)) * 100, 2),
_performance_end=True)
return {
"batch_id": batch_id,
"success_count": success_count,
"error_count": error_count
}
def _process_single_item(self, item, batch_id, index):
# Individual processing simulation
if index % 50 == 0: # 1 in 50 errors
raise ValueError(f"Processing error: {item}")
return f"processed_{item}"
# Usage example and performance test
def run_performance_test():
service = OptimizedService()
# Large data processing test
large_data = list(range(10000))
print("=== Large Data Processing Test ===")
service.process_large_dataset(large_data)
# Batch processing test
batch_data = [f"item_{i}" for i in range(500)]
print("\n=== Batch Processing Test ===")
service.batch_processing_with_metrics(batch_data)
if __name__ == "__main__":
run_performance_test()
Log Capture and Assertions in Test Environment
import pytest
import structlog
from structlog.testing import LogCapture, capture_logs
import json
# structlog configuration for testing
def setup_test_logging():
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.testing.TestingLoggerFactory(),
cache_logger_on_first_use=False,
)
# Pytest fixtures
@pytest.fixture(name="log_capture")
def fixture_log_capture():
return LogCapture()
@pytest.fixture(autouse=True)
def fixture_configure_structlog_for_tests(log_capture):
structlog.configure(
processors=[log_capture],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.testing.TestingLoggerFactory(),
cache_logger_on_first_use=False,
)
class TestableService:
def __init__(self):
self.log = structlog.get_logger(self.__class__.__name__)
def create_user(self, user_data):
if not user_data.get("email"):
self.log.error("Email required", validation_error="missing_email")
raise ValueError("Email is required")
if "@" not in user_data["email"]:
self.log.warning("Email format warning",
email=user_data["email"],
validation_warning="invalid_format")
self.log.info("User creation started",
email=user_data["email"],
user_data_keys=list(user_data.keys()))
# User creation processing simulation
user_id = 12345
self.log.info("User creation completed",
user_id=user_id,
email=user_data["email"],
creation_method="api")
return user_id
def process_batch(self, items):
self.log.info("Batch processing started", batch_size=len(items))
results = []
for i, item in enumerate(items):
try:
if item == "error_item":
raise ValueError("Processing error")
result = f"processed_{item}"
results.append(result)
self.log.debug("Item processing completed",
item_index=i,
item_value=item,
result=result)
except Exception as e:
self.log.error("Item processing error",
item_index=i,
item_value=item,
error_message=str(e))
self.log.info("Batch processing completed",
total_items=len(items),
successful_items=len(results),
failed_items=len(items) - len(results))
return results
# Test class
class TestStructlogIntegration:
def test_successful_user_creation(self, log_capture):
"""Log test for successful user creation"""
service = TestableService()
user_data = {
"email": "[email protected]",
"name": "Test User",
"age": 25
}
result = service.create_user(user_data)
# Basic assertion
assert result == 12345
# Log entries verification
assert len(log_capture.entries) == 2
# Start log verification
start_log = log_capture.entries[0]
assert start_log["event"] == "User creation started"
assert start_log["email"] == "[email protected]"
assert "user_data_keys" in start_log
assert start_log["level"] == "info"
# Completion log verification
end_log = log_capture.entries[1]
assert end_log["event"] == "User creation completed"
assert end_log["user_id"] == 12345
assert end_log["email"] == "[email protected]"
assert end_log["creation_method"] == "api"
def test_user_creation_with_invalid_email(self, log_capture):
"""Warning log test with invalid email address"""
service = TestableService()
user_data = {
"email": "invalid_email",
"name": "Test User"
}
result = service.create_user(user_data)
# Verify warning log is output
warning_logs = [
entry for entry in log_capture.entries
if entry["level"] == "warning"
]
assert len(warning_logs) == 1
warning_log = warning_logs[0]
assert warning_log["event"] == "Email format warning"
assert warning_log["email"] == "invalid_email"
assert warning_log["validation_warning"] == "invalid_format"
def test_user_creation_missing_email_error(self, log_capture):
"""Error log test for missing required field"""
service = TestableService()
user_data = {"name": "Test User"}
with pytest.raises(ValueError, match="Email is required"):
service.create_user(user_data)
# Error log verification
error_logs = [
entry for entry in log_capture.entries
if entry["level"] == "error"
]
assert len(error_logs) == 1
error_log = error_logs[0]
assert error_log["event"] == "Email required"
assert error_log["validation_error"] == "missing_email"
def test_batch_processing_with_mixed_results(self, log_capture):
"""Log test for batch processing with mixed success/failure"""
service = TestableService()
items = ["item1", "item2", "error_item", "item3"]
results = service.process_batch(items)
# Result verification
assert len(results) == 3
assert "processed_item1" in results
assert "processed_item2" in results
assert "processed_item3" in results
# Log level verification
info_logs = [e for e in log_capture.entries if e["level"] == "info"]
debug_logs = [e for e in log_capture.entries if e["level"] == "debug"]
error_logs = [e for e in log_capture.entries if e["level"] == "error"]
# INFO: start and completion logs
assert len(info_logs) == 2
assert info_logs[0]["event"] == "Batch processing started"
assert info_logs[0]["batch_size"] == 4
assert info_logs[1]["event"] == "Batch processing completed"
assert info_logs[1]["total_items"] == 4
assert info_logs[1]["successful_items"] == 3
assert info_logs[1]["failed_items"] == 1
# DEBUG: individual item processing logs (successful only)
assert len(debug_logs) == 3
# ERROR: failed item logs
assert len(error_logs) == 1
assert error_logs[0]["event"] == "Item processing error"
assert error_logs[0]["item_value"] == "error_item"
def test_log_context_binding(self, log_capture):
"""Log context binding test"""
base_log = structlog.get_logger("TestLogger")
# Context binding
bound_log = base_log.bind(
session_id="sess_12345",
user_id=67890,
operation="test_operation"
)
bound_log.info("Log with context", additional_data="test_value")
# Verify context is correctly included
assert len(log_capture.entries) == 1
log_entry = log_capture.entries[0]
assert log_entry["session_id"] == "sess_12345"
assert log_entry["user_id"] == 67890
assert log_entry["operation"] == "test_operation"
assert log_entry["additional_data"] == "test_value"
assert log_entry["event"] == "Log with context"
# Log capture test using context manager
class TestLogCaptureContext:
def test_with_capture_logs_context_manager(self):
"""Test using capture_logs context manager"""
service = TestableService()
with capture_logs() as captured_logs:
service.create_user({
"email": "[email protected]",
"name": "Context Test"
})
# Captured log verification
assert len(captured_logs) == 2
# JSON format verification (after processor processing)
start_log = captured_logs[0]
assert "timestamp" in start_log
assert start_log["event"] == "User creation started"
assert start_log["email"] == "[email protected]"
end_log = captured_logs[1]
assert end_log["event"] == "User creation completed"
assert end_log["user_id"] == 12345
# Test execution example
if __name__ == "__main__":
# Manual test execution
pytest.main([__file__, "-v"])