Apache Avro for JavaScript

JavaScript向けのApache Avroシリアライゼーションライブラリ。スキーマ進化、パフォーマンス、ブラウザ互換性を提供。

Apache Avro for JavaScript

概要

Apache AvroはJavaScript環境で、純粋なJavaScript実装で高速でコンパクトなシリアライゼーションを提供します。主にavscavro-jsで2つのメジャーな実装があり、どちらもJSONよりも高速でサイズが小さいシリアライゼーションを実現します。スキーマ進化、型推論、リモートプロシージャコールなどの高度な機能をサポートし、モダンなデータ中心アプリケーションに最適です。

主な特徴

  1. 高性能: JSONと比較して約2倍高速でサイズが小さい
  2. スキーマ進化: 後方互換性と前方互換性をサポート
  3. ブラウザ互換性: ブラウザとNode.js両方で動作
  4. ライトウェイト: 依存関係なしでコンパクトなサイズ
  5. 柔軟なシリアライズ: 任意のJavaScriptオブジェクトのシリアライズをサポート
  6. スキーマ検証: コンパイル時のデータバリデーション

インストール

avsc(推奨)

npm install avsc

avro-js

npm install avro-js

基本的な使い方

avscでのシンプルな例

const avro = require('avsc');

// スキーマの定義
const userSchema = avro.Type.forSchema({
  type: 'record',
  name: 'User',
  fields: [
    { name: 'id', type: 'long' },
    { name: 'name', type: 'string' },
    { name: 'email', type: ['null', 'string'], default: null },
    { name: 'age', type: 'int' },
    { name: 'roles', type: { type: 'array', items: 'string' } },
    { name: 'metadata', type: { type: 'map', values: 'string' } },
    { name: 'isActive', type: 'boolean', default: true }
  ]
});

// データの作成
const user = {
  id: 1,
  name: '田中太郎',
  email: '[email protected]',
  age: 30,
  roles: ['admin', 'user'],
  metadata: {
    department: 'engineering',
    level: 'senior'
  },
  isActive: true
};

// シリアライゼーション
const buffer = userSchema.toBuffer(user);
console.log('シリアライズサイズ:', buffer.length, 'バイト');

// デシリアライゼーション
const deserializedUser = userSchema.fromBuffer(buffer);
console.log('デシリアライズ結果:', deserializedUser);

// JSONとのサイズ比較
const jsonString = JSON.stringify(user);
console.log('JSONサイズ:', Buffer.from(jsonString).length, 'バイト');
console.log('サイズ削減率:', 
  Math.round((1 - buffer.length / Buffer.from(jsonString).length) * 100) + '%');

avro-jsでの例

const avro = require('avro-js');

// スキーマのパース
const userType = avro.parse({
  name: 'User',
  type: 'record',
  fields: [
    { name: 'id', type: 'long' },
    { name: 'name', type: 'string' },
    { name: 'email', type: ['null', 'string'], default: null },
    { name: 'kind', type: { 
      name: 'Kind', 
      type: 'enum', 
      symbols: ['ADMIN', 'USER', 'GUEST'] 
    } },
    { name: 'tags', type: { type: 'array', items: 'string' } }
  ]
});

// データのシリアライズ
const user = {
  id: 1,
  name: '田中太郎',
  email: '[email protected]',
  kind: 'ADMIN',
  tags: ['developer', 'javascript']
};

const buffer = userType.toBuffer(user);
console.log('シリアライズサイズ:', buffer.length, 'バイト');

// デシリアライズ
const deserializedUser = userType.fromBuffer(buffer);
console.log('デシリアライズ結果:', deserializedUser);

スキーマ進化

後方互換性の例

const avro = require('avsc');

// 旧スキーマ(バージョン1)
const oldSchema = avro.Type.forSchema({
  type: 'record',
  name: 'User',
  fields: [
    { name: 'id', type: 'long' },
    { name: 'name', type: 'string' }
  ]
});

