tokio-postgres

tokio-postgresは「A native, asynchronous PostgreSQL client for Rust」として開発された、Rustのための非同期PostgreSQLクライアントライブラリです。Tokioランタイムと完全に統合されており、ノンブロッキングでパフォーマンスの高いデータベース操作を実現します。SQLクエリへの直接的で低レベルなアクセスを提供し、高度なORMの抽象化レイヤーを持たないため、パフォーマンスクリティカルなアプリケーションや、SQLの完全な制御が必要なシステムに最適です。

RustPostgreSQLAsyncDatabase DriverNon-blockingRaw SQL

ライブラリ

tokio-postgres

概要

tokio-postgresは「A native, asynchronous PostgreSQL client for Rust」として開発された、Rustのための非同期PostgreSQLクライアントライブラリです。Tokioランタイムと完全に統合されており、ノンブロッキングでパフォーマンスの高いデータベース操作を実現します。SQLクエリへの直接的で低レベルなアクセスを提供し、高度なORMの抽象化レイヤーを持たないため、パフォーマンスクリティカルなアプリケーションや、SQLの完全な制御が必要なシステムに最適です。

詳細

tokio-postgres 2025年版は、Rust 1.75以降の最新async/await機能とTokioランタイムを活用し、高い並行性と効率性を提供します。TLS接続のサポート(openssl、native-tlsアダプタ経由)、プリペアドステートメント、カーソル、トランザクション管理、COPYオペレーション、通知システムなど、PostgreSQLの高度な機能を包括的にサポートします。外部の接続プール(bb8、deadpool等)との統合により、エンタープライズレベルのアプリケーション開発に必要な堅牢性とスケーラビリティを実現します。

主な特徴

  • 完全非同期: Tokioランタイムとの深い統合による高い並行性
  • 低レベルSQL制御: ORMオーバーヘッドなしの直接的なSQL実行
  • 高性能: ゼロコピー最適化とノンブロッキングI/O
  • PostgreSQL完全対応: プリペアドステートメント、通知、COPY等の全機能
  • TLS対応: openssl、native-tlsによる暗号化通信
  • 接続プール統合: bb8、deadpoolなどの外部プール対応

メリット・デメリット

メリット

  • SQLの完全な制御と最大限のパフォーマンス発揮
  • 非同期処理による高い並行性とスケーラビリティ
  • PostgreSQL固有機能(通知、配列、JSON等)の完全活用
  • ORMオーバーヘッドなしの直接的なデータベースアクセス
  • メモリ効率の高いゼロコピー最適化
  • Rustの型安全性を活かしたセキュアなSQL実行

デメリット

  • 生SQLでの記述が必要でORMのような抽象化なし
  • PostgreSQL専用でマルチデータベース対応なし
  • 複雑なマッピングやリレーションシップ管理は手動実装
  • ボイラープレートコードが多く初期開発コストが高い
  • 非同期プログラミングの理解が必要
  • エラーハンドリングと接続管理の複雑性

参考ページ

書き方の例

セットアップ

# Cargo.toml
[dependencies]
tokio-postgres = "0.7"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.0", features = ["v4", "serde"] }

# TLS対応(任意)
postgres-openssl = "0.5"  # OpenSSL使用時
postgres-native-tls = "0.5"  # native-tls使用時

# 接続プール(推奨)
bb8 = "0.8"
bb8-postgres = "0.8"
# PostgreSQL準備
# Docker使用の場合
docker run --name postgres-tokio \
  -e POSTGRES_DB=testdb \
  -e POSTGRES_USER=testuser \
  -e POSTGRES_PASSWORD=testpass \
  -p 5432:5432 \
  -d postgres:15

基本的な使い方

use tokio_postgres::{NoTls, Error, Row};
use std::collections::HashMap;

#[derive(Debug)]
struct User {
    id: i32,
    username: String,
    email: String,
    created_at: chrono::NaiveDateTime,
}

