Beaker

Pythonキャッシュライブラリセッション管理WSGIWeb開発ミドルウェア

GitHub概要

bbangert/beaker

WSGI middleware for sessions and caching

スター537
ウォッチ22
フォーク145
作成日:2011年10月16日
言語:Python
ライセンス:Other

トピックス

なし

スター履歴

bbangert/beaker Star History
データ取得日時: 2025/10/22 08:07

ライブラリ

Beaker

概要

Beakerは、Pythonで書かれたWebアプリケーション向けのセッション管理とキャッシングライブラリです。

詳細

Beakerは、Webアプリケーションやスタンドアロンの Python スクリプトとアプリケーション用の包括的なキャッシングとセッションライブラリです。WSGI ミドルウェアとして動作し、セッション実装の詳細を抽象化してシンプルなセッション管理を提供します。すべてのクッキーは改ざんを防ぐためにHMAC署名で保護され、セッションオブジェクトが実際にアクセスされた時のみファイルシステムからロードされるため、セッションを使用しないページでのパフォーマンス低下を防ぎます。複数のバックエンドストレージ(file、dbm、memory、memcached、database、google、mongodb、redis)をサポートし、柔軟な設定が可能です。セッション用には追加でcookieタイプが利用でき、すべてのセッションデータをクッキー自体に保存できます(4096バイトの制限あり)。Pylonsフレームワークでの使用で広く知られていますが、任意のWSGI対応フレームワークで利用可能です。

メリット・デメリット

メリット

  • WSGI統合: 標準的なWSGIミドルウェアとして簡単に統合
  • 豊富なバックエンド: 8つの異なるストレージバックエンドをサポート
  • セキュリティ: HMAC署名によるクッキー改ざん防止
  • パフォーマンス: 遅延ロードによる効率的なセッション管理
  • 柔軟性: キャッシュとセッション両方の機能を提供
  • 設定簡単: 直感的な設定オプション
  • 軽量: 最小限の依存関係

デメリット

  • 古い設計: 比較的古いライブラリで現代的な機能が限定的
  • メンテナンス: アクティブな開発が減少している
  • セキュリティ警告: pickleモジュール使用による潜在的なセキュリティリスク
  • Python2時代: Python3への移行で一部機能が古い
  • 代替ライブラリ: より現代的なキャッシング・セッションライブラリが存在

主要リンク

書き方の例

インストール

pip install Beaker

基本的なWSGIミドルウェアセットアップ

from beaker.middleware import SessionMiddleware

# WSGIアプリケーションの定義
def simple_app(environ, start_response):
    session = environ['beaker.session']
    
    # セッションからデータを取得
    count = session.get('count', 0)
    count += 1
    
    # セッションにデータを保存
    session['count'] = count
    session.save()
    
    response_body = f'あなたは{count}回目の訪問です'
    status = '200 OK'
    headers = [('Content-Type', 'text/plain; charset=utf-8')]
    
    start_response(status, headers)
    return [response_body.encode('utf-8')]

# セッションミドルウェアの設定
session_opts = {
    'session.type': 'file',
    'session.cookie_expires': 300,
    'session.data_dir': './data',
    'session.auto': True
}

app = SessionMiddleware(simple_app, session_opts)

Flaskとの統合

from flask import Flask, session, request
from beaker.middleware import SessionMiddleware

app = Flask(__name__)

# Beakerセッション設定
session_opts = {
    'session.type': 'ext:redis',
    'session.url': 'redis://localhost:6379',
    'session.cookie_expires': 3600,
    'session.auto': True,
    'session.encrypt_key': 'your-secret-key-here',
    'session.validate_key': 'your-validation-key-here'
}

@app.route('/')
def index():
    beaker_session = request.environ['beaker.session']
    visits = beaker_session.get('visits', 0)
    visits += 1
    beaker_session['visits'] = visits
    beaker_session.save()
    
    return f'訪問回数: {visits}'

@app.route('/user/<username>')
def user_profile(username):
    beaker_session = request.environ['beaker.session']
    beaker_session['current_user'] = username
    beaker_session.save()
    
    return f'ようこそ、{username}さん!'

@app.route('/logout')
def logout():
    beaker_session = request.environ['beaker.session']
    beaker_session.delete()
    return 'ログアウトしました'

# Beakerミドルウェアでアプリケーションをラップ
app.wsgi_app = SessionMiddleware(app.wsgi_app, session_opts)

if __name__ == '__main__':
    app.run(debug=True)

キャッシュ機能の使用

from beaker.cache import CacheManager
from beaker.util import parse_cache_config_options

# キャッシュ設定
cache_opts = {
    'cache.type': 'file',
    'cache.data_dir': '/tmp/cache/data',
    'cache.lock_dir': '/tmp/cache/lock'
}

cache = CacheManager(**parse_cache_config_options(cache_opts))

# 基本的なキャッシュ操作
def expensive_function(param):
    """重い計算処理をシミュレート"""
    import time
    time.sleep(2)  # 2秒の処理
    return f"計算結果: {param * 2}"

# キャッシュデコレータの使用
@cache.cache('expensive_func', expire=300)  # 5分間キャッシュ
def cached_expensive_function(param):
    return expensive_function(param)

# 手動キャッシュ操作
def manual_cache_example():
    # キャッシュ名前空間を作成
    mycache = cache.get_cache('manual_cache', expire=600)
    
    # キーが存在するかチェック
    key = 'user_data_123'
    if key in mycache:
        return mycache[key]
    
    # データを計算してキャッシュに保存
    data = fetch_user_data_from_db(123)
    mycache[key] = data
    return data

