redis-py
GitHub Overview
Topics
Star History
Cache Library
redis-py
Overview
redis-py is the official Python client library that provides an interface between Redis key-value store and Python applications, offering both synchronous and asynchronous APIs, comprehensive Redis functionality, and advanced clustering capabilities.
Details
redis-py is the official Python client library that provides an interface between Redis and Python applications. It comprehensively supports basic Redis command execution, connection management, and advanced features such as pipelining, Pub/Sub, distributed locks, Lua scripting, and monitoring. The main interface is provided by the Redis class, which implements Python interfaces for all Redis commands and the Redis protocol. It uses connection pools for efficient connection management and achieves thread safety. When the hiredis library is installed, it automatically uses it as a compiled response parser for improved performance. It supports Redis's latest features including RESP3 protocol support, Redis Cluster, Redis Sentinel, and Redis Modules. Client version 5.0 and later can enable the RESP3 protocol with the protocol=3 setting. It provides both synchronous and asynchronous APIs to meet the needs of modern Python applications. The library is actively maintained and continues to be the most feature-complete and reliable choice for Python developers working with Redis.
Pros and Cons
Pros
- Official Support: Redis's official Python client library
- Comprehensive Features: Support for all Redis features from basic to advanced
- High Performance: Fast response processing with hiredis support
- Thread Safe: Thread-safe design of Redis client instances
- Latest Features: Full support for RESP3, Redis Modules, and Cluster
- Flexible API: Both synchronous and asynchronous APIs provided
- Rich Ecosystem: Extensive community support and documentation
Cons
- PubSub Limitations: PubSub objects are not thread-safe
- Learning Curve: Mastering advanced features (Cluster, Sentinel, etc.) required
- Connection Management: Proper connection pool configuration and error handling needed
- Redis Dependency: Strongly dependent on Redis-specific features
- Version Compatibility: Managing Redis server and client version compatibility
Key Links
- redis-py GitHub Repository
- redis-py PyPI Page
- Redis Official Documentation
- redis-py Official Documentation
- Redis Python Development Guide
Example Usage
Installation and Basic Connection
# Installation
# pip install redis
# or with performance enhancement
# pip install "redis[hiredis]"
import redis
# Basic connection
r = redis.Redis(host='localhost', port=6379, db=0)
# URL connection
r = redis.from_url('redis://localhost:6379/0')
# Decode responses as strings
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Connection test
print(r.ping()) # True
Basic Key-Value Operations
import redis
r = redis.Redis(decode_responses=True)
# Set and get strings
r.set('name', 'Alice')
name = r.get('name')
print(name) # Alice
# Set key with expiration
r.setex('session:123', 3600, 'session_data') # Expires in 1 hour
# Numeric operations
r.set('counter', 0)
count = r.incr('counter')
print(count) # 1
# Bulk operations with multiple keys
r.mset({'key1': 'value1', 'key2': 'value2', 'key3': 'value3'})
values = r.mget(['key1', 'key2', 'key3'])
print(values) # ['value1', 'value2', 'value3']
# Check key existence
if r.exists('name'):
print('Key exists')
# Delete key
r.delete('name')
List, Set, and Hash Operations
# List operations
r.lpush('tasks', 'task1', 'task2', 'task3')
r.rpush('tasks', 'task4')
# Get list contents
tasks = r.lrange('tasks', 0, -1)
print(tasks) # ['task3', 'task2', 'task1', 'task4']
# Pop element from list
task = r.lpop('tasks')
print(task) # task3
# Set operations
r.sadd('tags', 'python', 'redis', 'cache')
members = r.smembers('tags')
print(members) # {'python', 'redis', 'cache'}
# Set operations
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')
intersection = r.sinter('set1', 'set2')
print(intersection) # {'b', 'c'}
# Hash operations
r.hset('user:1', mapping={
'name': 'John',
'email': '[email protected]',
'age': 30
})
user_data = r.hgetall('user:1')
print(user_data) # {'name': 'John', 'email': '[email protected]', 'age': '30'}
name = r.hget('user:1', 'name')
print(name) # John
Pipelining
# Execute multiple commands efficiently with pipeline
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.get('key2')
pipe.incr('counter')
# Bulk execution
results = pipe.execute()
print(results) # [True, True, 'value1', 'value2', 1]
# Pipeline with transaction (default)
pipe = r.pipeline(transaction=True)
pipe.multi()
pipe.set('account:1:balance', 100)
pipe.set('account:2:balance', 200)
pipe.decrby('account:1:balance', 50)
pipe.incrby('account:2:balance', 50)
results = pipe.execute()
Pub/Sub Feature
import threading
import time
# Publisher side
def publisher():
time.sleep(1) # Wait for subscriber setup
for i in range(5):
r.publish('notifications', f'Message {i}')
time.sleep(1)
# Subscriber side
def subscriber():
pubsub = r.pubsub()
pubsub.subscribe('notifications')
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
elif message['type'] == 'subscribe':
print(f"Subscribed to {message['channel']}")
# Run publisher in separate thread
pub_thread = threading.Thread(target=publisher)
pub_thread.start()
# Run subscriber
subscriber()
Connection Pool
# Using connection pool
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=20)
r = redis.Redis(connection_pool=pool)
# Connection pool with configuration
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50,
retry_on_timeout=True,
socket_timeout=5,
socket_connect_timeout=5
)
r = redis.Redis(connection_pool=pool)
Error Handling and Retry
from redis.exceptions import ConnectionError, TimeoutError, ResponseError
from redis.retry import Retry
from redis.backoff import ExponentialBackoff
# Client with retry functionality
retry = Retry(ExponentialBackoff(), 3)
r = redis.Redis(
host='localhost',
port=6379,
retry=retry,
retry_on_timeout=True
)
# Error handling
def safe_redis_operation():
try:
r.set('test_key', 'test_value')
value = r.get('test_key')
return value
except ConnectionError:
print("Redis connection failed")
return None
except TimeoutError:
print("Redis operation timed out")
return None
except ResponseError as e:
print(f"Redis response error: {e}")
return None
Redis Cluster
from redis.cluster import RedisCluster
# Redis Cluster connection
startup_nodes = [
{"host": "127.0.0.1", "port": "7000"},
{"host": "127.0.0.1", "port": "7001"},
{"host": "127.0.0.1", "port": "7002"}
]
rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
# Cluster operations
rc.set('name', 'Alice')
name = rc.get('name')
print(name) # Alice
# Cluster information
nodes = rc.get_nodes()
print(f"Cluster has {len(nodes)} nodes")
Asynchronous Operations (redis.asyncio)
import asyncio
import redis.asyncio as redis
async def async_redis_operations():
# Async Redis connection
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
try:
# Basic operations
await r.set('async_key', 'async_value')
value = await r.get('async_key')
print(f"Async value: {value}")
# Pipeline
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
results = await pipe.execute()
print(f"Pipeline results: {results}")
finally:
await r.close()
# Execute
asyncio.run(async_redis_operations())
Lua Scripts
# Define and execute Lua script
increment_script = """
local key = KEYS[1]
local increment = ARGV[1]
local current = redis.call('GET', key)
if current == false then
current = 0
else
current = tonumber(current)
end
local new_value = current + tonumber(increment)
redis.call('SET', key, new_value)
return new_value
"""
# Register script
script = r.register_script(increment_script)
# Execute script
result = script(keys=['my_counter'], args=[5])
print(f"New counter value: {result}")
# Re-execute
result = script(keys=['my_counter'], args=[3])
print(f"New counter value: {result}")
Advanced Caching Patterns
import json
import time
from functools import wraps
def redis_cache(expiration=3600):
"""Decorator-based caching"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Generate cache key
cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
# Try to get from cache
cached_result = r.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Execute if not in cache
result = func(*args, **kwargs)
# Store result in cache
r.setex(cache_key, expiration, json.dumps(result))
return result
return wrapper
return decorator
# Usage example
@redis_cache(expiration=1800) # 30-minute cache
def expensive_computation(n):
"""Heavy computation process"""
time.sleep(2) # Simulate heavy processing
return sum(range(n))
# Execute
result = expensive_computation(100000) # Takes 2 seconds first time
result = expensive_computation(100000) # Returns immediately second time