tokio-postgres

tokio-postgres is developed as "A native, asynchronous PostgreSQL client for Rust" - an asynchronous PostgreSQL client library for Rust. Fully integrated with the Tokio runtime, it provides non-blocking, high-performance database operations. It offers direct, low-level access to SQL queries without heavyweight ORM abstraction layers, making it ideal for performance-critical applications and systems requiring complete SQL control.

RustPostgreSQLAsyncDatabase DriverNon-blockingRaw SQL

Library

tokio-postgres

Overview

tokio-postgres is developed as "A native, asynchronous PostgreSQL client for Rust" - an asynchronous PostgreSQL client library for Rust. Fully integrated with the Tokio runtime, it provides non-blocking, high-performance database operations. It offers direct, low-level access to SQL queries without heavyweight ORM abstraction layers, making it ideal for performance-critical applications and systems requiring complete SQL control.

Details

tokio-postgres 2025 edition leverages the latest async/await features of Rust 1.75+ and the Tokio runtime to provide high concurrency and efficiency. It comprehensively supports PostgreSQL's advanced features including TLS connection support (via openssl and native-tls adapters), prepared statements, cursors, transaction management, COPY operations, and notification systems. Integration with external connection pools (bb8, deadpool, etc.) achieves the robustness and scalability necessary for enterprise-level application development.

Key Features

  • Fully Asynchronous: High concurrency through deep integration with Tokio runtime
  • Low-level SQL Control: Direct SQL execution without ORM overhead
  • High Performance: Zero-copy optimization and non-blocking I/O
  • Complete PostgreSQL Support: All features including prepared statements, notifications, COPY
  • TLS Support: Encrypted communication via openssl and native-tls
  • Connection Pool Integration: Support for external pools like bb8 and deadpool

Pros and Cons

Pros

  • Complete SQL control and maximum performance
  • High concurrency and scalability through asynchronous processing
  • Full utilization of PostgreSQL-specific features (notifications, arrays, JSON, etc.)
  • Direct database access without ORM overhead
  • Memory-efficient zero-copy optimization
  • Secure SQL execution leveraging Rust's type safety

Cons

  • Raw SQL writing required without ORM-like abstraction
  • PostgreSQL-only with no multi-database support
  • Complex mapping and relationship management requires manual implementation
  • High initial development cost due to boilerplate code
  • Requires understanding of asynchronous programming
  • Complexity in error handling and connection management

Reference Pages

Code Examples

Setup

# Cargo.toml
[dependencies]
tokio-postgres = "0.7"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.0", features = ["v4", "serde"] }

# TLS support (optional)
postgres-openssl = "0.5"  # When using OpenSSL
postgres-native-tls = "0.5"  # When using native-tls

# Connection pool (recommended)
bb8 = "0.8"
bb8-postgres = "0.8"
# PostgreSQL setup
# Using Docker
docker run --name postgres-tokio \
  -e POSTGRES_DB=testdb \
  -e POSTGRES_USER=testuser \
  -e POSTGRES_PASSWORD=testpass \
  -p 5432:5432 \
  -d postgres:15

Basic Usage

use tokio_postgres::{NoTls, Error, Row};
use std::collections::HashMap;

#[derive(Debug)]
struct User {
    id: i32,
    username: String,
    email: String,
    created_at: chrono::NaiveDateTime,
}

impl From<Row> for User {
    fn from(row: Row) -> Self {
        Self {
            id: row.get("id"),
            username: row.get("username"),
            email: row.get("email"),
            created_at: row.get("created_at"),
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Database connection
    let (client, connection) = tokio_postgres::connect(
        "postgresql://testuser:testpass@localhost:5432/testdb", 
        NoTls
    ).await?;
    
    // Manage connection object in separate task
    let connection_handle = tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("Connection error: {}", e);
        }
    });
    
    // Create table
    client.execute(
        "CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            username VARCHAR(50) UNIQUE NOT NULL,
            email VARCHAR(100) UNIQUE NOT NULL,
            created_at TIMESTAMP DEFAULT NOW()
        )",
        &[]
    ).await?;
    
    // Insert user
    let insert_stmt = client.prepare(
        "INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id, created_at"
    ).await?;
    
    let row = client.query_one(
        &insert_stmt, 
        &[&"john_doe", &"[email protected]"]
    ).await?;
    
    let user_id: i32 = row.get(0);
    let created_at: chrono::NaiveDateTime = row.get(1);
    
    println!("Created user ID: {} at {}", user_id, created_at);
    
    // Search user
    let users = client.query(
        "SELECT id, username, email, created_at FROM users WHERE id = $1", 
        &[&user_id]
    ).await?;
    
    for row in users {
        let user = User::from(row);
        println!("Found user: {:?}", user);
    }
    
    // Wait for connection completion
    connection_handle.await?;
    
    Ok(())
}

