MessagePack

JSONのような効率的なバイナリ シリアライゼーション形式のPython実装

serializationbinarymsgpackperformancecompact

MessagePack

MessagePack は、JSONのような効率的なバイナリ シリアライゼーション形式のPython実装です。JSONよりもコンパクトで高速でありながら、多くのプログラミング言語をサポートしています。

主な特徴

  • 高速処理: JSONよりも高速なシリアライゼーション/デシリアライゼーション
  • コンパクト: JSONと比較して20-50%小さいデータサイズ
  • バイナリ対応: バイナリデータをネイティブにサポート
  • 型保持: 元のデータ型を正確に保持
  • クロスプラットフォーム: 50以上のプログラミング言語をサポート
  • C拡張: CPythonでのパフォーマンス最適化

主なユースケース

  • リアルタイム通信: WebSocketでの高速データ交換
  • キャッシュシステム: Redisなどでの効率的なデータ保存
  • マイクロサービス: サービス間の高速通信
  • ログ記録: 効率的なバイナリログ
  • データストレージ: 高速なデータ永続化
  • IoT通信: 帯域幅制限のある環境

インストール

# 標準インストール
pip install msgpack

# 高速版(msgspec)のインストール
pip install msgspec

# 別の高速実装(ormsgpack)
pip install ormsgpack

基本的な使い方

1. 基本的なシリアライゼーション

import msgpack

# 基本的なデータ型
data = {
    "name": "田中太郎",
    "age": 30,
    "city": "東京",
    "is_active": True,
    "scores": [85, 92, 78, 90],
    "metadata": {
        "created_at": "2024-01-01",
        "version": 1.0
    },
    "tags": None,
    "binary_data": b"binary content"
}

# シリアライズ
packed = msgpack.packb(data)
print(f"シリアライズ後のサイズ: {len(packed)} bytes")

# デシリアライズ
unpacked = msgpack.unpackb(packed, raw=False)
print("デシリアライズ結果:", unpacked)

# JSONとの比較
import json
json_str = json.dumps(data, default=str)
print(f"JSON文字列サイズ: {len(json_str.encode('utf-8'))} bytes")
print(f"MessagePack削減率: {(len(json_str.encode('utf-8')) - len(packed)) / len(json_str.encode('utf-8')) * 100:.1f}%")

2. ファイルへの読み書き

import msgpack

# 複雑なデータ構造
user_data = {
    "users": [
        {
            "id": 1,
            "name": "田中太郎",
            "email": "[email protected]",
            "profile": {
                "age": 30,
                "city": "東京",
                "hobbies": ["読書", "映画鑑賞", "プログラミング"],
                "avatar": b"binary_image_data_here"
            },
            "settings": {
                "notifications": True,
                "theme": "dark",
                "language": "ja"
            }
        },
        {
            "id": 2,
            "name": "佐藤花子",
            "email": "[email protected]",
            "profile": {
                "age": 25,
                "city": "大阪",
                "hobbies": ["旅行", "料理", "写真"],
                "avatar": b"another_binary_image_data"
            },
            "settings": {
                "notifications": False,
                "theme": "light",
                "language": "ja"
            }
        }
    ],
    "metadata": {
        "total_users": 2,
        "last_updated": "2024-01-01T12:00:00Z",
        "version": 2
    }
}

# ファイルに書き込み
with open('users.msgpack', 'wb') as f:
    msgpack.dump(user_data, f)

print("データをusers.msgpackに保存しました")

# ファイルから読み込み
with open('users.msgpack', 'rb') as f:
    loaded_data = msgpack.load(f, raw=False)

print(f"読み込まれたユーザー数: {loaded_data['metadata']['total_users']}")
for user in loaded_data['users']:
    print(f"- {user['name']} ({user['profile']['city']})")
    print(f"  趣味: {', '.join(user['profile']['hobbies'])}")

3. バイナリデータの処理

import msgpack
import numpy as np
from datetime import datetime

