Mini Redis

RustRedisTokio非同期学習教材インメモリDB

GitHub概要

tokio-rs/mini-redis

Incomplete Redis client and server implementation using Tokio - for learning purposes only

スター4,450
ウォッチ45
フォーク531
作成日:2019年12月4日
言語:Rust
ライセンス:MIT License

トピックス

なし

スター履歴

tokio-rs/mini-redis Star History
データ取得日時: 2025/10/22 08:07

キャッシュライブラリ

Mini Redis

概要

Mini Redisは、Tokioフレームワークを使用して構築されたRedisサーバーとクライアントの学習用実装です。

詳細

Mini Redisは、非同期Rustアプリケーション開発の学習を目的として設計されたRedisの最小限実装です。Tokioエコシステムの使用方法を包括的に学習できる教育リソースとして、Redis公式のTokio-rsチームによって開発・メンテナンスされています。本番環境での使用は意図されておらず、Tokioの非同期プログラミングパターン、フレーミング、グレースフルシャットダウン、Pub/Sub、接続管理など、実用的なRustアプリケーション開発で必要となる概念を体系的に学習できるよう設計されています。GET/SET、PUBLISH/SUBSCRIBE、PING等の基本的なRedisコマンドを実装し、TCP接続管理、Redisワイヤプロトコルの解析・エンコード、バックグラウンドタスクによるキー有効期限管理、セマフォを使用した接続制限など、実際のサーバーアプリケーションで必要となる機能を含んでいます。公式Tokioチュートリアルとして位置づけられ、段階的にRedisクライアントとサーバーの構築プロセスを学習できるドキュメントと豊富なコード例が提供されています。

メリット・デメリット

メリット

  • 学習特化: Tokioの非同期プログラミング学習に最適化
  • 実践的コード例: 実用的なサーバーアプリケーションパターンを包含
  • 包括的チュートリアル: 段階的な学習プロセスの提供
  • 公式サポート: Tokioチームによる継続的メンテナンス
  • 豊富なパターン: フレーミング、Pub/Sub、グレースフルシャットダウン等
  • 詳細ドキュメント: コード解説とベストプラクティスの提供
  • プロダクションレディパターン: 実際の開発で応用可能な設計パターン

デメリット

  • 本番利用不可: 学習目的のため本番環境使用は非推奨
  • 機能制限: 完全なRedis実装ではなく基本機能のみ
  • パフォーマンス最適化なし: 学習重視でパフォーマンス最適化は限定的
  • スケーラビリティ制限: 大規模システム向けの最適化なし
  • プロトコル部分実装: Redis全機能は未実装

主要リンク

書き方の例

インストールと基本セットアップ

# Mini-Redisサーバーのインストール
cargo install mini-redis

# プロジェクトへの依存関係追加 (Cargo.toml)
[dependencies]
mini-redis = "0.4"
tokio = { version = "1.0", features = ["full"] }

サーバーの起動と基本操作

# サーバーの起動
RUST_LOG=debug cargo run --bin mini-redis-server

# 別ターミナルでCLIクライアント操作
cargo run --bin mini-redis-cli set foo bar
cargo run --bin mini-redis-cli get foo
cargo run --bin mini-redis-cli ping

基本的なクライアント接続

use mini_redis::clients::Client;

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    // サーバーへの接続
    let mut client = Client::connect("127.0.0.1:6379").await?;
    
    // 接続確認
    let pong = client.ping(None).await?;
    println!("Server response: {:?}", pong);
    
    Ok(())
}

GET/SET操作の実装

use mini_redis::clients::Client;
use bytes::Bytes;

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    let mut client = Client::connect("127.0.0.1:6379").await?;
    
    // SET操作
    client.set("hello", "world".into()).await?;
    client.set("user:1", "Alice".into()).await?;
    
    // GET操作
    let value = client.get("hello").await?;
    match value {
        Some(val) => {
            println!("Got value: {}", std::str::from_utf8(&val)?);
        }
        None => {
            println!("Key not found");
        }
    }
    
    // 複数のキー操作
    let keys = vec!["hello", "user:1", "nonexistent"];
    for key in keys {
        match client.get(key).await? {
            Some(value) => {
                println!("{} = {}", key, std::str::from_utf8(&value)?);
            }
            None => {
                println!("{} = (not found)", key);
            }
        }
    }
    
    Ok(())
}

Pub/Sub機能の実装

use mini_redis::clients::{Client, BlockingClient};
use tokio::sync::oneshot;

async fn publisher() -> mini_redis::Result<()> {
    let mut client = Client::connect("127.0.0.1:6379").await?;
    
    // チャンネルにメッセージを配信
    client.publish("news", "Breaking news!".into()).await?;
    client.publish("sports", "Game results".into()).await?;
    client.publish("weather", "Sunny day".into()).await?;
    
    Ok(())
}

async fn subscriber() -> mini_redis::Result<()> {
    let client = Client::connect("127.0.0.1:6379").await?;
    let mut subscriber = client.subscribe(vec!["news".to_string(), "sports".to_string()]).await?;
    
    // メッセージの受信
    while let Some(msg) = subscriber.next_message().await? {
        println!("Received: {} on channel {}", 
                std::str::from_utf8(&msg.content)?, 
                msg.channel);
    }
    
    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    // Subscriber を先に起動
    let (tx, rx) = oneshot::channel();
    
    let subscriber_handle = tokio::spawn(async move {
        let _ = tx.send(());
        subscriber().await
    });
    
    // Subscriber の起動を待機
    let _ = rx.await;
    
    // Publisher を起動
    let publisher_handle = tokio::spawn(publisher());
    
    // 両方の完了を待機
    let (sub_result, pub_result) = tokio::join!(subscriber_handle, publisher_handle);
    
    sub_result??;
    pub_result??;
    
    Ok(())
}

