tokio-postgres
tokio-postgres is developed as "A native, asynchronous PostgreSQL client for Rust" - an asynchronous PostgreSQL client library for Rust. Fully integrated with the Tokio runtime, it provides non-blocking, high-performance database operations. It offers direct, low-level access to SQL queries without heavyweight ORM abstraction layers, making it ideal for performance-critical applications and systems requiring complete SQL control.
Library
tokio-postgres
Overview
tokio-postgres is developed as "A native, asynchronous PostgreSQL client for Rust" - an asynchronous PostgreSQL client library for Rust. Fully integrated with the Tokio runtime, it provides non-blocking, high-performance database operations. It offers direct, low-level access to SQL queries without heavyweight ORM abstraction layers, making it ideal for performance-critical applications and systems requiring complete SQL control.
Details
tokio-postgres 2025 edition leverages the latest async/await features of Rust 1.75+ and the Tokio runtime to provide high concurrency and efficiency. It comprehensively supports PostgreSQL's advanced features including TLS connection support (via openssl and native-tls adapters), prepared statements, cursors, transaction management, COPY operations, and notification systems. Integration with external connection pools (bb8, deadpool, etc.) achieves the robustness and scalability necessary for enterprise-level application development.
Key Features
- Fully Asynchronous: High concurrency through deep integration with Tokio runtime
- Low-level SQL Control: Direct SQL execution without ORM overhead
- High Performance: Zero-copy optimization and non-blocking I/O
- Complete PostgreSQL Support: All features including prepared statements, notifications, COPY
- TLS Support: Encrypted communication via openssl and native-tls
- Connection Pool Integration: Support for external pools like bb8 and deadpool
Pros and Cons
Pros
- Complete SQL control and maximum performance
- High concurrency and scalability through asynchronous processing
- Full utilization of PostgreSQL-specific features (notifications, arrays, JSON, etc.)
- Direct database access without ORM overhead
- Memory-efficient zero-copy optimization
- Secure SQL execution leveraging Rust's type safety
Cons
- Raw SQL writing required without ORM-like abstraction
- PostgreSQL-only with no multi-database support
- Complex mapping and relationship management requires manual implementation
- High initial development cost due to boilerplate code
- Requires understanding of asynchronous programming
- Complexity in error handling and connection management
Reference Pages
Code Examples
Setup
# Cargo.toml
[dependencies]
tokio-postgres = "0.7"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
# TLS support (optional)
postgres-openssl = "0.5" # When using OpenSSL
postgres-native-tls = "0.5" # When using native-tls
# Connection pool (recommended)
bb8 = "0.8"
bb8-postgres = "0.8"
# PostgreSQL setup
# Using Docker
docker run --name postgres-tokio \
-e POSTGRES_DB=testdb \
-e POSTGRES_USER=testuser \
-e POSTGRES_PASSWORD=testpass \
-p 5432:5432 \
-d postgres:15
Basic Usage
use tokio_postgres::{NoTls, Error, Row};
use std::collections::HashMap;
#[derive(Debug)]
struct User {
id: i32,
username: String,
email: String,
created_at: chrono::NaiveDateTime,
}
impl From<Row> for User {
fn from(row: Row) -> Self {
Self {
id: row.get("id"),
username: row.get("username"),
email: row.get("email"),
created_at: row.get("created_at"),
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Database connection
let (client, connection) = tokio_postgres::connect(
"postgresql://testuser:testpass@localhost:5432/testdb",
NoTls
).await?;
// Manage connection object in separate task
let connection_handle = tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
// Create table
client.execute(
"CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
)",
&[]
).await?;
// Insert user
let insert_stmt = client.prepare(
"INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id, created_at"
).await?;
let row = client.query_one(
&insert_stmt,
&[&"john_doe", &"[email protected]"]
).await?;
let user_id: i32 = row.get(0);
let created_at: chrono::NaiveDateTime = row.get(1);
println!("Created user ID: {} at {}", user_id, created_at);
// Search user
let users = client.query(
"SELECT id, username, email, created_at FROM users WHERE id = $1",
&[&user_id]
).await?;
for row in users {
let user = User::from(row);
println!("Found user: {:?}", user);
}
// Wait for connection completion
connection_handle.await?;
Ok(())
}
Prepared Statements and Transactions
use tokio_postgres::{NoTls, Transaction};
use std::error::Error;
#[derive(Debug)]
struct Post {
id: i32,
title: String,
content: String,
author_id: i32,
published: bool,
}
struct DatabaseService {
client: tokio_postgres::Client,
}
impl DatabaseService {
async fn new() -> Result<Self, Box<dyn Error>> {
let (client, connection) = tokio_postgres::connect(
"postgresql://testuser:testpass@localhost:5432/testdb",
NoTls
).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
Ok(Self { client })
}
// Transaction usage example
async fn create_user_with_post(
&self,
username: &str,
email: &str,
title: &str,
content: &str
) -> Result<(i32, i32), Box<dyn Error>> {
let transaction = self.client.transaction().await?;
// Create user
let user_row = transaction.query_one(
"INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id",
&[&username, &email]
).await?;
let user_id: i32 = user_row.get(0);
// Create post
let post_row = transaction.query_one(
"INSERT INTO posts (title, content, author_id, published)
VALUES ($1, $2, $3, $4) RETURNING id",
&[&title, &content, &user_id, &true]
).await?;
let post_id: i32 = post_row.get(0);
// Commit transaction
transaction.commit().await?;
Ok((user_id, post_id))
}
// Leveraging prepared statements
async fn find_posts_by_author(&self, author_id: i32) -> Result<Vec<Post>, Box<dyn Error>> {
let stmt = self.client.prepare(
"SELECT id, title, content, author_id, published
FROM posts
WHERE author_id = $1
ORDER BY created_at DESC"
).await?;
let rows = self.client.query(&stmt, &[&author_id]).await?;
let posts = rows.into_iter().map(|row| {
Post {
id: row.get(0),
title: row.get(1),
content: row.get(2),
author_id: row.get(3),
published: row.get(4),
}
}).collect();
Ok(posts)
}
// Batch operations
async fn bulk_update_posts_status(
&self,
post_ids: &[i32],
published: bool
) -> Result<u64, Box<dyn Error>> {
let transaction = self.client.transaction().await?;
let stmt = transaction.prepare(
"UPDATE posts SET published = $1 WHERE id = $2"
).await?;
let mut total_affected = 0u64;
for &post_id in post_ids {
let affected = transaction.execute(&stmt, &[&published, &post_id]).await?;
total_affected += affected;
}
transaction.commit().await?;
Ok(total_affected)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Table preparation
let (client, connection) = tokio_postgres::connect(
"postgresql://testuser:testpass@localhost:5432/testdb",
NoTls
).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
client.execute(
"CREATE TABLE IF NOT EXISTS posts (
id SERIAL PRIMARY KEY,
title VARCHAR(200) NOT NULL,
content TEXT,
author_id INTEGER REFERENCES users(id),
published BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW()
)",
&[]
).await?;
drop(client);
// Service usage
let service = DatabaseService::new().await?;
let (user_id, post_id) = service.create_user_with_post(
"alice",
"[email protected]",
"My First Post",
"This is the content of my first post."
).await?;
println!("Created user {} with post {}", user_id, post_id);
let posts = service.find_posts_by_author(user_id).await?;
println!("Found {} posts", posts.len());
Ok(())
}
Connection Pool and Asynchronous Performance Optimization
use bb8::{Pool, PooledConnection};
use bb8_postgres::PostgresConnectionManager;
use tokio_postgres::{NoTls, Config};
use std::str::FromStr;
use std::time::Duration;
type ConnectionPool = Pool<PostgresConnectionManager<NoTls>>;
type Connection<'a> = PooledConnection<'a, PostgresConnectionManager<NoTls>>;
#[derive(Clone)]
struct PooledDatabaseService {
pool: ConnectionPool,
}
impl PooledDatabaseService {
async fn new() -> Result<Self, Box<dyn std::error::Error>> {
let config = Config::from_str("postgresql://testuser:testpass@localhost:5432/testdb")?;
let manager = PostgresConnectionManager::new(config, NoTls);
let pool = Pool::builder()
.max_size(15)
.min_idle(Some(5))
.connection_timeout(Duration::from_secs(30))
.idle_timeout(Some(Duration::from_secs(600)))
.max_lifetime(Some(Duration::from_secs(1800)))
.build(manager)
.await?;
Ok(Self { pool })
}
async fn get_connection(&self) -> Result<Connection<'_>, bb8::RunError<tokio_postgres::Error>> {
self.pool.get().await
}
// Batch loading with concurrent processing
async fn get_users_with_post_counts(&self, user_ids: &[i32])
-> Result<Vec<(i32, String, i64)>, Box<dyn std::error::Error>> {
let futures = user_ids.iter().map(|&user_id| {
let pool = self.pool.clone();
async move {
let conn = pool.get().await?;
let row = conn.query_one(
"SELECT u.id, u.username, COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.author_id
WHERE u.id = $1
GROUP BY u.id, u.username",
&[&user_id]
).await?;
Ok::<_, Box<dyn std::error::Error + Send>>((
row.get::<_, i32>(0),
row.get::<_, String>(1),
row.get::<_, i64>(2),
))
}
});
let results = futures_util::future::try_join_all(futures).await?;
Ok(results)
}
// Streaming processing
async fn stream_large_dataset<F>(&self, mut processor: F)
-> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(i32, String) -> Result<(), Box<dyn std::error::Error>>,
{
let conn = self.get_connection().await?;
let transaction = conn.transaction().await?;
// Streaming using cursors
transaction.execute("BEGIN", &[]).await?;
transaction.execute(
"DECLARE user_cursor CURSOR FOR SELECT id, username FROM users ORDER BY id",
&[]
).await?;
loop {
let rows = transaction.query("FETCH 100 FROM user_cursor", &[]).await?;
if rows.is_empty() {
break;
}
for row in rows {
let id: i32 = row.get(0);
let username: String = row.get(1);
processor(id, username)?;
}
}
transaction.execute("CLOSE user_cursor", &[]).await?;
transaction.commit().await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let service = PooledDatabaseService::new().await?;
// Concurrent processing test
let user_ids = vec![1, 2, 3, 4, 5];
let results = service.get_users_with_post_counts(&user_ids).await?;
for (id, username, count) in results {
println!("User {} ({}) has {} posts", id, username, count);
}
// Streaming processing test
println!("Streaming users:");
service.stream_large_dataset(|id, username| {
println!("Processing user: {} - {}", id, username);
Ok(())
}).await?;
Ok(())
}
Error Handling and Monitoring
use tokio_postgres::{Error as PgError, ErrorKind};
use std::time::{Duration, Instant};
use tracing::{info, warn, error, instrument};
#[derive(Debug)]
enum DatabaseError {
ConnectionError(String),
QueryError(String),
TransactionError(String),
UniqueViolation(String),
NotFound,
}
impl From<PgError> for DatabaseError {
fn from(err: PgError) -> Self {
match err.kind() {
ErrorKind::UniqueViolation => {
DatabaseError::UniqueViolation(err.to_string())
}
ErrorKind::ConnectionDead | ErrorKind::ConnectionFailed => {
DatabaseError::ConnectionError(err.to_string())
}
_ => DatabaseError::QueryError(err.to_string()),
}
}
}
struct MonitoredDatabaseService {
pool: ConnectionPool,
}
impl MonitoredDatabaseService {
#[instrument(skip(self))]
async fn create_user_with_monitoring(
&self,
username: &str,
email: &str
) -> Result<i32, DatabaseError> {
let start = Instant::now();
let conn = self.pool.get().await.map_err(|e| {
error!("Failed to get connection: {}", e);
DatabaseError::ConnectionError(e.to_string())
})?;
let result = conn.query_one(
"INSERT INTO users (username, email) VALUES ($1, $2) RETURNING id",
&[&username, &email]
).await;
let duration = start.elapsed();
match result {
Ok(row) => {
let user_id: i32 = row.get(0);
info!(
duration_ms = duration.as_millis() as u64,
user_id = user_id,
"User created successfully"
);
Ok(user_id)
}
Err(e) => {
error!(
duration_ms = duration.as_millis() as u64,
error = %e,
"Failed to create user"
);
Err(DatabaseError::from(e))
}
}
}
// Query execution with retry functionality
async fn query_with_retry<T, F>(
&self,
operation: F,
max_retries: u32
) -> Result<T, DatabaseError>
where
F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, PgError>> + Send>>,
{
let mut last_error = None;
for attempt in 0..=max_retries {
match operation().await {
Ok(result) => return Ok(result),
Err(e) => {
warn!(
attempt = attempt,
max_retries = max_retries,
error = %e,
"Query attempt failed"
);
if attempt < max_retries {
let delay = Duration::from_millis(100 * (1 << attempt));
tokio::time::sleep(delay).await;
}
last_error = Some(e);
}
}
}
Err(DatabaseError::from(last_error.unwrap()))
}
// Health check
async fn health_check(&self) -> Result<Duration, DatabaseError> {
let start = Instant::now();
let conn = self.pool.get().await.map_err(|e| {
DatabaseError::ConnectionError(e.to_string())
})?;
conn.query_one("SELECT 1", &[]).await?;
Ok(start.elapsed())
}
}
// Periodic monitoring task
async fn monitoring_task(service: MonitoredDatabaseService) {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
match service.health_check().await {
Ok(duration) => {
info!(
health_check_ms = duration.as_millis() as u64,
"Database health check passed"
);
}
Err(e) => {
error!(error = ?e, "Database health check failed");
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let config = Config::from_str("postgresql://testuser:testpass@localhost:5432/testdb")?;
let manager = PostgresConnectionManager::new(config, NoTls);
let pool = Pool::builder().build(manager).await?;
let service = MonitoredDatabaseService { pool };
// Start monitoring task
let monitor_service = service.clone();
let monitor_handle = tokio::spawn(monitoring_task(monitor_service));
// Main processing
match service.create_user_with_monitoring("monitor_user", "[email protected]").await {
Ok(user_id) => info!(user_id = user_id, "User created successfully"),
Err(e) => error!(error = ?e, "Failed to create user"),
}
// Wait a bit to check monitoring logs
tokio::time::sleep(Duration::from_secs(65)).await;
monitor_handle.abort();
Ok(())
}