def fetch_user_data_from_db(user_id):
    """データベースからユーザーデータを取得(シミュレート)"""
    import time
    time.sleep(1)
    return {'id': user_id, 'name': f'User {user_id}', 'email': f'user{user_id}@example.com'}

高度なセッション設定

import json
from beaker.middleware import SessionMiddleware

# カスタムシリアライザーの設定
def json_serializer(obj):
    return json.dumps(obj)

def json_deserializer(data):
    return json.loads(data)

# セキュアなセッション設定
secure_session_opts = {
    'session.type': 'ext:redis',
    'session.url': 'redis://localhost:6379/1',
    'session.cookie_expires': True,  # ブラウザ終了時に削除
    'session.cookie_domain': '.example.com',
    'session.cookie_path': '/',
    'session.cookie_secure': True,  # HTTPS必須
    'session.cookie_httponly': True,  # JavaScriptからアクセス不可
    'session.encrypt_key': 'your-very-long-encryption-key',
    'session.validate_key': 'your-validation-key',
    'session.auto': True,
    'session.timeout': 3600,  # 1時間でタイムアウト
    'session.data_serializer': 'json',  # pickleの代わりにJSONを使用
}

class SecureSessionApp:
    def __init__(self):
        self.session_opts = secure_session_opts
        
    def __call__(self, environ, start_response):
        session = environ['beaker.session']
        
        # セッション検証
        if not self.validate_session(session):
            session.delete()
            return self.unauthorized_response(start_response)
        
        # 通常の処理
        return self.handle_request(environ, start_response, session)
    
    def validate_session(self, session):
        """セッションの有効性を検証"""
        if 'user_id' not in session:
            return False
        
        # 最後のアクティビティ時間をチェック
        last_activity = session.get('last_activity')
        if last_activity:
            import time
            if time.time() - last_activity > 3600:  # 1時間
                return False
        
        # アクティビティ時間を更新
        session['last_activity'] = time.time()
        session.save()
        return True
    
    def handle_request(self, environ, start_response, session):
        user_id = session.get('user_id')
        response_body = f'ユーザー {user_id} でログイン中'
        
        status = '200 OK'
        headers = [('Content-Type', 'text/plain; charset=utf-8')]
        start_response(status, headers)
        return [response_body.encode('utf-8')]
    
    def unauthorized_response(self, start_response):
        status = '401 Unauthorized'
        headers = [('Content-Type', 'text/plain')]
        start_response(status, headers)
        return [b'Unauthorized']

# アプリケーションの作成
app = SecureSessionApp()
app = SessionMiddleware(app, secure_session_opts)

マルチストレージ設定

from beaker.cache import CacheManager
from beaker.util import parse_cache_config_options

# 複数のキャッシュストレージ設定
cache_config = {
    # メモリキャッシュ(高速、揮発性)
    'cache.memory.type': 'memory',
    
    # ファイルキャッシュ(永続化)
    'cache.file.type': 'file',
    'cache.file.data_dir': '/tmp/cache/data',
    'cache.file.lock_dir': '/tmp/cache/lock',
    
    # Redisキャッシュ(分散対応)
    'cache.redis.type': 'ext:redis',
    'cache.redis.url': 'redis://localhost:6379/2',
    
    # データベースキャッシュ
    'cache.database.type': 'ext:database',
    'cache.database.url': 'postgresql://user:pass@localhost/cache_db',
    'cache.database.table_name': 'beaker_cache'
}

cache_manager = CacheManager(**parse_cache_config_options(cache_config))

class MultiTierCache:
    def __init__(self, cache_manager):
        self.memory_cache = cache_manager.get_cache('memory_cache', type='memory', expire=300)
        self.file_cache = cache_manager.get_cache('file_cache', type='file', expire=3600)
        self.redis_cache = cache_manager.get_cache('redis_cache', type='ext:redis', expire=7200)
    
    def get(self, key):
        """階層的にキャッシュを確認"""
        # L1: メモリキャッシュ
        if key in self.memory_cache:
            return self.memory_cache[key]
        
        # L2: ファイルキャッシュ
        if key in self.file_cache:
            value = self.file_cache[key]
            self.memory_cache[key] = value  # メモリキャッシュにも保存
            return value
        
        # L3: Redisキャッシュ
        if key in self.redis_cache:
            value = self.redis_cache[key]
            self.file_cache[key] = value    # ファイルキャッシュにも保存
            self.memory_cache[key] = value  # メモリキャッシュにも保存
            return value
        
        return None
    
    def set(self, key, value):
        """すべての階層にデータを保存"""
        self.memory_cache[key] = value
        self.file_cache[key] = value
        self.redis_cache[key] = value
    
    def delete(self, key):
        """すべての階層からデータを削除"""
        self.memory_cache.pop(key, None)
        self.file_cache.pop(key, None)
        self.redis_cache.pop(key, None)

# 使用例
multi_cache = MultiTierCache(cache_manager)

def cached_api_call(api_endpoint):
    """APIコールをマルチ階層キャッシュで最適化"""
    cache_key = f"api:{api_endpoint}"
    
    # キャッシュから取得を試行
    cached_result = multi_cache.get(cache_key)
    if cached_result:
        return cached_result
    
    # APIを実際に呼び出し
    result = make_api_request(api_endpoint)
    
    # すべての階層にキャッシュ
    multi_cache.set(cache_key, result)
    
    return result

def make_api_request(endpoint):
    """実際のAPI呼び出し(シミュレート)"""
    import time
    time.sleep(0.5)  # ネットワーク遅延をシミュレート
    return {'data': f'Response from {endpoint}', 'timestamp': time.time()}