redis

Rust向けの高機能Redisクライアント。同期・非同期APIの両方をサポートし、クラスタ、トランザクション、Pub/Sub機能を提供。

GitHub概要

redis-rs/redis-rs

Redis library for rust

スター4,050
ウォッチ36
フォーク652
作成日:2013年12月29日
言語:Rust
ライセンス:Other

トピックス

なし

スター履歴

redis-rs/redis-rs Star History
データ取得日時: 2025/10/22 09:23

概要

redis-rsは、Redis、Valkey、およびその他のRESP互換データベース向けの高レベルRustクライアントライブラリです。同期・非同期の両方のAPIを提供し、Tokio、async-std、smolなどの主要な非同期ランタイムをサポートしています。クラスタ、トランザクション、Pub/Subなど、Redisの高度な機能にも対応しています。

主な特徴

  • 同期・非同期対応: 両方のAPIスタイルをサポート
  • マルチプレキシング: 単一接続で複数の並行リクエストを処理
  • クラスタサポート: Redis Clusterの自動ルーティングとトポロジー変更対応
  • トランザクション: MULTI/EXECによるアトミック操作
  • Pub/Sub: 非同期メッセージングのフルサポート
  • TLS対応: セキュアな接続をサポート

インストール

[dependencies]
# 基本的な同期API
redis = "0.25"

# Tokio非同期サポート
redis = { version = "0.25", features = ["tokio-comp"] }

# async-std非同期サポート
redis = { version = "0.25", features = ["async-std-comp"] }

# 接続プール(同期)
redis = { version = "0.25", features = ["r2d2"] }

# 接続マネージャー(自動再接続)
redis = { version = "0.25", features = ["connection-manager"] }

# クラスタサポート
redis = { version = "0.25", features = ["cluster", "cluster-async"] }

# TLSサポート
redis = { version = "0.25", features = ["tls-native-tls"] }

基本的な使い方

同期API

use redis::Commands;

fn main() -> redis::RedisResult<()> {
    // クライアントの作成
    let client = redis::Client::open("redis://127.0.0.1/")?;
    let mut con = client.get_connection()?;

    // 文字列の操作
    let _: () = con.set("my_key", "Hello, Redis!")?;
    let value: String = con.get("my_key")?;
    println!("Value: {}", value);

    // 数値のインクリメント
    let _: () = con.set("counter", 0)?;
    let new_val: i32 = con.incr("counter", 1)?;
    println!("Counter: {}", new_val);

    // リストの操作
    let _: () = con.lpush("my_list", vec!["item1", "item2", "item3"])?;
    let list_items: Vec<String> = con.lrange("my_list", 0, -1)?;
    println!("List: {:?}", list_items);

    Ok(())
}

非同期API(Tokio)

use redis::AsyncCommands;

#[tokio::main]
async fn main() -> redis::RedisResult<()> {
    let client = redis::Client::open("redis://127.0.0.1/")?;
    
    // マルチプレキシング接続(推奨)
    let mut con = client.get_multiplexed_async_connection().await?;

    // 非同期操作
    let _: () = con.set("async_key", "async value").await?;
    let value: String = con.get("async_key").await?;
    println!("Async value: {}", value);

    // パイプライン
    let (k1, k2): (String, String) = redis::pipe()
        .atomic()
        .set("key1", "value1")
        .set("key2", "value2")
        .ignore()
        .get("key1")
        .get("key2")
        .query_async(&mut con)
        .await?;
    
    println!("Pipeline results: {}, {}", k1, k2);

    Ok(())
}

高度な使い方

接続プールの設定(同期)

use redis::Commands;
use r2d2::Pool;

fn setup_pool() -> redis::RedisResult<Pool<redis::Client>> {
    let client = redis::Client::open("redis://127.0.0.1/")?;
    
    let pool = r2d2::Pool::builder()
        .max_size(15)
        .min_idle(Some(5))
        .connection_timeout(std::time::Duration::from_secs(10))
        .build(client)
        .expect("Failed to create pool");

    Ok(pool)
}

