Pony ORM

Pony ORMは「Pythonic構文でSQLクエリを記述できるユニークなORM」として設計された、革新的なPython向けORMライブラリです。最大の特徴は、Pythonのジェネレータ式やラムダ関数を使って直感的にデータベースクエリを記述できることです。従来のORMとは一線を画するアプローチで、SQLを知らなくてもPythonの知識だけで複雑なクエリを書くことができ、教育目的や小〜中規模プロジェクトで高い評価を得ています。

ORMPythonPythonicジェネレータ式直感的クエリ

GitHub概要

ponyorm/pony

Pony Object Relational Mapper

スター3,762
ウォッチ87
フォーク244
作成日:2013年2月8日
言語:Python
ライセンス:Apache License 2.0

トピックス

cockroachcockroachdbmysqloracleormpostgresqlpythonpython3sqlite

スター履歴

ponyorm/pony Star History
データ取得日時: 2025/8/13 01:43

ライブラリ

Pony ORM

概要

Pony ORMは「Pythonic構文でSQLクエリを記述できるユニークなORM」として設計された、革新的なPython向けORMライブラリです。最大の特徴は、Pythonのジェネレータ式やラムダ関数を使って直感的にデータベースクエリを記述できることです。従来のORMとは一線を画するアプローチで、SQLを知らなくてもPythonの知識だけで複雑なクエリを書くことができ、教育目的や小〜中規模プロジェクトで高い評価を得ています。

詳細

Pony ORM 2025年版は、特殊な構文により一部の開発者から熱狂的な支持を受けています。Pythonのジェネレータ式を活用することで、SQLクエリを自然言語に近い形で記述可能。Data Mapperパターンを採用し、Pythonオブジェクトとデータベーステーブルの関係を透明化します。デバッグモードでは生成されたSQLを確認でき、学習効果も高い設計です。教育目的での使用が増加しており、Pythonの理解を深めながらデータベース操作を学べる点が評価されています。

主な特徴

  • Pythonic クエリ: ジェネレータ式による自然なクエリ記述
  • 直感的構文: SQL知識不要でPythonだけでクエリ作成
  • 自動最適化: クエリの自動最適化とSQL生成
  • デバッグ支援: 生成SQLの確認とクエリ分析機能
  • 型安全: Pythonの型システムとの自然な統合
  • 軽量設計: 最小限の設定で高機能を実現

メリット・デメリット

メリット

  • Python初心者でもSQL知識なしでデータベース操作が可能
  • ジェネレータ式により極めて読みやすいクエリコードを記述
  • 教育目的に最適で、Pythonとデータベースの理解が深まる
  • デバッグ機能が充実しており、生成SQLを確認できる
  • 小〜中規模プロジェクトで高い開発効率を実現
  • ユニークなアプローチで開発者の学習意欲を刺激

デメリット

  • 特殊な構文のため、チーム開発で学習コストが発生
  • 大規模・複雑なアプリケーションには向かない場合がある
  • 他のORMとの併用や移行が困難
  • エンタープライズレベルでの採用事例が限定的
  • パフォーマンスチューニングのオプションが制約的
  • コミュニティとプラグインエコシステムが小規模

参考ページ

書き方の例

インストールとセットアップ

# Pony ORMのインストール
pip install pony

# データベース固有のアダプタ
pip install pony[mysql]      # MySQL用
pip install pony[postgresql] # PostgreSQL用

# SQLiteは標準で含まれる

基本的なモデル定義とデータベース設定

from pony.orm import *
from datetime import datetime
import os

# データベース接続
db = Database()

# エンティティ(モデル)定義
class Customer(db.Entity):
    id = PrimaryKey(int, auto=True)
    name = Required(str)
    email = Required(str, unique=True)
    country = Optional(str)
    city = Optional(str)
    orders = Set('Order')

class Product(db.Entity):
    id = PrimaryKey(int, auto=True)
    name = Required(str)
    price = Required(float)
    description = Optional(str)
    category = Required('Category')
    order_items = Set('OrderItem')