impl From<Row> for User {
    fn from(row: Row) -> Self {
        Self {
            id: row.get("id"),
            username: row.get("username"),
            email: row.get("email"),
            created_at: row.get("created_at"),
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // データベース接続
    let (client, connection) = tokio_postgres::connect(
        "postgresql://testuser:testpass@localhost:5432/testdb", 
        NoTls
    ).await?;
    
    // 接続オブジェクトを別タスクで管理
    let connection_handle = tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("Connection error: {}", e);
        }
    });
    
    // テーブル作成
    client.execute(
        "CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            username VARCHAR(50) UNIQUE NOT NULL,
            email VARCHAR(100) UNIQUE NOT NULL,
            created_at TIMESTAMP DEFAULT NOW()
        )",
        &[]
    ).await?;
    
    // ユーザー挿入
    let insert_stmt = client.prepare(
        "INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id, created_at"
    ).await?;
    
    let row = client.query_one(
        &insert_stmt, 
        &[&"john_doe", &"[email protected]"]
    ).await?;
    
    let user_id: i32 = row.get(0);
    let created_at: chrono::NaiveDateTime = row.get(1);
    
    println!("Created user ID: {} at {}", user_id, created_at);
    
    // ユーザー検索
    let users = client.query(
        "SELECT id, username, email, created_at FROM users WHERE id = $1", 
        &[&user_id]
    ).await?;
    
    for row in users {
        let user = User::from(row);
        println!("Found user: {:?}", user);
    }
    
    // 接続終了待機
    connection_handle.await?;
    
    Ok(())
}

プリペアドステートメントとトランザクション

use tokio_postgres::{NoTls, Transaction};
use std::error::Error;

#[derive(Debug)]
struct Post {
    id: i32,
    title: String,
    content: String,
    author_id: i32,
    published: bool,
}

struct DatabaseService {
    client: tokio_postgres::Client,
}

impl DatabaseService {
    async fn new() -> Result<Self, Box<dyn Error>> {
        let (client, connection) = tokio_postgres::connect(
            "postgresql://testuser:testpass@localhost:5432/testdb",
            NoTls
        ).await?;
        
        tokio::spawn(async move {
            if let Err(e) = connection.await {
                eprintln!("Connection error: {}", e);
            }
        });
        
        Ok(Self { client })
    }
    
    // トランザクション使用例
    async fn create_user_with_post(
        &self,
        username: &str,
        email: &str,
        title: &str,
        content: &str
    ) -> Result<(i32, i32), Box<dyn Error>> {
        let transaction = self.client.transaction().await?;
        
        // ユーザー作成
        let user_row = transaction.query_one(
            "INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id",
            &[&username, &email]
        ).await?;
        
        let user_id: i32 = user_row.get(0);
        
        // 投稿作成
        let post_row = transaction.query_one(
            "INSERT INTO posts (title, content, author_id, published) 
             VALUES ($1, $2, $3, $4) RETURNING id",
            &[&title, &content, &user_id, &true]
        ).await?;
        
        let post_id: i32 = post_row.get(0);
        
        // トランザクションコミット
        transaction.commit().await?;
        
        Ok((user_id, post_id))
    }
    
    // プリペアドステートメント活用
    async fn find_posts_by_author(&self, author_id: i32) -> Result<Vec<Post>, Box<dyn Error>> {
        let stmt = self.client.prepare(
            "SELECT id, title, content, author_id, published 
             FROM posts 
             WHERE author_id = $1 
             ORDER BY created_at DESC"
        ).await?;
        
        let rows = self.client.query(&stmt, &[&author_id]).await?;
        
        let posts = rows.into_iter().map(|row| {
            Post {
                id: row.get(0),
                title: row.get(1),
                content: row.get(2),
                author_id: row.get(3),
                published: row.get(4),
            }
        }).collect();
        
        Ok(posts)
    }
    
    // バッチ操作
    async fn bulk_update_posts_status(
        &self, 
        post_ids: &[i32], 
        published: bool
    ) -> Result<u64, Box<dyn Error>> {
        let transaction = self.client.transaction().await?;
        
        let stmt = transaction.prepare(
            "UPDATE posts SET published = $1 WHERE id = $2"
        ).await?;
        
        let mut total_affected = 0u64;
        
        for &post_id in post_ids {
            let affected = transaction.execute(&stmt, &[&published, &post_id]).await?;
            total_affected += affected;
        }
        
        transaction.commit().await?;
        
        Ok(total_affected)
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // テーブル準備
    let (client, connection) = tokio_postgres::connect(
        "postgresql://testuser:testpass@localhost:5432/testdb",
        NoTls
    ).await?;
    
    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("Connection error: {}", e);
        }
    });
    
    client.execute(
        "CREATE TABLE IF NOT EXISTS posts (
            id SERIAL PRIMARY KEY,
            title VARCHAR(200) NOT NULL,
            content TEXT,
            author_id INTEGER REFERENCES users(id),
            published BOOLEAN DEFAULT FALSE,
            created_at TIMESTAMP DEFAULT NOW()
        )",
        &[]
    ).await?;
    
    drop(client);
    
    // サービス使用
    let service = DatabaseService::new().await?;
    
    let (user_id, post_id) = service.create_user_with_post(
        "alice",
        "[email protected]",
        "My First Post",
        "This is the content of my first post."
    ).await?;
    
    println!("Created user {} with post {}", user_id, post_id);
    
    let posts = service.find_posts_by_author(user_id).await?;
    println!("Found {} posts", posts.len());
    
    Ok(())
}