fn use_pooled_connection(pool: &Pool<redis::Client>) -> redis::RedisResult<()> {
    let mut conn = pool.get().expect("Failed to get connection from pool");
    
    // 接続を使用
    let _: () = conn.set("pooled_key", "pooled_value")?;
    let value: String = conn.get("pooled_key")?;
    println!("Pooled value: {}", value);
    
    Ok(())
}

ConnectionManager(自動再接続)

use redis::{AsyncCommands, ConnectionManager};

#[tokio::main]
async fn main() -> redis::RedisResult<()> {
    let client = redis::Client::open("redis://127.0.0.1/")?;
    let mut manager = ConnectionManager::new(client).await?;

    // 接続が切断されても自動的に再接続される
    loop {
        match manager.set("key", "value").await {
            Ok(_) => println!("Set successful"),
            Err(e) => {
                eprintln!("Error: {}, retrying...", e);
                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            }
        }
        
        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
    }
}

トランザクション

use redis::{Commands, transaction};

fn increment_atomically(con: &mut redis::Connection, key: &str) -> redis::RedisResult<i32> {
    let (new_val,): (i32,) = transaction(con, &[key], |con, pipe| {
        let old_val: i32 = con.get(key).unwrap_or(0);
        pipe
            .set(key, old_val + 1)
            .ignore()
            .get(key)
            .query(con)
    })?;
    
    Ok(new_val)
}

// より複雑なトランザクション例
fn transfer_funds(
    con: &mut redis::Connection,
    from: &str,
    to: &str,
    amount: i32,
) -> redis::RedisResult<()> {
    let result: redis::RedisResult<()> = transaction(con, &[from, to], |con, pipe| {
        let from_balance: i32 = con.get(from)?;
        let to_balance: i32 = con.get(to)?;
        
        if from_balance < amount {
            return Err(redis::RedisError::from((
                redis::ErrorKind::ExtensionError,
                "Insufficient funds",
            )));
        }
        
        pipe
            .set(from, from_balance - amount)
            .ignore()
            .set(to, to_balance + amount)
            .ignore()
            .query(con)
    });
    
    result
}

Pub/Sub(非同期)

use redis::aio::PubSub;
use futures_util::StreamExt;

#[tokio::main]
async fn main() -> redis::RedisResult<()> {
    let client = redis::Client::open("redis://127.0.0.1/")?;
    let mut pubsub = client.get_async_pubsub().await?;

    // チャンネルの購読
    pubsub.subscribe("channel1").await?;
    pubsub.psubscribe("channel:*").await?;

    // メッセージの受信
    let mut pubsub_stream = pubsub.on_message();
    
    tokio::spawn(async move {
        while let Some(msg) = pubsub_stream.next().await {
            let channel = msg.get_channel_name();
            let payload: String = msg.get_payload().unwrap();
            println!("Received on {}: {}", channel, payload);
        }
    });

    // 別の接続でメッセージを送信
    let mut con = client.get_multiplexed_async_connection().await?;
    redis::cmd("PUBLISH")
        .arg("channel1")
        .arg("Hello, subscribers!")
        .exec_async(&mut con)
        .await?;

    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    Ok(())
}

実践的な例

Redis Clusterの使用

use redis::cluster::{ClusterClient, ClusterClientBuilder};
use redis::AsyncCommands;