class Category(db.Entity):
    id = PrimaryKey(int, auto=True)
    name = Required(str, unique=True)
    products = Set(Product)

class Order(db.Entity):
    id = PrimaryKey(int, auto=True)
    date = Required(datetime, default=datetime.now)
    customer = Required(Customer)
    total_price = Required(float)
    items = Set('OrderItem')

class OrderItem(db.Entity):
    order = Required(Order)
    product = Required(Product)
    quantity = Required(int)
    price = Required(float)
    PrimaryKey(order, product)

# データベースバインドと設定
# SQLite(開発・テスト用)
db.bind('sqlite', 'example.db')

# PostgreSQL(本番用)
# db.bind('postgres', user='username', password='password', 
#         host='localhost', database='mydb')

# MySQL(本番用)
# db.bind('mysql', host='localhost', user='username', password='password', 
#         db='mydb')

# デバッグモード有効化(SQL出力)
set_sql_debug(True)

# スキーマ生成
db.generate_mapping(create_tables=True)

# サンプルデータの投入
@db_session
def populate_sample_data():
    if Category.select().count() == 0:
        # カテゴリ作成
        electronics = Category(name="Electronics")
        books = Category(name="Books") 
        clothes = Category(name="Clothes")
        
        # 商品作成
        Product(name="Laptop", price=999.99, category=electronics)
        Product(name="Mouse", price=25.50, category=electronics)
        Product(name="Python Book", price=45.00, category=books)
        Product(name="T-Shirt", price=19.99, category=clothes)
        
        # 顧客作成
        Customer(name="John Smith", email="[email protected]", country="USA", city="New York")
        Customer(name="Alice Johnson", email="[email protected]", country="Canada", city="Toronto")
        
        commit()

populate_sample_data()

Pythonic クエリの基本

@db_session
def basic_queries():
    # 基本的なselect(ジェネレータ式)
    customers = select(c for c in Customer)
    print("All customers:", list(customers))
    
    # 条件付きselect
    usa_customers = select(c for c in Customer if c.country == "USA")
    print("USA customers:", list(usa_customers))
    
    # 複合条件
    expensive_electronics = select(p for p in Product 
                                  if p.category.name == "Electronics" and p.price > 100)
    print("Expensive electronics:", list(expensive_electronics))
    
    # ラムダ式を使用(エンティティメソッド)
    customers_lambda = Customer.select(lambda c: c.country == "USA")
    print("USA customers (lambda):", list(customers_lambda))
    
    # 単一レコード取得
    customer = Customer.get(name="John Smith")
    print(f"Customer: {customer.name} from {customer.city}")
    
    # 存在チェック
    exists = Customer.exists(name="Alice Johnson")
    print(f"Alice exists: {exists}")
    
    # カウント
    total_customers = Customer.select().count()
    print(f"Total customers: {total_customers}")

高度なクエリとジョイン

@db_session
def advanced_queries():
    # JOIN操作(自動)
    products_with_category = select(p for p in Product)
    for p in products_with_category:
        print(f"{p.name} - Category: {p.category.name}")
    
    # 複雑な条件とJOIN
    customers_with_orders = select(c for c in Customer 
                                  if count(c.orders) > 0)
    print("Customers with orders:", list(customers_with_orders))
    
    # ネストしたクエリ
    popular_products = select(p for p in Product 
                             if count(p.order_items) > 0)
    print("Popular products:", list(popular_products))
    
    # 集計関数
    total_sales = sum(o.total_price for o in Order)
    print(f"Total sales: ${total_sales}")
    
    # グループ集計
    sales_by_country = select((c.country, sum(o.total_price)) 
                             for c in Customer 
                             for o in c.orders 
                             if c.country is not None)
    print("Sales by country:", list(sales_by_country))
    
    # 最大・最小値
    max_price = max(p.price for p in Product)
    min_price = min(p.price for p in Product)
    print(f"Price range: ${min_price} - ${max_price}")
    
    # 平均値
    avg_order_value = avg(o.total_price for o in Order)
    print(f"Average order value: ${avg_order_value}")