// 新スキーマ(バージョン2)
const newSchema = avro.Type.forSchema({
  type: 'record',
  name: 'User',
  fields: [
    { name: 'id', type: 'long' },
    { name: 'name', type: 'string' },
    { name: 'email', type: ['null', 'string'], default: null }, // 新フィールド
    { name: 'age', type: 'int', default: 0 }, // 新フィールド
    { name: 'isActive', type: 'boolean', default: true } // 新フィールド
  ]
});

// 旧スキーマでデータをシリアライズ
const oldUser = { id: 1, name: '田中太郎' };
const buffer = oldSchema.toBuffer(oldUser);

// 新スキーマでデシリアライズ(スキーマ進化)
const resolver = avro.Type.forSchema(newSchema, { writerSchema: oldSchema });
const newUser = resolver.fromBuffer(buffer);

console.log('スキーマ進化結果:', newUser);
// 出力: { id: 1, name: '田中太郎', email: null, age: 0, isActive: true }

スキーマレジストリとの連携

const avro = require('avsc');
const axios = require('axios');

class SchemaRegistry {
  constructor(baseUrl) {
    this.baseUrl = baseUrl;
    this.cache = new Map();
  }

  async getSchema(schemaId) {
    if (this.cache.has(schemaId)) {
      return this.cache.get(schemaId);
    }

    const response = await axios.get(`${this.baseUrl}/schemas/ids/${schemaId}`);
    const schema = JSON.parse(response.data.schema);
    const type = avro.Type.forSchema(schema);
    
    this.cache.set(schemaId, type);
    return type;
  }

  async getLatestSchema(subject) {
    const response = await axios.get(`${this.baseUrl}/subjects/${subject}/versions/latest`);
    return {
      id: response.data.id,
      schema: avro.Type.forSchema(JSON.parse(response.data.schema))
    };
  }

  async registerSchema(subject, schema) {
    const response = await axios.post(`${this.baseUrl}/subjects/${subject}/versions`, {
      schema: JSON.stringify(schema)
    });
    return response.data.id;
  }
}

// 使用例
const registry = new SchemaRegistry('http://localhost:8081');

async function serializeWithRegistry(subject, data) {
  const { id, schema } = await registry.getLatestSchema(subject);
  const buffer = schema.toBuffer(data);
  
  // スキーマIDを付加したフォーマット
  const result = Buffer.allocUnsafe(5 + buffer.length);
  result.writeUInt8(0, 0); // マジックバイト
  result.writeUInt32BE(id, 1); // スキーマID
  buffer.copy(result, 5);
  
  return result;
}

async function deserializeWithRegistry(buffer) {
  if (buffer.readUInt8(0) !== 0) {
    throw new Error('Invalid magic byte');
  }
  
  const schemaId = buffer.readUInt32BE(1);
  const schema = await registry.getSchema(schemaId);
  
  return schema.fromBuffer(buffer.slice(5));
}

ストリーミング処理

ストリームベースのシリアライズ

const avro = require('avsc');
const { Transform } = require('stream');

class AvroEncoder extends Transform {
  constructor(schema) {
    super({ objectMode: true });
    this.schema = avro.Type.forSchema(schema);
  }

  _transform(chunk, encoding, callback) {
    try {
      const buffer = this.schema.toBuffer(chunk);
      // メッセージサイズをプリフィックスとして付加
      const sizeBuffer = Buffer.allocUnsafe(4);
      sizeBuffer.writeUInt32BE(buffer.length, 0);
      
      this.push(Buffer.concat([sizeBuffer, buffer]));
      callback();
    } catch (error) {
      callback(error);
    }
  }
}

class AvroDecoder extends Transform {
  constructor(schema) {
    super({ objectMode: true });
    this.schema = avro.Type.forSchema(schema);
    this.buffer = Buffer.alloc(0);
  }