Prepared Statements and Transactions

use tokio_postgres::{NoTls, Transaction};
use std::error::Error;

#[derive(Debug)]
struct Post {
    id: i32,
    title: String,
    content: String,
    author_id: i32,
    published: bool,
}

struct DatabaseService {
    client: tokio_postgres::Client,
}

impl DatabaseService {
    async fn new() -> Result<Self, Box<dyn Error>> {
        let (client, connection) = tokio_postgres::connect(
            "postgresql://testuser:testpass@localhost:5432/testdb",
            NoTls
        ).await?;
        
        tokio::spawn(async move {
            if let Err(e) = connection.await {
                eprintln!("Connection error: {}", e);
            }
        });
        
        Ok(Self { client })
    }
    
    // Transaction usage example
    async fn create_user_with_post(
        &self,
        username: &str,
        email: &str,
        title: &str,
        content: &str
    ) -> Result<(i32, i32), Box<dyn Error>> {
        let transaction = self.client.transaction().await?;
        
        // Create user
        let user_row = transaction.query_one(
            "INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id",
            &[&username, &email]
        ).await?;
        
        let user_id: i32 = user_row.get(0);
        
        // Create post
        let post_row = transaction.query_one(
            "INSERT INTO posts (title, content, author_id, published) 
             VALUES ($1, $2, $3, $4) RETURNING id",
            &[&title, &content, &user_id, &true]
        ).await?;
        
        let post_id: i32 = post_row.get(0);
        
        // Commit transaction
        transaction.commit().await?;
        
        Ok((user_id, post_id))
    }
    
    // Leveraging prepared statements
    async fn find_posts_by_author(&self, author_id: i32) -> Result<Vec<Post>, Box<dyn Error>> {
        let stmt = self.client.prepare(
            "SELECT id, title, content, author_id, published 
             FROM posts 
             WHERE author_id = $1 
             ORDER BY created_at DESC"
        ).await?;
        
        let rows = self.client.query(&stmt, &[&author_id]).await?;
        
        let posts = rows.into_iter().map(|row| {
            Post {
                id: row.get(0),
                title: row.get(1),
                content: row.get(2),
                author_id: row.get(3),
                published: row.get(4),
            }
        }).collect();
        
        Ok(posts)
    }
    
    // Batch operations
    async fn bulk_update_posts_status(
        &self, 
        post_ids: &[i32], 
        published: bool
    ) -> Result<u64, Box<dyn Error>> {
        let transaction = self.client.transaction().await?;
        
        let stmt = transaction.prepare(
            "UPDATE posts SET published = $1 WHERE id = $2"
        ).await?;
        
        let mut total_affected = 0u64;
        
        for &post_id in post_ids {
            let affected = transaction.execute(&stmt, &[&published, &post_id]).await?;
            total_affected += affected;
        }
        
        transaction.commit().await?;
        
        Ok(total_affected)
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Table preparation
    let (client, connection) = tokio_postgres::connect(
        "postgresql://testuser:testpass@localhost:5432/testdb",
        NoTls
    ).await?;
    
    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("Connection error: {}", e);
        }
    });
    
    client.execute(
        "CREATE TABLE IF NOT EXISTS posts (
            id SERIAL PRIMARY KEY,
            title VARCHAR(200) NOT NULL,
            content TEXT,
            author_id INTEGER REFERENCES users(id),
            published BOOLEAN DEFAULT FALSE,
            created_at TIMESTAMP DEFAULT NOW()
        )",
        &[]
    ).await?;
    
    drop(client);
    
    // Service usage
    let service = DatabaseService::new().await?;
    
    let (user_id, post_id) = service.create_user_with_post(
        "alice",
        "[email protected]",
        "My First Post",
        "This is the content of my first post."
    ).await?;
    
    println!("Created user {} with post {}", user_id, post_id);
    
    let posts = service.find_posts_by_author(user_id).await?;
    println!("Found {} posts", posts.len());
    
    Ok(())
}

