Dogpile Cache
GitHub Overview
sqlalchemy/dogpile.cache
dogpile.cache is a Python caching API which provides a generic interface to caching backends of any variety
Topics
Star History
Library
Dogpile Cache
Overview
Dogpile Cache is a Python caching API that provides a generic interface to caching backends of any variety. Developed as part of the SQLAlchemy Project, it's designed as a replacement for the Beaker caching system.
Details
Dogpile Cache (dogpile.cache) is an advanced caching library developed by the SQLAlchemy Project that solves fundamental performance issues inherent in the Beaker caching system. It specifically addresses the "double-fetching" problem where values were frequently retrieved twice from the cache, featuring a vastly simplified, improved, and better-tested dogpile lock engine at its core. It supports rich cache backends including memcached (python-memcached, pylibmc, bmemcached), Redis, anydbm, and dictionary-based options. The key generation mechanism is fully customizable with pluggable key generators and optional key mangler features, allowing fine-grained control over the correspondence between cache keys and function calls. Distributed locking functionality enables each backend to implement its own version of a "distributed" lock that matches the backend's storage system. Strong integration with SQLAlchemy enables efficient caching at the ORM level through specialized classes like ORMCache, FromCache, and RelationshipCache for query result caching and lazy loading optimization.
Pros and Cons
Pros
- Performance Improvement: Fundamentally resolves Beaker's "double-fetching" problem
- SQLAlchemy Integration: Dedicated classes like ORMCache, FromCache, RelationshipCache
- Flexible Key Management: Pluggable key generators and key mangler functionality
- Distributed Locking: Backend-specific distributed lock implementation to avoid stampedes
- Rich Backends: Diverse options including memcached, Redis, anydbm, dictionary
- Advanced Caching Strategies: Stale value return capability and background processing support
- Error Handling: Proper handling for deserialization failures
Cons
- Learning Curve: High initial learning cost due to advanced features
- Configuration Complexity: Fine-grained control can lead to complex configurations
- SQLAlchemy Dependency: Many SQLAlchemy-specific features, less beneficial for other ORMs
- Backend Dependencies: Dependent on limitations and characteristics of chosen backend
- Debugging Difficulty: Distributed locks and cache layers can make debugging challenging
- Minimum Python Requirement: Requires Python 3.8+ (incompatible with older Python versions)
Key Links
- Dogpile Cache Official Documentation
- GitHub Repository
- PyPI Package
- SQLAlchemy Integration Guide
- Usage Guide
- API Reference
Code Examples
Basic Configuration and Usage
from dogpile.cache import make_region
# Create cache region
cache_region = make_region().configure(
'dogpile.cache.memory', # Memory cache backend
expiration_time=3600, # 1 hour expiration
)
# Basic cache operations
@cache_region.cache_on_arguments()
def get_user_data(user_id):
"""Cache heavy processing"""
# Database access or external API calls
import time
time.sleep(1) # Simulate heavy processing
return f"User data for ID: {user_id}"
# Function calls (first time executes processing, subsequent calls use cache)
user_data = get_user_data(123)
cached_data = get_user_data(123) # Fast retrieval from cache
# Manual cache operations
cache_region.set('manual_key', 'manual_value')
value = cache_region.get('manual_key')
Redis Backend Configuration
from dogpile.cache import make_region
import redis
# Cache region using Redis
redis_region = make_region().configure(
'dogpile.cache.redis',
arguments={
'host': 'localhost',
'port': 6379,
'db': 0,
'redis_expiration_time': 3600,
'distributed_lock': True, # Enable distributed locking
}
)
# Or use existing Redis connection
redis_client = redis.StrictRedis(host='localhost', port=6379, db=1)
redis_region_custom = make_region().configure(
'dogpile.cache.redis',
arguments={
'connection': redis_client,
'distributed_lock': True,
}
)
@redis_region.cache_on_arguments(expiration_time=1800)
def expensive_computation(param1, param2):
"""Heavy computation processing"""
result = param1 ** param2
return result
# Usage example
result = expensive_computation(2, 100) # First time executes computation
cached_result = expensive_computation(2, 100) # Retrieved from cache
SQLAlchemy Integration
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from dogpile.cache import make_region
from dogpile.cache.api import CantDeserializeException
# Database configuration
engine = create_engine('sqlite:///example.db')
Base = declarative_base()
Session = sessionmaker(bind=engine)
# Model definition
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(100))
# Cache region configuration
cache_region = make_region().configure(
'dogpile.cache.memory',
expiration_time=1800
)
# SQLAlchemy integrated cache
from dogpile.cache.ext.sqlalchemy import FromCache, RelationshipCache
def get_user_with_cache(session, user_id):
"""Get user (with cache)"""
# Cache query using FromCache option
user = session.query(User)\
.options(FromCache(cache_region, "user", User.id == user_id))\
.filter(User.id == user_id)\
.first()
return user
# Usage example
session = Session()
user1 = get_user_with_cache(session, 1) # DB access
user2 = get_user_with_cache(session, 1) # Retrieved from cache
# Relationship cache
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
user_id = Column(Integer)
title = Column(String(200))
def get_user_with_posts(session, user_id):
"""Get user with posts (with relationship cache)"""
user = session.query(User)\
.options(
FromCache(cache_region, "user", User.id == user_id),
RelationshipCache(cache_region, "user_posts", User.posts)
)\
.filter(User.id == user_id)\
.first()
return user
Cache Stampede Prevention
from dogpile.cache import make_region
from dogpile.cache.api import Lock
import threading
import time
# Cache region with stampede prevention
stampede_region = make_region().configure(
'dogpile.cache.memory',
expiration_time=300, # 5 minutes
)
@stampede_region.cache_on_arguments()
def slow_function(param):
"""Heavy and time-consuming processing"""
print(f"Executing slow function with param: {param}")
time.sleep(2) # Simulate heavy processing
return f"Result for {param}"
# Stale value return strategy (stampede avoidance)
@stampede_region.cache_on_arguments(
should_cache_fn=lambda value: value is not None
)
def cached_with_stale_fallback(param):
"""Return stale value to avoid stampede"""
def creator():
print(f"Creating fresh value for {param}")
time.sleep(2)
return f"Fresh value for {param}"
# Return immediately if stale value is available
return creator()
# Stampede prevention through background processing
def background_refresh(key, creator_func):
"""Update cache in background"""
def background_worker():
fresh_value = creator_func()
stampede_region.set(key, fresh_value)
thread = threading.Thread(target=background_worker)
thread.daemon = True
thread.start()
@stampede_region.cache_on_arguments(
function_multi_key_generator=lambda namespace, func, **kwargs:
f"{namespace}:{func.__name__}:{':'.join(str(v) for v in kwargs.values())}"
)
def smart_cached_function(param):
"""Smart caching strategy"""
def creator():
time.sleep(1)
return f"Smart result for {param}"
# Generate key
cache_key = f"smart_function:{param}"
# Check existing value
existing_value = stampede_region.get(cache_key)
# If value exists and expiration is near, trigger background update
if existing_value and should_refresh_background():
background_refresh(cache_key, creator)
return existing_value
return creator()
def should_refresh_background():
"""Determine if background update is needed"""
# Trigger background update with 30% probability
import random
return random.random() < 0.3
Custom Key Generators and Manglers
from dogpile.cache import make_region
from dogpile.cache.util import function_key_generator
import hashlib
import json
# Custom key generator
def custom_key_generator(namespace, fn, **kwargs):
"""Custom key generation logic"""
# Generate key from function name and parameters
fn_name = fn.__name__
# Serialize complex objects
serialized_args = json.dumps(kwargs, sort_keys=True, default=str)
# Hash and shorten
hash_obj = hashlib.md5(serialized_args.encode())
hashed_args = hash_obj.hexdigest()[:12]
return f"{namespace}:{fn_name}:{hashed_args}"
# Custom key mangler
def custom_key_mangler(key):
"""Transform and normalize keys"""
# Replace special characters
key = key.replace(':', '_').replace(' ', '_')
# Length limitation (memcached compatibility)
if len(key) > 200:
key = key[:190] + hashlib.md5(key.encode()).hexdigest()[:10]
return key.lower()
# Region with custom configuration
custom_region = make_region(
key_mangler=custom_key_mangler
).configure(
'dogpile.cache.memory',
expiration_time=3600
)
@custom_region.cache_on_arguments(
function_key_generator=custom_key_generator
)
def complex_function(user_obj, config_dict, timestamp):
"""Function with complex arguments"""
return f"Processed {user_obj} with {config_dict} at {timestamp}"
# Usage example
from datetime import datetime
user = {'id': 123, 'name': 'Alice'}
config = {'debug': True, 'timeout': 30}
result = complex_function(user, config, datetime.now())
Error Handling and Deserialization
from dogpile.cache import make_region
from dogpile.cache.api import CantDeserializeException
import pickle
import json
class SafeJsonRegion:
"""Cache with safe JSON serialization"""
def __init__(self, region):
self.region = region
def get_or_create(self, key, creator_func, expiration_time=3600):
"""Safe get_or_create implementation"""
try:
# Try to get from cache
cached_value = self.region.get(key)
if cached_value is not None:
return json.loads(cached_value)
except (json.JSONDecodeError, CantDeserializeException):
# Generate new value on deserialization failure
pass
# Generate new value
fresh_value = creator_func()
# Store in cache as JSON
try:
serialized_value = json.dumps(fresh_value, default=str)
self.region.set(key, serialized_value, expiration_time)
except (TypeError, ValueError):
# Don't cache on serialization failure
pass
return fresh_value
# Usage example
error_region = make_region().configure('dogpile.cache.memory')
safe_cache = SafeJsonRegion(error_region)
def get_user_profile(user_id):
"""Safe user profile retrieval"""
def creator():
# Fetch from database
return {
'id': user_id,
'name': f'User {user_id}',
'created_at': datetime.now(),
'preferences': {'theme': 'dark', 'language': 'en'}
}
return safe_cache.get_or_create(
f'user_profile:{user_id}',
creator,
expiration_time=1800
)
# Graceful cache invalidation
def invalidate_with_grace(region, key, creator_func):
"""Graceful cache invalidation"""
old_value = region.get(key)
try:
# Generate new value
new_value = creator_func()
region.set(key, new_value)
return new_value
except Exception as e:
# Return old value if new value generation fails
if old_value is not None:
print(f"Using stale value due to error: {e}")
return old_value
raise