複雑なクエリとサブクエリ

@db_session
def complex_queries():
    # サブクエリ
    expensive_product_customers = select(c for c in Customer 
                                        for o in c.orders 
                                        for item in o.items 
                                        if item.product.price > 500)
    print("Customers who bought expensive products:", 
          list(expensive_product_customers))
    
    # EXISTS相当の処理
    customers_without_orders = select(c for c in Customer 
                                     if not c.orders)
    print("Customers without orders:", list(customers_without_orders))
    
    # IN演算子相当
    electronic_categories = select(c for c in Category 
                                  if c.name in ["Electronics", "Computers"])
    print("Electronic categories:", list(electronic_categories))
    
    # 文字列操作
    customers_with_gmail = select(c for c in Customer 
                                 if "gmail" in c.email)
    print("Gmail customers:", list(customers_with_gmail))
    
    # 日付範囲
    from datetime import datetime, timedelta
    recent_orders = select(o for o in Order 
                          if o.date > datetime.now() - timedelta(days=30))
    print("Recent orders:", list(recent_orders))
    
    # 複雑な集計
    customer_order_stats = select((c.name, 
                                  count(c.orders), 
                                  sum(c.orders.total_price),
                                  avg(c.orders.total_price))
                                 for c in Customer 
                                 if count(c.orders) > 0)
    print("Customer order statistics:")
    for name, order_count, total, average in customer_order_stats:
        print(f"  {name}: {order_count} orders, ${total:.2f} total, ${average:.2f} avg")

データの挿入・更新・削除

@db_session
def crud_operations():
    # 挿入
    new_customer = Customer(
        name="Bob Wilson",
        email="[email protected]",
        country="UK",
        city="London"
    )
    
    # 関連オブジェクトも同時作成
    electronics = Category.get(name="Electronics")
    new_product = Product(
        name="Smartphone",
        price=699.99,
        description="Latest smartphone",
        category=electronics
    )
    
    # コミット(@db_sessionで自動的にコミット)
    print(f"Created customer: {new_customer.name}")
    print(f"Created product: {new_product.name}")
    
    # 更新
    customer_to_update = Customer.get(name="Bob Wilson")
    customer_to_update.city = "Manchester"
    print(f"Updated customer city: {customer_to_update.city}")
    
    # 一括更新
    update(c.country for c in Customer if c.country == "USA").set("United States")
    
    # 削除
    customer_to_delete = Customer.get(name="Bob Wilson")
    customer_to_delete.delete()
    print("Customer deleted")
    
    # 一括削除
    delete(p for p in Product if p.price < 20)
    print("Deleted low-price products")

トランザクション管理

# デコレータ使用
@db_session
def simple_transaction():
    # この関数全体が1つのトランザクション
    customer = Customer(name="Transaction Test", email="[email protected]")
    product = Product(name="Test Product", price=100, category=Category.get(name="Electronics"))
    # 自動コミット

# 明示的トランザクション制御
def manual_transaction():
    with db_session:
        try:
            customer = Customer(name="Manual Transaction", email="[email protected]")
            # 何らかのビジネスロジック
            if some_condition_fails():
                rollback()  # 明示的ロールバック
            else:
                commit()   # 明示的コミット
        except Exception as e:
            rollback()
            print(f"Transaction failed: {e}")

# ネストしたトランザクション
@db_session
def nested_transactions():
    customer = Customer(name="Outer Transaction", email="[email protected]")
    
    try:
        with db_session:
            # 内側のトランザクション
            product = Product(name="Inner Product", price=50, 
                            category=Category.get(name="Electronics"))
            if some_validation_fails():
                raise ValueError("Validation failed")
    except ValueError:
        print("Inner transaction failed, but outer continues")
    
    # 外側のトランザクションは続行
    commit()

生SQLの実行とraw_sql関数

