tokio-postgres
tokio-postgresは「A native, asynchronous PostgreSQL client for Rust」として開発された、Rustのための非同期PostgreSQLクライアントライブラリです。Tokioランタイムと完全に統合されており、ノンブロッキングでパフォーマンスの高いデータベース操作を実現します。SQLクエリへの直接的で低レベルなアクセスを提供し、高度なORMの抽象化レイヤーを持たないため、パフォーマンスクリティカルなアプリケーションや、SQLの完全な制御が必要なシステムに最適です。
ライブラリ
tokio-postgres
概要
tokio-postgresは「A native, asynchronous PostgreSQL client for Rust」として開発された、Rustのための非同期PostgreSQLクライアントライブラリです。Tokioランタイムと完全に統合されており、ノンブロッキングでパフォーマンスの高いデータベース操作を実現します。SQLクエリへの直接的で低レベルなアクセスを提供し、高度なORMの抽象化レイヤーを持たないため、パフォーマンスクリティカルなアプリケーションや、SQLの完全な制御が必要なシステムに最適です。
詳細
tokio-postgres 2025年版は、Rust 1.75以降の最新async/await機能とTokioランタイムを活用し、高い並行性と効率性を提供します。TLS接続のサポート(openssl、native-tlsアダプタ経由)、プリペアドステートメント、カーソル、トランザクション管理、COPYオペレーション、通知システムなど、PostgreSQLの高度な機能を包括的にサポートします。外部の接続プール(bb8、deadpool等)との統合により、エンタープライズレベルのアプリケーション開発に必要な堅牢性とスケーラビリティを実現します。
主な特徴
- 完全非同期: Tokioランタイムとの深い統合による高い並行性
- 低レベルSQL制御: ORMオーバーヘッドなしの直接的なSQL実行
- 高性能: ゼロコピー最適化とノンブロッキングI/O
- PostgreSQL完全対応: プリペアドステートメント、通知、COPY等の全機能
- TLS対応: openssl、native-tlsによる暗号化通信
- 接続プール統合: bb8、deadpoolなどの外部プール対応
メリット・デメリット
メリット
- SQLの完全な制御と最大限のパフォーマンス発揮
- 非同期処理による高い並行性とスケーラビリティ
- PostgreSQL固有機能(通知、配列、JSON等)の完全活用
- ORMオーバーヘッドなしの直接的なデータベースアクセス
- メモリ効率の高いゼロコピー最適化
- Rustの型安全性を活かしたセキュアなSQL実行
デメリット
- 生SQLでの記述が必要でORMのような抽象化なし
- PostgreSQL専用でマルチデータベース対応なし
- 複雑なマッピングやリレーションシップ管理は手動実装
- ボイラープレートコードが多く初期開発コストが高い
- 非同期プログラミングの理解が必要
- エラーハンドリングと接続管理の複雑性
参考ページ
書き方の例
セットアップ
# Cargo.toml
[dependencies]
tokio-postgres = "0.7"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
# TLS対応(任意)
postgres-openssl = "0.5" # OpenSSL使用時
postgres-native-tls = "0.5" # native-tls使用時
# 接続プール(推奨)
bb8 = "0.8"
bb8-postgres = "0.8"
# PostgreSQL準備
# Docker使用の場合
docker run --name postgres-tokio \
-e POSTGRES_DB=testdb \
-e POSTGRES_USER=testuser \
-e POSTGRES_PASSWORD=testpass \
-p 5432:5432 \
-d postgres:15
基本的な使い方
use tokio_postgres::{NoTls, Error, Row};
use std::collections::HashMap;
#[derive(Debug)]
struct User {
id: i32,
username: String,
email: String,
created_at: chrono::NaiveDateTime,
}
impl From<Row> for User {
fn from(row: Row) -> Self {
Self {
id: row.get("id"),
username: row.get("username"),
email: row.get("email"),
created_at: row.get("created_at"),
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// データベース接続
let (client, connection) = tokio_postgres::connect(
"postgresql://testuser:testpass@localhost:5432/testdb",
NoTls
).await?;
// 接続オブジェクトを別タスクで管理
let connection_handle = tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
// テーブル作成
client.execute(
"CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
)",
&[]
).await?;
// ユーザー挿入
let insert_stmt = client.prepare(
"INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id, created_at"
).await?;
let row = client.query_one(
&insert_stmt,
&[&"john_doe", &"[email protected]"]
).await?;
let user_id: i32 = row.get(0);
let created_at: chrono::NaiveDateTime = row.get(1);
println!("Created user ID: {} at {}", user_id, created_at);
// ユーザー検索
let users = client.query(
"SELECT id, username, email, created_at FROM users WHERE id = $1",
&[&user_id]
).await?;
for row in users {
let user = User::from(row);
println!("Found user: {:?}", user);
}
// 接続終了待機
connection_handle.await?;
Ok(())
}
プリペアドステートメントとトランザクション
use tokio_postgres::{NoTls, Transaction};
use std::error::Error;
#[derive(Debug)]
struct Post {
id: i32,
title: String,
content: String,
author_id: i32,
published: bool,
}
struct DatabaseService {
client: tokio_postgres::Client,
}
impl DatabaseService {
async fn new() -> Result<Self, Box<dyn Error>> {
let (client, connection) = tokio_postgres::connect(
"postgresql://testuser:testpass@localhost:5432/testdb",
NoTls
).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
Ok(Self { client })
}
// トランザクション使用例
async fn create_user_with_post(
&self,
username: &str,
email: &str,
title: &str,
content: &str
) -> Result<(i32, i32), Box<dyn Error>> {
let transaction = self.client.transaction().await?;
// ユーザー作成
let user_row = transaction.query_one(
"INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id",
&[&username, &email]
).await?;
let user_id: i32 = user_row.get(0);
// 投稿作成
let post_row = transaction.query_one(
"INSERT INTO posts (title, content, author_id, published)
VALUES ($1, $2, $3, $4) RETURNING id",
&[&title, &content, &user_id, &true]
).await?;
let post_id: i32 = post_row.get(0);
// トランザクションコミット
transaction.commit().await?;
Ok((user_id, post_id))
}
// プリペアドステートメント活用
async fn find_posts_by_author(&self, author_id: i32) -> Result<Vec<Post>, Box<dyn Error>> {
let stmt = self.client.prepare(
"SELECT id, title, content, author_id, published
FROM posts
WHERE author_id = $1
ORDER BY created_at DESC"
).await?;
let rows = self.client.query(&stmt, &[&author_id]).await?;
let posts = rows.into_iter().map(|row| {
Post {
id: row.get(0),
title: row.get(1),
content: row.get(2),
author_id: row.get(3),
published: row.get(4),
}
}).collect();
Ok(posts)
}
// バッチ操作
async fn bulk_update_posts_status(
&self,
post_ids: &[i32],
published: bool
) -> Result<u64, Box<dyn Error>> {
let transaction = self.client.transaction().await?;
let stmt = transaction.prepare(
"UPDATE posts SET published = $1 WHERE id = $2"
).await?;
let mut total_affected = 0u64;
for &post_id in post_ids {
let affected = transaction.execute(&stmt, &[&published, &post_id]).await?;
total_affected += affected;
}
transaction.commit().await?;
Ok(total_affected)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// テーブル準備
let (client, connection) = tokio_postgres::connect(
"postgresql://testuser:testpass@localhost:5432/testdb",
NoTls
).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
client.execute(
"CREATE TABLE IF NOT EXISTS posts (
id SERIAL PRIMARY KEY,
title VARCHAR(200) NOT NULL,
content TEXT,
author_id INTEGER REFERENCES users(id),
published BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW()
)",
&[]
).await?;
drop(client);
// サービス使用
let service = DatabaseService::new().await?;
let (user_id, post_id) = service.create_user_with_post(
"alice",
"[email protected]",
"My First Post",
"This is the content of my first post."
).await?;
println!("Created user {} with post {}", user_id, post_id);
let posts = service.find_posts_by_author(user_id).await?;
println!("Found {} posts", posts.len());
Ok(())
}
接続プールと非同期パフォーマンス最適化
use bb8::{Pool, PooledConnection};
use bb8_postgres::PostgresConnectionManager;
use tokio_postgres::{NoTls, Config};
use std::str::FromStr;
use std::time::Duration;
type ConnectionPool = Pool<PostgresConnectionManager<NoTls>>;
type Connection<'a> = PooledConnection<'a, PostgresConnectionManager<NoTls>>;
#[derive(Clone)]
struct PooledDatabaseService {
pool: ConnectionPool,
}
impl PooledDatabaseService {
async fn new() -> Result<Self, Box<dyn std::error::Error>> {
let config = Config::from_str("postgresql://testuser:testpass@localhost:5432/testdb")?;
let manager = PostgresConnectionManager::new(config, NoTls);
let pool = Pool::builder()
.max_size(15)
.min_idle(Some(5))
.connection_timeout(Duration::from_secs(30))
.idle_timeout(Some(Duration::from_secs(600)))
.max_lifetime(Some(Duration::from_secs(1800)))
.build(manager)
.await?;
Ok(Self { pool })
}
async fn get_connection(&self) -> Result<Connection<'_>, bb8::RunError<tokio_postgres::Error>> {
self.pool.get().await
}
// 並行処理によるバッチ読み込み
async fn get_users_with_post_counts(&self, user_ids: &[i32])
-> Result<Vec<(i32, String, i64)>, Box<dyn std::error::Error>> {
let futures = user_ids.iter().map(|&user_id| {
let pool = self.pool.clone();
async move {
let conn = pool.get().await?;
let row = conn.query_one(
"SELECT u.id, u.username, COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.author_id
WHERE u.id = $1
GROUP BY u.id, u.username",
&[&user_id]
).await?;
Ok::<_, Box<dyn std::error::Error + Send>>((
row.get::<_, i32>(0),
row.get::<_, String>(1),
row.get::<_, i64>(2),
))
}
});
let results = futures_util::future::try_join_all(futures).await?;
Ok(results)
}
// ストリーミング処理
async fn stream_large_dataset<F>(&self, mut processor: F)
-> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(i32, String) -> Result<(), Box<dyn std::error::Error>>,
{
let conn = self.get_connection().await?;
let transaction = conn.transaction().await?;
// カーソルを使用したストリーミング
transaction.execute("BEGIN", &[]).await?;
transaction.execute(
"DECLARE user_cursor CURSOR FOR SELECT id, username FROM users ORDER BY id",
&[]
).await?;
loop {
let rows = transaction.query("FETCH 100 FROM user_cursor", &[]).await?;
if rows.is_empty() {
break;
}
for row in rows {
let id: i32 = row.get(0);
let username: String = row.get(1);
processor(id, username)?;
}
}
transaction.execute("CLOSE user_cursor", &[]).await?;
transaction.commit().await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let service = PooledDatabaseService::new().await?;
// 並行処理テスト
let user_ids = vec![1, 2, 3, 4, 5];
let results = service.get_users_with_post_counts(&user_ids).await?;
for (id, username, count) in results {
println!("User {} ({}) has {} posts", id, username, count);
}
// ストリーミング処理テスト
println!("Streaming users:");
service.stream_large_dataset(|id, username| {
println!("Processing user: {} - {}", id, username);
Ok(())
}).await?;
Ok(())
}
エラーハンドリングと監視
use tokio_postgres::{Error as PgError, ErrorKind};
use std::time::{Duration, Instant};
use tracing::{info, warn, error, instrument};
#[derive(Debug)]
enum DatabaseError {
ConnectionError(String),
QueryError(String),
TransactionError(String),
UniqueViolation(String),
NotFound,
}
impl From<PgError> for DatabaseError {
fn from(err: PgError) -> Self {
match err.kind() {
ErrorKind::UniqueViolation => {
DatabaseError::UniqueViolation(err.to_string())
}
ErrorKind::ConnectionDead | ErrorKind::ConnectionFailed => {
DatabaseError::ConnectionError(err.to_string())
}
_ => DatabaseError::QueryError(err.to_string()),
}
}
}
struct MonitoredDatabaseService {
pool: ConnectionPool,
}
impl MonitoredDatabaseService {
#[instrument(skip(self))]
async fn create_user_with_monitoring(
&self,
username: &str,
email: &str
) -> Result<i32, DatabaseError> {
let start = Instant::now();
let conn = self.pool.get().await.map_err(|e| {
error!("Failed to get connection: {}", e);
DatabaseError::ConnectionError(e.to_string())
})?;
let result = conn.query_one(
"INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id",
&[&username, &email]
).await;
let duration = start.elapsed();
match result {
Ok(row) => {
let user_id: i32 = row.get(0);
info!(
duration_ms = duration.as_millis() as u64,
user_id = user_id,
"User created successfully"
);
Ok(user_id)
}
Err(e) => {
error!(
duration_ms = duration.as_millis() as u64,
error = %e,
"Failed to create user"
);
Err(DatabaseError::from(e))
}
}
}
// リトライ機能付きクエリ実行
async fn query_with_retry<T, F>(
&self,
operation: F,
max_retries: u32
) -> Result<T, DatabaseError>
where
F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, PgError>> + Send>>,
{
let mut last_error = None;
for attempt in 0..=max_retries {
match operation().await {
Ok(result) => return Ok(result),
Err(e) => {
warn!(
attempt = attempt,
max_retries = max_retries,
error = %e,
"Query attempt failed"
);
if attempt < max_retries {
let delay = Duration::from_millis(100 * (1 << attempt));
tokio::time::sleep(delay).await;
}
last_error = Some(e);
}
}
}
Err(DatabaseError::from(last_error.unwrap()))
}
// ヘルスチェック
async fn health_check(&self) -> Result<Duration, DatabaseError> {
let start = Instant::now();
let conn = self.pool.get().await.map_err(|e| {
DatabaseError::ConnectionError(e.to_string())
})?;
conn.query_one("SELECT 1", &[]).await?;
Ok(start.elapsed())
}
}
// 定期的な監視タスク
async fn monitoring_task(service: MonitoredDatabaseService) {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
match service.health_check().await {
Ok(duration) => {
info!(
health_check_ms = duration.as_millis() as u64,
"Database health check passed"
);
}
Err(e) => {
error!(error = ?e, "Database health check failed");
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let config = Config::from_str("postgresql://testuser:testpass@localhost:5432/testdb")?;
let manager = PostgresConnectionManager::new(config, NoTls);
let pool = Pool::builder().build(manager).await?;
let service = MonitoredDatabaseService { pool };
// 監視タスク開始
let monitor_service = service.clone();
let monitor_handle = tokio::spawn(monitoring_task(monitor_service));
// メイン処理
match service.create_user_with_monitoring("monitor_user", "[email protected]").await {
Ok(user_id) => info!(user_id = user_id, "User created successfully"),
Err(e) => error!(error = ?e, "Failed to create user"),
}
// 少し待って監視ログを確認
tokio::time::sleep(Duration::from_secs(65)).await;
monitor_handle.abort();
Ok(())
}