# バイナリデータを含む複雑なデータ
complex_data = {
    "text": "Hello, 世界!",
    "binary": b"\x00\x01\x02\x03\x04",
    "numbers": [1, 2, 3, 4, 5],
    "nested": {
        "array": np.array([1.1, 2.2, 3.3]).tobytes(),
        "timestamp": datetime.now().isoformat()
    }
}

# MessagePackは自動的にバイナリデータを処理
packed = msgpack.packb(complex_data)
unpacked = msgpack.unpackb(packed, raw=False)

print("元のデータ:")
print(f"  text: {complex_data['text']}")
print(f"  binary: {complex_data['binary']}")
print(f"  numbers: {complex_data['numbers']}")

print("\nデシリアライズされたデータ:")
print(f"  text: {unpacked['text']}")
print(f"  binary: {unpacked['binary']}")
print(f"  numbers: {unpacked['numbers']}")

# NumPy配列の復元
restored_array = np.frombuffer(unpacked['nested']['array'], dtype=np.float64)
print(f"  restored array: {restored_array}")

高度な使用例

1. カスタムエンコーダー・デコーダー

import msgpack
from datetime import datetime, date
from decimal import Decimal
import uuid

class CustomEncoder:
    """カスタムエンコーダー"""
    
    def encode(self, obj):
        if isinstance(obj, datetime):
            return {'__datetime__': obj.isoformat()}
        elif isinstance(obj, date):
            return {'__date__': obj.isoformat()}
        elif isinstance(obj, Decimal):
            return {'__decimal__': str(obj)}
        elif isinstance(obj, uuid.UUID):
            return {'__uuid__': str(obj)}
        elif isinstance(obj, set):
            return {'__set__': list(obj)}
        elif isinstance(obj, complex):
            return {'__complex__': [obj.real, obj.imag]}
        return obj

class CustomDecoder:
    """カスタムデコーダー"""
    
    def decode(self, obj):
        if isinstance(obj, dict):
            if '__datetime__' in obj:
                return datetime.fromisoformat(obj['__datetime__'])
            elif '__date__' in obj:
                return datetime.fromisoformat(obj['__date__']).date()
            elif '__decimal__' in obj:
                return Decimal(obj['__decimal__'])
            elif '__uuid__' in obj:
                return uuid.UUID(obj['__uuid__'])
            elif '__set__' in obj:
                return set(obj['__set__'])
            elif '__complex__' in obj:
                return complex(obj['__complex__'][0], obj['__complex__'][1])
        return obj

# 使用例
encoder = CustomEncoder()
decoder = CustomDecoder()

complex_data = {
    "id": uuid.uuid4(),
    "created_at": datetime.now(),
    "updated_at": date.today(),
    "price": Decimal('1234.56'),
    "tags": {"python", "msgpack", "serialization"},
    "coordinates": complex(3, 4)
}

print("元のデータ:")
for key, value in complex_data.items():
    print(f"  {key}: {value} ({type(value).__name__})")

# カスタムエンコーダーでシリアライズ
packed = msgpack.packb(complex_data, default=encoder.encode)
print(f"\nシリアライズ後のサイズ: {len(packed)} bytes")

# カスタムデコーダーでデシリアライズ
unpacked = msgpack.unpackb(packed, object_hook=decoder.decode, raw=False)
print(f"\nデシリアライズされたデータ:")
for key, value in unpacked.items():
    print(f"  {key}: {value} ({type(value).__name__})")

2. ストリーミング処理

import msgpack
import io
from typing import Generator, Any

class MessagePackStreamer:
    """MessagePackストリーミング処理"""
    
    def __init__(self):
        self.unpacker = msgpack.Unpacker(raw=False)
    
    def pack_stream(self, objects: list) -> bytes:
        """複数のオブジェクトを連続してパック"""
        stream = io.BytesIO()
        
        for obj in objects:
            packed = msgpack.packb(obj)
            stream.write(packed)
        
        return stream.getvalue()
    
    def unpack_stream(self, data: bytes) -> Generator[Any, None, None]:
        """ストリームから複数のオブジェクトをアンパック"""
        self.unpacker.feed(data)
        
        for unpacked in self.unpacker:
            yield unpacked
    
    def process_file_stream(self, filename: str, chunk_size: int = 1024) -> Generator[Any, None, None]:
        """ファイルをストリーミング処理"""
        with open(filename, 'rb') as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                
                self.unpacker.feed(chunk)
                for unpacked in self.unpacker:
                    yield unpacked