#[tokio::main]
async fn main() -> redis::RedisResult<()> {
    // クラスタノードのリスト
    let nodes = vec![
        "redis://127.0.0.1:7000/",
        "redis://127.0.0.1:7001/",
        "redis://127.0.0.1:7002/",
    ];

    // クラスタクライアントの作成
    let client = ClusterClientBuilder::new(nodes)
        .password("cluster_password".to_string())
        .retries(3)
        .build()?;

    let mut con = client.get_async_connection().await?;

    // クラスタへの操作は通常の接続と同じ
    let _: () = con.set("cluster_key", "cluster_value").await?;
    let value: String = con.get("cluster_key").await?;
    println!("Cluster value: {}", value);

    // ハッシュタグを使用して同じノードに配置
    let _: () = con.set("{user:123}:name", "Alice").await?;
    let _: () = con.set("{user:123}:age", "30").await?;
    
    // トランザクションも可能(同じハッシュタグ内で)
    let results: Vec<String> = redis::pipe()
        .atomic()
        .get("{user:123}:name")
        .get("{user:123}:age")
        .query_async(&mut con)
        .await?;
    
    println!("User data: {:?}", results);

    Ok(())
}

セッション管理システム

use redis::{AsyncCommands, aio::MultiplexedConnection};
use serde::{Serialize, Deserialize};
use std::time::{SystemTime, UNIX_EPOCH};

#[derive(Serialize, Deserialize, Debug)]
struct Session {
    user_id: u64,
    username: String,
    created_at: u64,
    last_accessed: u64,
}

struct SessionManager {
    redis: MultiplexedConnection,
    ttl: u64, // セッションの有効期限(秒)
}

impl SessionManager {
    async fn new(redis_url: &str, ttl: u64) -> redis::RedisResult<Self> {
        let client = redis::Client::open(redis_url)?;
        let redis = client.get_multiplexed_async_connection().await?;
        
        Ok(Self { redis, ttl })
    }

    async fn create_session(&mut self, user_id: u64, username: String) -> redis::RedisResult<String> {
        let session_id = uuid::Uuid::new_v4().to_string();
        let now = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();

        let session = Session {
            user_id,
            username,
            created_at: now,
            last_accessed: now,
        };

        let session_json = serde_json::to_string(&session).unwrap();
        let key = format!("session:{}", session_id);

        // セッションを保存(TTL付き)
        self.redis.set_ex(&key, session_json, self.ttl).await?;

        Ok(session_id)
    }

    async fn get_session(&mut self, session_id: &str) -> redis::RedisResult<Option<Session>> {
        let key = format!("session:{}", session_id);
        let session_json: Option<String> = self.redis.get(&key).await?;

        match session_json {
            Some(json) => {
                let mut session: Session = serde_json::from_str(&json).unwrap();
                
                // 最終アクセス時刻を更新
                session.last_accessed = SystemTime::now()
                    .duration_since(UNIX_EPOCH)
                    .unwrap()
                    .as_secs();
                
                let updated_json = serde_json::to_string(&session).unwrap();
                
                // TTLを更新
                self.redis.set_ex(&key, updated_json, self.ttl).await?;
                
                Ok(Some(session))
            }
            None => Ok(None),
        }
    }

    async fn delete_session(&mut self, session_id: &str) -> redis::RedisResult<()> {
        let key = format!("session:{}", session_id);
        self.redis.del(&key).await?;
        Ok(())
    }
}

// 使用例
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
    let mut manager = SessionManager::new("redis://127.0.0.1/", 3600).await?;

    // セッション作成
    let session_id = manager.create_session(123, "alice".to_string()).await?;
    println!("Created session: {}", session_id);

    // セッション取得
    if let Some(session) = manager.get_session(&session_id).await? {
        println!("Retrieved session: {:?}", session);
    }

    Ok(())
}

Luaスクリプトの実行

use redis::{AsyncCommands, Script};