  _transform(chunk, encoding, callback) {
    this.buffer = Buffer.concat([this.buffer, chunk]);
    
    while (this.buffer.length >= 4) {
      const messageSize = this.buffer.readUInt32BE(0);
      
      if (this.buffer.length >= 4 + messageSize) {
        const messageBuffer = this.buffer.slice(4, 4 + messageSize);
        this.buffer = this.buffer.slice(4 + messageSize);
        
        try {
          const object = this.schema.fromBuffer(messageBuffer);
          this.push(object);
        } catch (error) {
          return callback(error);
        }
      } else {
        break;
      }
    }
    
    callback();
  }
}

// 使用例
const schema = {
  type: 'record',
  name: 'LogEntry',
  fields: [
    { name: 'timestamp', type: 'long' },
    { name: 'level', type: 'string' },
    { name: 'message', type: 'string' },
    { name: 'metadata', type: { type: 'map', values: 'string' } }
  ]
};

const encoder = new AvroEncoder(schema);
const decoder = new AvroDecoder(schema);

// ストリームパイプライン
const fs = require('fs');

// エンコードストリーム
const writeStream = fs.createWriteStream('logs.avro');
encoder.pipe(writeStream);

// データの送信
encoder.write({
  timestamp: Date.now(),
  level: 'INFO',
  message: 'アプリケーションが起動しました',
  metadata: { component: 'server', version: '1.0.0' }
});

encoder.end();

ファイルコンテナの処理

const avro = require('avsc');
const fs = require('fs');

// Avroファイルコンテナの作成
function writeAvroFile(filename, schema, records) {
  const type = avro.Type.forSchema(schema);
  const writerSchema = type.schema();
  
  const encoder = avro.createFileEncoder(filename, writerSchema);
  
  records.forEach(record => {
    encoder.write(record);
  });
  
  encoder.end();
}

// Avroファイルコンテナの読み込み
function readAvroFile(filename) {
  return new Promise((resolve, reject) => {
    const records = [];
    
    avro.createFileDecoder(filename)
      .on('data', record => records.push(record))
      .on('end', () => resolve(records))
      .on('error', reject);
  });
}

// 使用例
const schema = {
  type: 'record',
  name: 'Event',
  fields: [
    { name: 'id', type: 'string' },
    { name: 'timestamp', type: 'long' },
    { name: 'type', type: 'string' },
    { name: 'data', type: { type: 'map', values: 'string' } }
  ]
};

const events = [
  {
    id: '1',
    timestamp: Date.now(),
    type: 'user_login',
    data: { userId: '123', ip: '192.168.1.1' }
  },
  {
    id: '2',
    timestamp: Date.now(),
    type: 'page_view',
    data: { path: '/dashboard', referrer: '/login' }
  }
];

// ファイルへの書き込み
writeAvroFile('events.avro', schema, events);

// ファイルからの読み込み
readAvroFile('events.avro').then(records => {
  console.log('読み込みレコード数:', records.length);
  records.forEach(record => {
    console.log('イベント:', record.type, 'タイムスタンプ:', new Date(record.timestamp));
  });
});

ブラウザ統合

ブラウザでの使用

<!DOCTYPE html>
<html>
<head>
    <title>Avro in Browser</title>
    <script src="https://unpkg.com/[email protected]/dist/avsc.js"></script>