# 使用例
streamer = MessagePackStreamer()

# テストデータ
test_objects = [
    {"id": 1, "name": "田中太郎", "age": 30},
    {"id": 2, "name": "佐藤花子", "age": 25},
    {"id": 3, "name": "山田次郎", "age": 28},
    {"type": "metadata", "count": 3, "timestamp": "2024-01-01"}
]

# ストリーミングでパック
stream_data = streamer.pack_stream(test_objects)
print(f"ストリームデータサイズ: {len(stream_data)} bytes")

# ストリーミングでアンパック
print("\nストリーミング処理結果:")
for obj in streamer.unpack_stream(stream_data):
    print(f"  処理されたオブジェクト: {obj}")

# ファイルストリーミング
with open('stream_test.msgpack', 'wb') as f:
    f.write(stream_data)

print(f"\nファイルストリーミング処理:")
for obj in streamer.process_file_stream('stream_test.msgpack'):
    print(f"  ファイルから読み込み: {obj}")

3. 高速キャッシュシステム

import msgpack
import time
from typing import Any, Optional, Dict
import threading

class MessagePackCache:
    """MessagePackベースの高速キャッシュ"""
    
    def __init__(self, max_size: int = 1000):
        self.cache: Dict[str, Dict[str, Any]] = {}
        self.max_size = max_size
        self.lock = threading.RLock()
        self.hit_count = 0
        self.miss_count = 0
    
    def set(self, key: str, value: Any, ttl: int = 3600):
        """キャッシュに値を設定"""
        with self.lock:
            # キャッシュサイズ制限
            if len(self.cache) >= self.max_size:
                # 最も古いエントリを削除
                oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k]['timestamp'])
                del self.cache[oldest_key]
            
            # MessagePackでシリアライズ
            try:
                packed_value = msgpack.packb(value)
                self.cache[key] = {
                    'data': packed_value,
                    'timestamp': time.time(),
                    'ttl': ttl
                }
                return True
            except Exception as e:
                print(f"キャッシュ設定エラー: {e}")
                return False
    
    def get(self, key: str) -> Optional[Any]:
        """キャッシュから値を取得"""
        with self.lock:
            if key not in self.cache:
                self.miss_count += 1
                return None
            
            entry = self.cache[key]
            
            # TTLチェック
            if time.time() - entry['timestamp'] > entry['ttl']:
                del self.cache[key]
                self.miss_count += 1
                return None
            
            # MessagePackでデシリアライズ
            try:
                value = msgpack.unpackb(entry['data'], raw=False)
                self.hit_count += 1
                return value
            except Exception as e:
                print(f"キャッシュ取得エラー: {e}")
                del self.cache[key]
                self.miss_count += 1
                return None
    
    def delete(self, key: str) -> bool:
        """キャッシュから削除"""
        with self.lock:
            if key in self.cache:
                del self.cache[key]
                return True
            return False
    
    def clear(self):
        """キャッシュをクリア"""
        with self.lock:
            self.cache.clear()
            self.hit_count = 0
            self.miss_count = 0
    
    def stats(self) -> Dict[str, Any]:
        """キャッシュ統計を取得"""
        with self.lock:
            total_requests = self.hit_count + self.miss_count
            hit_rate = (self.hit_count / total_requests * 100) if total_requests > 0 else 0
            
            # メモリ使用量を計算
            memory_usage = sum(len(entry['data']) for entry in self.cache.values())
            
            return {
                'size': len(self.cache),
                'hit_count': self.hit_count,
                'miss_count': self.miss_count,
                'hit_rate': hit_rate,
                'memory_usage': memory_usage,
                'avg_entry_size': memory_usage / len(self.cache) if self.cache else 0
            }