接続プールと非同期パフォーマンス最適化

use bb8::{Pool, PooledConnection};
use bb8_postgres::PostgresConnectionManager;
use tokio_postgres::{NoTls, Config};
use std::str::FromStr;
use std::time::Duration;

type ConnectionPool = Pool<PostgresConnectionManager<NoTls>>;
type Connection<'a> = PooledConnection<'a, PostgresConnectionManager<NoTls>>;

#[derive(Clone)]
struct PooledDatabaseService {
    pool: ConnectionPool,
}

impl PooledDatabaseService {
    async fn new() -> Result<Self, Box<dyn std::error::Error>> {
        let config = Config::from_str("postgresql://testuser:testpass@localhost:5432/testdb")?;
        let manager = PostgresConnectionManager::new(config, NoTls);
        
        let pool = Pool::builder()
            .max_size(15)
            .min_idle(Some(5))
            .connection_timeout(Duration::from_secs(30))
            .idle_timeout(Some(Duration::from_secs(600)))
            .max_lifetime(Some(Duration::from_secs(1800)))
            .build(manager)
            .await?;
        
        Ok(Self { pool })
    }
    
    async fn get_connection(&self) -> Result<Connection<'_>, bb8::RunError<tokio_postgres::Error>> {
        self.pool.get().await
    }
    
    // 並行処理によるバッチ読み込み
    async fn get_users_with_post_counts(&self, user_ids: &[i32]) 
        -> Result<Vec<(i32, String, i64)>, Box<dyn std::error::Error>> {
        
        let futures = user_ids.iter().map(|&user_id| {
            let pool = self.pool.clone();
            async move {
                let conn = pool.get().await?;
                let row = conn.query_one(
                    "SELECT u.id, u.username, COUNT(p.id) as post_count
                     FROM users u
                     LEFT JOIN posts p ON u.id = p.author_id
                     WHERE u.id = $1
                     GROUP BY u.id, u.username",
                    &[&user_id]
                ).await?;
                
                Ok::<_, Box<dyn std::error::Error + Send>>((
                    row.get::<_, i32>(0),
                    row.get::<_, String>(1),
                    row.get::<_, i64>(2),
                ))
            }
        });
        
        let results = futures_util::future::try_join_all(futures).await?;
        Ok(results)
    }
    
    // ストリーミング処理
    async fn stream_large_dataset<F>(&self, mut processor: F) 
        -> Result<(), Box<dyn std::error::Error>>
    where
        F: FnMut(i32, String) -> Result<(), Box<dyn std::error::Error>>,
    {
        let conn = self.get_connection().await?;
        
        let transaction = conn.transaction().await?;
        
        // カーソルを使用したストリーミング
        transaction.execute("BEGIN", &[]).await?;
        transaction.execute(
            "DECLARE user_cursor CURSOR FOR SELECT id, username FROM users ORDER BY id",
            &[]
        ).await?;
        
        loop {
            let rows = transaction.query("FETCH 100 FROM user_cursor", &[]).await?;
            
            if rows.is_empty() {
                break;
            }
            
            for row in rows {
                let id: i32 = row.get(0);
                let username: String = row.get(1);
                processor(id, username)?;
            }
        }
        
        transaction.execute("CLOSE user_cursor", &[]).await?;
        transaction.commit().await?;
        
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let service = PooledDatabaseService::new().await?;
    
    // 並行処理テスト
    let user_ids = vec![1, 2, 3, 4, 5];
    let results = service.get_users_with_post_counts(&user_ids).await?;
    
    for (id, username, count) in results {
        println!("User {} ({}) has {} posts", id, username, count);
    }
    
    // ストリーミング処理テスト
    println!("Streaming users:");
    service.stream_large_dataset(|id, username| {
        println!("Processing user: {} - {}", id, username);
        Ok(())
    }).await?;
    
    Ok(())
}