</head>
<body>
    <script>
        // スキーマの定義
        const schema = {
            type: 'record',
            name: 'UserEvent',
            fields: [
                { name: 'userId', type: 'string' },
                { name: 'event', type: 'string' },
                { name: 'timestamp', type: 'long' },
                { name: 'properties', type: { type: 'map', values: 'string' } }
            ]
        };

        const type = avsc.Type.forSchema(schema);

        // ユーザーイベントのトラッキング
        function trackEvent(userId, event, properties = {}) {
            const eventData = {
                userId,
                event,
                timestamp: Date.now(),
                properties
            };

            // シリアライズしてサーバーに送信
            const buffer = type.toBuffer(eventData);
            
            fetch('/analytics/track', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-avro-binary'
                },
                body: buffer
            }).then(response => {
                if (response.ok) {
                    console.log('イベント送信成功:', event);
                }
            });
        }

        // イベントのトラッキング
        trackEvent('user123', 'page_view', {
            page: '/dashboard',
            referrer: '/login'
        });

        // ボタンクリックイベント
        document.addEventListener('click', (e) => {
            if (e.target.tagName === 'BUTTON') {
                trackEvent('user123', 'button_click', {
                    button_id: e.target.id,
                    button_text: e.target.textContent
                });
            }
        });
    </script>
</body>
</html>

Express.jsでのサーバー側処理

const express = require('express');
const avro = require('avsc');
const app = express();

// スキーマの定義
const eventSchema = avro.Type.forSchema({
  type: 'record',
  name: 'UserEvent',
  fields: [
    { name: 'userId', type: 'string' },
    { name: 'event', type: 'string' },
    { name: 'timestamp', type: 'long' },
    { name: 'properties', type: { type: 'map', values: 'string' } }
  ]
});

// Avroバイナリミドルウェア
app.use('/analytics', (req, res, next) => {
  if (req.headers['content-type'] === 'application/x-avro-binary') {
    const chunks = [];
    req.on('data', chunk => chunks.push(chunk));
    req.on('end', () => {
      try {
        const buffer = Buffer.concat(chunks);
        req.body = eventSchema.fromBuffer(buffer);
        next();
      } catch (error) {
        res.status(400).json({ error: 'Invalid Avro data' });
      }
    });
  } else {
    next();
  }
});

// イベントトラッキングエンドポイント
app.post('/analytics/track', (req, res) => {
  const event = req.body;
  
  console.log('イベント受信:', {
    user: event.userId,
    event: event.event,
    time: new Date(event.timestamp)
  });
  
  // データベースやメッセージキューに保存
  saveEvent(event);
  
  res.json({ success: true });
});

// イベントの保存処理
function saveEvent(event) {
  // データベースやKafkaなどに保存する処理
  console.log('Saving event:', event);
}

app.listen(3000, () => {
  console.log('Server running on port 3000');
});

パフォーマンス最適化

ベンチマークテスト

const avro = require('avsc');
const { performance } = require('perf_hooks');

// ベンチマーク用のデータとスキーマ
const schema = {
  type: 'record',
  name: 'TestRecord',
  fields: [
    { name: 'id', type: 'long' },
    { name: 'name', type: 'string' },
    { name: 'email', type: 'string' },
    { name: 'tags', type: { type: 'array', items: 'string' } },
    { name: 'metadata', type: { type: 'map', values: 'string' } }
  ]
};

const type = avro.Type.forSchema(schema);

// テストデータの作成
function generateTestData(count) {
  const data = [];
  for (let i = 0; i < count; i++) {
    data.push({
      id: i,
      name: `ユーザー${i}`,
      email: `user${i}@example.com`,
      tags: ['tag1', 'tag2', 'tag3'],
      metadata: {
        department: 'engineering',
        level: 'senior',
        location: 'tokyo'
      }
    });
  }
  return data;
}

// ベンチマーク関数
function benchmark(name, fn) {
  const start = performance.now();
  const result = fn();
  const end = performance.now();
  console.log(`${name}: ${(end - start).toFixed(2)}ms`);
  return result;
}

// パフォーマンス比較
const testData = generateTestData(10000);

console.log('10,000レコードのシリアライズパフォーマンス比較:');

// JSONシリアライズ
const jsonResult = benchmark('JSONシリアライズ', () => {
  const buffers = testData.map(data => Buffer.from(JSON.stringify(data)));
  return Buffer.concat(buffers);
});

