redis-py

PythonキャッシュライブラリRedisクライアントNoSQL

GitHub概要

redis/redis-py

Redis Python client

スター13,292
ウォッチ323
フォーク2,638
作成日:2009年11月6日
言語:Python
ライセンス:MIT License

トピックス

pythonredisredis-clientredis-clusterredis-py

スター履歴

redis/redis-py Star History
データ取得日時: 2025/10/22 08:07

キャッシュライブラリ

redis-py

概要

redis-pyは、RedisキーバリューストアとPythonアプリケーション間のインターフェースを提供する公式Pythonクライアントライブラリで、同期・非同期の両方のAPI、包括的なRedis機能、高度なクラスタリング機能を提供します。

詳細

redis-py(レディス・パイ)は、RedisとPythonアプリケーション間のインターフェースを提供する公式Pythonクライアントライブラリです。基本的なRedisコマンドの実行、接続管理、パイプライニング、Pub/Sub、分散ロック、Luaスクリプティング、モニター機能などの高度な機能を包括的にサポートしています。主要なインターフェースはRedisクラスによって提供され、すべてのRedisコマンドに対するPythonインターフェースとRedisプロトコルの実装を提供します。効率的な接続管理のために接続プールを使用し、スレッドセーフティを実現しています。hiredisライブラリがインストールされている場合、コンパイルされたレスポンスパーサーとして自動的に使用され、パフォーマンスが向上します。RESP3プロトコルサポート、Redis Cluster、Redis Sentinel、Redis ModulesサポートなどのRedisの最新機能にも対応しています。クライアントバージョン5.0以降ではprotocol=3設定によりRESP3プロトコルを有効にできます。同期・非同期両方のAPIを提供し、現代的なPythonアプリケーションのニーズに対応しています。

メリット・デメリット

メリット

  • 公式サポート: Redisの公式Pythonクライアントライブラリ
  • 包括的機能: 基本操作から高度な機能まで全てのRedis機能をサポート
  • 高性能: hiredisサポートによる高速レスポンス処理
  • スレッドセーフ: Redisクライアントインスタンスのスレッドセーフ設計
  • 最新機能: RESP3、Redis Modules、Clusterの完全サポート
  • 柔軟なAPI: 同期・非同期の両方のAPIを提供
  • 豊富なエコシステム: 広範囲なコミュニティサポートとドキュメント

デメリット

  • PubSub制約: PubSubオブジェクトはスレッドセーフではない
  • 学習コスト: 高度な機能(Cluster、Sentinel等)の習得が必要
  • 接続管理: 適切な接続プール設定とエラーハンドリングが必要
  • Redis依存: Redis固有の機能に強く依存した設計
  • バージョン互換: Redisサーバーとクライアントのバージョン互換性管理

主要リンク

書き方の例

インストールと基本接続

# インストール
# pip install redis
# または高速化のため
# pip install "redis[hiredis]"

import redis

# 基本接続
r = redis.Redis(host='localhost', port=6379, db=0)

# URL接続
r = redis.from_url('redis://localhost:6379/0')

# デコード設定(文字列で結果を受け取る)
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# 接続テスト
print(r.ping())  # True

基本的なキー・バリュー操作

import redis

r = redis.Redis(decode_responses=True)

# 文字列の設定と取得
r.set('name', 'Alice')
name = r.get('name')
print(name)  # Alice

# 期限付きキー設定
r.setex('session:123', 3600, 'session_data')  # 1時間後に期限切れ

# 数値の操作
r.set('counter', 0)
count = r.incr('counter')
print(count)  # 1

# 複数キーの一括操作
r.mset({'key1': 'value1', 'key2': 'value2', 'key3': 'value3'})
values = r.mget(['key1', 'key2', 'key3'])
print(values)  # ['value1', 'value2', 'value3']

# キーの存在確認
if r.exists('name'):
    print('Key exists')

# キーの削除
r.delete('name')

リスト・セット・ハッシュ操作

# リスト操作
r.lpush('tasks', 'task1', 'task2', 'task3')
r.rpush('tasks', 'task4')

# リストの内容を取得
tasks = r.lrange('tasks', 0, -1)
print(tasks)  # ['task3', 'task2', 'task1', 'task4']

# リストから要素を取得
task = r.lpop('tasks')
print(task)  # task3

# セット操作
r.sadd('tags', 'python', 'redis', 'cache')
members = r.smembers('tags')
print(members)  # {'python', 'redis', 'cache'}

# セットの演算
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')
intersection = r.sinter('set1', 'set2')
print(intersection)  # {'b', 'c'}

# ハッシュ操作
r.hset('user:1', mapping={
    'name': 'John',
    'email': '[email protected]',
    'age': 30
})

user_data = r.hgetall('user:1')
print(user_data)  # {'name': 'John', 'email': '[email protected]', 'age': '30'}

name = r.hget('user:1', 'name')
print(name)  # John

