Mini Redis

RustRedisTokioAsyncLearning MaterialIn-Memory DB

GitHub Overview

tokio-rs/mini-redis

Incomplete Redis client and server implementation using Tokio - for learning purposes only

Stars4,450
Watchers45
Forks531
Created:December 4, 2019
Language:Rust
License:MIT License

Topics

None

Star History

tokio-rs/mini-redis Star History
Data as of: 10/22/2025, 08:07 AM

Cache Library

Mini Redis

Overview

Mini Redis is a learning-oriented Redis server and client implementation built with the Tokio framework.

Details

Mini Redis is a minimal Redis implementation designed specifically for learning asynchronous Rust application development. Developed and maintained by the official Tokio-rs team, it serves as a comprehensive educational resource for learning how to use the Tokio ecosystem. It is not intended for production use but is designed to systematically teach the concepts needed for practical Rust application development, including Tokio's asynchronous programming patterns, framing, graceful shutdown, Pub/Sub, and connection management. The implementation includes basic Redis commands such as GET/SET, PUBLISH/SUBSCRIBE, and PING, along with essential server application features like TCP connection management, Redis wire protocol parsing and encoding, background task-based key expiration management, and semaphore-based connection limiting. Positioned as the official Tokio tutorial, it provides documentation and rich code examples that allow learners to gradually understand the process of building Redis clients and servers. The project demonstrates useful patterns including server connection handling, client modeling, database accessibility across connections, wire protocol implementation, and graceful shutdown procedures.

Pros and Cons

Pros

  • Learning-Focused: Optimized specifically for learning Tokio async programming
  • Practical Code Examples: Includes comprehensive real-world server application patterns
  • Comprehensive Tutorial: Provides step-by-step learning process
  • Official Support: Continuously maintained by the Tokio team
  • Rich Patterns: Covers framing, Pub/Sub, graceful shutdown, and more
  • Detailed Documentation: Provides code explanations and best practices
  • Production-Ready Patterns: Design patterns applicable to real development

Cons

  • Not Production-Ready: Discouraged for production use as it's for learning
  • Limited Features: Basic functionality only, not a complete Redis implementation
  • No Performance Optimization: Learning-focused with limited performance optimization
  • Scalability Limitations: Not optimized for large-scale systems
  • Partial Protocol Implementation: Not all Redis features are implemented

Key Links

Code Examples

Installation and Basic Setup

# Install Mini-Redis server
cargo install mini-redis

# Add dependency to project (Cargo.toml)
[dependencies]
mini-redis = "0.4"
tokio = { version = "1.0", features = ["full"] }

Server Startup and Basic Operations

# Start server
RUST_LOG=debug cargo run --bin mini-redis-server

# CLI client operations in separate terminal
cargo run --bin mini-redis-cli set foo bar
cargo run --bin mini-redis-cli get foo
cargo run --bin mini-redis-cli ping

Basic Client Connection

use mini_redis::clients::Client;

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    // Connect to server
    let mut client = Client::connect("127.0.0.1:6379").await?;
    
    // Test connection
    let pong = client.ping(None).await?;
    println!("Server response: {:?}", pong);
    
    Ok(())
}

GET/SET Operations Implementation

use mini_redis::clients::Client;
use bytes::Bytes;

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    let mut client = Client::connect("127.0.0.1:6379").await?;
    
    // SET operations
    client.set("hello", "world".into()).await?;
    client.set("user:1", "Alice".into()).await?;
    
    // GET operations
    let value = client.get("hello").await?;
    match value {
        Some(val) => {
            println!("Got value: {}", std::str::from_utf8(&val)?);
        }
        None => {
            println!("Key not found");
        }
    }
    
    // Multiple key operations
    let keys = vec!["hello", "user:1", "nonexistent"];
    for key in keys {
        match client.get(key).await? {
            Some(value) => {
                println!("{} = {}", key, std::str::from_utf8(&value)?);
            }
            None => {
                println!("{} = (not found)", key);
            }
        }
    }
    
    Ok(())
}

Pub/Sub Functionality Implementation

use mini_redis::clients::{Client, BlockingClient};
use tokio::sync::oneshot;