@db_session
def raw_sql_examples():
    # 生SQLでselect
    results = db.select("SELECT name, email FROM Customer WHERE country = $country", 
                       {"country": "USA"})
    print("Raw SQL results:", results)
    
    # 変数を使用したraw SQL
    country = "Canada"
    canadian_customers = db.select("SELECT * FROM Customer WHERE country = $country", 
                                  {"country": country})
    print("Canadian customers:", canadian_customers)
    
    # raw_sql関数を使用したクエリ
    from pony.orm import raw_sql
    
    # 日付関数を使用
    recent_orders_raw = select(o for o in Order 
                              if raw_sql("DATE(o.date)") == raw_sql("DATE('now')"))
    print("Today's orders:", list(recent_orders_raw))
    
    # データベース固有の関数
    # PostgreSQLの例
    # substring_customers = select(c for c in Customer 
    #                             if raw_sql("SUBSTRING(c.name, 1, 1)") == "J")
    
    # SQLiteの例  
    first_letter_j = select(c for c in Customer 
                           if raw_sql("SUBSTR(c.name, 1, 1)") == "J")
    print("Customers starting with J:", list(first_letter_j))

パフォーマンス最適化

@db_session
def performance_optimization():
    # プリフェッチ(N+1問題の解決)
    # Bad: N+1クエリ
    products = Product.select()
    for p in products:
        print(f"{p.name} - {p.category.name}")  # 各productでcategoryをクエリ
    
    # Good: JOINを使用
    products_with_category = select(p for p in Product)
    for p in products_with_category:
        print(f"{p.name} - {p.category.name}")  # 1回のJOINクエリ
    
    # バッチ処理
    batch_size = 100
    total_products = Product.select().count()
    
    for offset in range(0, total_products, batch_size):
        batch = Product.select().limit(batch_size, offset=offset)
        process_product_batch(batch)
    
    # インデックス活用
    # インデックスが効く検索
    indexed_customer = Customer.get(email="[email protected]")  # emailにインデックス
    
    # インデックスヒント
    customers_by_country = select(c for c in Customer 
                                 if c.country == "USA").order_by(Customer.name)
    
    # 集計の最適化
    # 効率的な集計
    category_product_counts = select((c.name, count(c.products)) 
                                   for c in Category)
    print("Products per category:", list(category_product_counts))

def process_product_batch(products):
    for product in products:
        # バッチ処理ロジック
        pass

# クエリ結果のキャッシュ
@db_session
def caching_example():
    # 結果をリストに変換してキャッシュ
    expensive_products = list(select(p for p in Product if p.price > 100))
    
    # 後で再利用
    for product in expensive_products:
        print(product.name)
    
    # count()をキャッシュ
    total_count = Product.select().count()
    print(f"Total products: {total_count}")

デバッグとロギング

from pony.orm import sql_debug

# SQLデバッグの有効化/無効化
sql_debug(True)   # SQL出力開始
sql_debug(False)  # SQL出力停止

@db_session
def debug_queries():
    # クエリ実行前後でSQLを確認
    print("=== Debug Query Start ===")
    
    customers = select(c for c in Customer if c.country == "USA")
    print("Query object created")
    
    # この時点でSQLが実行される
    customer_list = list(customers)
    print(f"Found {len(customer_list)} customers")
    
    print("=== Debug Query End ===")

# カスタムログ設定
import logging

# PonyORMのログ設定
logging.basicConfig(level=logging.DEBUG)
pony_logger = logging.getLogger('pony.orm')
pony_logger.setLevel(logging.DEBUG)

# ハンドラ追加
handler = logging.FileHandler('pony_queries.log')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
pony_logger.addHandler(handler)

# クエリ実行時間の測定
@db_session
def measure_query_performance():
    import time
    
    start_time = time.time()
    
    # 複雑なクエリ
    result = select((c.name, count(c.orders), sum(c.orders.total_price))
                   for c in Customer 
                   if count(c.orders) > 0)
    
    query_result = list(result)
    
    end_time = time.time()
    execution_time = end_time - start_time
    
    print(f"Query executed in {execution_time:.4f} seconds")
    print(f"Results: {len(query_result)} rows")