MessagePack
JSONのような効率的なバイナリ シリアライゼーション形式のPython実装
MessagePack
MessagePack は、JSONのような効率的なバイナリ シリアライゼーション形式のPython実装です。JSONよりもコンパクトで高速でありながら、多くのプログラミング言語をサポートしています。
主な特徴
- 高速処理: JSONよりも高速なシリアライゼーション/デシリアライゼーション
- コンパクト: JSONと比較して20-50%小さいデータサイズ
- バイナリ対応: バイナリデータをネイティブにサポート
- 型保持: 元のデータ型を正確に保持
- クロスプラットフォーム: 50以上のプログラミング言語をサポート
- C拡張: CPythonでのパフォーマンス最適化
主なユースケース
- リアルタイム通信: WebSocketでの高速データ交換
- キャッシュシステム: Redisなどでの効率的なデータ保存
- マイクロサービス: サービス間の高速通信
- ログ記録: 効率的なバイナリログ
- データストレージ: 高速なデータ永続化
- IoT通信: 帯域幅制限のある環境
インストール
# 標準インストール
pip install msgpack
# 高速版(msgspec)のインストール
pip install msgspec
# 別の高速実装(ormsgpack)
pip install ormsgpack
基本的な使い方
1. 基本的なシリアライゼーション
import msgpack
# 基本的なデータ型
data = {
"name": "田中太郎",
"age": 30,
"city": "東京",
"is_active": True,
"scores": [85, 92, 78, 90],
"metadata": {
"created_at": "2024-01-01",
"version": 1.0
},
"tags": None,
"binary_data": b"binary content"
}
# シリアライズ
packed = msgpack.packb(data)
print(f"シリアライズ後のサイズ: {len(packed)} bytes")
# デシリアライズ
unpacked = msgpack.unpackb(packed, raw=False)
print("デシリアライズ結果:", unpacked)
# JSONとの比較
import json
json_str = json.dumps(data, default=str)
print(f"JSON文字列サイズ: {len(json_str.encode('utf-8'))} bytes")
print(f"MessagePack削減率: {(len(json_str.encode('utf-8')) - len(packed)) / len(json_str.encode('utf-8')) * 100:.1f}%")
2. ファイルへの読み書き
import msgpack
# 複雑なデータ構造
user_data = {
"users": [
{
"id": 1,
"name": "田中太郎",
"email": "[email protected]",
"profile": {
"age": 30,
"city": "東京",
"hobbies": ["読書", "映画鑑賞", "プログラミング"],
"avatar": b"binary_image_data_here"
},
"settings": {
"notifications": True,
"theme": "dark",
"language": "ja"
}
},
{
"id": 2,
"name": "佐藤花子",
"email": "[email protected]",
"profile": {
"age": 25,
"city": "大阪",
"hobbies": ["旅行", "料理", "写真"],
"avatar": b"another_binary_image_data"
},
"settings": {
"notifications": False,
"theme": "light",
"language": "ja"
}
}
],
"metadata": {
"total_users": 2,
"last_updated": "2024-01-01T12:00:00Z",
"version": 2
}
}
# ファイルに書き込み
with open('users.msgpack', 'wb') as f:
msgpack.dump(user_data, f)
print("データをusers.msgpackに保存しました")
# ファイルから読み込み
with open('users.msgpack', 'rb') as f:
loaded_data = msgpack.load(f, raw=False)
print(f"読み込まれたユーザー数: {loaded_data['metadata']['total_users']}")
for user in loaded_data['users']:
print(f"- {user['name']} ({user['profile']['city']})")
print(f" 趣味: {', '.join(user['profile']['hobbies'])}")
3. バイナリデータの処理
import msgpack
import numpy as np
from datetime import datetime
# バイナリデータを含む複雑なデータ
complex_data = {
"text": "Hello, 世界!",
"binary": b"\x00\x01\x02\x03\x04",
"numbers": [1, 2, 3, 4, 5],
"nested": {
"array": np.array([1.1, 2.2, 3.3]).tobytes(),
"timestamp": datetime.now().isoformat()
}
}
# MessagePackは自動的にバイナリデータを処理
packed = msgpack.packb(complex_data)
unpacked = msgpack.unpackb(packed, raw=False)
print("元のデータ:")
print(f" text: {complex_data['text']}")
print(f" binary: {complex_data['binary']}")
print(f" numbers: {complex_data['numbers']}")
print("\nデシリアライズされたデータ:")
print(f" text: {unpacked['text']}")
print(f" binary: {unpacked['binary']}")
print(f" numbers: {unpacked['numbers']}")
# NumPy配列の復元
restored_array = np.frombuffer(unpacked['nested']['array'], dtype=np.float64)
print(f" restored array: {restored_array}")
高度な使用例
1. カスタムエンコーダー・デコーダー
import msgpack
from datetime import datetime, date
from decimal import Decimal
import uuid
class CustomEncoder:
"""カスタムエンコーダー"""
def encode(self, obj):
if isinstance(obj, datetime):
return {'__datetime__': obj.isoformat()}
elif isinstance(obj, date):
return {'__date__': obj.isoformat()}
elif isinstance(obj, Decimal):
return {'__decimal__': str(obj)}
elif isinstance(obj, uuid.UUID):
return {'__uuid__': str(obj)}
elif isinstance(obj, set):
return {'__set__': list(obj)}
elif isinstance(obj, complex):
return {'__complex__': [obj.real, obj.imag]}
return obj
class CustomDecoder:
"""カスタムデコーダー"""
def decode(self, obj):
if isinstance(obj, dict):
if '__datetime__' in obj:
return datetime.fromisoformat(obj['__datetime__'])
elif '__date__' in obj:
return datetime.fromisoformat(obj['__date__']).date()
elif '__decimal__' in obj:
return Decimal(obj['__decimal__'])
elif '__uuid__' in obj:
return uuid.UUID(obj['__uuid__'])
elif '__set__' in obj:
return set(obj['__set__'])
elif '__complex__' in obj:
return complex(obj['__complex__'][0], obj['__complex__'][1])
return obj
# 使用例
encoder = CustomEncoder()
decoder = CustomDecoder()
complex_data = {
"id": uuid.uuid4(),
"created_at": datetime.now(),
"updated_at": date.today(),
"price": Decimal('1234.56'),
"tags": {"python", "msgpack", "serialization"},
"coordinates": complex(3, 4)
}
print("元のデータ:")
for key, value in complex_data.items():
print(f" {key}: {value} ({type(value).__name__})")
# カスタムエンコーダーでシリアライズ
packed = msgpack.packb(complex_data, default=encoder.encode)
print(f"\nシリアライズ後のサイズ: {len(packed)} bytes")
# カスタムデコーダーでデシリアライズ
unpacked = msgpack.unpackb(packed, object_hook=decoder.decode, raw=False)
print(f"\nデシリアライズされたデータ:")
for key, value in unpacked.items():
print(f" {key}: {value} ({type(value).__name__})")
2. ストリーミング処理
import msgpack
import io
from typing import Generator, Any
class MessagePackStreamer:
"""MessagePackストリーミング処理"""
def __init__(self):
self.unpacker = msgpack.Unpacker(raw=False)
def pack_stream(self, objects: list) -> bytes:
"""複数のオブジェクトを連続してパック"""
stream = io.BytesIO()
for obj in objects:
packed = msgpack.packb(obj)
stream.write(packed)
return stream.getvalue()
def unpack_stream(self, data: bytes) -> Generator[Any, None, None]:
"""ストリームから複数のオブジェクトをアンパック"""
self.unpacker.feed(data)
for unpacked in self.unpacker:
yield unpacked
def process_file_stream(self, filename: str, chunk_size: int = 1024) -> Generator[Any, None, None]:
"""ファイルをストリーミング処理"""
with open(filename, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
self.unpacker.feed(chunk)
for unpacked in self.unpacker:
yield unpacked
# 使用例
streamer = MessagePackStreamer()
# テストデータ
test_objects = [
{"id": 1, "name": "田中太郎", "age": 30},
{"id": 2, "name": "佐藤花子", "age": 25},
{"id": 3, "name": "山田次郎", "age": 28},
{"type": "metadata", "count": 3, "timestamp": "2024-01-01"}
]
# ストリーミングでパック
stream_data = streamer.pack_stream(test_objects)
print(f"ストリームデータサイズ: {len(stream_data)} bytes")
# ストリーミングでアンパック
print("\nストリーミング処理結果:")
for obj in streamer.unpack_stream(stream_data):
print(f" 処理されたオブジェクト: {obj}")
# ファイルストリーミング
with open('stream_test.msgpack', 'wb') as f:
f.write(stream_data)
print(f"\nファイルストリーミング処理:")
for obj in streamer.process_file_stream('stream_test.msgpack'):
print(f" ファイルから読み込み: {obj}")
3. 高速キャッシュシステム
import msgpack
import time
from typing import Any, Optional, Dict
import threading
class MessagePackCache:
"""MessagePackベースの高速キャッシュ"""
def __init__(self, max_size: int = 1000):
self.cache: Dict[str, Dict[str, Any]] = {}
self.max_size = max_size
self.lock = threading.RLock()
self.hit_count = 0
self.miss_count = 0
def set(self, key: str, value: Any, ttl: int = 3600):
"""キャッシュに値を設定"""
with self.lock:
# キャッシュサイズ制限
if len(self.cache) >= self.max_size:
# 最も古いエントリを削除
oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k]['timestamp'])
del self.cache[oldest_key]
# MessagePackでシリアライズ
try:
packed_value = msgpack.packb(value)
self.cache[key] = {
'data': packed_value,
'timestamp': time.time(),
'ttl': ttl
}
return True
except Exception as e:
print(f"キャッシュ設定エラー: {e}")
return False
def get(self, key: str) -> Optional[Any]:
"""キャッシュから値を取得"""
with self.lock:
if key not in self.cache:
self.miss_count += 1
return None
entry = self.cache[key]
# TTLチェック
if time.time() - entry['timestamp'] > entry['ttl']:
del self.cache[key]
self.miss_count += 1
return None
# MessagePackでデシリアライズ
try:
value = msgpack.unpackb(entry['data'], raw=False)
self.hit_count += 1
return value
except Exception as e:
print(f"キャッシュ取得エラー: {e}")
del self.cache[key]
self.miss_count += 1
return None
def delete(self, key: str) -> bool:
"""キャッシュから削除"""
with self.lock:
if key in self.cache:
del self.cache[key]
return True
return False
def clear(self):
"""キャッシュをクリア"""
with self.lock:
self.cache.clear()
self.hit_count = 0
self.miss_count = 0
def stats(self) -> Dict[str, Any]:
"""キャッシュ統計を取得"""
with self.lock:
total_requests = self.hit_count + self.miss_count
hit_rate = (self.hit_count / total_requests * 100) if total_requests > 0 else 0
# メモリ使用量を計算
memory_usage = sum(len(entry['data']) for entry in self.cache.values())
return {
'size': len(self.cache),
'hit_count': self.hit_count,
'miss_count': self.miss_count,
'hit_rate': hit_rate,
'memory_usage': memory_usage,
'avg_entry_size': memory_usage / len(self.cache) if self.cache else 0
}
# 使用例
cache = MessagePackCache(max_size=100)
# テストデータ
test_data = {
"user_profile": {
"id": 12345,
"name": "田中太郎",
"email": "[email protected]",
"preferences": {
"theme": "dark",
"notifications": True
},
"history": [f"action_{i}" for i in range(100)]
},
"session_data": {
"token": "abc123def456",
"expires_at": "2024-12-31T23:59:59Z",
"permissions": ["read", "write", "admin"]
}
}
# キャッシュに保存
print("キャッシュにデータを保存中...")
for key, value in test_data.items():
cache.set(key, value, ttl=300) # 5分間有効
# キャッシュから取得
print("\nキャッシュからデータを取得中...")
for key in test_data.keys():
cached_value = cache.get(key)
if cached_value:
print(f" {key}: キャッシュヒット")
else:
print(f" {key}: キャッシュミス")
# 存在しないキーの取得
cache.get("non_existent_key")
# 統計情報
stats = cache.stats()
print(f"\nキャッシュ統計:")
print(f" サイズ: {stats['size']}")
print(f" ヒット率: {stats['hit_rate']:.1f}%")
print(f" メモリ使用量: {stats['memory_usage']:,} bytes")
print(f" 平均エントリサイズ: {stats['avg_entry_size']:.1f} bytes")
パフォーマンスベンチマーク
1. 基本的なパフォーマンステスト
import msgpack
import json
import time
import pickle
from typing import Dict, Any
def benchmark_serialization():
"""シリアライゼーションのベンチマーク"""
# テストデータの準備
test_data = {
"users": [
{
"id": i,
"name": f"ユーザー{i}",
"email": f"user{i}@example.com",
"age": 20 + (i % 60),
"active": i % 2 == 0,
"scores": [80 + (i % 20), 75 + (i % 25), 90 + (i % 10)],
"metadata": {
"created_at": f"2024-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}",
"last_login": f"2024-01-{(i % 31) + 1:02d}",
"preferences": {
"theme": "dark" if i % 2 == 0 else "light",
"notifications": i % 3 == 0
}
}
}
for i in range(1000)
],
"metadata": {
"total_users": 1000,
"generated_at": "2024-01-01T00:00:00Z"
}
}
print("シリアライゼーション ベンチマーク")
print(f"テストデータサイズ: {len(test_data['users'])} ユーザー")
# MessagePack
start_time = time.time()
msgpack_packed = msgpack.packb(test_data)
msgpack_serialize_time = time.time() - start_time
start_time = time.time()
msgpack_unpacked = msgpack.unpackb(msgpack_packed, raw=False)
msgpack_deserialize_time = time.time() - start_time
# JSON
start_time = time.time()
json_str = json.dumps(test_data)
json_serialize_time = time.time() - start_time
start_time = time.time()
json_unpacked = json.loads(json_str)
json_deserialize_time = time.time() - start_time
# Pickle
start_time = time.time()
pickle_data = pickle.dumps(test_data)
pickle_serialize_time = time.time() - start_time
start_time = time.time()
pickle_unpacked = pickle.loads(pickle_data)
pickle_deserialize_time = time.time() - start_time
# 結果の表示
print(f"\n結果:")
print(f"MessagePack:")
print(f" シリアライズ: {msgpack_serialize_time:.4f}秒")
print(f" デシリアライズ: {msgpack_deserialize_time:.4f}秒")
print(f" データサイズ: {len(msgpack_packed):,} bytes")
print(f"\nJSON:")
print(f" シリアライズ: {json_serialize_time:.4f}秒")
print(f" デシリアライズ: {json_deserialize_time:.4f}秒")
print(f" データサイズ: {len(json_str.encode('utf-8')):,} bytes")
print(f"\nPickle:")
print(f" シリアライズ: {pickle_serialize_time:.4f}秒")
print(f" デシリアライズ: {pickle_deserialize_time:.4f}秒")
print(f" データサイズ: {len(pickle_data):,} bytes")
# 比較
json_size = len(json_str.encode('utf-8'))
msgpack_size = len(msgpack_packed)
pickle_size = len(pickle_data)
print(f"\n比較:")
print(f" MessagePack vs JSON:")
print(f" シリアライズ速度: {json_serialize_time / msgpack_serialize_time:.1f}倍")
print(f" デシリアライズ速度: {json_deserialize_time / msgpack_deserialize_time:.1f}倍")
print(f" サイズ削減: {(json_size - msgpack_size) / json_size * 100:.1f}%")
print(f" MessagePack vs Pickle:")
print(f" シリアライズ速度: {pickle_serialize_time / msgpack_serialize_time:.1f}倍")
print(f" デシリアライズ速度: {pickle_deserialize_time / msgpack_deserialize_time:.1f}倍")
print(f" サイズ比較: {(pickle_size - msgpack_size) / msgpack_size * 100:+.1f}%")
# ベンチマーク実行
benchmark_serialization()
2. 高速実装の比較
import time
from typing import Any, Dict
def compare_msgpack_implementations():
"""MessagePackの異なる実装を比較"""
# テストデータ
test_data = {
"numbers": list(range(1000)),
"strings": [f"文字列{i}" for i in range(100)],
"nested": {
"level1": {
"level2": {
"data": [{"id": i, "value": f"値{i}"} for i in range(50)]
}
}
}
}
print("MessagePack実装の比較")
# 標準msgpack
try:
import msgpack
start = time.time()
for _ in range(100):
packed = msgpack.packb(test_data)
unpacked = msgpack.unpackb(packed, raw=False)
msgpack_time = time.time() - start
msgpack_size = len(packed)
print(f"msgpack: {msgpack_time:.4f}秒, サイズ: {msgpack_size:,} bytes")
except ImportError:
print("msgpack: インストールされていません")
# msgspec
try:
import msgspec
start = time.time()
for _ in range(100):
packed = msgspec.msgpack.encode(test_data)
unpacked = msgspec.msgpack.decode(packed)
msgspec_time = time.time() - start
msgspec_size = len(packed)
print(f"msgspec: {msgspec_time:.4f}秒, サイズ: {msgspec_size:,} bytes")
if 'msgpack_time' in locals():
print(f" msgspecはmsgpackより{msgpack_time / msgspec_time:.1f}倍高速")
except ImportError:
print("msgspec: インストールされていません")
# ormsgpack
try:
import ormsgpack
start = time.time()
for _ in range(100):
packed = ormsgpack.packb(test_data)
unpacked = ormsgpack.unpackb(packed)
ormsgpack_time = time.time() - start
ormsgpack_size = len(packed)
print(f"ormsgpack: {ormsgpack_time:.4f}秒, サイズ: {ormsgpack_size:,} bytes")
if 'msgpack_time' in locals():
print(f" ormsgpackはmsgpackより{msgpack_time / ormsgpack_time:.1f}倍高速")
except ImportError:
print("ormsgpack: インストールされていません")
# 実装比較を実行
compare_msgpack_implementations()
実用的な例
1. WebSocketでのリアルタイム通信
import msgpack
import asyncio
import json
from typing import Dict, Any
class MessagePackWebSocket:
"""MessagePackを使用したWebSocket通信"""
def __init__(self):
self.clients = set()
self.message_count = 0
async def handle_client(self, websocket, path):
"""クライアント接続処理"""
self.clients.add(websocket)
try:
async for message in websocket:
# MessagePackでデシリアライズ
try:
data = msgpack.unpackb(message, raw=False)
await self.process_message(data, websocket)
except Exception as e:
print(f"メッセージ処理エラー: {e}")
finally:
self.clients.remove(websocket)
async def process_message(self, data: Dict[str, Any], sender):
"""メッセージ処理"""
message_type = data.get('type')
if message_type == 'chat':
# チャットメッセージをブロードキャスト
response = {
'type': 'chat',
'user': data.get('user'),
'message': data.get('message'),
'timestamp': data.get('timestamp'),
'id': self.message_count
}
self.message_count += 1
await self.broadcast(response)
elif message_type == 'user_joined':
# ユーザー参加通知
response = {
'type': 'user_joined',
'user': data.get('user'),
'timestamp': data.get('timestamp')
}
await self.broadcast(response)
elif message_type == 'ping':
# Ping/Pongテスト
response = {
'type': 'pong',
'timestamp': data.get('timestamp')
}
await self.send_to_client(sender, response)
async def broadcast(self, data: Dict[str, Any]):
"""全クライアントにメッセージを送信"""
if self.clients:
# MessagePackでシリアライズ
packed_data = msgpack.packb(data)
# 全クライアントに送信
disconnected_clients = set()
for client in self.clients:
try:
await client.send(packed_data)
except Exception:
disconnected_clients.add(client)
# 切断されたクライアントを削除
self.clients -= disconnected_clients
async def send_to_client(self, client, data: Dict[str, Any]):
"""特定のクライアントにメッセージを送信"""
try:
packed_data = msgpack.packb(data)
await client.send(packed_data)
except Exception as e:
print(f"クライアント送信エラー: {e}")
# 使用例(疑似コード)
async def websocket_client_example():
"""WebSocketクライアントの例"""
import websockets
async def client():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
# 参加メッセージ
join_message = {
'type': 'user_joined',
'user': '田中太郎',
'timestamp': '2024-01-01T12:00:00Z'
}
await websocket.send(msgpack.packb(join_message))
# チャットメッセージ
chat_message = {
'type': 'chat',
'user': '田中太郎',
'message': 'こんにちは、皆さん!',
'timestamp': '2024-01-01T12:00:01Z'
}
await websocket.send(msgpack.packb(chat_message))
# メッセージ受信
while True:
response = await websocket.recv()
data = msgpack.unpackb(response, raw=False)
print(f"受信: {data}")
# 注意: 実際の使用では適切なWebSocketサーバーが必要
print("WebSocketクライアントの例(実際の接続は行いません)")
2. 分散キューシステム
import msgpack
import queue
import threading
import time
from typing import Any, Dict, Optional
from dataclasses import dataclass
@dataclass
class Task:
"""タスクデータクラス"""
id: str
type: str
data: Dict[str, Any]
priority: int = 0
created_at: float = None
retry_count: int = 0
max_retries: int = 3
def __post_init__(self):
if self.created_at is None:
self.created_at = time.time()
class MessagePackTaskQueue:
"""MessagePackベースのタスクキュー"""
def __init__(self, max_size: int = 1000):
self.queue = queue.PriorityQueue(maxsize=max_size)
self.results = {}
self.failed_tasks = {}
self.lock = threading.Lock()
self.task_count = 0
def enqueue(self, task: Task) -> bool:
"""タスクをキューに追加"""
try:
# タスクをMessagePackでシリアライズ
serialized_task = msgpack.packb({
'id': task.id,
'type': task.type,
'data': task.data,
'priority': task.priority,
'created_at': task.created_at,
'retry_count': task.retry_count,
'max_retries': task.max_retries
})
# 優先度でソート(数値が小さいほど高優先度)
self.queue.put((-task.priority, time.time(), serialized_task))
with self.lock:
self.task_count += 1
return True
except queue.Full:
print("キューが満杯です")
return False
except Exception as e:
print(f"タスク追加エラー: {e}")
return False
def dequeue(self, timeout: Optional[float] = None) -> Optional[Task]:
"""タスクをキューから取得"""
try:
_, _, serialized_task = self.queue.get(timeout=timeout)
# MessagePackでデシリアライズ
task_data = msgpack.unpackb(serialized_task, raw=False)
task = Task(
id=task_data['id'],
type=task_data['type'],
data=task_data['data'],
priority=task_data['priority'],
created_at=task_data['created_at'],
retry_count=task_data['retry_count'],
max_retries=task_data['max_retries']
)
return task
except queue.Empty:
return None
except Exception as e:
print(f"タスク取得エラー: {e}")
return None
def mark_completed(self, task_id: str, result: Any):
"""タスクを完了としてマーク"""
with self.lock:
self.results[task_id] = {
'result': result,
'completed_at': time.time()
}
def mark_failed(self, task: Task, error: str):
"""タスクを失敗としてマーク"""
with self.lock:
task.retry_count += 1
if task.retry_count <= task.max_retries:
# リトライ
self.enqueue(task)
else:
# 最大リトライ回数に達した
self.failed_tasks[task.id] = {
'task': task,
'error': error,
'failed_at': time.time()
}
def get_result(self, task_id: str) -> Optional[Any]:
"""タスクの結果を取得"""
with self.lock:
return self.results.get(task_id)
def stats(self) -> Dict[str, Any]:
"""キューの統計情報"""
with self.lock:
return {
'queue_size': self.queue.qsize(),
'total_tasks': self.task_count,
'completed_tasks': len(self.results),
'failed_tasks': len(self.failed_tasks),
'success_rate': len(self.results) / self.task_count * 100 if self.task_count > 0 else 0
}
# ワーカーの実装
class TaskWorker:
"""タスクワーカー"""
def __init__(self, task_queue: MessagePackTaskQueue, worker_id: str):
self.task_queue = task_queue
self.worker_id = worker_id
self.running = False
self.thread = None
def start(self):
"""ワーカーを開始"""
self.running = True
self.thread = threading.Thread(target=self._worker_loop)
self.thread.start()
def stop(self):
"""ワーカーを停止"""
self.running = False
if self.thread:
self.thread.join()
def _worker_loop(self):
"""ワーカーのメインループ"""
while self.running:
task = self.task_queue.dequeue(timeout=1.0)
if task:
try:
result = self._process_task(task)
self.task_queue.mark_completed(task.id, result)
print(f"Worker {self.worker_id}: タスク {task.id} 完了")
except Exception as e:
self.task_queue.mark_failed(task, str(e))
print(f"Worker {self.worker_id}: タスク {task.id} 失敗: {e}")
def _process_task(self, task: Task) -> Any:
"""タスクを処理"""
if task.type == 'calculate':
# 計算タスク
a = task.data.get('a', 0)
b = task.data.get('b', 0)
operation = task.data.get('operation', 'add')
if operation == 'add':
return a + b
elif operation == 'multiply':
return a * b
else:
raise ValueError(f"不明な操作: {operation}")
elif task.type == 'process_data':
# データ処理タスク
data = task.data.get('data', [])
return {'processed_count': len(data), 'sum': sum(data)}
else:
raise ValueError(f"不明なタスクタイプ: {task.type}")
# 使用例
def task_queue_example():
"""タスクキューの使用例"""
# タスクキューの作成
task_queue = MessagePackTaskQueue(max_size=100)
# ワーカーの作成と開始
workers = []
for i in range(3):
worker = TaskWorker(task_queue, f"worker-{i}")
worker.start()
workers.append(worker)
# タスクの追加
tasks = [
Task(id="task-1", type="calculate", data={"a": 10, "b": 20, "operation": "add"}, priority=1),
Task(id="task-2", type="calculate", data={"a": 5, "b": 3, "operation": "multiply"}, priority=2),
Task(id="task-3", type="process_data", data={"data": [1, 2, 3, 4, 5]}, priority=1),
Task(id="task-4", type="calculate", data={"a": 100, "b": 50, "operation": "add"}, priority=3),
]
print("タスクをキューに追加中...")
for task in tasks:
task_queue.enqueue(task)
# 結果の待機
time.sleep(2)
# 結果の確認
print("\nタスク結果:")
for task in tasks:
result = task_queue.get_result(task.id)
if result:
print(f" {task.id}: {result['result']}")
else:
print(f" {task.id}: 未完了")
# 統計情報
stats = task_queue.stats()
print(f"\nキュー統計:")
print(f" キューサイズ: {stats['queue_size']}")
print(f" 総タスク数: {stats['total_tasks']}")
print(f" 完了タスク数: {stats['completed_tasks']}")
print(f" 失敗タスク数: {stats['failed_tasks']}")
print(f" 成功率: {stats['success_rate']:.1f}%")
# ワーカーの停止
for worker in workers:
worker.stop()
# 使用例の実行
task_queue_example()
他のライブラリとの比較
| 特徴 | MessagePack | JSON | Protocol Buffers | Pickle |
|---|---|---|---|---|
| パフォーマンス | 高 | 中 | 高 | 高 |
| サイズ | 小 | 大 | 小 | 中 |
| 可読性 | 低 | 高 | 低 | 低 |
| 多言語対応 | 優秀 | 優秀 | 優秀 | Python専用 |
| バイナリサポート | 完全 | 制限的 | 完全 | 完全 |
| スキーマ | 不要 | 不要 | 必要 | 不要 |
トラブルシューティング
よくある問題と解決策
-
文字列エンコーディングの問題
# 問題: raw=Trueでバイナリ文字列が返される # 解決策: raw=Falseを使用 data = msgpack.unpackb(packed_data, raw=False) -
カスタムオブジェクトのシリアライズ
# 問題: カスタムオブジェクトがシリアライズできない # 解決策: defaultパラメータを使用 def custom_serializer(obj): if hasattr(obj, '__dict__'): return obj.__dict__ return str(obj) packed = msgpack.packb(data, default=custom_serializer) -
大きなデータのメモリ不足
# 問題: 大きなデータでメモリ不足 # 解決策: ストリーミング処理 unpacker = msgpack.Unpacker() with open('large_file.msgpack', 'rb') as f: while True: chunk = f.read(1024) if not chunk: break unpacker.feed(chunk) for obj in unpacker: process_object(obj)
まとめ
MessagePack は、高速でコンパクトなバイナリ シリアライゼーション形式です。特に以下の場合に効果的です:
利点
- 高速処理: JSONよりも高速なシリアライゼーション
- コンパクト: 20-50%小さいデータサイズ
- バイナリサポート: ネイティブなバイナリデータ処理
- 多言語対応: 様々なプログラミング言語で利用可能
推奨用途
- リアルタイム通信で帯域幅を節約したい場合
- キャッシュシステムで効率的なデータ保存
- マイクロサービス間の高速データ交換
- IoT通信でデータサイズを最小化したい場合
MessagePack は、JSONの柔軟性を保ちながら、バイナリ形式の効率性を提供する理想的なソリューションです。特に、パフォーマンスが重要で、かつバイナリデータを扱う必要がある場合に最適です。