async fn publisher() -> mini_redis::Result<()> {
    let mut client = Client::connect("127.0.0.1:6379").await?;
    
    // Publish messages to channels
    client.publish("news", "Breaking news!".into()).await?;
    client.publish("sports", "Game results".into()).await?;
    client.publish("weather", "Sunny day".into()).await?;
    
    Ok(())
}

async fn subscriber() -> mini_redis::Result<()> {
    let client = Client::connect("127.0.0.1:6379").await?;
    let mut subscriber = client.subscribe(vec!["news".to_string(), "sports".to_string()]).await?;
    
    // Receive messages
    while let Some(msg) = subscriber.next_message().await? {
        println!("Received: {} on channel {}", 
                std::str::from_utf8(&msg.content)?, 
                msg.channel);
    }
    
    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    // Start subscriber first
    let (tx, rx) = oneshot::channel();
    
    let subscriber_handle = tokio::spawn(async move {
        let _ = tx.send(());
        subscriber().await
    });
    
    // Wait for subscriber to start
    let _ = rx.await;
    
    // Start publisher
    let publisher_handle = tokio::spawn(publisher());
    
    // Wait for both to complete
    let (sub_result, pub_result) = tokio::join!(subscriber_handle, publisher_handle);
    
    sub_result??;
    pub_result??;
    
    Ok(())
}

Server Implementation (Basic Version)

use mini_redis::{server, DEFAULT_PORT};
use tokio::net::{TcpListener, TcpStream};
use tokio::signal;

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    // Configure logging
    tracing_subscriber::fmt()
        .with_target(false)
        .init();
        
    let listener = TcpListener::bind(&format!("127.0.0.1:{}", DEFAULT_PORT)).await?;
    
    println!("Mini-Redis server listening on port {}", DEFAULT_PORT);
    
    // Run server with graceful shutdown on Ctrl+C
    server::run(listener, signal::ctrl_c()).await;
    
    Ok(())
}

Server with Connection Limiting

use mini_redis::server;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6379").await?;
    
    // Allow maximum 250 concurrent connections
    let max_connections = Arc::new(Semaphore::new(250));
    
    loop {
        // Acquire connection permit
        let permit = max_connections.clone().acquire_owned().await.unwrap();
        
        let (socket, addr) = listener.accept().await?;
        println!("New connection from: {}", addr);
        
        // Spawn task for each connection
        tokio::spawn(async move {
            // Hold connection slot until permit is dropped
            let _permit = permit;
            
            if let Err(err) = server::handle_connection(socket).await {
                eprintln!("Connection error: {}", err);
            }
        });
    }
}

Custom Command Implementation

use mini_redis::{Connection, Frame};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::net::{TcpListener, TcpStream};

type Db = Arc<Mutex<HashMap<String, bytes::Bytes>>>;

async fn handle_connection(socket: TcpStream, db: Db) {
    let mut connection = Connection::new(socket);
    
    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match frame {
            Frame::Array(frames) => {
                if frames.is_empty() {
                    Frame::Error("empty command".to_string())
                } else {
                    match &frames[0] {
                        Frame::Bulk(cmd) if cmd == b"GET" => {
                            if frames.len() != 2 {
                                Frame::Error("GET requires exactly 1 argument".to_string())
                            } else if let Frame::Bulk(key) = &frames[1] {
                                let key = std::str::from_utf8(key).unwrap();
                                let db = db.lock().unwrap();
                                
                                match db.get(key) {
                                    Some(value) => Frame::Bulk(value.clone()),
                                    None => Frame::Null,
                                }
                            } else {
                                Frame::Error("invalid key".to_string())
                            }
                        }
                        Frame::Bulk(cmd) if cmd == b"SET" => {
                            if frames.len() != 3 {
                                Frame::Error("SET requires exactly 2 arguments".to_string())
                            } else if let (Frame::Bulk(key), Frame::Bulk(value)) = (&frames[1], &frames[2]) {
                                let key = std::str::from_utf8(key).unwrap().to_string();
                                let mut db = db.lock().unwrap();
                                db.insert(key, value.clone());
                                Frame::Simple("OK".to_string())
                            } else {
                                Frame::Error("invalid arguments".to_string())
                            }
                        }
                        _ => Frame::Error("unknown command".to_string()),
                    }
                }
            }
            _ => Frame::Error("invalid frame type".to_string()),
        };
        
        connection.write_frame(&response).await.unwrap();
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:6379").await?;
    let db: Db = Arc::new(Mutex::new(HashMap::new()));
    
    println!("Custom Mini-Redis server listening on port 6379");
    
    loop {
        let (socket, _) = listener.accept().await?;
        let db = db.clone();
        
        tokio::spawn(async move {
            handle_connection(socket, db).await;
        });
    }
}