Connection Pool and Asynchronous Performance Optimization

use bb8::{Pool, PooledConnection};
use bb8_postgres::PostgresConnectionManager;
use tokio_postgres::{NoTls, Config};
use std::str::FromStr;
use std::time::Duration;

type ConnectionPool = Pool<PostgresConnectionManager<NoTls>>;
type Connection<'a> = PooledConnection<'a, PostgresConnectionManager<NoTls>>;

#[derive(Clone)]
struct PooledDatabaseService {
    pool: ConnectionPool,
}

impl PooledDatabaseService {
    async fn new() -> Result<Self, Box<dyn std::error::Error>> {
        let config = Config::from_str("postgresql://testuser:testpass@localhost:5432/testdb")?;
        let manager = PostgresConnectionManager::new(config, NoTls);
        
        let pool = Pool::builder()
            .max_size(15)
            .min_idle(Some(5))
            .connection_timeout(Duration::from_secs(30))
            .idle_timeout(Some(Duration::from_secs(600)))
            .max_lifetime(Some(Duration::from_secs(1800)))
            .build(manager)
            .await?;
        
        Ok(Self { pool })
    }
    
    async fn get_connection(&self) -> Result<Connection<'_>, bb8::RunError<tokio_postgres::Error>> {
        self.pool.get().await
    }
    
    // Batch loading with concurrent processing
    async fn get_users_with_post_counts(&self, user_ids: &[i32]) 
        -> Result<Vec<(i32, String, i64)>, Box<dyn std::error::Error>> {
        
        let futures = user_ids.iter().map(|&user_id| {
            let pool = self.pool.clone();
            async move {
                let conn = pool.get().await?;
                let row = conn.query_one(
                    "SELECT u.id, u.username, COUNT(p.id) as post_count
                     FROM users u
                     LEFT JOIN posts p ON u.id = p.author_id
                     WHERE u.id = $1
                     GROUP BY u.id, u.username",
                    &[&user_id]
                ).await?;
                
                Ok::<_, Box<dyn std::error::Error + Send>>((
                    row.get::<_, i32>(0),
                    row.get::<_, String>(1),
                    row.get::<_, i64>(2),
                ))
            }
        });
        
        let results = futures_util::future::try_join_all(futures).await?;
        Ok(results)
    }
    
    // Streaming processing
    async fn stream_large_dataset<F>(&self, mut processor: F) 
        -> Result<(), Box<dyn std::error::Error>>
    where
        F: FnMut(i32, String) -> Result<(), Box<dyn std::error::Error>>,
    {
        let conn = self.get_connection().await?;
        
        let transaction = conn.transaction().await?;
        
        // Streaming using cursors
        transaction.execute("BEGIN", &[]).await?;
        transaction.execute(
            "DECLARE user_cursor CURSOR FOR SELECT id, username FROM users ORDER BY id",
            &[]
        ).await?;
        
        loop {
            let rows = transaction.query("FETCH 100 FROM user_cursor", &[]).await?;
            
            if rows.is_empty() {
                break;
            }
            
            for row in rows {
                let id: i32 = row.get(0);
                let username: String = row.get(1);
                processor(id, username)?;
            }
        }
        
        transaction.execute("CLOSE user_cursor", &[]).await?;
        transaction.commit().await?;
        
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let service = PooledDatabaseService::new().await?;
    
    // Concurrent processing test
    let user_ids = vec![1, 2, 3, 4, 5];
    let results = service.get_users_with_post_counts(&user_ids).await?;
    
    for (id, username, count) in results {
        println!("User {} ({}) has {} posts", id, username, count);
    }
    
    // Streaming processing test
    println!("Streaming users:");
    service.stream_large_dataset(|id, username| {
        println!("Processing user: {} - {}", id, username);
        Ok(())
    }).await?;
    
    Ok(())
}

Error Handling and Monitoring

use tokio_postgres::{Error as PgError, ErrorKind};
use std::time::{Duration, Instant};
use tracing::{info, warn, error, instrument};