エラーハンドリングと監視

use tokio_postgres::{Error as PgError, ErrorKind};
use std::time::{Duration, Instant};
use tracing::{info, warn, error, instrument};

#[derive(Debug)]
enum DatabaseError {
    ConnectionError(String),
    QueryError(String),
    TransactionError(String),
    UniqueViolation(String),
    NotFound,
}

impl From<PgError> for DatabaseError {
    fn from(err: PgError) -> Self {
        match err.kind() {
            ErrorKind::UniqueViolation => {
                DatabaseError::UniqueViolation(err.to_string())
            }
            ErrorKind::ConnectionDead | ErrorKind::ConnectionFailed => {
                DatabaseError::ConnectionError(err.to_string())
            }
            _ => DatabaseError::QueryError(err.to_string()),
        }
    }
}

struct MonitoredDatabaseService {
    pool: ConnectionPool,
}

impl MonitoredDatabaseService {
    #[instrument(skip(self))]
    async fn create_user_with_monitoring(
        &self,
        username: &str,
        email: &str
    ) -> Result<i32, DatabaseError> {
        let start = Instant::now();
        
        let conn = self.pool.get().await.map_err(|e| {
            error!("Failed to get connection: {}", e);
            DatabaseError::ConnectionError(e.to_string())
        })?;
        
        let result = conn.query_one(
            "INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id",
            &[&username, &email]
        ).await;
        
        let duration = start.elapsed();
        
        match result {
            Ok(row) => {
                let user_id: i32 = row.get(0);
                info!(
                    duration_ms = duration.as_millis() as u64,
                    user_id = user_id,
                    "User created successfully"
                );
                Ok(user_id)
            }
            Err(e) => {
                error!(
                    duration_ms = duration.as_millis() as u64,
                    error = %e,
                    "Failed to create user"
                );
                Err(DatabaseError::from(e))
            }
        }
    }
    
    // リトライ機能付きクエリ実行
    async fn query_with_retry<T, F>(
        &self,
        operation: F,
        max_retries: u32
    ) -> Result<T, DatabaseError>
    where
        F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, PgError>> + Send>>,
    {
        let mut last_error = None;
        
        for attempt in 0..=max_retries {
            match operation().await {
                Ok(result) => return Ok(result),
                Err(e) => {
                    warn!(
                        attempt = attempt,
                        max_retries = max_retries,
                        error = %e,
                        "Query attempt failed"
                    );
                    
                    if attempt < max_retries {
                        let delay = Duration::from_millis(100 * (1 << attempt));
                        tokio::time::sleep(delay).await;
                    }
                    
                    last_error = Some(e);
                }
            }
        }
        
        Err(DatabaseError::from(last_error.unwrap()))
    }
    
    // ヘルスチェック
    async fn health_check(&self) -> Result<Duration, DatabaseError> {
        let start = Instant::now();
        
        let conn = self.pool.get().await.map_err(|e| {
            DatabaseError::ConnectionError(e.to_string())
        })?;
        
        conn.query_one("SELECT 1", &[]).await?;
        
        Ok(start.elapsed())
    }
}

// 定期的な監視タスク
async fn monitoring_task(service: MonitoredDatabaseService) {
    let mut interval = tokio::time::interval(Duration::from_secs(30));
    
    loop {
        interval.tick().await;
        
        match service.health_check().await {
            Ok(duration) => {
                info!(
                    health_check_ms = duration.as_millis() as u64,
                    "Database health check passed"
                );
            }
            Err(e) => {
                error!(error = ?e, "Database health check failed");
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt::init();
    
    let config = Config::from_str("postgresql://testuser:testpass@localhost:5432/testdb")?;
    let manager = PostgresConnectionManager::new(config, NoTls);
    let pool = Pool::builder().build(manager).await?;
    
    let service = MonitoredDatabaseService { pool };
    
    // 監視タスク開始
    let monitor_service = service.clone();
    let monitor_handle = tokio::spawn(monitoring_task(monitor_service));
    
    // メイン処理
    match service.create_user_with_monitoring("monitor_user", "[email protected]").await {
        Ok(user_id) => info!(user_id = user_id, "User created successfully"),
        Err(e) => error!(error = ?e, "Failed to create user"),
    }
    
    // 少し待って監視ログを確認
    tokio::time::sleep(Duration::from_secs(65)).await;
    monitor_handle.abort();
    
    Ok(())
}