SQLx

SQLxは「RustのSQL Toolkit」として開発された、DSLを使わずにコンパイル時検証済みクエリを特徴とする非同期で純粋なRust SQLクレートです。PostgreSQL、MySQL、SQLiteをサポートし、「Rustらしい安全性とパフォーマンス」をコンセプトに、型安全性とSQL本来の表現力を両立。コンパイル時クエリ検証、接続プーリング、マイグレーション、SQLインジェクション対策等、本格的なWebアプリケーション開発に必要な機能を包括的に提供する現代的なデータベースライブラリです。

非同期SQLRustPostgreSQLMySQLSQLiteコンパイル時検証Tokio

GitHub概要

launchbadge/sqlx

🧰 The Rust SQL Toolkit. An async, pure Rust SQL crate featuring compile-time checked queries without a DSL. Supports PostgreSQL, MySQL, and SQLite.

スター15,476
ウォッチ63
フォーク1,444
作成日:2019年12月28日
言語:Rust
ライセンス:Apache License 2.0

トピックス

asyncawaitmariadbmysqlpostgrespostgresqlrustsqlsqlite

スター履歴

launchbadge/sqlx Star History
データ取得日時: 2025/8/13 01:43

ライブラリ

SQLx

概要

SQLxは「RustのSQL Toolkit」として開発された、DSLを使わずにコンパイル時検証済みクエリを特徴とする非同期で純粋なRust SQLクレートです。PostgreSQL、MySQL、SQLiteをサポートし、「Rustらしい安全性とパフォーマンス」をコンセプトに、型安全性とSQL本来の表現力を両立。コンパイル時クエリ検証、接続プーリング、マイグレーション、SQLインジェクション対策等、本格的なWebアプリケーション開発に必要な機能を包括的に提供する現代的なデータベースライブラリです。

詳細

SQLx 2025年版はRustエコシステムにおけるデータベース操作の定番ライブラリとして確固たる地位を築いています。独自のマクロシステム(query!, query_as!)により、コンパイル時にSQLクエリを検証し、型安全性を保証しながらSQLの完全な表現力を活用可能。Tokio/async-stdの非同期ランタイム対応、TLS暗号化通信(rustls、native-tls)、連続接続プーリング等、本格的なプロダクション環境に求められる機能を標準装備。オフラインビルドモード、プリペアドステートメント、トランザクション管理等も充実し、安全で高速なデータベースアクセスを実現します。

主な特徴

  • コンパイル時クエリ検証: SQLの構文・型・スキーマをビルド時に検証
  • 非同期ファースト: Tokio/async-std完全対応の真の非同期処理
  • 多DB対応: PostgreSQL、MySQL、SQLite統一API
  • 型安全: Rustの型システムとSQLスキーマの完全統合
  • プロダクション対応: 接続プーリング、TLS、マイグレーション、監査機能
  • DSLフリー: 生SQLを保ちながら安全性を確保

メリット・デメリット

メリット

  • コンパイル時クエリ検証により実行時エラーを大幅に削減
  • Rustの型システムとSQLの完全統合で開発効率向上
  • 非同期処理によるスケーラブルな高性能データベースアクセス
  • 生SQLを維持できるため複雑クエリも自在に記述可能
  • 接続プーリングとTLS対応でエンタープライズ級の機能
  • 活発な開発コミュニティと豊富なドキュメント

デメリット

  • ビルド時にデータベース接続が必要(オフラインモードで回避可能)
  • SQLの深い知識が必要でORM初心者には学習コストが高い
  • コンパイル時間がクエリ検証により若干増加
  • スキーマ変更時に広範囲なコード修正が必要な場合がある
  • 複雑なORM機能(リレーション管理等)は自実装が必要
  • マクロベースのため、IDEサポートが限定的な場合がある

参考ページ

書き方の例

インストールと基本設定

# Cargo.toml
[dependencies]
sqlx = { version = "0.8", features = [
    "runtime-tokio-rustls",  # Tokio runtime + Rustls TLS
    "postgres",              # PostgreSQL support
    "mysql",                 # MySQL support  
    "sqlite",                # SQLite support
    "macros",                # Compile-time macros
    "migrate"                # Migration support
] }
tokio = { version = "1.0", features = ["full"] }

# 環境変数(コンパイル時検証用)
# .env
DATABASE_URL=postgres://user:password@localhost/my_database

# 代替設定
# DATABASE_URL=mysql://user:password@localhost/my_database
# DATABASE_URL=sqlite:database.db
// 基本インポート
use sqlx::{PgPool, Row};
use sqlx::postgres::PgPoolOptions;
// MySQL の場合: use sqlx::mysql::{MySqlPool, MySqlPoolOptions};
// SQLite の場合: use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};