サーバーの実装(基本版)

use mini_redis::{server, DEFAULT_PORT};
use tokio::net::{TcpListener, TcpStream};
use tokio::signal;

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    // ロギングの設定
    tracing_subscriber::fmt()
        .with_target(false)
        .init();
        
    let listener = TcpListener::bind(&format!("127.0.0.1:{}", DEFAULT_PORT)).await?;
    
    println!("Mini-Redis server listening on port {}", DEFAULT_PORT);
    
    // サーバーの実行(Ctrl+Cでのグレースフルシャットダウン対応)
    server::run(listener, signal::ctrl_c()).await;
    
    Ok(())
}

接続制限付きサーバー

use mini_redis::server;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6379").await?;
    
    // 最大250の同時接続を許可
    let max_connections = Arc::new(Semaphore::new(250));
    
    loop {
        // 接続許可の取得
        let permit = max_connections.clone().acquire_owned().await.unwrap();
        
        let (socket, addr) = listener.accept().await?;
        println!("New connection from: {}", addr);
        
        // 接続ごとにタスクを生成
        tokio::spawn(async move {
            // permit がドロップされるまで接続スロットを保持
            let _permit = permit;
            
            if let Err(err) = server::handle_connection(socket).await {
                eprintln!("Connection error: {}", err);
            }
        });
    }
}

カスタムコマンド実装

use mini_redis::{Connection, Frame};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::net::{TcpListener, TcpStream};

type Db = Arc<Mutex<HashMap<String, bytes::Bytes>>>;

async fn handle_connection(socket: TcpStream, db: Db) {
    let mut connection = Connection::new(socket);
    
    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match frame {
            Frame::Array(frames) => {
                if frames.is_empty() {
                    Frame::Error("empty command".to_string())
                } else {
                    match &frames[0] {
                        Frame::Bulk(cmd) if cmd == b"GET" => {
                            if frames.len() != 2 {
                                Frame::Error("GET requires exactly 1 argument".to_string())
                            } else if let Frame::Bulk(key) = &frames[1] {
                                let key = std::str::from_utf8(key).unwrap();
                                let db = db.lock().unwrap();
                                
                                match db.get(key) {
                                    Some(value) => Frame::Bulk(value.clone()),
                                    None => Frame::Null,
                                }
                            } else {
                                Frame::Error("invalid key".to_string())
                            }
                        }
                        Frame::Bulk(cmd) if cmd == b"SET" => {
                            if frames.len() != 3 {
                                Frame::Error("SET requires exactly 2 arguments".to_string())
                            } else if let (Frame::Bulk(key), Frame::Bulk(value)) = (&frames[1], &frames[2]) {
                                let key = std::str::from_utf8(key).unwrap().to_string();
                                let mut db = db.lock().unwrap();
                                db.insert(key, value.clone());
                                Frame::Simple("OK".to_string())
                            } else {
                                Frame::Error("invalid arguments".to_string())
                            }
                        }
                        _ => Frame::Error("unknown command".to_string()),
                    }
                }
            }
            _ => Frame::Error("invalid frame type".to_string()),
        };
        
        connection.write_frame(&response).await.unwrap();
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:6379").await?;
    let db: Db = Arc::new(Mutex::new(HashMap::new()));
    
    println!("Custom Mini-Redis server listening on port 6379");
    
    loop {
        let (socket, _) = listener.accept().await?;
        let db = db.clone();
        
        tokio::spawn(async move {
            handle_connection(socket, db).await;
        });
    }
}

テストの作成例

use mini_redis::{clients::Client, server};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;

async fn start_server() -> (SocketAddr, JoinHandle<()>) {
    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
    let addr = listener.local_addr().unwrap();

    let handle = tokio::spawn(async move {
        server::run(listener, tokio::signal::ctrl_c()).await
    });

    (addr, handle)
}

#[tokio::test]
async fn key_value_get_set() {
    let (addr, _handle) = start_server().await;

    let mut client = Client::connect(addr).await.unwrap();
    
    // SET操作のテスト
    client.set("hello", "world".into()).await.unwrap();

    // GET操作のテスト
    let value = client.get("hello").await.unwrap().unwrap();
    assert_eq!(b"world", &value[..]);
    
    // 存在しないキーのテスト
    let missing = client.get("missing").await.unwrap();
    assert!(missing.is_none());
}

#[tokio::test]
async fn pub_sub_test() {
    let (addr, _handle) = start_server().await;

    let client = Client::connect(addr).await.unwrap();
    let mut subscriber = client
        .subscribe(vec!["test-channel".to_string()])
        .await
        .unwrap();

    // 別のクライアントでメッセージ送信
    let mut publisher = Client::connect(addr).await.unwrap();
    publisher
        .publish("test-channel", "test-message".into())
        .await
        .unwrap();

    // メッセージ受信の確認
    let message = subscriber.next_message().await.unwrap().unwrap();
    assert_eq!("test-channel", message.channel);
    assert_eq!(b"test-message", &message.content[..]);
}