# 使用例
cache = MessagePackCache(max_size=100)

# テストデータ
test_data = {
    "user_profile": {
        "id": 12345,
        "name": "田中太郎",
        "email": "[email protected]",
        "preferences": {
            "theme": "dark",
            "notifications": True
        },
        "history": [f"action_{i}" for i in range(100)]
    },
    "session_data": {
        "token": "abc123def456",
        "expires_at": "2024-12-31T23:59:59Z",
        "permissions": ["read", "write", "admin"]
    }
}

# キャッシュに保存
print("キャッシュにデータを保存中...")
for key, value in test_data.items():
    cache.set(key, value, ttl=300)  # 5分間有効

# キャッシュから取得
print("\nキャッシュからデータを取得中...")
for key in test_data.keys():
    cached_value = cache.get(key)
    if cached_value:
        print(f"  {key}: キャッシュヒット")
    else:
        print(f"  {key}: キャッシュミス")

# 存在しないキーの取得
cache.get("non_existent_key")

# 統計情報
stats = cache.stats()
print(f"\nキャッシュ統計:")
print(f"  サイズ: {stats['size']}")
print(f"  ヒット率: {stats['hit_rate']:.1f}%")
print(f"  メモリ使用量: {stats['memory_usage']:,} bytes")
print(f"  平均エントリサイズ: {stats['avg_entry_size']:.1f} bytes")

パフォーマンスベンチマーク

1. 基本的なパフォーマンステスト

import msgpack
import json
import time
import pickle
from typing import Dict, Any

def benchmark_serialization():
    """シリアライゼーションのベンチマーク"""
    
    # テストデータの準備
    test_data = {
        "users": [
            {
                "id": i,
                "name": f"ユーザー{i}",
                "email": f"user{i}@example.com",
                "age": 20 + (i % 60),
                "active": i % 2 == 0,
                "scores": [80 + (i % 20), 75 + (i % 25), 90 + (i % 10)],
                "metadata": {
                    "created_at": f"2024-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}",
                    "last_login": f"2024-01-{(i % 31) + 1:02d}",
                    "preferences": {
                        "theme": "dark" if i % 2 == 0 else "light",
                        "notifications": i % 3 == 0
                    }
                }
            }
            for i in range(1000)
        ],
        "metadata": {
            "total_users": 1000,
            "generated_at": "2024-01-01T00:00:00Z"
        }
    }
    
    print("シリアライゼーション ベンチマーク")
    print(f"テストデータサイズ: {len(test_data['users'])} ユーザー")
    
    # MessagePack
    start_time = time.time()
    msgpack_packed = msgpack.packb(test_data)
    msgpack_serialize_time = time.time() - start_time
    
    start_time = time.time()
    msgpack_unpacked = msgpack.unpackb(msgpack_packed, raw=False)
    msgpack_deserialize_time = time.time() - start_time
    
    # JSON
    start_time = time.time()
    json_str = json.dumps(test_data)
    json_serialize_time = time.time() - start_time
    
    start_time = time.time()
    json_unpacked = json.loads(json_str)
    json_deserialize_time = time.time() - start_time
    
    # Pickle
    start_time = time.time()
    pickle_data = pickle.dumps(test_data)
    pickle_serialize_time = time.time() - start_time
    
    start_time = time.time()
    pickle_unpacked = pickle.loads(pickle_data)
    pickle_deserialize_time = time.time() - start_time
    
    # 結果の表示
    print(f"\n結果:")
    print(f"MessagePack:")
    print(f"  シリアライズ: {msgpack_serialize_time:.4f}秒")
    print(f"  デシリアライズ: {msgpack_deserialize_time:.4f}秒")
    print(f"  データサイズ: {len(msgpack_packed):,} bytes")
    
    print(f"\nJSON:")
    print(f"  シリアライズ: {json_serialize_time:.4f}秒")
    print(f"  デシリアライズ: {json_deserialize_time:.4f}秒")
    print(f"  データサイズ: {len(json_str.encode('utf-8')):,} bytes")
    
    print(f"\nPickle:")
    print(f"  シリアライズ: {pickle_serialize_time:.4f}秒")
    print(f"  デシリアライズ: {pickle_deserialize_time:.4f}秒")
    print(f"  データサイズ: {len(pickle_data):,} bytes")
    
    # 比較
    json_size = len(json_str.encode('utf-8'))
    msgpack_size = len(msgpack_packed)
    pickle_size = len(pickle_data)
    
    print(f"\n比較:")
    print(f"  MessagePack vs JSON:")
    print(f"    シリアライズ速度: {json_serialize_time / msgpack_serialize_time:.1f}倍")
    print(f"    デシリアライズ速度: {json_deserialize_time / msgpack_deserialize_time:.1f}倍")
    print(f"    サイズ削減: {(json_size - msgpack_size) / json_size * 100:.1f}%")
    
    print(f"  MessagePack vs Pickle:")
    print(f"    シリアライズ速度: {pickle_serialize_time / msgpack_serialize_time:.1f}倍")
    print(f"    デシリアライズ速度: {pickle_deserialize_time / msgpack_deserialize_time:.1f}倍")
    print(f"    サイズ比較: {(pickle_size - msgpack_size) / msgpack_size * 100:+.1f}%")