// 基本的な接続設定
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    // 接続プール作成
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://user:password@localhost/database")
        .await?;

    // 基本クエリの実行
    let row: (i64,) = sqlx::query_as("SELECT $1")
        .bind(150_i64)
        .fetch_one(&pool)
        .await?;

    assert_eq!(row.0, 150);
    Ok(())
}

基本的なCRUD操作

use sqlx::{PgPool, FromRow};
use serde::{Deserialize, Serialize};

// ユーザー構造体
#[derive(Debug, FromRow, Serialize, Deserialize)]
struct User {
    id: i32,
    name: String,
    email: String,
    age: Option<i32>,
    created_at: chrono::DateTime<chrono::Utc>,
}

// ユーザー作成用構造体
#[derive(Debug, Deserialize)]
struct CreateUser {
    name: String,
    email: String,
    age: Option<i32>,
}

// データベース操作構造体
struct UserRepository {
    pool: PgPool,
}

impl UserRepository {
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }

    // ユーザー作成
    pub async fn create_user(&self, user: CreateUser) -> Result<User, sqlx::Error> {
        let user = sqlx::query_as!(
            User,
            r#"
            INSERT INTO users (name, email, age)
            VALUES ($1, $2, $3)
            RETURNING id, name, email, age, created_at
            "#,
            user.name,
            user.email,
            user.age
        )
        .fetch_one(&self.pool)
        .await?;

        Ok(user)
    }

    // ユーザー取得(ID指定)
    pub async fn get_user_by_id(&self, id: i32) -> Result<Option<User>, sqlx::Error> {
        let user = sqlx::query_as!(
            User,
            "SELECT id, name, email, age, created_at FROM users WHERE id = $1",
            id
        )
        .fetch_optional(&self.pool)
        .await?;

        Ok(user)
    }

    // 全ユーザー取得
    pub async fn get_all_users(&self) -> Result<Vec<User>, sqlx::Error> {
        let users = sqlx::query_as!(
            User,
            "SELECT id, name, email, age, created_at FROM users ORDER BY created_at DESC"
        )
        .fetch_all(&self.pool)
        .await?;

        Ok(users)
    }

    // ユーザー更新
    pub async fn update_user(
        &self,
        id: i32,
        name: Option<String>,
        email: Option<String>,
        age: Option<i32>,
    ) -> Result<Option<User>, sqlx::Error> {
        let user = sqlx::query_as!(
            User,
            r#"
            UPDATE users 
            SET name = COALESCE($2, name),
                email = COALESCE($3, email),
                age = COALESCE($4, age)
            WHERE id = $1
            RETURNING id, name, email, age, created_at
            "#,
            id,
            name,
            email,
            age
        )
        .fetch_optional(&self.pool)
        .await?;

        Ok(user)
    }

    // ユーザー削除
    pub async fn delete_user(&self, id: i32) -> Result<bool, sqlx::Error> {
        let result = sqlx::query!("DELETE FROM users WHERE id = $1", id)
            .execute(&self.pool)
            .await?;

        Ok(result.rows_affected() > 0)
    }
}

// 使用例
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect(&std::env::var("DATABASE_URL")?)
        .await?;

    let repo = UserRepository::new(pool);

    // ユーザー作成
    let new_user = CreateUser {
        name: "田中太郎".to_string(),
        email: "[email protected]".to_string(),
        age: Some(30),
    };

    let user = repo.create_user(new_user).await?;
    println!("Created user: {:?}", user);

    // ユーザー取得
    if let Some(found_user) = repo.get_user_by_id(user.id).await? {
        println!("Found user: {:?}", found_user);
    }

    // 全ユーザー取得
    let all_users = repo.get_all_users().await?;
    println!("All users: {:?}", all_users);

    Ok(())
}

高度なクエリとトランザクション

use sqlx::{PgPool, Postgres, Transaction};

struct AdvancedQueries {
    pool: PgPool,
}

