structlog

構造化ログ出力に特化したPythonライブラリ(2,500+ GitHub スター)。JSON形式やLogfmt形式での構造化出力をサポートし、開発環境では美しいコンソール出力、本番環境では機械読み取り可能なログを生成。asyncio、コンテキスト変数、型ヒントに対応。

ロギングPython構造化ログJSON高性能非同期サポート

GitHub概要

hynek/structlog

Simple, powerful, and fast logging for Python.

スター4,093
ウォッチ33
フォーク245
作成日:2013年8月13日
言語:Python
ライセンス:Other

トピックス

loggingpythonstructured-logging

スター履歴

hynek/structlog Star History
データ取得日時: 2025/7/17 08:20

ライブラリ

structlog

概要

structlogは、Pythonにおける構造化ログの決定版として位置づけられている高性能ロギングライブラリです。従来の文字列ベースのログから脱却し、キー・バリュー形式の構造化データによる表現力豊かなログ出力を実現します。2013年から本格的な本番環境で使用され続け、asyncio、Context Variables、型ヒントなど最新のPython機能への対応を重視。2025年版では、バージョン25.4.0として大規模システムでの実績とパフォーマンス最適化を追求した、Python構造化ログ分野のリーディングライブラリです。

詳細

structlog 2025年版は、プロダクションレディな構造化ログソリューションとして完全に確立されています。標準ライブラリのloggingとの完全統合、Twisted框架への専用サポート、カスタムプロセッサーチェーンによる柔軟なログ変換パイプラインを提供。JSON、logfmt、美しいコンソール出力をネイティブサポートし、複数の出力形式に対応。非同期処理、Context Variables活用、型安全性を重視した設計により、モダンPythonアプリケーションでの高いパフォーマンスとメンテナンス性を実現しています。

主な特徴

  • 構造化データ: キー・バリュー形式による表現力豊かなログ表現
  • 高性能プロセッサー: カスタマイズ可能なログ変換パイプライン
  • 完全な標準ライブラリ統合: Python標準loggingとのシームレス連携
  • 非同期サポート: asyncio環境での最適化されたログ処理
  • 複数出力形式: JSON、logfmt、コンソール表示の柔軟な切り替え
  • Context Variables: スレッド安全なコンテキスト管理

メリット・デメリット

メリット

  • 構造化ログによる圧倒的な解析性とデバッグ効率の向上
  • プロセッサーチェーンによる柔軟で拡張性の高いログ変換
  • 標準ライブラリとの完全互換性で既存コードへの段階的導入が可能
  • 非同期処理環境での高性能ログ出力を実現
  • JSON出力によるログ集約システムとの優れた統合性
  • 豊富なドキュメントと10年以上の本番運用実績

デメリット

  • 初期学習コストがやや高く、プロセッサー概念の理解が必要
  • 構造化しすぎることで単純なログには複雑性が増す場合がある
  • 大量ログ出力時は標準ライブラリよりもわずかにオーバーヘッドが大きい
  • プロセッサーチェーンの設定ミスによる予期しない動作
  • JSON以外の出力形式では一部機能が制限される
  • 複雑な設定時のデバッグがやや困難

参考ページ

書き方の例

基本的なインストールと設定

# インストール
# pip install structlog

import structlog
import logging

# 最もシンプルな設定
structlog.configure()
log = structlog.get_logger()

# 基本的なログ出力
log.info("アプリケーション開始")
log.debug("デバッグ情報", user_id=12345)
log.warning("警告メッセージ", error_code="W001")
log.error("エラーが発生", exception_type="ValueError")

# 構造化データでの詳細ログ
log.info("ユーザーログイン", 
         user_id=67890, 
         ip_address="192.168.1.100", 
         user_agent="Mozilla/5.0...",
         session_id="sess_abc123")

# コンテキスト情報の結合
user_log = log.bind(user_id=12345, session="abc123")
user_log.info("ページ閲覧", page="/dashboard")
user_log.info("データ取得", table="users", count=150)