# ベンチマーク実行
benchmark_serialization()

2. 高速実装の比較

import time
from typing import Any, Dict

def compare_msgpack_implementations():
    """MessagePackの異なる実装を比較"""
    
    # テストデータ
    test_data = {
        "numbers": list(range(1000)),
        "strings": [f"文字列{i}" for i in range(100)],
        "nested": {
            "level1": {
                "level2": {
                    "data": [{"id": i, "value": f"値{i}"} for i in range(50)]
                }
            }
        }
    }
    
    print("MessagePack実装の比較")
    
    # 標準msgpack
    try:
        import msgpack
        
        start = time.time()
        for _ in range(100):
            packed = msgpack.packb(test_data)
            unpacked = msgpack.unpackb(packed, raw=False)
        msgpack_time = time.time() - start
        msgpack_size = len(packed)
        
        print(f"msgpack: {msgpack_time:.4f}秒, サイズ: {msgpack_size:,} bytes")
    except ImportError:
        print("msgpack: インストールされていません")
    
    # msgspec
    try:
        import msgspec
        
        start = time.time()
        for _ in range(100):
            packed = msgspec.msgpack.encode(test_data)
            unpacked = msgspec.msgpack.decode(packed)
        msgspec_time = time.time() - start
        msgspec_size = len(packed)
        
        print(f"msgspec: {msgspec_time:.4f}秒, サイズ: {msgspec_size:,} bytes")
        
        if 'msgpack_time' in locals():
            print(f"  msgspecはmsgpackより{msgpack_time / msgspec_time:.1f}倍高速")
    except ImportError:
        print("msgspec: インストールされていません")
    
    # ormsgpack
    try:
        import ormsgpack
        
        start = time.time()
        for _ in range(100):
            packed = ormsgpack.packb(test_data)
            unpacked = ormsgpack.unpackb(packed)
        ormsgpack_time = time.time() - start
        ormsgpack_size = len(packed)
        
        print(f"ormsgpack: {ormsgpack_time:.4f}秒, サイズ: {ormsgpack_size:,} bytes")
        
        if 'msgpack_time' in locals():
            print(f"  ormsgpackはmsgpackより{msgpack_time / ormsgpack_time:.1f}倍高速")
    except ImportError:
        print("ormsgpack: インストールされていません")

# 実装比較を実行
compare_msgpack_implementations()

実用的な例

1. WebSocketでのリアルタイム通信

import msgpack
import asyncio
import json
from typing import Dict, Any

