Loguru
Python向けの最も人気なサードパーティロギングライブラリ(15,000+ GitHub スター)。シンプルで直感的なAPIと事前設定された豊富な機能により、少ないコードで複雑なロギング設定が可能。カラフルなコンソール出力と詳細なスタックトレース表示を提供。
GitHub概要
スター22,195
ウォッチ140
フォーク740
作成日:2017年8月15日
言語:Python
ライセンス:MIT License
トピックス
logloggerloggingpython
スター履歴
データ取得日時: 2025/7/17 10:32
ライブラリ
Loguru
概要
Loguruは「stupidly simple(愚かなほど簡単)」をコンセプトにしたPython向けの革新的なロギングライブラリです。従来のPython標準ライブラリloggingモジュールの複雑な設定プロセスを排除し、設定不要でそのまま使える利便性を提供します。自動的なJSON変換、構造化ログ、カラー出力、自動ローテーション、例外トレース、非同期ロギングを標準装備し、開発者が即座に高品質なログを出力できる環境を実現します。
詳細
Loguru 0.7.2は2025年現在、Pythonエコシステムで急速に注目を集める次世代ロギングライブラリです。標準ライブラリのloggingモジュールが抱える設定の複雑さ、ハンドラー管理の煩雑さ、フォーマッター設定の冗長性を根本的に解決し、「import → 即座に使用開始」の開発体験を実現。自動的なJSONシリアライゼーション、構造化データのサポート、リッチな例外トレース、ファイル自動ローテーション機能により、モダンな可観測性要件に完全対応します。
主な特徴
- Zero-Config Ready: インポート後、設定不要で即座にリッチなログを出力開始
- 自動構造化ログ: JSON形式での自動出力と構造化データの直感的なサポート
- Sinkベースアーキテクチャ: ファイル、コンソール、ネットワーク、カスタム出力先への柔軟な対応
- リッチ例外処理: backtraceとdiagnoseによる詳細なエラー情報の自動取得
- スマートローテーション: サイズ、時間、保持ポリシーによる自動ファイル管理
- パフォーマンス最適化: enqueue=Trueによる非同期ログ、lazy evaluationサポート
メリット・デメリット
メリット
- Python標準libraryのloggingモジュールと比較して95%以上の設定工数削減を実現
- 1行のimportで即座に本格的なログシステムが利用可能
- バックトレースと診断情報による圧倒的なデバッグ効率の向上
- 自動JSON出力により現代的な可観測性ツール(ELK Stack、Grafana等)との即座の統合
- bind()メソッドによる直感的なコンテキスト情報の追加
- 非同期ログとワーカースレッドによる高性能な大量ログ処理
デメリット
- 標準ライブラリではないため追加の依存関係が必要(pip install loguru)
- 既存のloggingモジュールベースのコードベースでは移行コストが発生
- カスタムハンドラーが豊富なloggingエコシステムとの直接互換性制限
- diagnose=Trueでの詳細情報出力時のセキュリティリスク(本番環境での注意要)
- 極めてシンプルなログニーズの場合はオーバースペックの可能性
- 企業環境での標準ライブラリ優先ポリシーとの競合
参考ページ
書き方の例
インストールと基本セットアップ
# Loguruのインストール
pip install loguru
# 開発環境用(オプションの依存関係含む)
pip install "loguru[dev]"
# 最もシンプルなログ開始 - 設定不要!
from loguru import logger
logger.debug("That's it, beautiful and simple logging!")
logger.info("Hello, World!")
logger.warning("Watch out!")
logger.error("Something went wrong")
logger.critical("This is critical!")
基本的なロギング操作(レベル、フォーマット)
import sys
from loguru import logger
# デフォルトハンドラーをカスタマイズ
logger.remove() # 既存のハンドラーをすべて削除
logger.add(sys.stderr, format="{time} - {level} - {message}")
# ファイル出力の追加
logger.add("app.log", level="INFO", rotation="500 MB")
# 異なるレベルでのログ出力
logger.add("debug.log", level="DEBUG", format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}")
logger.add("error.log", level="ERROR", retention="10 days", compression="zip")
# レベル別ログの実行
logger.debug("Detailed debugging information")
logger.info("General information about application progress")
logger.warning("Something unexpected happened")
logger.error("An error occurred but application continues")
logger.critical("A critical error occurred")
# 現代的なフォーマット記法(ブレース)のサポート
logger.info("Processing user {} with ID {}", "john_doe", 12345)
logger.info("Python version: {version}, feature: {feature}", version=3.9, feature="f-strings")
高度な設定とカスタマイズ(Sink、フィルター等)
from loguru import logger
import sys
# 複数Sinkの設定
logger.remove()
logger.add(
"application.log",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}",
level="INFO",
rotation="12:00", # 毎日正午にローテーション
retention="30 days", # 30日間保持
compression="zip", # 圧縮保存
enqueue=True # 非同期処理
)
# エラー専用ログファイル
logger.add(
"errors.log",
level="ERROR",
format="{time} | {level} | {extra[user_id]} | {message}",
filter=lambda record: record["level"].no >= 40 # ERROR以上のみ
)
# カスタムフィルター関数
def important_only(record):
return "important" in record["extra"]
logger.add("important.log", filter=important_only)
# 条件付きロギング
logger.add(
sys.stdout,
format="<green>{time}</green> | <level>{level}</level> | <blue>{message}</blue>",
level="DEBUG",
filter=lambda record: record["extra"].get("environment") == "development"
)
# カスタムSink関数
def database_sink(message):
record = message.record
# データベースへの保存処理
print(f"Saving to DB: {record['level']} - {record['message']}")
logger.add(database_sink, level="WARNING")
構造化ログと現代的な可観測性対応
from loguru import logger
import json
from datetime import datetime
# JSON形式での構造化ログ設定
logger.add("structured.log", serialize=True, level="INFO")
# bind()を使用したコンテキスト情報の追加
context_logger = logger.bind(
user_id="user_123",
session_id="sess_456",
service="web-api"
)
context_logger.info("User login successful")
context_logger.bind(action="purchase").info("Order completed", order_id=789)
# contextualize()による一時的なコンテキスト
with logger.contextualize(request_id="req_001"):
logger.info("Processing request")
# 複数の処理...
logger.info("Request completed")
# patch()によるグローバルな情報追加
logger = logger.patch(lambda record: record["extra"].update(
timestamp=datetime.utcnow().isoformat(),
hostname="web-server-01",
version="1.2.3"
))
# カスタムシリアライザーでのオブジェクト処理
def custom_serializer(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
# ELK Stack向けの構造化ログ
def elk_stack_format(record):
return json.dumps({
"@timestamp": record["time"].isoformat(),
"level": record["level"].name,
"logger": record["name"],
"message": record["message"],
"extra": record["extra"],
"function": record["function"],
"line": record["line"]
}, default=custom_serializer) + "\n"
logger.add("elk-logs.jsonl", format=elk_stack_format, level="INFO")
エラーハンドリングとパフォーマンス最適化
from loguru import logger
import sys
# 高性能非同期ログ設定
logger.add(
"performance.log",
enqueue=True, # キューを使用した非同期処理
level="INFO"
)
# 例外処理とバックトレース
logger.add("debug.log", backtrace=True, diagnose=True)
def divide_numbers(a, b):
try:
result = a / b
logger.info("Division successful: {} / {} = {}", a, b, result)
return result
except ZeroDivisionError:
logger.exception("Division by zero error occurred")
raise
except Exception as e:
logger.error("Unexpected error in division: {}", str(e))
raise
# Lazy evaluation によるパフォーマンス最適化
def expensive_operation():
# 重い計算処理
return "expensive_result"
# expensive_operation()はDEBUGレベルが有効な場合のみ実行される
logger.opt(lazy=True).debug("Debug info: {}", expensive_operation)
# opt()メソッドの活用
logger.opt(colors=True).info("Success: <green>Operation completed</green>")
logger.opt(raw=True).info("Raw message without formatting\n")
logger.opt(depth=1).info("Parent stack context")
logger.opt(exception=True).info("Include exception traceback")
# リトライ機能付きロギング
import time
import random
async def retry_with_logging(operation, max_retries=3):
for attempt in range(max_retries):
try:
logger.bind(attempt=attempt + 1).info("Attempting operation")
result = await operation()
logger.success("Operation succeeded on attempt {}", attempt + 1)
return result
except Exception as e:
logger.bind(attempt=attempt + 1).warning(
"Operation failed: {} (attempt {}/{})",
str(e), attempt + 1, max_retries
)
if attempt == max_retries - 1:
logger.error("Operation failed after {} attempts", max_retries)
raise
time.sleep(2 ** attempt) # 指数バックオフ
フレームワーク統合と実用例
from loguru import logger
import sys
from pathlib import Path
# FastAPI統合例
def setup_logging_for_fastapi():
"""FastAPI用のLoguruセットアップ"""
# FastAPI起動時の設定
logger.remove()
# 開発環境用設定
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
"{message}",
level="DEBUG"
)
# 本番環境用設定
logger.add(
"logs/app.log",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}",
level="INFO",
rotation="100 MB",
retention="30 days",
compression="zip",
enqueue=True
)
# Django統合例
def setup_logging_for_django():
"""Django用のLoguruセットアップ"""
from django.conf import settings
# Djangoの標準ロギングを無効化
import logging
logging.disable(logging.CRITICAL)
logger.remove()
# Djangoログの取得
class DjangoLoguruInterceptHandler(logging.Handler):
def emit(self, record):
logger_opt = logger.opt(depth=6, exception=record.exc_info)
logger_opt.log(record.levelname, record.getMessage())
logging.basicConfig(handlers=[DjangoLoguruInterceptHandler()], level=0)
# Loguru設定
logger.add(
settings.BASE_DIR / "logs" / "django.log",
level="INFO",
format="{time} | {level} | {name}:{function}:{line} | {message}",
rotation="50 MB",
serialize=True
)
# Flask統合例
def setup_logging_for_flask(app):
"""Flask用のLoguruセットアップ"""
import logging
# Flaskのロガーをinterceptする
class InterceptHandler(logging.Handler):
def emit(self, record):
logger_opt = logger.opt(depth=6, exception=record.exc_info)
logger_opt.log(record.levelno, record.getMessage())
app.logger.addHandler(InterceptHandler())
# Werkzeugログも取得
logging.getLogger("werkzeug").addHandler(InterceptHandler())
logger.add(
"flask_app.log",
level="INFO",
format="{time} | {level} | {extra[ip]} | {message}",
filter=lambda record: "ip" in record["extra"]
)
# ログパーシングと分析
def parse_log_files():
"""ログファイルの解析例"""
import re
from datetime import datetime
# ログパターンの定義
pattern = r"(?P<time>.*?) \| (?P<level>.*?) \| (?P<message>.*)"
# カスタムキャスタ
def parse_time(time_str):
return datetime.fromisoformat(time_str.replace("Z", "+00:00"))
caster_dict = dict(time=parse_time, level=str)
# ログ解析の実行
for groups in logger.parse("application.log", pattern, cast=caster_dict):
if groups["level"] == "ERROR":
logger.warning("Found error log: {}", groups["message"])
# 使用例
if __name__ == "__main__":
# 基本使用
setup_logging_for_fastapi()
logger.info("Application started")
# コンテキスト付きログ
with logger.contextualize(user_id="user_001"):
logger.info("User operation started")
divide_numbers(10, 2)
logger.info("User operation completed")
logger.success("Application setup completed")
カスタムレベルとアドバンス機能
from loguru import logger
# カスタムログレベルの定義
logger.level("AUDIT", no=35, color="<yellow>", icon="📋")
logger.level("BUSINESS", no=25, color="<blue>", icon="💼")
# カスタムレベルの使用
logger.log("AUDIT", "User action recorded")
logger.log("BUSINESS", "Business metric updated")
# プログレスバー的なログ
def log_progress_inline():
logger.bind(end="").debug("Progress: ")
for i in range(5):
logger.opt(raw=True).debug(".")
time.sleep(0.5)
logger.opt(raw=True).debug(" Done!\n")
# 通知機能の統合(Apprise使用例)
try:
import apprise
# Discord通知の設定
WEBHOOK_ID = "123456790"
WEBHOOK_TOKEN = "abc123def456"
notifier = apprise.Apprise()
notifier.add(f"discord://{WEBHOOK_ID}/{WEBHOOK_TOKEN}")
# エラー時の自動通知
logger.add(
notifier.notify,
level="ERROR",
filter=lambda record: "apprise" not in record["name"]
)
except ImportError:
logger.warning("Apprise not available, notifications disabled")
# ログのルーティング
def setup_log_routing():
"""コンテキストベースのログルーティング"""
# サービス別ログファイル
logger.add(
"service_a.log",
filter=lambda record: record["extra"].get("service") == "A"
)
logger.add(
"service_b.log",
filter=lambda record: record["extra"].get("service") == "B"
)
# 使用例
logger_a = logger.bind(service="A")
logger_b = logger.bind(service="B")
logger_a.info("Service A operation")
logger_b.info("Service B operation")
# メモリ効率的なログローテーション
logger.add(
"memory_efficient.log",
rotation="10 MB",
retention=5, # 最大5ファイル保持
compression="gz",
enqueue=True,
catch=True # ログエラーを抑制
)