impl AdvancedQueries {
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }

    // 複雑な条件検索
    pub async fn search_users(
        &self,
        name_pattern: Option<&str>,
        min_age: Option<i32>,
        max_age: Option<i32>,
        limit: Option<i64>,
        offset: Option<i64>,
    ) -> Result<Vec<User>, sqlx::Error> {
        let mut query = String::from(
            "SELECT id, name, email, age, created_at FROM users WHERE 1=1"
        );
        let mut bind_count = 0;

        if name_pattern.is_some() {
            bind_count += 1;
            query.push_str(&format!(" AND name ILIKE ${}", bind_count));
        }

        if min_age.is_some() {
            bind_count += 1;
            query.push_str(&format!(" AND age >= ${}", bind_count));
        }

        if max_age.is_some() {
            bind_count += 1;
            query.push_str(&format!(" AND age <= ${}", bind_count));
        }

        query.push_str(" ORDER BY created_at DESC");

        if limit.is_some() {
            bind_count += 1;
            query.push_str(&format!(" LIMIT ${}", bind_count));
        }

        if offset.is_some() {
            bind_count += 1;
            query.push_str(&format!(" OFFSET ${}", bind_count));
        }

        let mut query_builder = sqlx::query_as::<_, User>(&query);

        if let Some(pattern) = name_pattern {
            query_builder = query_builder.bind(format!("%{}%", pattern));
        }
        if let Some(min) = min_age {
            query_builder = query_builder.bind(min);
        }
        if let Some(max) = max_age {
            query_builder = query_builder.bind(max);
        }
        if let Some(lim) = limit {
            query_builder = query_builder.bind(lim);
        }
        if let Some(off) = offset {
            query_builder = query_builder.bind(off);
        }

        let users = query_builder.fetch_all(&self.pool).await?;
        Ok(users)
    }

    // PostgreSQL配列を使用したバルク操作
    pub async fn bulk_insert_users(
        &self,
        names: Vec<String>,
        emails: Vec<String>,
    ) -> Result<Vec<User>, sqlx::Error> {
        let users = sqlx::query_as!(
            User,
            r#"
            INSERT INTO users (name, email)
            SELECT * FROM UNNEST($1::text[], $2::text[])
            RETURNING id, name, email, age, created_at
            "#,
            &names[..],
            &emails[..]
        )
        .fetch_all(&self.pool)
        .await?;

        Ok(users)
    }

    // 集約クエリ
    pub async fn get_user_statistics(&self) -> Result<UserStats, sqlx::Error> {
        let stats = sqlx::query_as!(
            UserStats,
            r#"
            SELECT 
                COUNT(*) as total_users,
                AVG(age) as average_age,
                MIN(age) as min_age,
                MAX(age) as max_age,
                COUNT(*) FILTER (WHERE age >= 18) as adult_users
            FROM users
            "#
        )
        .fetch_one(&self.pool)
        .await?;

        Ok(stats)
    }

    // トランザクション処理
    pub async fn transfer_user_data(
        &self,
        from_user_id: i32,
        to_user_id: i32,
        amount: i32,
    ) -> Result<(), sqlx::Error> {
        let mut tx = self.pool.begin().await?;

        // ソースユーザーから削除
        sqlx::query!(
            "UPDATE user_balances SET balance = balance - $1 WHERE user_id = $2",
            amount,
            from_user_id
        )
        .execute(&mut *tx)
        .await?;

        // デスティネーションユーザーに追加
        sqlx::query!(
            "UPDATE user_balances SET balance = balance + $1 WHERE user_id = $2",
            amount,
            to_user_id
        )
        .execute(&mut *tx)
        .await?;

        // 残高チェック
        let balance: (i32,) = sqlx::query_as(
            "SELECT balance FROM user_balances WHERE user_id = $1"
        )
        .bind(from_user_id)
        .fetch_one(&mut *tx)
        .await?;

        if balance.0 < 0 {
            return Err(sqlx::Error::RowNotFound);
        }

        tx.commit().await?;
        Ok(())
    }
}

#[derive(Debug, sqlx::FromRow)]
struct UserStats {
    total_users: Option<i64>,
    average_age: Option<f64>,
    min_age: Option<i32>,
    max_age: Option<i32>,
    adult_users: Option<i64>,
}

マイグレーションとスキーマ管理

// migration.rs
use sqlx::migrate::MigrateDatabase;
use sqlx::PgPool;

pub struct MigrationManager {
    database_url: String,
}

impl MigrationManager {
    pub fn new(database_url: String) -> Self {
        Self { database_url }
    }

    // データベース作成
    pub async fn create_database(&self) -> Result<(), sqlx::Error> {
        if !sqlx::Postgres::database_exists(&self.database_url).await? {
            sqlx::Postgres::create_database(&self.database_url).await?;
            println!("Database created successfully");
        } else {
            println!("Database already exists");
        }
        Ok(())
    }

    // データベース削除
    pub async fn drop_database(&self) -> Result<(), sqlx::Error> {
        if sqlx::Postgres::database_exists(&self.database_url).await? {
            sqlx::Postgres::drop_database(&self.database_url).await?;
            println!("Database dropped successfully");
        }
        Ok(())
    }