class MessagePackWebSocket:
    """MessagePackを使用したWebSocket通信"""
    
    def __init__(self):
        self.clients = set()
        self.message_count = 0
    
    async def handle_client(self, websocket, path):
        """クライアント接続処理"""
        self.clients.add(websocket)
        try:
            async for message in websocket:
                # MessagePackでデシリアライズ
                try:
                    data = msgpack.unpackb(message, raw=False)
                    await self.process_message(data, websocket)
                except Exception as e:
                    print(f"メッセージ処理エラー: {e}")
        finally:
            self.clients.remove(websocket)
    
    async def process_message(self, data: Dict[str, Any], sender):
        """メッセージ処理"""
        message_type = data.get('type')
        
        if message_type == 'chat':
            # チャットメッセージをブロードキャスト
            response = {
                'type': 'chat',
                'user': data.get('user'),
                'message': data.get('message'),
                'timestamp': data.get('timestamp'),
                'id': self.message_count
            }
            self.message_count += 1
            await self.broadcast(response)
        
        elif message_type == 'user_joined':
            # ユーザー参加通知
            response = {
                'type': 'user_joined',
                'user': data.get('user'),
                'timestamp': data.get('timestamp')
            }
            await self.broadcast(response)
        
        elif message_type == 'ping':
            # Ping/Pongテスト
            response = {
                'type': 'pong',
                'timestamp': data.get('timestamp')
            }
            await self.send_to_client(sender, response)
    
    async def broadcast(self, data: Dict[str, Any]):
        """全クライアントにメッセージを送信"""
        if self.clients:
            # MessagePackでシリアライズ
            packed_data = msgpack.packb(data)
            
            # 全クライアントに送信
            disconnected_clients = set()
            for client in self.clients:
                try:
                    await client.send(packed_data)
                except Exception:
                    disconnected_clients.add(client)
            
            # 切断されたクライアントを削除
            self.clients -= disconnected_clients
    
    async def send_to_client(self, client, data: Dict[str, Any]):
        """特定のクライアントにメッセージを送信"""
        try:
            packed_data = msgpack.packb(data)
            await client.send(packed_data)
        except Exception as e:
            print(f"クライアント送信エラー: {e}")

# 使用例(疑似コード)
async def websocket_client_example():
    """WebSocketクライアントの例"""
    import websockets
    
    async def client():
        uri = "ws://localhost:8765"
        async with websockets.connect(uri) as websocket:
            # 参加メッセージ
            join_message = {
                'type': 'user_joined',
                'user': '田中太郎',
                'timestamp': '2024-01-01T12:00:00Z'
            }
            await websocket.send(msgpack.packb(join_message))
            
            # チャットメッセージ
            chat_message = {
                'type': 'chat',
                'user': '田中太郎',
                'message': 'こんにちは、皆さん!',
                'timestamp': '2024-01-01T12:00:01Z'
            }
            await websocket.send(msgpack.packb(chat_message))
            
            # メッセージ受信
            while True:
                response = await websocket.recv()
                data = msgpack.unpackb(response, raw=False)
                print(f"受信: {data}")
    
    # 注意: 実際の使用では適切なWebSocketサーバーが必要
    print("WebSocketクライアントの例(実際の接続は行いません)")

2. 分散キューシステム

import msgpack
import queue
import threading
import time
from typing import Any, Dict, Optional
from dataclasses import dataclass

@dataclass
class Task:
    """タスクデータクラス"""
    id: str
    type: str
    data: Dict[str, Any]
    priority: int = 0
    created_at: float = None
    retry_count: int = 0
    max_retries: int = 3
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = time.time()