# 例外情報付きログ
try:
    result = 10 / 0
except ZeroDivisionError:
    log.exception("計算エラー", operation="division", dividend=10, divisor=0)

プロダクション環境向け高度な設定

import structlog
import logging
import sys
import orjson

def setup_production_logging():
    """プロダクション環境用の最適化された設定"""
    
    # 環境に応じた設定分岐
    if sys.stderr.isatty():
        # 開発環境: 美しいコンソール出力
        processors = [
            structlog.contextvars.merge_contextvars,
            structlog.processors.add_log_level,
            structlog.processors.StackInfoRenderer(),
            structlog.dev.set_exc_info,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.dev.ConsoleRenderer(colors=True),
        ]
    else:
        # 本番環境: 高速JSON出力
        processors = [
            structlog.contextvars.merge_contextvars,
            structlog.processors.add_log_level,
            structlog.processors.format_exc_info,
            structlog.processors.dict_tracebacks,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.processors.JSONRenderer(serializer=orjson.dumps),
        ]
    
    # 高性能設定
    structlog.configure(
        cache_logger_on_first_use=True,
        wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
        processors=processors,
        logger_factory=structlog.BytesLoggerFactory(),
    )

# 使用例
setup_production_logging()

class UserService:
    def __init__(self):
        self.log = structlog.get_logger(self.__class__.__name__)
    
    def create_user(self, user_data):
        request_id = f"req_{int(time.time())}"
        log = self.log.bind(request_id=request_id, operation="create_user")
        
        log.info("ユーザー作成開始", input_data_keys=list(user_data.keys()))
        
        try:
            # バリデーション
            self._validate_user_data(user_data, log)
            
            # データベース保存
            user_id = self._save_to_database(user_data, log)
            
            # 成功ログ
            log.info("ユーザー作成完了", 
                    user_id=user_id,
                    email=user_data.get("email"),
                    registration_method="direct")
            
            return user_id
            
        except ValidationError as e:
            log.warning("バリデーションエラー",
                       error_code=e.code,
                       error_fields=e.fields,
                       input_data=user_data)
            raise
            
        except DatabaseError as e:
            log.error("データベースエラー",
                     error_type="database",
                     error_message=str(e),
                     table="users",
                     severity="high")
            raise
    
    def _validate_user_data(self, user_data, log):
        log.debug("データバリデーション開始")
        # バリデーション処理
        pass
    
    def _save_to_database(self, user_data, log):
        log.debug("データベース保存開始", table="users")
        # データベース処理
        return 12345

# 使用例
service = UserService()
service.create_user({
    "email": "[email protected]",
    "name": "田中太郎",
    "age": 30
})

標準ライブラリloggingとの統合

import logging
import structlog
import logging.config

