MessagePack

JSONのような効率的なバイナリ シリアライゼーション形式のJavaScript実装

serializationbinarymsgpackperformancecompact

MessagePack

MessagePack は、JSONのような効率的なバイナリ シリアライゼーション形式です。JSONよりもコンパクトで高速でありながら、多くのプログラミング言語をサポートしています。

主な特徴

  • 高速処理: JSONよりも高速なシリアライゼーション/デシリアライゼーション
  • コンパクト: JSONと比較して20-50%小さいデータサイズ
  • 型サポート: バイナリデータ、整数、浮動小数点数の正確な表現
  • クロスプラットフォーム: 50以上のプログラミング言語をサポート
  • ストリーミング: 大きなデータセットの効率的な処理
  • 拡張可能: カスタム型の定義が可能

主なユースケース

  • リアルタイム通信: WebSocketやgRPCでの高速データ交換
  • API通信: RESTful APIでの効率的なデータ転送
  • データストレージ: 効率的なバイナリ保存
  • マイクロサービス: サービス間の高速通信
  • ゲーム開発: リアルタイムゲームデータの転送
  • IoT通信: 帯域幅が限定された環境での通信

インストール

npm install @msgpack/msgpack

CDNから直接利用:

<script crossorigin src="https://unpkg.com/@msgpack/msgpack"></script>

基本的な使い方

1. 基本的なエンコード・デコード

import { encode, decode } from "@msgpack/msgpack";

// 複雑なオブジェクトの作成
const object = {
  nil: null,
  integer: 1,
  float: Math.PI,
  string: "こんにちは、世界!",
  binary: Uint8Array.from([1, 2, 3]),
  array: [10, 20, 30],
  map: { foo: "bar" },
  timestampExt: new Date(),
};

// エンコード
const encoded = encode(object);
console.log("エンコード後のサイズ:", encoded.length);

// デコード
const decoded = decode(encoded);
console.log("デコード結果:", decoded);

2. TypeScriptでの使用

import { deepStrictEqual } from "assert";
import { encode, decode } from "@msgpack/msgpack";

interface User {
  id: number;
  name: string;
  email: string;
  preferences: {
    theme: string;
    notifications: boolean;
  };
}

const user: User = {
  id: 1,
  name: "田中太郎",
  email: "[email protected]",
  preferences: {
    theme: "dark",
    notifications: true
  }
};

// 型安全なエンコード・デコード
const encoded: Uint8Array = encode(user);
const decoded: User = decode(encoded) as User;

deepStrictEqual(decoded, user);

3. パフォーマンス向上のためのインスタンス再利用

import { Encoder, Decoder } from "@msgpack/msgpack";

// エンコーダー・デコーダーインスタンスの作成
const encoder = new Encoder();
const decoder = new Decoder();

// 複数のオブジェクトを効率的に処理
const users = [
  { id: 1, name: "田中太郎" },
  { id: 2, name: "佐藤花子" },
  { id: 3, name: "山田次郎" }
];

const encodedUsers = users.map(user => encoder.encode(user));
const decodedUsers = encodedUsers.map(encoded => decoder.decode(encoded));

console.log("再利用によるパフォーマンス向上");

高度な使用例

1. カスタム拡張型の定義

import { encode, decode, ExtensionCodec } from "@msgpack/msgpack";

// カスタムコーデックの作成
const extensionCodec = new ExtensionCodec();

// Set型の処理
const SET_EXT_TYPE = 0;
extensionCodec.register({
  type: SET_EXT_TYPE,
  encode: (object) => {
    if (object instanceof Set) {
      return encode([...object], { extensionCodec });
    }
    return null;
  },
  decode: (data) => {
    const array = decode(data, { extensionCodec });
    return new Set(array);
  }
});

// Map型の処理
const MAP_EXT_TYPE = 1;
extensionCodec.register({
  type: MAP_EXT_TYPE,
  encode: (object) => {
    if (object instanceof Map) {
      return encode([...object], { extensionCodec });
    }
    return null;
  },
  decode: (data) => {
    const array = decode(data, { extensionCodec });
    return new Map(array);
  }
});

// 使用例
const data = {
  users: new Set(["田中", "佐藤", "山田"]),
  scores: new Map([
    ["田中", 100],
    ["佐藤", 95],
    ["山田", 88]
  ])
};

const encoded = encode(data, { extensionCodec });
const decoded = decode(encoded, { extensionCodec });