#[derive(Debug)]
enum DatabaseError {
    ConnectionError(String),
    QueryError(String),
    TransactionError(String),
    UniqueViolation(String),
    NotFound,
}

impl From<PgError> for DatabaseError {
    fn from(err: PgError) -> Self {
        match err.kind() {
            ErrorKind::UniqueViolation => {
                DatabaseError::UniqueViolation(err.to_string())
            }
            ErrorKind::ConnectionDead | ErrorKind::ConnectionFailed => {
                DatabaseError::ConnectionError(err.to_string())
            }
            _ => DatabaseError::QueryError(err.to_string()),
        }
    }
}

struct MonitoredDatabaseService {
    pool: ConnectionPool,
}

impl MonitoredDatabaseService {
    #[instrument(skip(self))]
    async fn create_user_with_monitoring(
        &self,
        username: &str,
        email: &str
    ) -> Result<i32, DatabaseError> {
        let start = Instant::now();
        
        let conn = self.pool.get().await.map_err(|e| {
            error!("Failed to get connection: {}", e);
            DatabaseError::ConnectionError(e.to_string())
        })?;
        
        let result = conn.query_one(
            "INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id",
            &[&username, &email]
        ).await;
        
        let duration = start.elapsed();
        
        match result {
            Ok(row) => {
                let user_id: i32 = row.get(0);
                info!(
                    duration_ms = duration.as_millis() as u64,
                    user_id = user_id,
                    "User created successfully"
                );
                Ok(user_id)
            }
            Err(e) => {
                error!(
                    duration_ms = duration.as_millis() as u64,
                    error = %e,
                    "Failed to create user"
                );
                Err(DatabaseError::from(e))
            }
        }
    }
    
    // Query execution with retry functionality
    async fn query_with_retry<T, F>(
        &self,
        operation: F,
        max_retries: u32
    ) -> Result<T, DatabaseError>
    where
        F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, PgError>> + Send>>,
    {
        let mut last_error = None;
        
        for attempt in 0..=max_retries {
            match operation().await {
                Ok(result) => return Ok(result),
                Err(e) => {
                    warn!(
                        attempt = attempt,
                        max_retries = max_retries,
                        error = %e,
                        "Query attempt failed"
                    );
                    
                    if attempt < max_retries {
                        let delay = Duration::from_millis(100 * (1 << attempt));
                        tokio::time::sleep(delay).await;
                    }
                    
                    last_error = Some(e);
                }
            }
        }
        
        Err(DatabaseError::from(last_error.unwrap()))
    }
    
    // Health check
    async fn health_check(&self) -> Result<Duration, DatabaseError> {
        let start = Instant::now();
        
        let conn = self.pool.get().await.map_err(|e| {
            DatabaseError::ConnectionError(e.to_string())
        })?;
        
        conn.query_one("SELECT 1", &[]).await?;
        
        Ok(start.elapsed())
    }
}

// Periodic monitoring task
async fn monitoring_task(service: MonitoredDatabaseService) {
    let mut interval = tokio::time::interval(Duration::from_secs(30));
    
    loop {
        interval.tick().await;
        
        match service.health_check().await {
            Ok(duration) => {
                info!(
                    health_check_ms = duration.as_millis() as u64,
                    "Database health check passed"
                );
            }
            Err(e) => {
                error!(error = ?e, "Database health check failed");
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt::init();
    
    let config = Config::from_str("postgresql://testuser:testpass@localhost:5432/testdb")?;
    let manager = PostgresConnectionManager::new(config, NoTls);
    let pool = Pool::builder().build(manager).await?;
    
    let service = MonitoredDatabaseService { pool };
    
    // Start monitoring task
    let monitor_service = service.clone();
    let monitor_handle = tokio::spawn(monitoring_task(monitor_service));
    
    // Main processing
    match service.create_user_with_monitoring("monitor_user", "[email protected]").await {
        Ok(user_id) => info!(user_id = user_id, "User created successfully"),
        Err(e) => error!(error = ?e, "Failed to create user"),
    }
    
    // Wait a bit to check monitoring logs
    tokio::time::sleep(Duration::from_secs(65)).await;
    monitor_handle.abort();
    
    Ok(())
}