Test Creation Example

use mini_redis::{clients::Client, server};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;

async fn start_server() -> (SocketAddr, JoinHandle<()>) {
    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
    let addr = listener.local_addr().unwrap();

    let handle = tokio::spawn(async move {
        server::run(listener, tokio::signal::ctrl_c()).await
    });

    (addr, handle)
}

#[tokio::test]
async fn key_value_get_set() {
    let (addr, _handle) = start_server().await;

    let mut client = Client::connect(addr).await.unwrap();
    
    // Test SET operation
    client.set("hello", "world".into()).await.unwrap();

    // Test GET operation
    let value = client.get("hello").await.unwrap().unwrap();
    assert_eq!(b"world", &value[..]);
    
    // Test non-existent key
    let missing = client.get("missing").await.unwrap();
    assert!(missing.is_none());
}

#[tokio::test]
async fn pub_sub_test() {
    let (addr, _handle) = start_server().await;

    let client = Client::connect(addr).await.unwrap();
    let mut subscriber = client
        .subscribe(vec!["test-channel".to_string()])
        .await
        .unwrap();

    // Send message from different client
    let mut publisher = Client::connect(addr).await.unwrap();
    publisher
        .publish("test-channel", "test-message".into())
        .await
        .unwrap();

    // Verify message reception
    let message = subscriber.next_message().await.unwrap().unwrap();
    assert_eq!("test-channel", message.channel);
    assert_eq!(b"test-message", &message.content[..]);
}

Advanced Example: Custom Protocol Extension

use mini_redis::{Connection, Frame};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;

#[derive(Serialize, Deserialize, Clone)]
struct UserData {
    name: String,
    email: String,
    age: u32,
}

type UserDb = Arc<Mutex<HashMap<String, UserData>>>;

async fn handle_user_commands(socket: TcpStream, user_db: UserDb) {
    let mut connection = Connection::new(socket);
    
    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match frame {
            Frame::Array(frames) if frames.len() >= 2 => {
                match &frames[0] {
                    Frame::Bulk(cmd) if cmd == b"SETUSER" && frames.len() == 5 => {
                        if let (
                            Frame::Bulk(id), 
                            Frame::Bulk(name), 
                            Frame::Bulk(email), 
                            Frame::Bulk(age_bytes)
                        ) = (&frames[1], &frames[2], &frames[3], &frames[4]) {
                            let id = std::str::from_utf8(id).unwrap().to_string();
                            let name = std::str::from_utf8(name).unwrap().to_string();
                            let email = std::str::from_utf8(email).unwrap().to_string();
                            let age = std::str::from_utf8(age_bytes).unwrap().parse::<u32>().unwrap_or(0);
                            
                            let user = UserData { name, email, age };
                            let mut db = user_db.lock().unwrap();
                            db.insert(id, user);
                            
                            Frame::Simple("OK".to_string())
                        } else {
                            Frame::Error("Invalid SETUSER arguments".to_string())
                        }
                    }
                    Frame::Bulk(cmd) if cmd == b"GETUSER" && frames.len() == 2 => {
                        if let Frame::Bulk(id) = &frames[1] {
                            let id = std::str::from_utf8(id).unwrap();
                            let db = user_db.lock().unwrap();
                            
                            match db.get(id) {
                                Some(user) => {
                                    let user_json = serde_json::to_string(user).unwrap();
                                    Frame::Bulk(user_json.into())
                                }
                                None => Frame::Null,
                            }
                        } else {
                            Frame::Error("Invalid GETUSER argument".to_string())
                        }
                    }
                    _ => Frame::Error("Unknown command".to_string()),
                }
            }
            _ => Frame::Error("Invalid command format".to_string()),
        };
        
        connection.write_frame(&response).await.unwrap();
    }
}