console.log("Set:", decoded.users);
console.log("Map:", decoded.scores);

2. BigInt型の処理

import { encode, decode, ExtensionCodec } from "@msgpack/msgpack";

const BIGINT_EXT_TYPE = 0;
const extensionCodec = new ExtensionCodec();

extensionCodec.register({
  type: BIGINT_EXT_TYPE,
  encode(input) {
    if (typeof input === "bigint") {
      // 安全整数の範囲内なら数値として、それ以外は文字列として保存
      if (input <= Number.MAX_SAFE_INTEGER && input >= Number.MIN_SAFE_INTEGER) {
        return encode(Number(input));
      } else {
        return encode(String(input));
      }
    }
    return null;
  },
  decode(data) {
    const val = decode(data);
    if (!(typeof val === "string" || typeof val === "number")) {
      throw new Error(`想定外のBigIntソース: ${val} (${typeof val})`);
    }
    return BigInt(val);
  }
});

// 使用例
const largeNumber = BigInt(Number.MAX_SAFE_INTEGER) + BigInt(1);
const encoded = encode(largeNumber, { extensionCodec });
const decoded = decode(encoded, { extensionCodec });

console.log("元の値:", largeNumber);
console.log("復元された値:", decoded);
console.log("同じ値か:", largeNumber === decoded);

3. 日付と時刻の処理

import { encode, decode, ExtensionCodec, EXT_TIMESTAMP } from "@msgpack/msgpack";

// デフォルトのタイムスタンプ拡張をオーバーライド
const extensionCodec = new ExtensionCodec();
extensionCodec.register({
  type: EXT_TIMESTAMP,
  encode(input) {
    if (input instanceof Date) {
      // ミリ秒精度でエンコード
      const time = input.getTime();
      const seconds = Math.floor(time / 1000);
      const nanoseconds = (time % 1000) * 1000000;
      return encode({ seconds, nanoseconds });
    }
    return null;
  },
  decode(data) {
    const { seconds, nanoseconds } = decode(data);
    return new Date(seconds * 1000 + nanoseconds / 1000000);
  }
});

// 使用例
const now = new Date();
const encoded = encode(now, { extensionCodec });
const decoded = decode(encoded, { extensionCodec });

console.log("元の時刻:", now);
console.log("復元された時刻:", decoded);
console.log("同じ時刻か:", now.getTime() === decoded.getTime());

ストリーミング処理

1. 非同期ストリーミングデコード

import { decodeAsync } from "@msgpack/msgpack";

// Fetch APIからのストリーミングデコード
async function fetchAndDecodeMessagePack(url) {
  try {
    const response = await fetch(url);
    const contentType = response.headers.get("Content-Type");
    
    if (contentType && contentType.startsWith("application/x-msgpack") && response.body) {
      const object = await decodeAsync(response.body);
      return object;
    } else {
      throw new Error("MessagePack形式ではありません");
    }
  } catch (error) {
    console.error("ストリーミングデコードエラー:", error);
    throw error;
  }
}

// 使用例
fetchAndDecodeMessagePack("/api/data.msgpack")
  .then(data => console.log("取得したデータ:", data))
  .catch(error => console.error("エラー:", error));

2. 配列ストリームの処理

import { decodeArrayStream } from "@msgpack/msgpack";

// ストリームから配列の各要素を処理
async function processArrayStream(stream) {
  try {
    let count = 0;
    
    for await (const item of decodeArrayStream(stream)) {
      console.log(`アイテム ${count++}:`, item);
      
      // 各アイテムを処理
      if (item.type === 'user') {
        await processUser(item);
      } else if (item.type === 'order') {
        await processOrder(item);
      }
    }
    
    console.log(`合計 ${count} アイテムを処理しました`);
  } catch (error) {
    console.error("配列ストリーム処理エラー:", error);
  }
}

async function processUser(user) {
  // ユーザー処理ロジック
  console.log("ユーザー処理:", user.name);
}

async function processOrder(order) {
  // 注文処理ロジック
  console.log("注文処理:", order.id);
}

3. 複数オブジェクトストリームの処理

import { decodeMultiStream } from "@msgpack/msgpack";

// 複数の独立したオブジェクトを含むストリームを処理
async function processMultiStream(stream) {
  const results = [];
  
  try {
    for await (const item of decodeMultiStream(stream)) {
      results.push(item);
      
      // リアルタイム処理
      if (item.timestamp) {
        const delay = Date.now() - item.timestamp;
        console.log(`遅延: ${delay}ms`, item);
      }
    }
    
    return results;
  } catch (error) {
    console.error("マルチストリーム処理エラー:", error);
    return results;
  }
}