パイプライニング

# パイプラインで複数コマンドを効率的に実行
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.get('key2')
pipe.incr('counter')

# 一括実行
results = pipe.execute()
print(results)  # [True, True, 'value1', 'value2', 1]

# トランザクション付きパイプライン(デフォルト)
pipe = r.pipeline(transaction=True)
pipe.multi()
pipe.set('account:1:balance', 100)
pipe.set('account:2:balance', 200)
pipe.decrby('account:1:balance', 50)
pipe.incrby('account:2:balance', 50)
results = pipe.execute()

Pub/Sub機能

import threading
import time

# Publisher側
def publisher():
    time.sleep(1)  # Subscriberの準備を待つ
    for i in range(5):
        r.publish('notifications', f'Message {i}')
        time.sleep(1)

# Subscriber側
def subscriber():
    pubsub = r.pubsub()
    pubsub.subscribe('notifications')
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"Received: {message['data']}")
        elif message['type'] == 'subscribe':
            print(f"Subscribed to {message['channel']}")

# 別スレッドでPublisher実行
pub_thread = threading.Thread(target=publisher)
pub_thread.start()

# Subscriber実行
subscriber()

接続プール

# 接続プールの使用
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=20)
r = redis.Redis(connection_pool=pool)

# 設定付き接続プール
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=50,
    retry_on_timeout=True,
    socket_timeout=5,
    socket_connect_timeout=5
)
r = redis.Redis(connection_pool=pool)

エラーハンドリングとリトライ

from redis.exceptions import ConnectionError, TimeoutError, ResponseError
from redis.retry import Retry
from redis.backoff import ExponentialBackoff

# リトライ機能付きクライアント
retry = Retry(ExponentialBackoff(), 3)
r = redis.Redis(
    host='localhost',
    port=6379,
    retry=retry,
    retry_on_timeout=True
)

# エラーハンドリング
def safe_redis_operation():
    try:
        r.set('test_key', 'test_value')
        value = r.get('test_key')
        return value
        
    except ConnectionError:
        print("Redis connection failed")
        return None
        
    except TimeoutError:
        print("Redis operation timed out")
        return None
        
    except ResponseError as e:
        print(f"Redis response error: {e}")
        return None

Redis Cluster

from redis.cluster import RedisCluster

# Redis Cluster接続
startup_nodes = [
    {"host": "127.0.0.1", "port": "7000"},
    {"host": "127.0.0.1", "port": "7001"},
    {"host": "127.0.0.1", "port": "7002"}
]

rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)

# Clusterでの操作
rc.set('name', 'Alice')
name = rc.get('name')
print(name)  # Alice

# クラスター情報
nodes = rc.get_nodes()
print(f"Cluster has {len(nodes)} nodes")

非同期操作(redis.asyncio)

import asyncio
import redis.asyncio as redis

async def async_redis_operations():
    # 非同期Redis接続
    r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
    
    try:
        # 基本操作
        await r.set('async_key', 'async_value')
        value = await r.get('async_key')
        print(f"Async value: {value}")
        
        # パイプライン
        pipe = r.pipeline()
        pipe.set('key1', 'value1')
        pipe.set('key2', 'value2')
        pipe.get('key1')
        results = await pipe.execute()
        print(f"Pipeline results: {results}")
        
    finally:
        await r.close()

# 実行
asyncio.run(async_redis_operations())

Luaスクリプト

# Luaスクリプトの定義と実行
increment_script = """
local key = KEYS[1]
local increment = ARGV[1]
local current = redis.call('GET', key)
if current == false then
    current = 0
else
    current = tonumber(current)
end
local new_value = current + tonumber(increment)
redis.call('SET', key, new_value)
return new_value
"""

# スクリプトの登録
script = r.register_script(increment_script)

# スクリプトの実行
result = script(keys=['my_counter'], args=[5])
print(f"New counter value: {result}")

# 再実行
result = script(keys=['my_counter'], args=[3])
print(f"New counter value: {result}")

高度なキャッシングパターン

import json
import time
from functools import wraps

def redis_cache(expiration=3600):
    """デコレータベースのキャッシング"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # キャッシュキーの生成
            cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
            
            # キャッシュから取得を試行
            cached_result = r.get(cache_key)
            if cached_result:
                return json.loads(cached_result)
            
            # キャッシュにない場合は実行
            result = func(*args, **kwargs)
            
            # 結果をキャッシュに保存
            r.setex(cache_key, expiration, json.dumps(result))
            return result
        return wrapper
    return decorator

# 使用例
@redis_cache(expiration=1800)  # 30分キャッシュ
def expensive_computation(n):
    """重い計算処理"""
    time.sleep(2)  # 重い処理をシミュレート
    return sum(range(n))

# 実行
result = expensive_computation(100000)  # 初回は2秒かかる
result = expensive_computation(100000)  # 2回目は即座に返る