// Avroシリアライズ
const avroResult = benchmark('Avroシリアライズ', () => {
  const buffers = testData.map(data => type.toBuffer(data));
  return Buffer.concat(buffers);
});

console.log('\nサイズ比較:');
console.log('JSONサイズ:', jsonResult.length, 'バイト');
console.log('Avroサイズ:', avroResult.length, 'バイト');
console.log('サイズ削減率:', 
  Math.round((1 - avroResult.length / jsonResult.length) * 100) + '%');

メモリ最適化

// スキーマキャッシュでパフォーマンス向上
class AvroManager {
  constructor() {
    this.schemas = new Map();
    this.bufferPool = [];
  }

  getSchema(schemaString) {
    if (!this.schemas.has(schemaString)) {
      this.schemas.set(schemaString, avro.Type.forSchema(JSON.parse(schemaString)));
    }
    return this.schemas.get(schemaString);
  }

  // バッファプールでメモリアロケーションを削減
  getBuffer(size) {
    if (this.bufferPool.length > 0) {
      const buffer = this.bufferPool.pop();
      if (buffer.length >= size) {
        return buffer.slice(0, size);
      }
    }
    return Buffer.allocUnsafe(size);
  }

  returnBuffer(buffer) {
    if (this.bufferPool.length < 10) { // プールサイズを制限
      this.bufferPool.push(buffer);
    }
  }

  serialize(schemaString, data) {
    const schema = this.getSchema(schemaString);
    return schema.toBuffer(data);
  }

  deserialize(schemaString, buffer) {
    const schema = this.getSchema(schemaString);
    return schema.fromBuffer(buffer);
  }
}

// グローバルインスタンス
const avroManager = new AvroManager();

// 使用例
const schemaString = JSON.stringify({
  type: 'record',
  name: 'OptimizedRecord',
  fields: [
    { name: 'id', type: 'long' },
    { name: 'data', type: 'string' }
  ]
});

// シリアライズ
const buffer = avroManager.serialize(schemaString, { id: 1, data: 'test' });

// デシリアライズ
const data = avroManager.deserialize(schemaString, buffer);

ベストプラクティス

1. ライブラリの選択

  • avsc: 高機能が必要な場合、スキーマ進化を重視する場合
  • avro-js: シンプルな使用例、ブラウザ互換性を重視する場合

2. スキーマ設計

// 良い例: デフォルト値を持つオプショナルフィールド
const goodSchema = {
  type: 'record',
  name: 'User',
  fields: [
    { name: 'id', type: 'long' },
    { name: 'name', type: 'string' },
    { name: 'email', type: ['null', 'string'], default: null },
    { name: 'isActive', type: 'boolean', default: true }
  ]
};

// 避けるべき: デフォルト値のない新フィールド
const badSchema = {
  type: 'record',
  name: 'User',
  fields: [
    { name: 'id', type: 'long' },
    { name: 'name', type: 'string' },
    { name: 'newField', type: 'string' } // デフォルト値がない
  ]
};

3. エラーハンドリング

function safeSerialize(schema, data) {
  try {
    return schema.toBuffer(data);
  } catch (error) {
    console.error('シリアライズエラー:', error.message);
    throw error;
  }
}

function safeDeserialize(schema, buffer) {
  try {
    return schema.fromBuffer(buffer);
  } catch (error) {
    console.error('デシリアライズエラー:', error.message);
    throw error;
  }
}

まとめ

Apache Avro for JavaScriptは、モダンなWebアプリケーションやNode.jsアプリケーションにおいて、高性能なデータシリアライゼーションを実現する優れたソリューションです。スキーマ進化のサポート、コンパクトなサイズ、ブラウザ互換性により、リアルタイムデータストリーミング、マイクロサービス間通信、イベントドリブンアーキテクチャなど幅広い用途に適用できます。avscとavro-jsのどちらも優れたライブラリですが、プロジェクトの要件に応じて適切な選択をすることが重要です。