実用的な例

1. リアルタイム通信

// WebSocketでのMessagePack使用例
class MessagePackWebSocket {
  constructor(url) {
    this.ws = new WebSocket(url);
    this.ws.binaryType = 'arraybuffer';
    
    this.ws.onmessage = (event) => {
      try {
        const data = new Uint8Array(event.data);
        const message = decode(data);
        this.handleMessage(message);
      } catch (error) {
        console.error("メッセージデコードエラー:", error);
      }
    };
  }
  
  send(message) {
    if (this.ws.readyState === WebSocket.OPEN) {
      const encoded = encode(message);
      this.ws.send(encoded);
    }
  }
  
  handleMessage(message) {
    switch (message.type) {
      case 'chat':
        this.displayChatMessage(message.data);
        break;
      case 'user_joined':
        this.displayUserJoined(message.data);
        break;
      case 'notification':
        this.displayNotification(message.data);
        break;
    }
  }
  
  displayChatMessage(data) {
    console.log(`${data.user}: ${data.message}`);
  }
  
  displayUserJoined(data) {
    console.log(`${data.user} が参加しました`);
  }
  
  displayNotification(data) {
    console.log(`通知: ${data.message}`);
  }
}

// 使用例
const socket = new MessagePackWebSocket('ws://localhost:8080');

// メッセージ送信
socket.send({
  type: 'chat',
  data: {
    user: '田中太郎',
    message: 'こんにちは!',
    timestamp: Date.now()
  }
});

2. データキャッシュシステム

import { encode, decode } from "@msgpack/msgpack";

class MessagePackCache {
  constructor() {
    this.cache = new Map();
    this.compressionRatio = 0;
  }
  
  set(key, value, ttl = 3600000) { // デフォルト1時間
    const encoded = encode(value);
    const originalSize = JSON.stringify(value).length;
    
    this.cache.set(key, {
      data: encoded,
      timestamp: Date.now(),
      ttl: ttl
    });
    
    // 圧縮率の計算
    this.compressionRatio = (originalSize - encoded.length) / originalSize;
    
    console.log(`キャッシュ保存: ${key}`);
    console.log(`圧縮率: ${(this.compressionRatio * 100).toFixed(2)}%`);
  }
  
  get(key) {
    const item = this.cache.get(key);
    
    if (!item) {
      return null;
    }
    
    // TTLチェック
    if (Date.now() - item.timestamp > item.ttl) {
      this.cache.delete(key);
      return null;
    }
    
    try {
      return decode(item.data);
    } catch (error) {
      console.error("キャッシュデコードエラー:", error);
      this.cache.delete(key);
      return null;
    }
  }
  
  clear() {
    this.cache.clear();
    console.log("キャッシュをクリアしました");
  }
  
  getStats() {
    return {
      size: this.cache.size,
      compressionRatio: this.compressionRatio
    };
  }
}

// 使用例
const cache = new MessagePackCache();

// データの保存
cache.set('user:1', {
  id: 1,
  name: '田中太郎',
  email: '[email protected]',
  preferences: {
    theme: 'dark',
    notifications: true
  },
  history: Array.from({ length: 100 }, (_, i) => ({
    id: i,
    action: `action_${i}`,
    timestamp: Date.now() - i * 1000
  }))
});

// データの取得
const user = cache.get('user:1');
console.log("取得したユーザー:", user?.name);

// 統計情報
console.log("キャッシュ統計:", cache.getStats());

3. バッチ処理システム

import { encode, decode } from "@msgpack/msgpack";

class MessagePackBatcher {
  constructor(batchSize = 1000) {
    this.batchSize = batchSize;
    this.buffer = [];
  }
  
  add(item) {
    this.buffer.push(item);
    
    if (this.buffer.length >= this.batchSize) {
      return this.flush();
    }
    
    return null;
  }
  
  flush() {
    if (this.buffer.length === 0) {
      return null;
    }
    
    const batch = {
      items: this.buffer,
      timestamp: Date.now(),
      count: this.buffer.length
    };
    
    this.buffer = [];
    
    return encode(batch);
  }
  