#[tokio::main]
async fn main() -> redis::RedisResult<()> {
    let client = redis::Client::open("redis://127.0.0.1/")?;
    let mut con = client.get_multiplexed_async_connection().await?;

    // レート制限のLuaスクリプト
    let rate_limit_script = Script::new(r"
        local key = KEYS[1]
        local limit = tonumber(ARGV[1])
        local window = tonumber(ARGV[2])
        local current = redis.call('GET', key)
        
        if current == false then
            redis.call('SETEX', key, window, 1)
            return 1
        elseif tonumber(current) < limit then
            redis.call('INCR', key)
            return 1
        else
            return 0
        end
    ");

    // スクリプトの実行
    let allowed: i32 = rate_limit_script
        .key("rate_limit:user:123")
        .arg(10)  // 1分間に10リクエストまで
        .arg(60)  // 60秒のウィンドウ
        .invoke_async(&mut con)
        .await?;

    if allowed == 1 {
        println!("Request allowed");
    } else {
        println!("Rate limit exceeded");
    }

    Ok(())
}

パフォーマンスのベストプラクティス

1. 接続の再利用

use redis::aio::MultiplexedConnection;
use std::sync::Arc;

// アプリケーション全体で共有する接続
struct AppState {
    redis: Arc<tokio::sync::Mutex<MultiplexedConnection>>,
}

impl AppState {
    async fn new(redis_url: &str) -> redis::RedisResult<Self> {
        let client = redis::Client::open(redis_url)?;
        let con = client.get_multiplexed_async_connection().await?;
        
        Ok(Self {
            redis: Arc::new(tokio::sync::Mutex::new(con)),
        })
    }
}

// マルチプレキシング接続はクローン可能
async fn better_approach(redis_url: &str) -> redis::RedisResult<MultiplexedConnection> {
    let client = redis::Client::open(redis_url)?;
    let con = client.get_multiplexed_async_connection().await?;
    // この接続はクローンして複数のタスクで使用可能
    Ok(con)
}

2. パイプラインの活用

use redis::AsyncCommands;

// 非効率的:複数の往復
async fn inefficient_bulk_set(con: &mut MultiplexedConnection, data: Vec<(String, String)>) 
    -> redis::RedisResult<()> {
    for (key, value) in data {
        let _: () = con.set(key, value).await?;
    }
    Ok(())
}

// 効率的:パイプラインで一度に送信
async fn efficient_bulk_set(con: &mut MultiplexedConnection, data: Vec<(String, String)>) 
    -> redis::RedisResult<()> {
    let mut pipe = redis::pipe();
    
    for (key, value) in data {
        pipe.set(key, value).ignore();
    }
    
    pipe.query_async(con).await?;
    Ok(())
}

3. 適切なシリアライゼーション

use serde::{Serialize, Deserialize};
use bincode;

#[derive(Serialize, Deserialize)]
struct LargeData {
    id: u64,
    values: Vec<f64>,
    metadata: HashMap<String, String>,
}

// バイナリシリアライゼーション(効率的)
async fn save_binary(con: &mut MultiplexedConnection, key: &str, data: &LargeData) 
    -> redis::RedisResult<()> {
    let bytes = bincode::serialize(data).unwrap();
    con.set(key, bytes).await
}

async fn load_binary(con: &mut MultiplexedConnection, key: &str) 
    -> redis::RedisResult<Option<LargeData>> {
    let bytes: Option<Vec<u8>> = con.get(key).await?;
    match bytes {
        Some(b) => Ok(bincode::deserialize(&b).ok()),
        None => Ok(None),
    }
}

他のRedisクライアントとの比較

redis-rs vs fred

特徴redis-rsfred
成熟度
API設計伝統的モダン
パフォーマンス
クラスタサポート
ドキュメント

redis-rs vs bb8-redis

特徴redis-rsbb8-redis
接続プールr2d2bb8
非同期プール不要*
統合の容易さ

*注:redis-rsのマルチプレキシング接続は通常プール不要

まとめ

redis-rsは、Rustエコシステムにおける最も成熟したRedisクライアントです。同期・非同期の両方のAPIを提供し、Redisの全機能をサポートしています。特に、マルチプレキシング接続による効率的な非同期処理、包括的なクラスタサポート、豊富な機能により、小規模から大規模なアプリケーションまで幅広く対応できます。