    // マイグレーション実行
    pub async fn run_migrations(&self, pool: &PgPool) -> Result<(), sqlx::Error> {
        sqlx::migrate!("./migrations").run(pool).await?;
        println!("Migrations completed successfully");
        Ok(())
    }
}

// マイグレーションファイル例
// migrations/001_create_users_table.sql
/*
-- Up migration
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    age INTEGER,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_age ON users(age);

-- Down migration (in separate .down.sql file)
DROP TABLE IF EXISTS users;
*/

接続プーリングと設定

use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions, PgSslMode};
use std::time::Duration;

pub struct DatabaseConfig {
    pub host: String,
    pub port: u16,
    pub username: String,
    pub password: String,
    pub database: String,
    pub max_connections: u32,
    pub min_connections: u32,
    pub connect_timeout: Duration,
    pub idle_timeout: Duration,
    pub ssl_mode: PgSslMode,
}

impl DatabaseConfig {
    pub async fn create_pool(&self) -> Result<PgPool, sqlx::Error> {
        let connect_options = PgConnectOptions::new()
            .host(&self.host)
            .port(self.port)
            .username(&self.username)
            .password(&self.password)
            .database(&self.database)
            .ssl_mode(self.ssl_mode)
            .log_statements(log::LevelFilter::Debug);

        let pool = PgPoolOptions::new()
            .max_connections(self.max_connections)
            .min_connections(self.min_connections)
            .acquire_timeout(self.connect_timeout)
            .idle_timeout(self.idle_timeout)
            .after_connect(|conn, _| Box::pin(async move {
                // 接続後の設定
                sqlx::query("SET application_name = 'my_app'")
                    .execute(conn)
                    .await?;
                Ok(())
            }))
            .connect_with(connect_options)
            .await?;

        Ok(pool)
    }
}

// 使用例
impl Default for DatabaseConfig {
    fn default() -> Self {
        Self {
            host: "localhost".to_string(),
            port: 5432,
            username: "postgres".to_string(),
            password: "password".to_string(),
            database: "my_app".to_string(),
            max_connections: 10,
            min_connections: 2,
            connect_timeout: Duration::from_secs(30),
            idle_timeout: Duration::from_secs(600),
            ssl_mode: PgSslMode::Prefer,
        }
    }
}

エラーハンドリングとロギング

use sqlx::{Error as SqlxError, PgPool};
use thiserror::Error;

#[derive(Error, Debug)]
pub enum DatabaseError {
    #[error("Database connection error: {0}")]
    Connection(#[from] SqlxError),
    
    #[error("User not found with id: {id}")]
    UserNotFound { id: i32 },
    
    #[error("Email already exists: {email}")]
    EmailAlreadyExists { email: String },
    
    #[error("Validation error: {message}")]
    Validation { message: String },
}

pub struct DatabaseService {
    pool: PgPool,
}

impl DatabaseService {
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }

    pub async fn create_user_safe(
        &self,
        name: String,
        email: String,
        age: Option<i32>,
    ) -> Result<User, DatabaseError> {
        // バリデーション
        if name.is_empty() {
            return Err(DatabaseError::Validation {
                message: "Name cannot be empty".to_string(),
            });
        }

        if !email.contains('@') {
            return Err(DatabaseError::Validation {
                message: "Invalid email format".to_string(),
            });
        }

        // 重複チェック
        let existing_user = sqlx::query!(
            "SELECT id FROM users WHERE email = $1",
            email
        )
        .fetch_optional(&self.pool)
        .await?;

        if existing_user.is_some() {
            return Err(DatabaseError::EmailAlreadyExists { email });
        }

        // ユーザー作成
        let user = sqlx::query_as!(
            User,
            r#"
            INSERT INTO users (name, email, age)
            VALUES ($1, $2, $3)
            RETURNING id, name, email, age, created_at
            "#,
            name,
            email,
            age
        )
        .fetch_one(&self.pool)
        .await?;

        tracing::info!("Created user: {} ({})", user.name, user.id);
        Ok(user)
    }

    pub async fn get_user_safe(&self, id: i32) -> Result<User, DatabaseError> {
        let user = sqlx::query_as!(
            User,
            "SELECT id, name, email, age, created_at FROM users WHERE id = $1",
            id
        )
        .fetch_optional(&self.pool)
        .await?
        .ok_or(DatabaseError::UserNotFound { id })?;

        Ok(user)
    }
}

// ロギング設定例
pub fn setup_logging() {
    use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

    tracing_subscriber::registry()
        .with(tracing_subscriber::EnvFilter::new(
            std::env::var("RUST_LOG").unwrap_or_else(|_| "info,sqlx=debug".into()),
        ))
        .with(tracing_subscriber::fmt::layer())
        .init();
}