  processBatch(encodedBatch) {
    try {
      const batch = decode(encodedBatch);
      
      console.log(`バッチ処理開始: ${batch.count} アイテム`);
      
      for (const item of batch.items) {
        this.processItem(item);
      }
      
      console.log(`バッチ処理完了: ${batch.count} アイテム`);
      
      return batch.count;
    } catch (error) {
      console.error("バッチ処理エラー:", error);
      return 0;
    }
  }
  
  processItem(item) {
    // 個別アイテムの処理ロジック
    console.log(`アイテム処理: ${item.id}`);
  }
}

// 使用例
const batcher = new MessagePackBatcher(5);

// データの追加
for (let i = 0; i < 12; i++) {
  const batch = batcher.add({
    id: i,
    name: `アイテム${i}`,
    value: Math.random(),
    timestamp: Date.now()
  });
  
  if (batch) {
    console.log("バッチが作成されました");
    batcher.processBatch(batch);
  }
}

// 残りのデータを処理
const finalBatch = batcher.flush();
if (finalBatch) {
  batcher.processBatch(finalBatch);
}

パフォーマンスベンチマーク

2024年のベンチマーク結果

// パフォーマンステスト
function performanceTest() {
  const testData = {
    users: Array.from({ length: 1000 }, (_, i) => ({
      id: i,
      name: `ユーザー${i}`,
      email: `user${i}@example.com`,
      scores: Array.from({ length: 10 }, () => Math.random() * 100)
    }))
  };
  
  console.log("パフォーマンステスト開始");
  
  // JSON テスト
  let start = performance.now();
  const jsonString = JSON.stringify(testData);
  const jsonParsed = JSON.parse(jsonString);
  const jsonTime = performance.now() - start;
  
  // MessagePack テスト
  start = performance.now();
  const msgpackEncoded = encode(testData);
  const msgpackDecoded = decode(msgpackEncoded);
  const msgpackTime = performance.now() - start;
  
  console.log("結果:");
  console.log(`JSON: ${jsonTime.toFixed(2)}ms, サイズ: ${jsonString.length} bytes`);
  console.log(`MessagePack: ${msgpackTime.toFixed(2)}ms, サイズ: ${msgpackEncoded.length} bytes`);
  console.log(`MessagePack高速化: ${(jsonTime / msgpackTime).toFixed(2)}倍`);
  console.log(`MessagePack圧縮率: ${((jsonString.length - msgpackEncoded.length) / jsonString.length * 100).toFixed(2)}%`);
}

performanceTest();

他のライブラリとの比較

特徴MessagePackJSONProtocol BuffersBSON
パフォーマンス
サイズ
可読性
型サポート豊富制限的豊富豊富
スキーマ不要不要必要不要
ストリーミング対応制限的対応制限的

トラブルシューティング

よくある問題と解決策

  1. 循環参照エラー

    // 問題のあるコード
    const obj = { name: "test" };
    obj.self = obj; // 循環参照
    
    // 解決策
    const extensionCodec = new ExtensionCodec();
    const refs = new WeakMap();
    
    extensionCodec.register({
      type: 99,
      encode(object) {
        if (refs.has(object)) {
          return encode({ $ref: refs.get(object) });
        }
        // 通常の処理
        return null;
      }
    });
    
  2. 大きなオブジェクトの処理

    // チャンクサイズを制限
    function encodeLargeObject(obj, chunkSize = 1000) {
      const keys = Object.keys(obj);
      const chunks = [];
      
      for (let i = 0; i < keys.length; i += chunkSize) {
        const chunk = {};
        keys.slice(i, i + chunkSize).forEach(key => {
          chunk[key] = obj[key];
        });
        chunks.push(encode(chunk));
      }
      
      return chunks;
    }
    

まとめ

MessagePack は、効率的なバイナリ シリアライゼーションを提供する優れたライブラリです。特に以下の場合に効果的です:

利点

  • 高速処理: JSONよりも高速なシリアライゼーション
  • コンパクト: 20-50%小さいデータサイズ
  • 型サポート: バイナリデータや正確な数値型をサポート
  • ストリーミング: 大きなデータセットの効率的な処理

推奨用途

  • リアルタイム通信で帯域幅を節約したい場合
  • API通信でパフォーマンスを重視する場合
  • マイクロサービス間の効率的なデータ交換
  • モバイルアプリでデータ使用量を削減したい場合

MessagePack は、JSONの柔軟性を保ちながら、バイナリ形式の効率性を提供する理想的なソリューションです。