Peewee
Peeweeは「軽量で表現力豊かなORM」として設計されたPython向けのシンプルなORM(Object-Relational Mapping)ライブラリです。小〜中規模プロジェクトに最適化されており、シンプルな構文と低い学習コストが特徴です。SQLite、MySQL、PostgreSQLをサポートし、Djangoライクな直感的なAPIを提供しながらも、SQLAlchemyよりも軽量でプロトタイピングに適している点が評価されています。
GitHub概要
coleifer/peewee
a small, expressive orm -- supports postgresql, mysql, sqlite and cockroachdb
スター11,621
ウォッチ197
フォーク1,378
作成日:2010年10月11日
言語:Python
ライセンス:MIT License
トピックス
dankgametightpeeweepythonsqlite
スター履歴
データ取得日時: 2025/7/19 02:41
ライブラリ
Peewee
概要
Peeweeは「軽量で表現力豊かなORM」として設計されたPython向けのシンプルなORM(Object-Relational Mapping)ライブラリです。小〜中規模プロジェクトに最適化されており、シンプルな構文と低い学習コストが特徴です。SQLite、MySQL、PostgreSQLをサポートし、Djangoライクな直感的なAPIを提供しながらも、SQLAlchemyよりも軽量でプロトタイピングに適している点が評価されています。
詳細
Peewee 2025年版は、小規模プロジェクトやスタートアップでの安定した人気を維持しています。Active Recordパターンを採用し、Pythonオブジェクトとデータベーステーブルの関係を直感的に表現できます。Django ORMからインスピレーションを受けた設計により、初心者でも理解しやすいAPIを提供。データベースマイグレーション、バックアップ、レプリケーション機能も含み、本格的なWebアプリケーション開発にも対応可能です。
主な特徴
- 軽量設計: 最小限の依存関係でシンプルな構成
- 直感的API: Django ORMライクな分かりやすい構文
- 柔軟なクエリ: Expression-based queriesで複雑な検索が可能
- マルチデータベース: SQLite、MySQL、PostgreSQL対応
- 組み込み機能: マイグレーション、プーリング、レプリケーション
- 拡張性: カスタムフィールドタイプとプラグインサポート
メリット・デメリット
メリット
- 学習コストが低く、Python初心者でも短時間で習得可能
- 軽量でオーバーヘッドが少なく、高速な動作を実現
- シンプルな構文により可読性の高いコードが書ける
- 小〜中規模プロジェクトでの開発効率が高い
- SQLite組み込みサポートでプロトタイピングが簡単
- 豊富なドキュメントとコミュニティサポート
デメリット
- 大規模・複雑なアプリケーションには機能不足の場合がある
- SQLAlchemyと比較して高度な機能や柔軟性に制限
- 企業レベルの複雑なリレーション処理に不向き
- 非同期処理のサポートが限定的
- プラグインエコシステムがSQLAlchemyより小規模
- パフォーマンスチューニングのオプションが少ない
参考ページ
書き方の例
インストールとセットアップ
# Peeweeのインストール
pip install peewee
# データベース固有のアダプタ
pip install peewee[mysql] # MySQL用
pip install peewee[postgresql] # PostgreSQL用
# SQLiteは標準で含まれる
基本的なモデル定義
from peewee import *
from datetime import datetime
# データベース接続
db = SqliteDatabase('my_app.db')
# db = MySQLDatabase('my_db', user='root', password='secret', host='localhost', port=3306)
# db = PostgreSQLDatabase('my_db', user='postgres', password='secret', host='localhost', port=5432)
class BaseModel(Model):
class Meta:
database = db
class User(BaseModel):
id = AutoField(primary_key=True)
username = CharField(unique=True, max_length=50)
email = CharField(unique=True, max_length=100)
password_hash = CharField(max_length=255)
is_active = BooleanField(default=True)
created_at = DateTimeField(default=datetime.now)
updated_at = DateTimeField(default=datetime.now)
class Meta:
table_name = 'users'
class Post(BaseModel):
id = AutoField(primary_key=True)
title = CharField(max_length=200)
content = TextField()
author = ForeignKeyField(User, backref='posts')
published = BooleanField(default=False)
created_at = DateTimeField(default=datetime.now)
tags = CharField(null=True)
class Meta:
table_name = 'posts'
class Comment(BaseModel):
id = AutoField(primary_key=True)
post = ForeignKeyField(Post, backref='comments')
author = ForeignKeyField(User, backref='comments')
content = TextField()
created_at = DateTimeField(default=datetime.now)
class Meta:
table_name = 'comments'
# データベースとテーブルの作成
db.connect()
db.create_tables([User, Post, Comment])
基本的なCRUD操作
# Create - データ作成
user = User.create(
username='john_doe',
email='[email protected]',
password_hash='hashed_password_here'
)
# または
user = User(username='jane_doe', email='[email protected]', password_hash='hash')
user.save()
# Read - データ読み込み
# 全ユーザー取得
users = User.select()
for user in users:
print(f'{user.username}: {user.email}')
# 単一ユーザー取得
user = User.get(User.username == 'john_doe')
# または
user = User.get_by_id(1)
# 条件付き検索
active_users = User.select().where(User.is_active == True)
recent_posts = Post.select().where(Post.created_at > datetime(2024, 1, 1))
# Update - データ更新
# 単一レコード更新
user = User.get_by_id(1)
user.email = '[email protected]'
user.updated_at = datetime.now()
user.save()
# 一括更新
query = User.update(is_active=False).where(User.created_at < datetime(2023, 1, 1))
query.execute()
# Delete - データ削除
# 単一レコード削除
user = User.get_by_id(1)
user.delete_instance()
# 一括削除
query = Post.delete().where(Post.published == False)
query.execute()
高度なクエリ操作
# JOIN操作
posts_with_authors = (Post
.select(Post, User)
.join(User)
.where(User.is_active == True))
for post in posts_with_authors:
print(f'{post.title} by {post.author.username}')
# 集計クエリ
from peewee import fn
# ユーザーごとの投稿数
user_post_counts = (User
.select(User.username, fn.COUNT(Post.id).alias('post_count'))
.join(Post, JOIN.LEFT_OUTER)
.group_by(User.username))
for user in user_post_counts:
print(f'{user.username}: {user.post_count} posts')
# サブクエリ
active_authors = User.select().where(User.is_active == True)
posts_by_active_authors = Post.select().where(Post.author.in_(active_authors))
# 複雑な条件
from peewee import fn
recent_popular_posts = (Post
.select()
.join(Comment)
.group_by(Post.id)
.having(fn.COUNT(Comment.id) > 5)
.where(Post.created_at > datetime(2024, 1, 1)))
# ソートとページネーション
paginated_posts = (Post
.select()
.order_by(Post.created_at.desc())
.paginate(page=1, paginate_by=10))
リレーションシップの操作
# Forward reference(1対多)
user = User.get_by_id(1)
user_posts = user.posts # User.postsでアクセス
# Backward reference(多対1)
post = Post.get_by_id(1)
post_author = post.author # Post.authorでアクセス
# リレーション先のデータを含めて取得(Prefetch)
posts_with_authors = (Post
.select()
.join(User)
.where(Post.published == True))
# Many-to-Many関係の実装
class Tag(BaseModel):
name = CharField(unique=True, max_length=50)
class PostTag(BaseModel):
post = ForeignKeyField(Post)
tag = ForeignKeyField(Tag)
class Meta:
primary_key = CompositeKey('post', 'tag')
# Many-to-Manyの操作
def add_tag_to_post(post, tag_name):
tag, created = Tag.get_or_create(name=tag_name)
PostTag.get_or_create(post=post, tag=tag)
def get_post_tags(post):
return (Tag
.select()
.join(PostTag)
.where(PostTag.post == post))
トランザクション管理
# 基本的なトランザクション
def create_post_with_comment(user, title, content, comment_text):
with db.atomic():
post = Post.create(
title=title,
content=content,
author=user,
published=True
)
comment = Comment.create(
post=post,
author=user,
content=comment_text
)
return post, comment
# ネストしたトランザクション
def complex_operation():
with db.atomic() as txn:
try:
user = User.create(username='test_user', email='[email protected]')
# ネストしたトランザクション
with db.atomic() as nested_txn:
post = Post.create(title='Test', content='Content', author=user)
if some_condition:
nested_txn.rollback() # 内側のトランザクションのみロールバック
except Exception as e:
txn.rollback() # 全体をロールバック
raise e
# デコレータを使用したトランザクション
@db.atomic()
def atomic_function():
# この関数全体がトランザクション内で実行される
User.create(username='atomic_user', email='[email protected]')
Post.create(title='Atomic Post', content='Content', author=user)
データベースマイグレーション
# マイグレーション用のスクリプト
from playhouse.migrate import *
# SQLiteの場合
migrator = SqliteMigrator(db)
# MySQLの場合
# migrator = MySQLMigrator(db)
# PostgreSQLの場合
# migrator = PostgresqlMigrator(db)
# カラム追加
migrate(
migrator.add_column('users', 'bio', TextField(null=True)),
migrator.add_column('posts', 'view_count', IntegerField(default=0)),
)
# カラム名変更
migrate(
migrator.rename_column('users', 'password_hash', 'password'),
)
# インデックス追加
migrate(
migrator.add_index('posts', ('author', 'created_at'), False),
migrator.add_index('users', ('email',), True), # ユニークインデックス
)
# カラム削除
migrate(
migrator.drop_column('posts', 'old_column'),
)
# 外部キー追加
migrate(
migrator.add_column('posts', 'category_id',
ForeignKeyField(Category, null=True)),
)
バックアップとレストア
import json
from playhouse.dataset import DataSet
# データベース全体をJSONにエクスポート
def backup_database():
ds = DataSet(db)
all_data = {}
for table_name in db.get_tables():
table = ds[table_name]
all_data[table_name] = list(table.all())
with open('backup.json', 'w') as f:
json.dump(all_data, f, indent=2, default=str)
# JSONからデータベースにインポート
def restore_database():
with open('backup.json', 'r') as f:
all_data = json.load(f)
ds = DataSet(db)
for table_name, records in all_data.items():
table = ds[table_name]
for record in records:
table.insert(**record)
# CSV出力
def export_to_csv():
import csv
users = User.select()
with open('users_export.csv', 'w', newline='') as csvfile:
fieldnames = ['id', 'username', 'email', 'is_active', 'created_at']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for user in users:
writer.writerow({
'id': user.id,
'username': user.username,
'email': user.email,
'is_active': user.is_active,
'created_at': user.created_at
})
性能最適化とデバッグ
# クエリログ有効化
import logging
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
# 接続プール使用(PostgreSQL/MySQL)
from playhouse.pool import PooledPostgresqlDatabase
db = PooledPostgresqlDatabase(
'my_db',
max_connections=20,
stale_timeout=300,
user='postgres',
password='secret'
)
# N+1問題の解決
# Bad: N+1クエリが発生
posts = Post.select()
for post in posts:
print(post.author.username) # 各投稿で個別にauthorをクエリ
# Good: prefetch使用
posts_with_authors = Post.select().join(User)
for post in posts_with_authors:
print(post.author.username) # 1回のJOINで解決
# インデックス最適化
class User(BaseModel):
username = CharField(unique=True, index=True) # インデックス追加
email = CharField(unique=True)
created_at = DateTimeField(index=True) # よく検索される項目にインデックス
class Meta:
database = db
indexes = (
(('username', 'email'), True), # 複合ユニークインデックス
(('created_at', 'is_active'), False), # 複合インデックス
)
# 生SQLの実行
def execute_raw_sql():
cursor = db.execute_sql('SELECT COUNT(*) FROM users WHERE is_active = ?', (True,))
result = cursor.fetchone()
return result[0]
# バッチ処理
def bulk_create_users(users_data):
with db.atomic():
User.insert_many(users_data).execute()
# 大量データの効率的な処理
def process_large_dataset():
for user_batch in User.select().paginate(page=1, paginate_by=1000):
# 1000件ずつ処理
process_users(user_batch)