def setup_integrated_logging():
    """標準loggingとstructlogの完全統合設定"""
    
    # タイムスタンプ設定
    timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
    
    # 共通プロセッサー
    shared_processors = [
        structlog.stdlib.add_log_level,
        structlog.stdlib.ExtraAdder(),
        timestamper,
    ]
    
    # カスタムプロセッサー: プロセス・スレッド情報抽出
    def extract_from_record(_, __, event_dict):
        record = event_dict["_record"]
        event_dict["thread_name"] = record.threadName
        event_dict["process_name"] = record.processName
        event_dict["module"] = record.module
        event_dict["function"] = record.funcName
        event_dict["line"] = record.lineno
        return event_dict
    
    # logging.config.dictConfigによる詳細設定
    logging.config.dictConfig({
        "version": 1,
        "disable_existing_loggers": False,
        "formatters": {
            "json_file": {
                "()": structlog.stdlib.ProcessorFormatter,
                "processors": [
                    extract_from_record,
                    structlog.stdlib.ProcessorFormatter.remove_processors_meta,
                    structlog.processors.JSONRenderer(),
                ],
                "foreign_pre_chain": shared_processors,
            },
            "colored_console": {
                "()": structlog.stdlib.ProcessorFormatter,
                "processors": [
                    extract_from_record,
                    structlog.stdlib.ProcessorFormatter.remove_processors_meta,
                    structlog.dev.ConsoleRenderer(colors=True),
                ],
                "foreign_pre_chain": shared_processors,
            },
            "plain_console": {
                "()": structlog.stdlib.ProcessorFormatter,
                "processors": [
                    structlog.stdlib.ProcessorFormatter.remove_processors_meta,
                    structlog.dev.ConsoleRenderer(colors=False),
                ],
                "foreign_pre_chain": shared_processors,
            }
        },
        "handlers": {
            "console": {
                "level": "DEBUG",
                "class": "logging.StreamHandler",
                "formatter": "colored_console",
                "stream": "ext://sys.stdout"
            },
            "file_info": {
                "level": "INFO",
                "class": "logging.handlers.RotatingFileHandler",
                "filename": "logs/app.log",
                "formatter": "json_file",
                "maxBytes": 10485760,  # 10MB
                "backupCount": 5,
                "encoding": "utf-8"
            },
            "file_error": {
                "level": "ERROR",
                "class": "logging.handlers.RotatingFileHandler",
                "filename": "logs/error.log",
                "formatter": "json_file",
                "maxBytes": 10485760,
                "backupCount": 10,
                "encoding": "utf-8"
            }
        },
        "loggers": {
            "": {  # ルートロガー
                "handlers": ["console", "file_info", "file_error"],
                "level": "DEBUG",
                "propagate": False,
            },
        },
    })
    
    # structlog設定
    structlog.configure(
        processors=[
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            timestamper,
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )

# 統合ログ使用例
setup_integrated_logging()

class WebApplication:
    def __init__(self):
        # structlogロガー
        self.struct_log = structlog.get_logger("WebApp")
        # 標準ロガー
        self.std_log = logging.getLogger("WebApp.Standard")
    
    def handle_request(self, request_data):
        request_id = f"req_{int(time.time() * 1000)}"
        
        # structlogでの構造化ログ
        self.struct_log.info("リクエスト受信",
                           request_id=request_id,
                           method=request_data["method"],
                           path=request_data["path"],
                           user_agent=request_data.get("user_agent"),
                           content_length=len(request_data.get("body", "")))
        
        # 標準loggingでのログ(structlogと混在可能)
        self.std_log.info(f"標準ログ: リクエスト {request_id} を処理中")
        
        try:
            result = self._process_request(request_data, request_id)
            
            self.struct_log.info("リクエスト処理完了",
                               request_id=request_id,
                               response_size=len(str(result)),
                               processing_time_ms=45)
            
            return result
            
        except Exception as e:
            # 例外情報付きログ
            self.struct_log.exception("リクエスト処理エラー",
                                    request_id=request_id,
                                    error_type=type(e).__name__,
                                    request_path=request_data["path"])
            raise
    
    def _process_request(self, request_data, request_id):
        # 処理シミュレーション
        return {"status": "success", "request_id": request_id}

# 使用例
app = WebApplication()
app.handle_request({
    "method": "POST",
    "path": "/api/users",
    "user_agent": "Mozilla/5.0...",
    "body": '{"name": "田中太郎"}'
})

非同期処理とContext Variables活用

import asyncio
import structlog
import contextvars
import time
from typing import Optional

# Context Variablesの設定
request_id_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('request_id', default=None)
user_id_var: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar('user_id', default=None)

def setup_async_logging():
    """非同期処理対応のstructlog設定"""
    
    # カスタムプロセッサー: Context Variablesを自動追加
    def add_context_vars(logger, method_name, event_dict):
        # Context Variablesからの自動取得
        if request_id := request_id_var.get(None):
            event_dict["request_id"] = request_id
        if user_id := user_id_var.get(None):
            event_dict["user_id"] = user_id
        return event_dict
    
    structlog.configure(
        processors=[
            # Context Variables統合
            structlog.contextvars.merge_contextvars,
            add_context_vars,
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer(),
        ],
        wrapper_class=structlog.make_filtering_bound_logger(20),  # INFO以上
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

setup_async_logging()

class AsyncAPIService:
    def __init__(self):
        self.log = structlog.get_logger(self.__class__.__name__)
    
    async def handle_async_request(self, request_data):
        """非同期リクエスト処理"""
        request_id = f"async_req_{int(time.time() * 1000)}"
        user_id = request_data.get("user_id")
        
        # Context Variablesに設定
        request_id_var.set(request_id)
        user_id_var.set(user_id)
        
        self.log.info("非同期リクエスト開始",
                     method=request_data["method"],
                     endpoint=request_data["endpoint"])
        
        try:
            # 複数の非同期タスクを並行実行
            tasks = [
                self._validate_async(request_data),
                self._fetch_user_data_async(user_id),
                self._check_permissions_async(user_id, request_data["endpoint"])
            ]
            
            validation_result, user_data, permissions = await asyncio.gather(*tasks)
            
            # メイン処理
            result = await self._process_main_logic_async(
                request_data, user_data, permissions
            )
            
            self.log.info("非同期リクエスト完了",
                         result_size=len(str(result)),
                         tasks_completed=len(tasks))
            
            return result
            
        except Exception as e:
            self.log.exception("非同期処理エラー",
                             error_context="main_handler",
                             request_method=request_data["method"])
            raise
        finally:
            # Context Variablesクリア
            request_id_var.set(None)
            user_id_var.set(None)
    
    async def _validate_async(self, request_data):
        self.log.debug("非同期バリデーション開始")
        await asyncio.sleep(0.1)  # バリデーション処理のシミュレーション
        
        if not request_data.get("required_field"):
            self.log.warning("バリデーション警告", 
                           missing_field="required_field")
            raise ValueError("必須フィールドが不足")
        
        self.log.debug("非同期バリデーション完了", 
                      validated_fields=list(request_data.keys()))
        return True
    
    async def _fetch_user_data_async(self, user_id):
        self.log.debug("ユーザーデータ取得開始", target_user_id=user_id)
        await asyncio.sleep(0.2)  # データベースアクセスのシミュレーション
        
        user_data = {
            "id": user_id,
            "name": "田中太郎",
            "role": "admin",
            "permissions": ["read", "write", "delete"]
        }
        
        self.log.debug("ユーザーデータ取得完了",
                      user_role=user_data["role"],
                      permission_count=len(user_data["permissions"]))
        return user_data
    
    async def _check_permissions_async(self, user_id, endpoint):
        self.log.debug("権限チェック開始", endpoint=endpoint)
        await asyncio.sleep(0.05)  # 権限チェックのシミュレーション
        
        # 権限チェックロジック
        allowed = endpoint.startswith("/api/user/")
        
        self.log.debug("権限チェック完了",
                      endpoint=endpoint,
                      access_allowed=allowed)
        return allowed
    
    async def _process_main_logic_async(self, request_data, user_data, permissions):
        self.log.info("メインロジック処理開始")
        
        if not permissions:
            self.log.error("アクセス権限なし",
                          user_role=user_data["role"],
                          requested_endpoint=request_data["endpoint"])
            raise PermissionError("アクセス権限がありません")
        
        # メイン処理のシミュレーション
        await asyncio.sleep(0.3)
        
        result = {
            "status": "success",
            "processed_at": time.time(),
            "user_name": user_data["name"],
            "data": {"message": "処理完了"}
        }
        
        self.log.info("メインロジック処理完了",
                     result_status=result["status"],
                     processing_user=user_data["name"])
        
        return result

# 非同期処理の使用例
async def main():
    service = AsyncAPIService()
    
    # 複数の非同期リクエストを同時処理
    request_data_list = [
        {
            "method": "POST",
            "endpoint": "/api/user/create",
            "user_id": 1001,
            "required_field": "value1"
        },
        {
            "method": "GET", 
            "endpoint": "/api/user/profile",
            "user_id": 1002,
            "required_field": "value2"
        },
        {
            "method": "PUT",
            "endpoint": "/api/user/update",
            "user_id": 1003,
            "required_field": "value3"
        }
    ]
    
    # 複数リクエストの並行処理
    tasks = [
        service.handle_async_request(request_data) 
        for request_data in request_data_list
    ]
    
    try:
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"リクエスト {i} でエラー: {result}")
            else:
                print(f"リクエスト {i} 成功: {result['status']}")
                
    except Exception as e:
        print(f"全体的なエラー: {e}")

# 実行
if __name__ == "__main__":
    asyncio.run(main())

カスタムプロセッサーとパフォーマンス最適化

import structlog
import time
import json
import gzip
from typing import Any, Dict
import logging

class PerformanceProcessor:
    """パフォーマンス測定プロセッサー"""
    
    def __init__(self):
        self.start_times = {}
    
    def __call__(self, logger, method_name, event_dict):
        # パフォーマンス測定の開始
        if event_dict.get("_performance_start"):
            operation_id = event_dict.get("operation_id", "unknown")
            self.start_times[operation_id] = time.perf_counter()
            event_dict.pop("_performance_start", None)
        
        # パフォーマンス測定の終了
        if event_dict.get("_performance_end"):
            operation_id = event_dict.get("operation_id", "unknown")
            if operation_id in self.start_times:
                duration = time.perf_counter() - self.start_times[operation_id]
                event_dict["performance_ms"] = round(duration * 1000, 3)
                del self.start_times[operation_id]
            event_dict.pop("_performance_end", None)
        
        return event_dict

class SensitiveDataProcessor:
    """機密データマスキングプロセッサー"""
    
    SENSITIVE_KEYS = {
        "password", "token", "secret", "key", "credential",
        "ssn", "credit_card", "email", "phone", "address"
    }
    
    def __call__(self, logger, method_name, event_dict):
        return self._mask_sensitive_data(event_dict)
    
    def _mask_sensitive_data(self, data):
        if isinstance(data, dict):
            masked = {}
            for key, value in data.items():
                if any(sensitive in key.lower() for sensitive in self.SENSITIVE_KEYS):
                    # 機密データのマスキング
                    if isinstance(value, str) and len(value) > 4:
                        masked[key] = f"{value[:2]}***{value[-2:]}"
                    else:
                        masked[key] = "***"
                else:
                    # 再帰的にマスキング
                    masked[key] = self._mask_sensitive_data(value)
            return masked
        elif isinstance(data, (list, tuple)):
            return [self._mask_sensitive_data(item) for item in data]
        else:
            return data

class CompressionProcessor:
    """大量データ圧縮プロセッサー"""
    
    def __init__(self, threshold_bytes=1024):
        self.threshold_bytes = threshold_bytes
    
    def __call__(self, logger, method_name, event_dict):
        # 大きなデータの圧縮
        for key, value in event_dict.items():
            if isinstance(value, (str, bytes)):
                value_bytes = value.encode() if isinstance(value, str) else value
                if len(value_bytes) > self.threshold_bytes:
                    compressed = gzip.compress(value_bytes)
                    event_dict[key] = {
                        "_compressed": True,
                        "_original_size": len(value_bytes),
                        "_compressed_size": len(compressed),
                        "_data": compressed.hex()
                    }
        
        return event_dict

def setup_high_performance_logging():
    """高性能・高機能logging設定"""
    
    # カスタムプロセッサーのインスタンス
    performance_processor = PerformanceProcessor()
    sensitive_processor = SensitiveDataProcessor()
    compression_processor = CompressionProcessor(threshold_bytes=512)
    
    # 高速JSONシリアライザーの設定
    try:
        import orjson
        json_serializer = orjson.dumps
    except ImportError:
        json_serializer = json.dumps
    
    structlog.configure(
        cache_logger_on_first_use=True,
        wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
        processors=[
            # Context Variables統合
            structlog.contextvars.merge_contextvars,
            
            # カスタムプロセッサー
            performance_processor,
            sensitive_processor,
            compression_processor,
            
            # 標準プロセッサー
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.processors.format_exc_info,
            
            # 高速JSON出力
            structlog.processors.JSONRenderer(serializer=json_serializer),
        ],
        logger_factory=structlog.BytesLoggerFactory(),
    )

# 高性能ログの使用例
setup_high_performance_logging()

class OptimizedService:
    def __init__(self):
        self.log = structlog.get_logger(self.__class__.__name__)
    
    def process_large_dataset(self, data):
        operation_id = f"op_{int(time.time() * 1000)}"
        
        # パフォーマンス測定開始
        self.log.info("大量データ処理開始",
                     operation_id=operation_id,
                     data_size=len(data),
                     _performance_start=True)
        
        try:
            # 機密データを含む処理
            user_data = {
                "user_id": 12345,
                "email": "[email protected]",  # マスキング対象
                "password": "secret123",       # マスキング対象
                "name": "田中太郎",
                "preferences": data[:100]  # データサンプル
            }
            
            self.log.debug("ユーザーデータ処理",
                          operation_id=operation_id,
                          user_info=user_data)
            
            # 大量データの処理シミュレーション
            processed_results = []
            for i, item in enumerate(data):
                if i % 1000 == 0:
                    self.log.debug("処理進捗",
                                  operation_id=operation_id,
                                  progress_percent=round((i / len(data)) * 100, 1),
                                  current_index=i)
                
                processed_results.append(f"processed_{item}")
            
            # 大量結果データ(圧縮対象)
            large_result = {
                "results": processed_results,
                "large_data": "x" * 2000,  # 大きなデータ(圧縮される)
                "metadata": {
                    "processing_method": "optimized",
                    "total_items": len(data)
                }
            }
            
            # パフォーマンス測定終了
            self.log.info("大量データ処理完了",
                         operation_id=operation_id,
                         processed_count=len(processed_results),
                         result_data=large_result,
                         _performance_end=True)
            
            return processed_results
            
        except Exception as e:
            self.log.exception("データ処理エラー",
                             operation_id=operation_id,
                             error_type=type(e).__name__)
            raise
    
    def batch_processing_with_metrics(self, batch_data):
        """メトリクス付きバッチ処理"""
        batch_id = f"batch_{int(time.time())}"
        
        self.log.info("バッチ処理開始",
                     batch_id=batch_id,
                     batch_size=len(batch_data),
                     _performance_start=True)
        
        success_count = 0
        error_count = 0
        
        for i, item in enumerate(batch_data):
            try:
                # 個別アイテム処理
                result = self._process_single_item(item, batch_id, i)
                success_count += 1
                
                if i % 100 == 0:  # 100件ごとにメトリクス出力
                    self.log.info("バッチ進捗",
                                 batch_id=batch_id,
                                 progress_ratio=f"{i}/{len(batch_data)}",
                                 success_count=success_count,
                                 error_count=error_count)
                
            except Exception as e:
                error_count += 1
                self.log.warning("アイテム処理エラー",
                               batch_id=batch_id,
                               item_index=i,
                               error_message=str(e))
        
        # 最終結果
        self.log.info("バッチ処理完了",
                     batch_id=batch_id,
                     total_processed=len(batch_data),
                     success_count=success_count,
                     error_count=error_count,
                     success_rate=round((success_count / len(batch_data)) * 100, 2),
                     _performance_end=True)
        
        return {
            "batch_id": batch_id,
            "success_count": success_count,
            "error_count": error_count
        }
    
    def _process_single_item(self, item, batch_id, index):
        # 個別処理のシミュレーション
        if index % 50 == 0:  # 50件に1件エラー
            raise ValueError(f"処理エラー: {item}")
        return f"processed_{item}"

# 使用例とパフォーマンステスト
def run_performance_test():
    service = OptimizedService()
    
    # 大量データ処理テスト
    large_data = list(range(10000))
    print("=== 大量データ処理テスト ===")
    service.process_large_dataset(large_data)
    
    # バッチ処理テスト  
    batch_data = [f"item_{i}" for i in range(500)]
    print("\n=== バッチ処理テスト ===")
    service.batch_processing_with_metrics(batch_data)

if __name__ == "__main__":
    run_performance_test()

テスト環境でのログキャプチャとアサーション

import pytest
import structlog
from structlog.testing import LogCapture, capture_logs
import json

# テスト用のstructlog設定
def setup_test_logging():
    structlog.configure(
        processors=[
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.processors.JSONRenderer(),
        ],
        wrapper_class=structlog.stdlib.BoundLogger,
        logger_factory=structlog.testing.TestingLoggerFactory(),
        cache_logger_on_first_use=False,
    )

# Pytestフィクスチャ
@pytest.fixture(name="log_capture")
def fixture_log_capture():
    return LogCapture()

@pytest.fixture(autouse=True)
def fixture_configure_structlog_for_tests(log_capture):
    structlog.configure(
        processors=[log_capture],
        wrapper_class=structlog.stdlib.BoundLogger,
        logger_factory=structlog.testing.TestingLoggerFactory(),
        cache_logger_on_first_use=False,
    )

class TestableService:
    def __init__(self):
        self.log = structlog.get_logger(self.__class__.__name__)
    
    def create_user(self, user_data):
        if not user_data.get("email"):
            self.log.error("メール必須", validation_error="missing_email")
            raise ValueError("Email is required")
        
        if "@" not in user_data["email"]:
            self.log.warning("メール形式警告", 
                           email=user_data["email"],
                           validation_warning="invalid_format")
            
        self.log.info("ユーザー作成開始", 
                     email=user_data["email"],
                     user_data_keys=list(user_data.keys()))
        
        # ユーザー作成処理のシミュレーション
        user_id = 12345
        
        self.log.info("ユーザー作成完了",
                     user_id=user_id,
                     email=user_data["email"],
                     creation_method="api")
        
        return user_id
    
    def process_batch(self, items):
        self.log.info("バッチ処理開始", batch_size=len(items))
        
        results = []
        for i, item in enumerate(items):
            try:
                if item == "error_item":
                    raise ValueError("処理エラー")
                
                result = f"processed_{item}"
                results.append(result)
                
                self.log.debug("アイテム処理完了",
                             item_index=i,
                             item_value=item,
                             result=result)
                
            except Exception as e:
                self.log.error("アイテム処理エラー",
                             item_index=i,
                             item_value=item,
                             error_message=str(e))
                
        self.log.info("バッチ処理完了",
                     total_items=len(items),
                     successful_items=len(results),
                     failed_items=len(items) - len(results))
        
        return results

# テストクラス
class TestStructlogIntegration:
    
    def test_successful_user_creation(self, log_capture):
        """正常なユーザー作成のログテスト"""
        service = TestableService()
        
        user_data = {
            "email": "[email protected]",
            "name": "テストユーザー",
            "age": 25
        }
        
        result = service.create_user(user_data)
        
        # 基本アサーション
        assert result == 12345
        
        # ログエントリーの確認
        assert len(log_capture.entries) == 2
        
        # 開始ログの確認
        start_log = log_capture.entries[0]
        assert start_log["event"] == "ユーザー作成開始"
        assert start_log["email"] == "[email protected]"
        assert "user_data_keys" in start_log
        assert start_log["level"] == "info"
        
        # 完了ログの確認
        end_log = log_capture.entries[1]
        assert end_log["event"] == "ユーザー作成完了"
        assert end_log["user_id"] == 12345
        assert end_log["email"] == "[email protected]"
        assert end_log["creation_method"] == "api"
    
    def test_user_creation_with_invalid_email(self, log_capture):
        """不正なメールアドレスでの警告ログテスト"""
        service = TestableService()
        
        user_data = {
            "email": "invalid_email",
            "name": "テストユーザー"
        }
        
        result = service.create_user(user_data)
        
        # 警告ログが出力されることを確認
        warning_logs = [
            entry for entry in log_capture.entries 
            if entry["level"] == "warning"
        ]
        
        assert len(warning_logs) == 1
        warning_log = warning_logs[0]
        assert warning_log["event"] == "メール形式警告"
        assert warning_log["email"] == "invalid_email"
        assert warning_log["validation_warning"] == "invalid_format"
    
    def test_user_creation_missing_email_error(self, log_capture):
        """必須フィールド不足でのエラーログテスト"""
        service = TestableService()
        
        user_data = {"name": "テストユーザー"}
        
        with pytest.raises(ValueError, match="Email is required"):
            service.create_user(user_data)
        
        # エラーログの確認
        error_logs = [
            entry for entry in log_capture.entries 
            if entry["level"] == "error"
        ]
        
        assert len(error_logs) == 1
        error_log = error_logs[0]
        assert error_log["event"] == "メール必須"
        assert error_log["validation_error"] == "missing_email"
    
    def test_batch_processing_with_mixed_results(self, log_capture):
        """成功・失敗混在バッチ処理のログテスト"""
        service = TestableService()
        
        items = ["item1", "item2", "error_item", "item3"]
        results = service.process_batch(items)
        
        # 結果確認
        assert len(results) == 3
        assert "processed_item1" in results
        assert "processed_item2" in results
        assert "processed_item3" in results
        
        # ログレベル別確認
        info_logs = [e for e in log_capture.entries if e["level"] == "info"]
        debug_logs = [e for e in log_capture.entries if e["level"] == "debug"]  
        error_logs = [e for e in log_capture.entries if e["level"] == "error"]
        
        # INFO: 開始・完了ログ
        assert len(info_logs) == 2
        assert info_logs[0]["event"] == "バッチ処理開始"
        assert info_logs[0]["batch_size"] == 4
        
        assert info_logs[1]["event"] == "バッチ処理完了"
        assert info_logs[1]["total_items"] == 4
        assert info_logs[1]["successful_items"] == 3
        assert info_logs[1]["failed_items"] == 1
        
        # DEBUG: 個別アイテム処理ログ(成功分のみ)
        assert len(debug_logs) == 3
        
        # ERROR: 失敗したアイテムのログ
        assert len(error_logs) == 1
        assert error_logs[0]["event"] == "アイテム処理エラー"
        assert error_logs[0]["item_value"] == "error_item"
    
    def test_log_context_binding(self, log_capture):
        """ログコンテキストバインディングのテスト"""
        base_log = structlog.get_logger("TestLogger")
        
        # コンテキストバインディング
        bound_log = base_log.bind(
            session_id="sess_12345",
            user_id=67890,
            operation="test_operation"
        )
        
        bound_log.info("コンテキスト付きログ", additional_data="test_value")
        
        # コンテキストが正しく含まれているか確認
        assert len(log_capture.entries) == 1
        log_entry = log_capture.entries[0]
        
        assert log_entry["session_id"] == "sess_12345"
        assert log_entry["user_id"] == 67890
        assert log_entry["operation"] == "test_operation"
        assert log_entry["additional_data"] == "test_value"
        assert log_entry["event"] == "コンテキスト付きログ"

# コンテキストマネージャーを使用したログキャプチャテスト
class TestLogCaptureContext:
    
    def test_with_capture_logs_context_manager(self):
        """capture_logsコンテキストマネージャーを使用したテスト"""
        service = TestableService()
        
        with capture_logs() as captured_logs:
            service.create_user({
                "email": "[email protected]",
                "name": "コンテキストテスト"
            })
        
        # キャプチャされたログの確認
        assert len(captured_logs) == 2
        
        # JSON形式での確認(プロセッサー処理後)
        start_log = captured_logs[0]
        assert "timestamp" in start_log
        assert start_log["event"] == "ユーザー作成開始"
        assert start_log["email"] == "[email protected]"
        
        end_log = captured_logs[1]
        assert end_log["event"] == "ユーザー作成完了"
        assert end_log["user_id"] == 12345

# テスト実行例
if __name__ == "__main__":
    # 手動テスト実行
    pytest.main([__file__, "-v"])