structlog
構造化ログ出力に特化したPythonライブラリ(2,500+ GitHub スター)。JSON形式やLogfmt形式での構造化出力をサポートし、開発環境では美しいコンソール出力、本番環境では機械読み取り可能なログを生成。asyncio、コンテキスト変数、型ヒントに対応。
GitHub概要
hynek/structlog
Simple, powerful, and fast logging for Python.
スター4,093
ウォッチ33
フォーク245
作成日:2013年8月13日
言語:Python
ライセンス:Other
トピックス
loggingpythonstructured-logging
スター履歴
データ取得日時: 2025/7/17 08:20
ライブラリ
structlog
概要
structlogは、Pythonにおける構造化ログの決定版として位置づけられている高性能ロギングライブラリです。従来の文字列ベースのログから脱却し、キー・バリュー形式の構造化データによる表現力豊かなログ出力を実現します。2013年から本格的な本番環境で使用され続け、asyncio、Context Variables、型ヒントなど最新のPython機能への対応を重視。2025年版では、バージョン25.4.0として大規模システムでの実績とパフォーマンス最適化を追求した、Python構造化ログ分野のリーディングライブラリです。
詳細
structlog 2025年版は、プロダクションレディな構造化ログソリューションとして完全に確立されています。標準ライブラリのloggingとの完全統合、Twisted框架への専用サポート、カスタムプロセッサーチェーンによる柔軟なログ変換パイプラインを提供。JSON、logfmt、美しいコンソール出力をネイティブサポートし、複数の出力形式に対応。非同期処理、Context Variables活用、型安全性を重視した設計により、モダンPythonアプリケーションでの高いパフォーマンスとメンテナンス性を実現しています。
主な特徴
- 構造化データ: キー・バリュー形式による表現力豊かなログ表現
- 高性能プロセッサー: カスタマイズ可能なログ変換パイプライン
- 完全な標準ライブラリ統合: Python標準loggingとのシームレス連携
- 非同期サポート: asyncio環境での最適化されたログ処理
- 複数出力形式: JSON、logfmt、コンソール表示の柔軟な切り替え
- Context Variables: スレッド安全なコンテキスト管理
メリット・デメリット
メリット
- 構造化ログによる圧倒的な解析性とデバッグ効率の向上
- プロセッサーチェーンによる柔軟で拡張性の高いログ変換
- 標準ライブラリとの完全互換性で既存コードへの段階的導入が可能
- 非同期処理環境での高性能ログ出力を実現
- JSON出力によるログ集約システムとの優れた統合性
- 豊富なドキュメントと10年以上の本番運用実績
デメリット
- 初期学習コストがやや高く、プロセッサー概念の理解が必要
- 構造化しすぎることで単純なログには複雑性が増す場合がある
- 大量ログ出力時は標準ライブラリよりもわずかにオーバーヘッドが大きい
- プロセッサーチェーンの設定ミスによる予期しない動作
- JSON以外の出力形式では一部機能が制限される
- 複雑な設定時のデバッグがやや困難
参考ページ
書き方の例
基本的なインストールと設定
# インストール
# pip install structlog
import structlog
import logging
# 最もシンプルな設定
structlog.configure()
log = structlog.get_logger()
# 基本的なログ出力
log.info("アプリケーション開始")
log.debug("デバッグ情報", user_id=12345)
log.warning("警告メッセージ", error_code="W001")
log.error("エラーが発生", exception_type="ValueError")
# 構造化データでの詳細ログ
log.info("ユーザーログイン",
user_id=67890,
ip_address="192.168.1.100",
user_agent="Mozilla/5.0...",
session_id="sess_abc123")
# コンテキスト情報の結合
user_log = log.bind(user_id=12345, session="abc123")
user_log.info("ページ閲覧", page="/dashboard")
user_log.info("データ取得", table="users", count=150)
# 例外情報付きログ
try:
result = 10 / 0
except ZeroDivisionError:
log.exception("計算エラー", operation="division", dividend=10, divisor=0)
プロダクション環境向け高度な設定
import structlog
import logging
import sys
import orjson
def setup_production_logging():
"""プロダクション環境用の最適化された設定"""
# 環境に応じた設定分岐
if sys.stderr.isatty():
# 開発環境: 美しいコンソール出力
processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.dev.ConsoleRenderer(colors=True),
]
else:
# 本番環境: 高速JSON出力
processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.format_exc_info,
structlog.processors.dict_tracebacks,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.JSONRenderer(serializer=orjson.dumps),
]
# 高性能設定
structlog.configure(
cache_logger_on_first_use=True,
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
processors=processors,
logger_factory=structlog.BytesLoggerFactory(),
)
# 使用例
setup_production_logging()
class UserService:
def __init__(self):
self.log = structlog.get_logger(self.__class__.__name__)
def create_user(self, user_data):
request_id = f"req_{int(time.time())}"
log = self.log.bind(request_id=request_id, operation="create_user")
log.info("ユーザー作成開始", input_data_keys=list(user_data.keys()))
try:
# バリデーション
self._validate_user_data(user_data, log)
# データベース保存
user_id = self._save_to_database(user_data, log)
# 成功ログ
log.info("ユーザー作成完了",
user_id=user_id,
email=user_data.get("email"),
registration_method="direct")
return user_id
except ValidationError as e:
log.warning("バリデーションエラー",
error_code=e.code,
error_fields=e.fields,
input_data=user_data)
raise
except DatabaseError as e:
log.error("データベースエラー",
error_type="database",
error_message=str(e),
table="users",
severity="high")
raise
def _validate_user_data(self, user_data, log):
log.debug("データバリデーション開始")
# バリデーション処理
pass
def _save_to_database(self, user_data, log):
log.debug("データベース保存開始", table="users")
# データベース処理
return 12345
# 使用例
service = UserService()
service.create_user({
"email": "[email protected]",
"name": "田中太郎",
"age": 30
})
標準ライブラリloggingとの統合
import logging
import structlog
import logging.config
def setup_integrated_logging():
"""標準loggingとstructlogの完全統合設定"""
# タイムスタンプ設定
timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
# 共通プロセッサー
shared_processors = [
structlog.stdlib.add_log_level,
structlog.stdlib.ExtraAdder(),
timestamper,
]
# カスタムプロセッサー: プロセス・スレッド情報抽出
def extract_from_record(_, __, event_dict):
record = event_dict["_record"]
event_dict["thread_name"] = record.threadName
event_dict["process_name"] = record.processName
event_dict["module"] = record.module
event_dict["function"] = record.funcName
event_dict["line"] = record.lineno
return event_dict
# logging.config.dictConfigによる詳細設定
logging.config.dictConfig({
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json_file": {
"()": structlog.stdlib.ProcessorFormatter,
"processors": [
extract_from_record,
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.processors.JSONRenderer(),
],
"foreign_pre_chain": shared_processors,
},
"colored_console": {
"()": structlog.stdlib.ProcessorFormatter,
"processors": [
extract_from_record,
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(colors=True),
],
"foreign_pre_chain": shared_processors,
},
"plain_console": {
"()": structlog.stdlib.ProcessorFormatter,
"processors": [
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(colors=False),
],
"foreign_pre_chain": shared_processors,
}
},
"handlers": {
"console": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "colored_console",
"stream": "ext://sys.stdout"
},
"file_info": {
"level": "INFO",
"class": "logging.handlers.RotatingFileHandler",
"filename": "logs/app.log",
"formatter": "json_file",
"maxBytes": 10485760, # 10MB
"backupCount": 5,
"encoding": "utf-8"
},
"file_error": {
"level": "ERROR",
"class": "logging.handlers.RotatingFileHandler",
"filename": "logs/error.log",
"formatter": "json_file",
"maxBytes": 10485760,
"backupCount": 10,
"encoding": "utf-8"
}
},
"loggers": {
"": { # ルートロガー
"handlers": ["console", "file_info", "file_error"],
"level": "DEBUG",
"propagate": False,
},
},
})
# structlog設定
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
timestamper,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# 統合ログ使用例
setup_integrated_logging()
class WebApplication:
def __init__(self):
# structlogロガー
self.struct_log = structlog.get_logger("WebApp")
# 標準ロガー
self.std_log = logging.getLogger("WebApp.Standard")
def handle_request(self, request_data):
request_id = f"req_{int(time.time() * 1000)}"
# structlogでの構造化ログ
self.struct_log.info("リクエスト受信",
request_id=request_id,
method=request_data["method"],
path=request_data["path"],
user_agent=request_data.get("user_agent"),
content_length=len(request_data.get("body", "")))
# 標準loggingでのログ(structlogと混在可能)
self.std_log.info(f"標準ログ: リクエスト {request_id} を処理中")
try:
result = self._process_request(request_data, request_id)
self.struct_log.info("リクエスト処理完了",
request_id=request_id,
response_size=len(str(result)),
processing_time_ms=45)
return result
except Exception as e:
# 例外情報付きログ
self.struct_log.exception("リクエスト処理エラー",
request_id=request_id,
error_type=type(e).__name__,
request_path=request_data["path"])
raise
def _process_request(self, request_data, request_id):
# 処理シミュレーション
return {"status": "success", "request_id": request_id}
# 使用例
app = WebApplication()
app.handle_request({
"method": "POST",
"path": "/api/users",
"user_agent": "Mozilla/5.0...",
"body": '{"name": "田中太郎"}'
})
非同期処理とContext Variables活用
import asyncio
import structlog
import contextvars
import time
from typing import Optional
# Context Variablesの設定
request_id_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('request_id', default=None)
user_id_var: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar('user_id', default=None)
def setup_async_logging():
"""非同期処理対応のstructlog設定"""
# カスタムプロセッサー: Context Variablesを自動追加
def add_context_vars(logger, method_name, event_dict):
# Context Variablesからの自動取得
if request_id := request_id_var.get(None):
event_dict["request_id"] = request_id
if user_id := user_id_var.get(None):
event_dict["user_id"] = user_id
return event_dict
structlog.configure(
processors=[
# Context Variables統合
structlog.contextvars.merge_contextvars,
add_context_vars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(20), # INFO以上
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
setup_async_logging()
class AsyncAPIService:
def __init__(self):
self.log = structlog.get_logger(self.__class__.__name__)
async def handle_async_request(self, request_data):
"""非同期リクエスト処理"""
request_id = f"async_req_{int(time.time() * 1000)}"
user_id = request_data.get("user_id")
# Context Variablesに設定
request_id_var.set(request_id)
user_id_var.set(user_id)
self.log.info("非同期リクエスト開始",
method=request_data["method"],
endpoint=request_data["endpoint"])
try:
# 複数の非同期タスクを並行実行
tasks = [
self._validate_async(request_data),
self._fetch_user_data_async(user_id),
self._check_permissions_async(user_id, request_data["endpoint"])
]
validation_result, user_data, permissions = await asyncio.gather(*tasks)
# メイン処理
result = await self._process_main_logic_async(
request_data, user_data, permissions
)
self.log.info("非同期リクエスト完了",
result_size=len(str(result)),
tasks_completed=len(tasks))
return result
except Exception as e:
self.log.exception("非同期処理エラー",
error_context="main_handler",
request_method=request_data["method"])
raise
finally:
# Context Variablesクリア
request_id_var.set(None)
user_id_var.set(None)
async def _validate_async(self, request_data):
self.log.debug("非同期バリデーション開始")
await asyncio.sleep(0.1) # バリデーション処理のシミュレーション
if not request_data.get("required_field"):
self.log.warning("バリデーション警告",
missing_field="required_field")
raise ValueError("必須フィールドが不足")
self.log.debug("非同期バリデーション完了",
validated_fields=list(request_data.keys()))
return True
async def _fetch_user_data_async(self, user_id):
self.log.debug("ユーザーデータ取得開始", target_user_id=user_id)
await asyncio.sleep(0.2) # データベースアクセスのシミュレーション
user_data = {
"id": user_id,
"name": "田中太郎",
"role": "admin",
"permissions": ["read", "write", "delete"]
}
self.log.debug("ユーザーデータ取得完了",
user_role=user_data["role"],
permission_count=len(user_data["permissions"]))
return user_data
async def _check_permissions_async(self, user_id, endpoint):
self.log.debug("権限チェック開始", endpoint=endpoint)
await asyncio.sleep(0.05) # 権限チェックのシミュレーション
# 権限チェックロジック
allowed = endpoint.startswith("/api/user/")
self.log.debug("権限チェック完了",
endpoint=endpoint,
access_allowed=allowed)
return allowed
async def _process_main_logic_async(self, request_data, user_data, permissions):
self.log.info("メインロジック処理開始")
if not permissions:
self.log.error("アクセス権限なし",
user_role=user_data["role"],
requested_endpoint=request_data["endpoint"])
raise PermissionError("アクセス権限がありません")
# メイン処理のシミュレーション
await asyncio.sleep(0.3)
result = {
"status": "success",
"processed_at": time.time(),
"user_name": user_data["name"],
"data": {"message": "処理完了"}
}
self.log.info("メインロジック処理完了",
result_status=result["status"],
processing_user=user_data["name"])
return result
# 非同期処理の使用例
async def main():
service = AsyncAPIService()
# 複数の非同期リクエストを同時処理
request_data_list = [
{
"method": "POST",
"endpoint": "/api/user/create",
"user_id": 1001,
"required_field": "value1"
},
{
"method": "GET",
"endpoint": "/api/user/profile",
"user_id": 1002,
"required_field": "value2"
},
{
"method": "PUT",
"endpoint": "/api/user/update",
"user_id": 1003,
"required_field": "value3"
}
]
# 複数リクエストの並行処理
tasks = [
service.handle_async_request(request_data)
for request_data in request_data_list
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"リクエスト {i} でエラー: {result}")
else:
print(f"リクエスト {i} 成功: {result['status']}")
except Exception as e:
print(f"全体的なエラー: {e}")
# 実行
if __name__ == "__main__":
asyncio.run(main())
カスタムプロセッサーとパフォーマンス最適化
import structlog
import time
import json
import gzip
from typing import Any, Dict
import logging
class PerformanceProcessor:
"""パフォーマンス測定プロセッサー"""
def __init__(self):
self.start_times = {}
def __call__(self, logger, method_name, event_dict):
# パフォーマンス測定の開始
if event_dict.get("_performance_start"):
operation_id = event_dict.get("operation_id", "unknown")
self.start_times[operation_id] = time.perf_counter()
event_dict.pop("_performance_start", None)
# パフォーマンス測定の終了
if event_dict.get("_performance_end"):
operation_id = event_dict.get("operation_id", "unknown")
if operation_id in self.start_times:
duration = time.perf_counter() - self.start_times[operation_id]
event_dict["performance_ms"] = round(duration * 1000, 3)
del self.start_times[operation_id]
event_dict.pop("_performance_end", None)
return event_dict
class SensitiveDataProcessor:
"""機密データマスキングプロセッサー"""
SENSITIVE_KEYS = {
"password", "token", "secret", "key", "credential",
"ssn", "credit_card", "email", "phone", "address"
}
def __call__(self, logger, method_name, event_dict):
return self._mask_sensitive_data(event_dict)
def _mask_sensitive_data(self, data):
if isinstance(data, dict):
masked = {}
for key, value in data.items():
if any(sensitive in key.lower() for sensitive in self.SENSITIVE_KEYS):
# 機密データのマスキング
if isinstance(value, str) and len(value) > 4:
masked[key] = f"{value[:2]}***{value[-2:]}"
else:
masked[key] = "***"
else:
# 再帰的にマスキング
masked[key] = self._mask_sensitive_data(value)
return masked
elif isinstance(data, (list, tuple)):
return [self._mask_sensitive_data(item) for item in data]
else:
return data
class CompressionProcessor:
"""大量データ圧縮プロセッサー"""
def __init__(self, threshold_bytes=1024):
self.threshold_bytes = threshold_bytes
def __call__(self, logger, method_name, event_dict):
# 大きなデータの圧縮
for key, value in event_dict.items():
if isinstance(value, (str, bytes)):
value_bytes = value.encode() if isinstance(value, str) else value
if len(value_bytes) > self.threshold_bytes:
compressed = gzip.compress(value_bytes)
event_dict[key] = {
"_compressed": True,
"_original_size": len(value_bytes),
"_compressed_size": len(compressed),
"_data": compressed.hex()
}
return event_dict
def setup_high_performance_logging():
"""高性能・高機能logging設定"""
# カスタムプロセッサーのインスタンス
performance_processor = PerformanceProcessor()
sensitive_processor = SensitiveDataProcessor()
compression_processor = CompressionProcessor(threshold_bytes=512)
# 高速JSONシリアライザーの設定
try:
import orjson
json_serializer = orjson.dumps
except ImportError:
json_serializer = json.dumps
structlog.configure(
cache_logger_on_first_use=True,
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
processors=[
# Context Variables統合
structlog.contextvars.merge_contextvars,
# カスタムプロセッサー
performance_processor,
sensitive_processor,
compression_processor,
# 標準プロセッサー
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.format_exc_info,
# 高速JSON出力
structlog.processors.JSONRenderer(serializer=json_serializer),
],
logger_factory=structlog.BytesLoggerFactory(),
)
# 高性能ログの使用例
setup_high_performance_logging()
class OptimizedService:
def __init__(self):
self.log = structlog.get_logger(self.__class__.__name__)
def process_large_dataset(self, data):
operation_id = f"op_{int(time.time() * 1000)}"
# パフォーマンス測定開始
self.log.info("大量データ処理開始",
operation_id=operation_id,
data_size=len(data),
_performance_start=True)
try:
# 機密データを含む処理
user_data = {
"user_id": 12345,
"email": "[email protected]", # マスキング対象
"password": "secret123", # マスキング対象
"name": "田中太郎",
"preferences": data[:100] # データサンプル
}
self.log.debug("ユーザーデータ処理",
operation_id=operation_id,
user_info=user_data)
# 大量データの処理シミュレーション
processed_results = []
for i, item in enumerate(data):
if i % 1000 == 0:
self.log.debug("処理進捗",
operation_id=operation_id,
progress_percent=round((i / len(data)) * 100, 1),
current_index=i)
processed_results.append(f"processed_{item}")
# 大量結果データ(圧縮対象)
large_result = {
"results": processed_results,
"large_data": "x" * 2000, # 大きなデータ(圧縮される)
"metadata": {
"processing_method": "optimized",
"total_items": len(data)
}
}
# パフォーマンス測定終了
self.log.info("大量データ処理完了",
operation_id=operation_id,
processed_count=len(processed_results),
result_data=large_result,
_performance_end=True)
return processed_results
except Exception as e:
self.log.exception("データ処理エラー",
operation_id=operation_id,
error_type=type(e).__name__)
raise
def batch_processing_with_metrics(self, batch_data):
"""メトリクス付きバッチ処理"""
batch_id = f"batch_{int(time.time())}"
self.log.info("バッチ処理開始",
batch_id=batch_id,
batch_size=len(batch_data),
_performance_start=True)
success_count = 0
error_count = 0
for i, item in enumerate(batch_data):
try:
# 個別アイテム処理
result = self._process_single_item(item, batch_id, i)
success_count += 1
if i % 100 == 0: # 100件ごとにメトリクス出力
self.log.info("バッチ進捗",
batch_id=batch_id,
progress_ratio=f"{i}/{len(batch_data)}",
success_count=success_count,
error_count=error_count)
except Exception as e:
error_count += 1
self.log.warning("アイテム処理エラー",
batch_id=batch_id,
item_index=i,
error_message=str(e))
# 最終結果
self.log.info("バッチ処理完了",
batch_id=batch_id,
total_processed=len(batch_data),
success_count=success_count,
error_count=error_count,
success_rate=round((success_count / len(batch_data)) * 100, 2),
_performance_end=True)
return {
"batch_id": batch_id,
"success_count": success_count,
"error_count": error_count
}
def _process_single_item(self, item, batch_id, index):
# 個別処理のシミュレーション
if index % 50 == 0: # 50件に1件エラー
raise ValueError(f"処理エラー: {item}")
return f"processed_{item}"
# 使用例とパフォーマンステスト
def run_performance_test():
service = OptimizedService()
# 大量データ処理テスト
large_data = list(range(10000))
print("=== 大量データ処理テスト ===")
service.process_large_dataset(large_data)
# バッチ処理テスト
batch_data = [f"item_{i}" for i in range(500)]
print("\n=== バッチ処理テスト ===")
service.batch_processing_with_metrics(batch_data)
if __name__ == "__main__":
run_performance_test()
テスト環境でのログキャプチャとアサーション
import pytest
import structlog
from structlog.testing import LogCapture, capture_logs
import json
# テスト用のstructlog設定
def setup_test_logging():
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.testing.TestingLoggerFactory(),
cache_logger_on_first_use=False,
)
# Pytestフィクスチャ
@pytest.fixture(name="log_capture")
def fixture_log_capture():
return LogCapture()
@pytest.fixture(autouse=True)
def fixture_configure_structlog_for_tests(log_capture):
structlog.configure(
processors=[log_capture],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.testing.TestingLoggerFactory(),
cache_logger_on_first_use=False,
)
class TestableService:
def __init__(self):
self.log = structlog.get_logger(self.__class__.__name__)
def create_user(self, user_data):
if not user_data.get("email"):
self.log.error("メール必須", validation_error="missing_email")
raise ValueError("Email is required")
if "@" not in user_data["email"]:
self.log.warning("メール形式警告",
email=user_data["email"],
validation_warning="invalid_format")
self.log.info("ユーザー作成開始",
email=user_data["email"],
user_data_keys=list(user_data.keys()))
# ユーザー作成処理のシミュレーション
user_id = 12345
self.log.info("ユーザー作成完了",
user_id=user_id,
email=user_data["email"],
creation_method="api")
return user_id
def process_batch(self, items):
self.log.info("バッチ処理開始", batch_size=len(items))
results = []
for i, item in enumerate(items):
try:
if item == "error_item":
raise ValueError("処理エラー")
result = f"processed_{item}"
results.append(result)
self.log.debug("アイテム処理完了",
item_index=i,
item_value=item,
result=result)
except Exception as e:
self.log.error("アイテム処理エラー",
item_index=i,
item_value=item,
error_message=str(e))
self.log.info("バッチ処理完了",
total_items=len(items),
successful_items=len(results),
failed_items=len(items) - len(results))
return results
# テストクラス
class TestStructlogIntegration:
def test_successful_user_creation(self, log_capture):
"""正常なユーザー作成のログテスト"""
service = TestableService()
user_data = {
"email": "[email protected]",
"name": "テストユーザー",
"age": 25
}
result = service.create_user(user_data)
# 基本アサーション
assert result == 12345
# ログエントリーの確認
assert len(log_capture.entries) == 2
# 開始ログの確認
start_log = log_capture.entries[0]
assert start_log["event"] == "ユーザー作成開始"
assert start_log["email"] == "[email protected]"
assert "user_data_keys" in start_log
assert start_log["level"] == "info"
# 完了ログの確認
end_log = log_capture.entries[1]
assert end_log["event"] == "ユーザー作成完了"
assert end_log["user_id"] == 12345
assert end_log["email"] == "[email protected]"
assert end_log["creation_method"] == "api"
def test_user_creation_with_invalid_email(self, log_capture):
"""不正なメールアドレスでの警告ログテスト"""
service = TestableService()
user_data = {
"email": "invalid_email",
"name": "テストユーザー"
}
result = service.create_user(user_data)
# 警告ログが出力されることを確認
warning_logs = [
entry for entry in log_capture.entries
if entry["level"] == "warning"
]
assert len(warning_logs) == 1
warning_log = warning_logs[0]
assert warning_log["event"] == "メール形式警告"
assert warning_log["email"] == "invalid_email"
assert warning_log["validation_warning"] == "invalid_format"
def test_user_creation_missing_email_error(self, log_capture):
"""必須フィールド不足でのエラーログテスト"""
service = TestableService()
user_data = {"name": "テストユーザー"}
with pytest.raises(ValueError, match="Email is required"):
service.create_user(user_data)
# エラーログの確認
error_logs = [
entry for entry in log_capture.entries
if entry["level"] == "error"
]
assert len(error_logs) == 1
error_log = error_logs[0]
assert error_log["event"] == "メール必須"
assert error_log["validation_error"] == "missing_email"
def test_batch_processing_with_mixed_results(self, log_capture):
"""成功・失敗混在バッチ処理のログテスト"""
service = TestableService()
items = ["item1", "item2", "error_item", "item3"]
results = service.process_batch(items)
# 結果確認
assert len(results) == 3
assert "processed_item1" in results
assert "processed_item2" in results
assert "processed_item3" in results
# ログレベル別確認
info_logs = [e for e in log_capture.entries if e["level"] == "info"]
debug_logs = [e for e in log_capture.entries if e["level"] == "debug"]
error_logs = [e for e in log_capture.entries if e["level"] == "error"]
# INFO: 開始・完了ログ
assert len(info_logs) == 2
assert info_logs[0]["event"] == "バッチ処理開始"
assert info_logs[0]["batch_size"] == 4
assert info_logs[1]["event"] == "バッチ処理完了"
assert info_logs[1]["total_items"] == 4
assert info_logs[1]["successful_items"] == 3
assert info_logs[1]["failed_items"] == 1
# DEBUG: 個別アイテム処理ログ(成功分のみ)
assert len(debug_logs) == 3
# ERROR: 失敗したアイテムのログ
assert len(error_logs) == 1
assert error_logs[0]["event"] == "アイテム処理エラー"
assert error_logs[0]["item_value"] == "error_item"
def test_log_context_binding(self, log_capture):
"""ログコンテキストバインディングのテスト"""
base_log = structlog.get_logger("TestLogger")
# コンテキストバインディング
bound_log = base_log.bind(
session_id="sess_12345",
user_id=67890,
operation="test_operation"
)
bound_log.info("コンテキスト付きログ", additional_data="test_value")
# コンテキストが正しく含まれているか確認
assert len(log_capture.entries) == 1
log_entry = log_capture.entries[0]
assert log_entry["session_id"] == "sess_12345"
assert log_entry["user_id"] == 67890
assert log_entry["operation"] == "test_operation"
assert log_entry["additional_data"] == "test_value"
assert log_entry["event"] == "コンテキスト付きログ"
# コンテキストマネージャーを使用したログキャプチャテスト
class TestLogCaptureContext:
def test_with_capture_logs_context_manager(self):
"""capture_logsコンテキストマネージャーを使用したテスト"""
service = TestableService()
with capture_logs() as captured_logs:
service.create_user({
"email": "[email protected]",
"name": "コンテキストテスト"
})
# キャプチャされたログの確認
assert len(captured_logs) == 2
# JSON形式での確認(プロセッサー処理後)
start_log = captured_logs[0]
assert "timestamp" in start_log
assert start_log["event"] == "ユーザー作成開始"
assert start_log["email"] == "[email protected]"
end_log = captured_logs[1]
assert end_log["event"] == "ユーザー作成完了"
assert end_log["user_id"] == 12345
# テスト実行例
if __name__ == "__main__":
# 手動テスト実行
pytest.main([__file__, "-v"])