class MessagePackTaskQueue:
    """MessagePackベースのタスクキュー"""
    
    def __init__(self, max_size: int = 1000):
        self.queue = queue.PriorityQueue(maxsize=max_size)
        self.results = {}
        self.failed_tasks = {}
        self.lock = threading.Lock()
        self.task_count = 0
        
    def enqueue(self, task: Task) -> bool:
        """タスクをキューに追加"""
        try:
            # タスクをMessagePackでシリアライズ
            serialized_task = msgpack.packb({
                'id': task.id,
                'type': task.type,
                'data': task.data,
                'priority': task.priority,
                'created_at': task.created_at,
                'retry_count': task.retry_count,
                'max_retries': task.max_retries
            })
            
            # 優先度でソート(数値が小さいほど高優先度)
            self.queue.put((-task.priority, time.time(), serialized_task))
            
            with self.lock:
                self.task_count += 1
            
            return True
        except queue.Full:
            print("キューが満杯です")
            return False
        except Exception as e:
            print(f"タスク追加エラー: {e}")
            return False
    
    def dequeue(self, timeout: Optional[float] = None) -> Optional[Task]:
        """タスクをキューから取得"""
        try:
            _, _, serialized_task = self.queue.get(timeout=timeout)
            
            # MessagePackでデシリアライズ
            task_data = msgpack.unpackb(serialized_task, raw=False)
            
            task = Task(
                id=task_data['id'],
                type=task_data['type'],
                data=task_data['data'],
                priority=task_data['priority'],
                created_at=task_data['created_at'],
                retry_count=task_data['retry_count'],
                max_retries=task_data['max_retries']
            )
            
            return task
        except queue.Empty:
            return None
        except Exception as e:
            print(f"タスク取得エラー: {e}")
            return None
    
    def mark_completed(self, task_id: str, result: Any):
        """タスクを完了としてマーク"""
        with self.lock:
            self.results[task_id] = {
                'result': result,
                'completed_at': time.time()
            }
    
    def mark_failed(self, task: Task, error: str):
        """タスクを失敗としてマーク"""
        with self.lock:
            task.retry_count += 1
            
            if task.retry_count <= task.max_retries:
                # リトライ
                self.enqueue(task)
            else:
                # 最大リトライ回数に達した
                self.failed_tasks[task.id] = {
                    'task': task,
                    'error': error,
                    'failed_at': time.time()
                }
    
    def get_result(self, task_id: str) -> Optional[Any]:
        """タスクの結果を取得"""
        with self.lock:
            return self.results.get(task_id)
    
    def stats(self) -> Dict[str, Any]:
        """キューの統計情報"""
        with self.lock:
            return {
                'queue_size': self.queue.qsize(),
                'total_tasks': self.task_count,
                'completed_tasks': len(self.results),
                'failed_tasks': len(self.failed_tasks),
                'success_rate': len(self.results) / self.task_count * 100 if self.task_count > 0 else 0
            }

# ワーカーの実装
class TaskWorker:
    """タスクワーカー"""
    
    def __init__(self, task_queue: MessagePackTaskQueue, worker_id: str):
        self.task_queue = task_queue
        self.worker_id = worker_id
        self.running = False
        self.thread = None
    
    def start(self):
        """ワーカーを開始"""
        self.running = True
        self.thread = threading.Thread(target=self._worker_loop)
        self.thread.start()
    
    def stop(self):
        """ワーカーを停止"""
        self.running = False
        if self.thread:
            self.thread.join()
    
    def _worker_loop(self):
        """ワーカーのメインループ"""
        while self.running:
            task = self.task_queue.dequeue(timeout=1.0)
            if task:
                try:
                    result = self._process_task(task)
                    self.task_queue.mark_completed(task.id, result)
                    print(f"Worker {self.worker_id}: タスク {task.id} 完了")
                except Exception as e:
                    self.task_queue.mark_failed(task, str(e))
                    print(f"Worker {self.worker_id}: タスク {task.id} 失敗: {e}")
    
    def _process_task(self, task: Task) -> Any:
        """タスクを処理"""
        if task.type == 'calculate':
            # 計算タスク
            a = task.data.get('a', 0)
            b = task.data.get('b', 0)
            operation = task.data.get('operation', 'add')
            
            if operation == 'add':
                return a + b
            elif operation == 'multiply':
                return a * b
            else:
                raise ValueError(f"不明な操作: {operation}")
        
        elif task.type == 'process_data':
            # データ処理タスク
            data = task.data.get('data', [])
            return {'processed_count': len(data), 'sum': sum(data)}
        
        else:
            raise ValueError(f"不明なタスクタイプ: {task.type}")

