redis-py
GitHub概要
スター13,292
ウォッチ323
フォーク2,638
作成日:2009年11月6日
言語:Python
ライセンス:MIT License
トピックス
pythonredisredis-clientredis-clusterredis-py
スター履歴
データ取得日時: 2025/10/22 08:07
キャッシュライブラリ
redis-py
概要
redis-pyは、RedisキーバリューストアとPythonアプリケーション間のインターフェースを提供する公式Pythonクライアントライブラリで、同期・非同期の両方のAPI、包括的なRedis機能、高度なクラスタリング機能を提供します。
詳細
redis-py(レディス・パイ)は、RedisとPythonアプリケーション間のインターフェースを提供する公式Pythonクライアントライブラリです。基本的なRedisコマンドの実行、接続管理、パイプライニング、Pub/Sub、分散ロック、Luaスクリプティング、モニター機能などの高度な機能を包括的にサポートしています。主要なインターフェースはRedisクラスによって提供され、すべてのRedisコマンドに対するPythonインターフェースとRedisプロトコルの実装を提供します。効率的な接続管理のために接続プールを使用し、スレッドセーフティを実現しています。hiredisライブラリがインストールされている場合、コンパイルされたレスポンスパーサーとして自動的に使用され、パフォーマンスが向上します。RESP3プロトコルサポート、Redis Cluster、Redis Sentinel、Redis ModulesサポートなどのRedisの最新機能にも対応しています。クライアントバージョン5.0以降ではprotocol=3設定によりRESP3プロトコルを有効にできます。同期・非同期両方のAPIを提供し、現代的なPythonアプリケーションのニーズに対応しています。
メリット・デメリット
メリット
- 公式サポート: Redisの公式Pythonクライアントライブラリ
- 包括的機能: 基本操作から高度な機能まで全てのRedis機能をサポート
- 高性能: hiredisサポートによる高速レスポンス処理
- スレッドセーフ: Redisクライアントインスタンスのスレッドセーフ設計
- 最新機能: RESP3、Redis Modules、Clusterの完全サポート
- 柔軟なAPI: 同期・非同期の両方のAPIを提供
- 豊富なエコシステム: 広範囲なコミュニティサポートとドキュメント
デメリット
- PubSub制約: PubSubオブジェクトはスレッドセーフではない
- 学習コスト: 高度な機能(Cluster、Sentinel等)の習得が必要
- 接続管理: 適切な接続プール設定とエラーハンドリングが必要
- Redis依存: Redis固有の機能に強く依存した設計
- バージョン互換: Redisサーバーとクライアントのバージョン互換性管理
主要リンク
書き方の例
インストールと基本接続
# インストール
# pip install redis
# または高速化のため
# pip install "redis[hiredis]"
import redis
# 基本接続
r = redis.Redis(host='localhost', port=6379, db=0)
# URL接続
r = redis.from_url('redis://localhost:6379/0')
# デコード設定(文字列で結果を受け取る)
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 接続テスト
print(r.ping()) # True
基本的なキー・バリュー操作
import redis
r = redis.Redis(decode_responses=True)
# 文字列の設定と取得
r.set('name', 'Alice')
name = r.get('name')
print(name) # Alice
# 期限付きキー設定
r.setex('session:123', 3600, 'session_data') # 1時間後に期限切れ
# 数値の操作
r.set('counter', 0)
count = r.incr('counter')
print(count) # 1
# 複数キーの一括操作
r.mset({'key1': 'value1', 'key2': 'value2', 'key3': 'value3'})
values = r.mget(['key1', 'key2', 'key3'])
print(values) # ['value1', 'value2', 'value3']
# キーの存在確認
if r.exists('name'):
print('Key exists')
# キーの削除
r.delete('name')
リスト・セット・ハッシュ操作
# リスト操作
r.lpush('tasks', 'task1', 'task2', 'task3')
r.rpush('tasks', 'task4')
# リストの内容を取得
tasks = r.lrange('tasks', 0, -1)
print(tasks) # ['task3', 'task2', 'task1', 'task4']
# リストから要素を取得
task = r.lpop('tasks')
print(task) # task3
# セット操作
r.sadd('tags', 'python', 'redis', 'cache')
members = r.smembers('tags')
print(members) # {'python', 'redis', 'cache'}
# セットの演算
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')
intersection = r.sinter('set1', 'set2')
print(intersection) # {'b', 'c'}
# ハッシュ操作
r.hset('user:1', mapping={
'name': 'John',
'email': '[email protected]',
'age': 30
})
user_data = r.hgetall('user:1')
print(user_data) # {'name': 'John', 'email': '[email protected]', 'age': '30'}
name = r.hget('user:1', 'name')
print(name) # John
パイプライニング
# パイプラインで複数コマンドを効率的に実行
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.get('key2')
pipe.incr('counter')
# 一括実行
results = pipe.execute()
print(results) # [True, True, 'value1', 'value2', 1]
# トランザクション付きパイプライン(デフォルト)
pipe = r.pipeline(transaction=True)
pipe.multi()
pipe.set('account:1:balance', 100)
pipe.set('account:2:balance', 200)
pipe.decrby('account:1:balance', 50)
pipe.incrby('account:2:balance', 50)
results = pipe.execute()
Pub/Sub機能
import threading
import time
# Publisher側
def publisher():
time.sleep(1) # Subscriberの準備を待つ
for i in range(5):
r.publish('notifications', f'Message {i}')
time.sleep(1)
# Subscriber側
def subscriber():
pubsub = r.pubsub()
pubsub.subscribe('notifications')
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
elif message['type'] == 'subscribe':
print(f"Subscribed to {message['channel']}")
# 別スレッドでPublisher実行
pub_thread = threading.Thread(target=publisher)
pub_thread.start()
# Subscriber実行
subscriber()
接続プール
# 接続プールの使用
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=20)
r = redis.Redis(connection_pool=pool)
# 設定付き接続プール
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50,
retry_on_timeout=True,
socket_timeout=5,
socket_connect_timeout=5
)
r = redis.Redis(connection_pool=pool)
エラーハンドリングとリトライ
from redis.exceptions import ConnectionError, TimeoutError, ResponseError
from redis.retry import Retry
from redis.backoff import ExponentialBackoff
# リトライ機能付きクライアント
retry = Retry(ExponentialBackoff(), 3)
r = redis.Redis(
host='localhost',
port=6379,
retry=retry,
retry_on_timeout=True
)
# エラーハンドリング
def safe_redis_operation():
try:
r.set('test_key', 'test_value')
value = r.get('test_key')
return value
except ConnectionError:
print("Redis connection failed")
return None
except TimeoutError:
print("Redis operation timed out")
return None
except ResponseError as e:
print(f"Redis response error: {e}")
return None
Redis Cluster
from redis.cluster import RedisCluster
# Redis Cluster接続
startup_nodes = [
{"host": "127.0.0.1", "port": "7000"},
{"host": "127.0.0.1", "port": "7001"},
{"host": "127.0.0.1", "port": "7002"}
]
rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
# Clusterでの操作
rc.set('name', 'Alice')
name = rc.get('name')
print(name) # Alice
# クラスター情報
nodes = rc.get_nodes()
print(f"Cluster has {len(nodes)} nodes")
非同期操作(redis.asyncio)
import asyncio
import redis.asyncio as redis
async def async_redis_operations():
# 非同期Redis接続
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
try:
# 基本操作
await r.set('async_key', 'async_value')
value = await r.get('async_key')
print(f"Async value: {value}")
# パイプライン
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
results = await pipe.execute()
print(f"Pipeline results: {results}")
finally:
await r.close()
# 実行
asyncio.run(async_redis_operations())
Luaスクリプト
# Luaスクリプトの定義と実行
increment_script = """
local key = KEYS[1]
local increment = ARGV[1]
local current = redis.call('GET', key)
if current == false then
current = 0
else
current = tonumber(current)
end
local new_value = current + tonumber(increment)
redis.call('SET', key, new_value)
return new_value
"""
# スクリプトの登録
script = r.register_script(increment_script)
# スクリプトの実行
result = script(keys=['my_counter'], args=[5])
print(f"New counter value: {result}")
# 再実行
result = script(keys=['my_counter'], args=[3])
print(f"New counter value: {result}")
高度なキャッシングパターン
import json
import time
from functools import wraps
def redis_cache(expiration=3600):
"""デコレータベースのキャッシング"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# キャッシュキーの生成
cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
# キャッシュから取得を試行
cached_result = r.get(cache_key)
if cached_result:
return json.loads(cached_result)
# キャッシュにない場合は実行
result = func(*args, **kwargs)
# 結果をキャッシュに保存
r.setex(cache_key, expiration, json.dumps(result))
return result
return wrapper
return decorator
# 使用例
@redis_cache(expiration=1800) # 30分キャッシュ
def expensive_computation(n):
"""重い計算処理"""
time.sleep(2) # 重い処理をシミュレート
return sum(range(n))
# 実行
result = expensive_computation(100000) # 初回は2秒かかる
result = expensive_computation(100000) # 2回目は即座に返る