redis
Rust向けの高機能Redisクライアント。同期・非同期APIの両方をサポートし、クラスタ、トランザクション、Pub/Sub機能を提供。
GitHub概要
redis-rs/redis-rs
Redis library for rust
スター4,050
ウォッチ36
フォーク652
作成日:2013年12月29日
言語:Rust
ライセンス:Other
トピックス
なし
スター履歴
データ取得日時: 2025/10/22 09:23
概要
redis-rsは、Redis、Valkey、およびその他のRESP互換データベース向けの高レベルRustクライアントライブラリです。同期・非同期の両方のAPIを提供し、Tokio、async-std、smolなどの主要な非同期ランタイムをサポートしています。クラスタ、トランザクション、Pub/Subなど、Redisの高度な機能にも対応しています。
主な特徴
- 同期・非同期対応: 両方のAPIスタイルをサポート
- マルチプレキシング: 単一接続で複数の並行リクエストを処理
- クラスタサポート: Redis Clusterの自動ルーティングとトポロジー変更対応
- トランザクション: MULTI/EXECによるアトミック操作
- Pub/Sub: 非同期メッセージングのフルサポート
- TLS対応: セキュアな接続をサポート
インストール
[dependencies]
# 基本的な同期API
redis = "0.25"
# Tokio非同期サポート
redis = { version = "0.25", features = ["tokio-comp"] }
# async-std非同期サポート
redis = { version = "0.25", features = ["async-std-comp"] }
# 接続プール(同期)
redis = { version = "0.25", features = ["r2d2"] }
# 接続マネージャー(自動再接続)
redis = { version = "0.25", features = ["connection-manager"] }
# クラスタサポート
redis = { version = "0.25", features = ["cluster", "cluster-async"] }
# TLSサポート
redis = { version = "0.25", features = ["tls-native-tls"] }
基本的な使い方
同期API
use redis::Commands;
fn main() -> redis::RedisResult<()> {
// クライアントの作成
let client = redis::Client::open("redis://127.0.0.1/")?;
let mut con = client.get_connection()?;
// 文字列の操作
let _: () = con.set("my_key", "Hello, Redis!")?;
let value: String = con.get("my_key")?;
println!("Value: {}", value);
// 数値のインクリメント
let _: () = con.set("counter", 0)?;
let new_val: i32 = con.incr("counter", 1)?;
println!("Counter: {}", new_val);
// リストの操作
let _: () = con.lpush("my_list", vec!["item1", "item2", "item3"])?;
let list_items: Vec<String> = con.lrange("my_list", 0, -1)?;
println!("List: {:?}", list_items);
Ok(())
}
非同期API(Tokio)
use redis::AsyncCommands;
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/")?;
// マルチプレキシング接続(推奨)
let mut con = client.get_multiplexed_async_connection().await?;
// 非同期操作
let _: () = con.set("async_key", "async value").await?;
let value: String = con.get("async_key").await?;
println!("Async value: {}", value);
// パイプライン
let (k1, k2): (String, String) = redis::pipe()
.atomic()
.set("key1", "value1")
.set("key2", "value2")
.ignore()
.get("key1")
.get("key2")
.query_async(&mut con)
.await?;
println!("Pipeline results: {}, {}", k1, k2);
Ok(())
}
高度な使い方
接続プールの設定(同期)
use redis::Commands;
use r2d2::Pool;
fn setup_pool() -> redis::RedisResult<Pool<redis::Client>> {
let client = redis::Client::open("redis://127.0.0.1/")?;
let pool = r2d2::Pool::builder()
.max_size(15)
.min_idle(Some(5))
.connection_timeout(std::time::Duration::from_secs(10))
.build(client)
.expect("Failed to create pool");
Ok(pool)
}
fn use_pooled_connection(pool: &Pool<redis::Client>) -> redis::RedisResult<()> {
let mut conn = pool.get().expect("Failed to get connection from pool");
// 接続を使用
let _: () = conn.set("pooled_key", "pooled_value")?;
let value: String = conn.get("pooled_key")?;
println!("Pooled value: {}", value);
Ok(())
}
ConnectionManager(自動再接続)
use redis::{AsyncCommands, ConnectionManager};
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/")?;
let mut manager = ConnectionManager::new(client).await?;
// 接続が切断されても自動的に再接続される
loop {
match manager.set("key", "value").await {
Ok(_) => println!("Set successful"),
Err(e) => {
eprintln!("Error: {}, retrying...", e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
トランザクション
use redis::{Commands, transaction};
fn increment_atomically(con: &mut redis::Connection, key: &str) -> redis::RedisResult<i32> {
let (new_val,): (i32,) = transaction(con, &[key], |con, pipe| {
let old_val: i32 = con.get(key).unwrap_or(0);
pipe
.set(key, old_val + 1)
.ignore()
.get(key)
.query(con)
})?;
Ok(new_val)
}
// より複雑なトランザクション例
fn transfer_funds(
con: &mut redis::Connection,
from: &str,
to: &str,
amount: i32,
) -> redis::RedisResult<()> {
let result: redis::RedisResult<()> = transaction(con, &[from, to], |con, pipe| {
let from_balance: i32 = con.get(from)?;
let to_balance: i32 = con.get(to)?;
if from_balance < amount {
return Err(redis::RedisError::from((
redis::ErrorKind::ExtensionError,
"Insufficient funds",
)));
}
pipe
.set(from, from_balance - amount)
.ignore()
.set(to, to_balance + amount)
.ignore()
.query(con)
});
result
}
Pub/Sub(非同期)
use redis::aio::PubSub;
use futures_util::StreamExt;
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/")?;
let mut pubsub = client.get_async_pubsub().await?;
// チャンネルの購読
pubsub.subscribe("channel1").await?;
pubsub.psubscribe("channel:*").await?;
// メッセージの受信
let mut pubsub_stream = pubsub.on_message();
tokio::spawn(async move {
while let Some(msg) = pubsub_stream.next().await {
let channel = msg.get_channel_name();
let payload: String = msg.get_payload().unwrap();
println!("Received on {}: {}", channel, payload);
}
});
// 別の接続でメッセージを送信
let mut con = client.get_multiplexed_async_connection().await?;
redis::cmd("PUBLISH")
.arg("channel1")
.arg("Hello, subscribers!")
.exec_async(&mut con)
.await?;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Ok(())
}
実践的な例
Redis Clusterの使用
use redis::cluster::{ClusterClient, ClusterClientBuilder};
use redis::AsyncCommands;
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
// クラスタノードのリスト
let nodes = vec![
"redis://127.0.0.1:7000/",
"redis://127.0.0.1:7001/",
"redis://127.0.0.1:7002/",
];
// クラスタクライアントの作成
let client = ClusterClientBuilder::new(nodes)
.password("cluster_password".to_string())
.retries(3)
.build()?;
let mut con = client.get_async_connection().await?;
// クラスタへの操作は通常の接続と同じ
let _: () = con.set("cluster_key", "cluster_value").await?;
let value: String = con.get("cluster_key").await?;
println!("Cluster value: {}", value);
// ハッシュタグを使用して同じノードに配置
let _: () = con.set("{user:123}:name", "Alice").await?;
let _: () = con.set("{user:123}:age", "30").await?;
// トランザクションも可能(同じハッシュタグ内で)
let results: Vec<String> = redis::pipe()
.atomic()
.get("{user:123}:name")
.get("{user:123}:age")
.query_async(&mut con)
.await?;
println!("User data: {:?}", results);
Ok(())
}
セッション管理システム
use redis::{AsyncCommands, aio::MultiplexedConnection};
use serde::{Serialize, Deserialize};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Serialize, Deserialize, Debug)]
struct Session {
user_id: u64,
username: String,
created_at: u64,
last_accessed: u64,
}
struct SessionManager {
redis: MultiplexedConnection,
ttl: u64, // セッションの有効期限(秒)
}
impl SessionManager {
async fn new(redis_url: &str, ttl: u64) -> redis::RedisResult<Self> {
let client = redis::Client::open(redis_url)?;
let redis = client.get_multiplexed_async_connection().await?;
Ok(Self { redis, ttl })
}
async fn create_session(&mut self, user_id: u64, username: String) -> redis::RedisResult<String> {
let session_id = uuid::Uuid::new_v4().to_string();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let session = Session {
user_id,
username,
created_at: now,
last_accessed: now,
};
let session_json = serde_json::to_string(&session).unwrap();
let key = format!("session:{}", session_id);
// セッションを保存(TTL付き)
self.redis.set_ex(&key, session_json, self.ttl).await?;
Ok(session_id)
}
async fn get_session(&mut self, session_id: &str) -> redis::RedisResult<Option<Session>> {
let key = format!("session:{}", session_id);
let session_json: Option<String> = self.redis.get(&key).await?;
match session_json {
Some(json) => {
let mut session: Session = serde_json::from_str(&json).unwrap();
// 最終アクセス時刻を更新
session.last_accessed = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let updated_json = serde_json::to_string(&session).unwrap();
// TTLを更新
self.redis.set_ex(&key, updated_json, self.ttl).await?;
Ok(Some(session))
}
None => Ok(None),
}
}
async fn delete_session(&mut self, session_id: &str) -> redis::RedisResult<()> {
let key = format!("session:{}", session_id);
self.redis.del(&key).await?;
Ok(())
}
}
// 使用例
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
let mut manager = SessionManager::new("redis://127.0.0.1/", 3600).await?;
// セッション作成
let session_id = manager.create_session(123, "alice".to_string()).await?;
println!("Created session: {}", session_id);
// セッション取得
if let Some(session) = manager.get_session(&session_id).await? {
println!("Retrieved session: {:?}", session);
}
Ok(())
}
Luaスクリプトの実行
use redis::{AsyncCommands, Script};
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/")?;
let mut con = client.get_multiplexed_async_connection().await?;
// レート制限のLuaスクリプト
let rate_limit_script = Script::new(r"
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('GET', key)
if current == false then
redis.call('SETEX', key, window, 1)
return 1
elseif tonumber(current) < limit then
redis.call('INCR', key)
return 1
else
return 0
end
");
// スクリプトの実行
let allowed: i32 = rate_limit_script
.key("rate_limit:user:123")
.arg(10) // 1分間に10リクエストまで
.arg(60) // 60秒のウィンドウ
.invoke_async(&mut con)
.await?;
if allowed == 1 {
println!("Request allowed");
} else {
println!("Rate limit exceeded");
}
Ok(())
}
パフォーマンスのベストプラクティス
1. 接続の再利用
use redis::aio::MultiplexedConnection;
use std::sync::Arc;
// アプリケーション全体で共有する接続
struct AppState {
redis: Arc<tokio::sync::Mutex<MultiplexedConnection>>,
}
impl AppState {
async fn new(redis_url: &str) -> redis::RedisResult<Self> {
let client = redis::Client::open(redis_url)?;
let con = client.get_multiplexed_async_connection().await?;
Ok(Self {
redis: Arc::new(tokio::sync::Mutex::new(con)),
})
}
}
// マルチプレキシング接続はクローン可能
async fn better_approach(redis_url: &str) -> redis::RedisResult<MultiplexedConnection> {
let client = redis::Client::open(redis_url)?;
let con = client.get_multiplexed_async_connection().await?;
// この接続はクローンして複数のタスクで使用可能
Ok(con)
}
2. パイプラインの活用
use redis::AsyncCommands;
// 非効率的:複数の往復
async fn inefficient_bulk_set(con: &mut MultiplexedConnection, data: Vec<(String, String)>)
-> redis::RedisResult<()> {
for (key, value) in data {
let _: () = con.set(key, value).await?;
}
Ok(())
}
// 効率的:パイプラインで一度に送信
async fn efficient_bulk_set(con: &mut MultiplexedConnection, data: Vec<(String, String)>)
-> redis::RedisResult<()> {
let mut pipe = redis::pipe();
for (key, value) in data {
pipe.set(key, value).ignore();
}
pipe.query_async(con).await?;
Ok(())
}
3. 適切なシリアライゼーション
use serde::{Serialize, Deserialize};
use bincode;
#[derive(Serialize, Deserialize)]
struct LargeData {
id: u64,
values: Vec<f64>,
metadata: HashMap<String, String>,
}
// バイナリシリアライゼーション(効率的)
async fn save_binary(con: &mut MultiplexedConnection, key: &str, data: &LargeData)
-> redis::RedisResult<()> {
let bytes = bincode::serialize(data).unwrap();
con.set(key, bytes).await
}
async fn load_binary(con: &mut MultiplexedConnection, key: &str)
-> redis::RedisResult<Option<LargeData>> {
let bytes: Option<Vec<u8>> = con.get(key).await?;
match bytes {
Some(b) => Ok(bincode::deserialize(&b).ok()),
None => Ok(None),
}
}
他のRedisクライアントとの比較
redis-rs vs fred
| 特徴 | redis-rs | fred |
|---|---|---|
| 成熟度 | ◎ | ○ |
| API設計 | 伝統的 | モダン |
| パフォーマンス | ◎ | ◎ |
| クラスタサポート | ◎ | ◎ |
| ドキュメント | ◎ | ○ |
redis-rs vs bb8-redis
| 特徴 | redis-rs | bb8-redis |
|---|---|---|
| 接続プール | r2d2 | bb8 |
| 非同期プール | 不要* | ◎ |
| 統合の容易さ | ◎ | ○ |
*注:redis-rsのマルチプレキシング接続は通常プール不要
まとめ
redis-rsは、Rustエコシステムにおける最も成熟したRedisクライアントです。同期・非同期の両方のAPIを提供し、Redisの全機能をサポートしています。特に、マルチプレキシング接続による効率的な非同期処理、包括的なクラスタサポート、豊富な機能により、小規模から大規模なアプリケーションまで幅広く対応できます。