# 使用例
def task_queue_example():
    """タスクキューの使用例"""
    
    # タスクキューの作成
    task_queue = MessagePackTaskQueue(max_size=100)
    
    # ワーカーの作成と開始
    workers = []
    for i in range(3):
        worker = TaskWorker(task_queue, f"worker-{i}")
        worker.start()
        workers.append(worker)
    
    # タスクの追加
    tasks = [
        Task(id="task-1", type="calculate", data={"a": 10, "b": 20, "operation": "add"}, priority=1),
        Task(id="task-2", type="calculate", data={"a": 5, "b": 3, "operation": "multiply"}, priority=2),
        Task(id="task-3", type="process_data", data={"data": [1, 2, 3, 4, 5]}, priority=1),
        Task(id="task-4", type="calculate", data={"a": 100, "b": 50, "operation": "add"}, priority=3),
    ]
    
    print("タスクをキューに追加中...")
    for task in tasks:
        task_queue.enqueue(task)
    
    # 結果の待機
    time.sleep(2)
    
    # 結果の確認
    print("\nタスク結果:")
    for task in tasks:
        result = task_queue.get_result(task.id)
        if result:
            print(f"  {task.id}: {result['result']}")
        else:
            print(f"  {task.id}: 未完了")
    
    # 統計情報
    stats = task_queue.stats()
    print(f"\nキュー統計:")
    print(f"  キューサイズ: {stats['queue_size']}")
    print(f"  総タスク数: {stats['total_tasks']}")
    print(f"  完了タスク数: {stats['completed_tasks']}")
    print(f"  失敗タスク数: {stats['failed_tasks']}")
    print(f"  成功率: {stats['success_rate']:.1f}%")
    
    # ワーカーの停止
    for worker in workers:
        worker.stop()

# 使用例の実行
task_queue_example()

他のライブラリとの比較

特徴MessagePackJSONProtocol BuffersPickle
パフォーマンス
サイズ
可読性
多言語対応優秀優秀優秀Python専用
バイナリサポート完全制限的完全完全
スキーマ不要不要必要不要

トラブルシューティング

よくある問題と解決策

  1. 文字列エンコーディングの問題

    # 問題: raw=Trueでバイナリ文字列が返される
    # 解決策: raw=Falseを使用
    data = msgpack.unpackb(packed_data, raw=False)
    
  2. カスタムオブジェクトのシリアライズ

    # 問題: カスタムオブジェクトがシリアライズできない
    # 解決策: defaultパラメータを使用
    def custom_serializer(obj):
        if hasattr(obj, '__dict__'):
            return obj.__dict__
        return str(obj)
    
    packed = msgpack.packb(data, default=custom_serializer)
    
  3. 大きなデータのメモリ不足

    # 問題: 大きなデータでメモリ不足
    # 解決策: ストリーミング処理
    unpacker = msgpack.Unpacker()
    
    with open('large_file.msgpack', 'rb') as f:
        while True:
            chunk = f.read(1024)
            if not chunk:
                break
            unpacker.feed(chunk)
            for obj in unpacker:
                process_object(obj)
    

まとめ

MessagePack は、高速でコンパクトなバイナリ シリアライゼーション形式です。特に以下の場合に効果的です:

利点

  • 高速処理: JSONよりも高速なシリアライゼーション
  • コンパクト: 20-50%小さいデータサイズ
  • バイナリサポート: ネイティブなバイナリデータ処理
  • 多言語対応: 様々なプログラミング言語で利用可能

推奨用途

  • リアルタイム通信で帯域幅を節約したい場合
  • キャッシュシステムで効率的なデータ保存
  • マイクロサービス間の高速データ交換
  • IoT通信でデータサイズを最小化したい場合

MessagePack は、JSONの柔軟性を保ちながら、バイナリ形式の効率性を提供する理想的なソリューションです。特に、パフォーマンスが重要で、かつバイナリデータを扱う必要がある場合に最適です。