Apache Avro for JavaScript
JavaScript向けのApache Avroシリアライゼーションライブラリ。スキーマ進化、パフォーマンス、ブラウザ互換性を提供。
Apache Avro for JavaScript
概要
Apache AvroはJavaScript環境で、純粋なJavaScript実装で高速でコンパクトなシリアライゼーションを提供します。主にavscとavro-jsで2つのメジャーな実装があり、どちらもJSONよりも高速でサイズが小さいシリアライゼーションを実現します。スキーマ進化、型推論、リモートプロシージャコールなどの高度な機能をサポートし、モダンなデータ中心アプリケーションに最適です。
主な特徴
- 高性能: JSONと比較して約2倍高速でサイズが小さい
- スキーマ進化: 後方互換性と前方互換性をサポート
- ブラウザ互換性: ブラウザとNode.js両方で動作
- ライトウェイト: 依存関係なしでコンパクトなサイズ
- 柔軟なシリアライズ: 任意のJavaScriptオブジェクトのシリアライズをサポート
- スキーマ検証: コンパイル時のデータバリデーション
インストール
avsc(推奨)
npm install avsc
avro-js
npm install avro-js
基本的な使い方
avscでのシンプルな例
const avro = require('avsc');
// スキーマの定義
const userSchema = avro.Type.forSchema({
type: 'record',
name: 'User',
fields: [
{ name: 'id', type: 'long' },
{ name: 'name', type: 'string' },
{ name: 'email', type: ['null', 'string'], default: null },
{ name: 'age', type: 'int' },
{ name: 'roles', type: { type: 'array', items: 'string' } },
{ name: 'metadata', type: { type: 'map', values: 'string' } },
{ name: 'isActive', type: 'boolean', default: true }
]
});
// データの作成
const user = {
id: 1,
name: '田中太郎',
email: '[email protected]',
age: 30,
roles: ['admin', 'user'],
metadata: {
department: 'engineering',
level: 'senior'
},
isActive: true
};
// シリアライゼーション
const buffer = userSchema.toBuffer(user);
console.log('シリアライズサイズ:', buffer.length, 'バイト');
// デシリアライゼーション
const deserializedUser = userSchema.fromBuffer(buffer);
console.log('デシリアライズ結果:', deserializedUser);
// JSONとのサイズ比較
const jsonString = JSON.stringify(user);
console.log('JSONサイズ:', Buffer.from(jsonString).length, 'バイト');
console.log('サイズ削減率:',
Math.round((1 - buffer.length / Buffer.from(jsonString).length) * 100) + '%');
avro-jsでの例
const avro = require('avro-js');
// スキーマのパース
const userType = avro.parse({
name: 'User',
type: 'record',
fields: [
{ name: 'id', type: 'long' },
{ name: 'name', type: 'string' },
{ name: 'email', type: ['null', 'string'], default: null },
{ name: 'kind', type: {
name: 'Kind',
type: 'enum',
symbols: ['ADMIN', 'USER', 'GUEST']
} },
{ name: 'tags', type: { type: 'array', items: 'string' } }
]
});
// データのシリアライズ
const user = {
id: 1,
name: '田中太郎',
email: '[email protected]',
kind: 'ADMIN',
tags: ['developer', 'javascript']
};
const buffer = userType.toBuffer(user);
console.log('シリアライズサイズ:', buffer.length, 'バイト');
// デシリアライズ
const deserializedUser = userType.fromBuffer(buffer);
console.log('デシリアライズ結果:', deserializedUser);
スキーマ進化
後方互換性の例
const avro = require('avsc');
// 旧スキーマ(バージョン1)
const oldSchema = avro.Type.forSchema({
type: 'record',
name: 'User',
fields: [
{ name: 'id', type: 'long' },
{ name: 'name', type: 'string' }
]
});
// 新スキーマ(バージョン2)
const newSchema = avro.Type.forSchema({
type: 'record',
name: 'User',
fields: [
{ name: 'id', type: 'long' },
{ name: 'name', type: 'string' },
{ name: 'email', type: ['null', 'string'], default: null }, // 新フィールド
{ name: 'age', type: 'int', default: 0 }, // 新フィールド
{ name: 'isActive', type: 'boolean', default: true } // 新フィールド
]
});
// 旧スキーマでデータをシリアライズ
const oldUser = { id: 1, name: '田中太郎' };
const buffer = oldSchema.toBuffer(oldUser);
// 新スキーマでデシリアライズ(スキーマ進化)
const resolver = avro.Type.forSchema(newSchema, { writerSchema: oldSchema });
const newUser = resolver.fromBuffer(buffer);
console.log('スキーマ進化結果:', newUser);
// 出力: { id: 1, name: '田中太郎', email: null, age: 0, isActive: true }
スキーマレジストリとの連携
const avro = require('avsc');
const axios = require('axios');
class SchemaRegistry {
constructor(baseUrl) {
this.baseUrl = baseUrl;
this.cache = new Map();
}
async getSchema(schemaId) {
if (this.cache.has(schemaId)) {
return this.cache.get(schemaId);
}
const response = await axios.get(`${this.baseUrl}/schemas/ids/${schemaId}`);
const schema = JSON.parse(response.data.schema);
const type = avro.Type.forSchema(schema);
this.cache.set(schemaId, type);
return type;
}
async getLatestSchema(subject) {
const response = await axios.get(`${this.baseUrl}/subjects/${subject}/versions/latest`);
return {
id: response.data.id,
schema: avro.Type.forSchema(JSON.parse(response.data.schema))
};
}
async registerSchema(subject, schema) {
const response = await axios.post(`${this.baseUrl}/subjects/${subject}/versions`, {
schema: JSON.stringify(schema)
});
return response.data.id;
}
}
// 使用例
const registry = new SchemaRegistry('http://localhost:8081');
async function serializeWithRegistry(subject, data) {
const { id, schema } = await registry.getLatestSchema(subject);
const buffer = schema.toBuffer(data);
// スキーマIDを付加したフォーマット
const result = Buffer.allocUnsafe(5 + buffer.length);
result.writeUInt8(0, 0); // マジックバイト
result.writeUInt32BE(id, 1); // スキーマID
buffer.copy(result, 5);
return result;
}
async function deserializeWithRegistry(buffer) {
if (buffer.readUInt8(0) !== 0) {
throw new Error('Invalid magic byte');
}
const schemaId = buffer.readUInt32BE(1);
const schema = await registry.getSchema(schemaId);
return schema.fromBuffer(buffer.slice(5));
}
ストリーミング処理
ストリームベースのシリアライズ
const avro = require('avsc');
const { Transform } = require('stream');
class AvroEncoder extends Transform {
constructor(schema) {
super({ objectMode: true });
this.schema = avro.Type.forSchema(schema);
}
_transform(chunk, encoding, callback) {
try {
const buffer = this.schema.toBuffer(chunk);
// メッセージサイズをプリフィックスとして付加
const sizeBuffer = Buffer.allocUnsafe(4);
sizeBuffer.writeUInt32BE(buffer.length, 0);
this.push(Buffer.concat([sizeBuffer, buffer]));
callback();
} catch (error) {
callback(error);
}
}
}
class AvroDecoder extends Transform {
constructor(schema) {
super({ objectMode: true });
this.schema = avro.Type.forSchema(schema);
this.buffer = Buffer.alloc(0);
}
_transform(chunk, encoding, callback) {
this.buffer = Buffer.concat([this.buffer, chunk]);
while (this.buffer.length >= 4) {
const messageSize = this.buffer.readUInt32BE(0);
if (this.buffer.length >= 4 + messageSize) {
const messageBuffer = this.buffer.slice(4, 4 + messageSize);
this.buffer = this.buffer.slice(4 + messageSize);
try {
const object = this.schema.fromBuffer(messageBuffer);
this.push(object);
} catch (error) {
return callback(error);
}
} else {
break;
}
}
callback();
}
}
// 使用例
const schema = {
type: 'record',
name: 'LogEntry',
fields: [
{ name: 'timestamp', type: 'long' },
{ name: 'level', type: 'string' },
{ name: 'message', type: 'string' },
{ name: 'metadata', type: { type: 'map', values: 'string' } }
]
};
const encoder = new AvroEncoder(schema);
const decoder = new AvroDecoder(schema);
// ストリームパイプライン
const fs = require('fs');
// エンコードストリーム
const writeStream = fs.createWriteStream('logs.avro');
encoder.pipe(writeStream);
// データの送信
encoder.write({
timestamp: Date.now(),
level: 'INFO',
message: 'アプリケーションが起動しました',
metadata: { component: 'server', version: '1.0.0' }
});
encoder.end();
ファイルコンテナの処理
const avro = require('avsc');
const fs = require('fs');
// Avroファイルコンテナの作成
function writeAvroFile(filename, schema, records) {
const type = avro.Type.forSchema(schema);
const writerSchema = type.schema();
const encoder = avro.createFileEncoder(filename, writerSchema);
records.forEach(record => {
encoder.write(record);
});
encoder.end();
}
// Avroファイルコンテナの読み込み
function readAvroFile(filename) {
return new Promise((resolve, reject) => {
const records = [];
avro.createFileDecoder(filename)
.on('data', record => records.push(record))
.on('end', () => resolve(records))
.on('error', reject);
});
}
// 使用例
const schema = {
type: 'record',
name: 'Event',
fields: [
{ name: 'id', type: 'string' },
{ name: 'timestamp', type: 'long' },
{ name: 'type', type: 'string' },
{ name: 'data', type: { type: 'map', values: 'string' } }
]
};
const events = [
{
id: '1',
timestamp: Date.now(),
type: 'user_login',
data: { userId: '123', ip: '192.168.1.1' }
},
{
id: '2',
timestamp: Date.now(),
type: 'page_view',
data: { path: '/dashboard', referrer: '/login' }
}
];
// ファイルへの書き込み
writeAvroFile('events.avro', schema, events);
// ファイルからの読み込み
readAvroFile('events.avro').then(records => {
console.log('読み込みレコード数:', records.length);
records.forEach(record => {
console.log('イベント:', record.type, 'タイムスタンプ:', new Date(record.timestamp));
});
});
ブラウザ統合
ブラウザでの使用
<!DOCTYPE html>
<html>
<head>
<title>Avro in Browser</title>
<script src="https://unpkg.com/[email protected]/dist/avsc.js"></script>
</head>
<body>
<script>
// スキーマの定義
const schema = {
type: 'record',
name: 'UserEvent',
fields: [
{ name: 'userId', type: 'string' },
{ name: 'event', type: 'string' },
{ name: 'timestamp', type: 'long' },
{ name: 'properties', type: { type: 'map', values: 'string' } }
]
};
const type = avsc.Type.forSchema(schema);
// ユーザーイベントのトラッキング
function trackEvent(userId, event, properties = {}) {
const eventData = {
userId,
event,
timestamp: Date.now(),
properties
};
// シリアライズしてサーバーに送信
const buffer = type.toBuffer(eventData);
fetch('/analytics/track', {
method: 'POST',
headers: {
'Content-Type': 'application/x-avro-binary'
},
body: buffer
}).then(response => {
if (response.ok) {
console.log('イベント送信成功:', event);
}
});
}
// イベントのトラッキング
trackEvent('user123', 'page_view', {
page: '/dashboard',
referrer: '/login'
});
// ボタンクリックイベント
document.addEventListener('click', (e) => {
if (e.target.tagName === 'BUTTON') {
trackEvent('user123', 'button_click', {
button_id: e.target.id,
button_text: e.target.textContent
});
}
});
</script>
</body>
</html>
Express.jsでのサーバー側処理
const express = require('express');
const avro = require('avsc');
const app = express();
// スキーマの定義
const eventSchema = avro.Type.forSchema({
type: 'record',
name: 'UserEvent',
fields: [
{ name: 'userId', type: 'string' },
{ name: 'event', type: 'string' },
{ name: 'timestamp', type: 'long' },
{ name: 'properties', type: { type: 'map', values: 'string' } }
]
});
// Avroバイナリミドルウェア
app.use('/analytics', (req, res, next) => {
if (req.headers['content-type'] === 'application/x-avro-binary') {
const chunks = [];
req.on('data', chunk => chunks.push(chunk));
req.on('end', () => {
try {
const buffer = Buffer.concat(chunks);
req.body = eventSchema.fromBuffer(buffer);
next();
} catch (error) {
res.status(400).json({ error: 'Invalid Avro data' });
}
});
} else {
next();
}
});
// イベントトラッキングエンドポイント
app.post('/analytics/track', (req, res) => {
const event = req.body;
console.log('イベント受信:', {
user: event.userId,
event: event.event,
time: new Date(event.timestamp)
});
// データベースやメッセージキューに保存
saveEvent(event);
res.json({ success: true });
});
// イベントの保存処理
function saveEvent(event) {
// データベースやKafkaなどに保存する処理
console.log('Saving event:', event);
}
app.listen(3000, () => {
console.log('Server running on port 3000');
});
パフォーマンス最適化
ベンチマークテスト
const avro = require('avsc');
const { performance } = require('perf_hooks');
// ベンチマーク用のデータとスキーマ
const schema = {
type: 'record',
name: 'TestRecord',
fields: [
{ name: 'id', type: 'long' },
{ name: 'name', type: 'string' },
{ name: 'email', type: 'string' },
{ name: 'tags', type: { type: 'array', items: 'string' } },
{ name: 'metadata', type: { type: 'map', values: 'string' } }
]
};
const type = avro.Type.forSchema(schema);
// テストデータの作成
function generateTestData(count) {
const data = [];
for (let i = 0; i < count; i++) {
data.push({
id: i,
name: `ユーザー${i}`,
email: `user${i}@example.com`,
tags: ['tag1', 'tag2', 'tag3'],
metadata: {
department: 'engineering',
level: 'senior',
location: 'tokyo'
}
});
}
return data;
}
// ベンチマーク関数
function benchmark(name, fn) {
const start = performance.now();
const result = fn();
const end = performance.now();
console.log(`${name}: ${(end - start).toFixed(2)}ms`);
return result;
}
// パフォーマンス比較
const testData = generateTestData(10000);
console.log('10,000レコードのシリアライズパフォーマンス比較:');
// JSONシリアライズ
const jsonResult = benchmark('JSONシリアライズ', () => {
const buffers = testData.map(data => Buffer.from(JSON.stringify(data)));
return Buffer.concat(buffers);
});
// Avroシリアライズ
const avroResult = benchmark('Avroシリアライズ', () => {
const buffers = testData.map(data => type.toBuffer(data));
return Buffer.concat(buffers);
});
console.log('\nサイズ比較:');
console.log('JSONサイズ:', jsonResult.length, 'バイト');
console.log('Avroサイズ:', avroResult.length, 'バイト');
console.log('サイズ削減率:',
Math.round((1 - avroResult.length / jsonResult.length) * 100) + '%');
メモリ最適化
// スキーマキャッシュでパフォーマンス向上
class AvroManager {
constructor() {
this.schemas = new Map();
this.bufferPool = [];
}
getSchema(schemaString) {
if (!this.schemas.has(schemaString)) {
this.schemas.set(schemaString, avro.Type.forSchema(JSON.parse(schemaString)));
}
return this.schemas.get(schemaString);
}
// バッファプールでメモリアロケーションを削減
getBuffer(size) {
if (this.bufferPool.length > 0) {
const buffer = this.bufferPool.pop();
if (buffer.length >= size) {
return buffer.slice(0, size);
}
}
return Buffer.allocUnsafe(size);
}
returnBuffer(buffer) {
if (this.bufferPool.length < 10) { // プールサイズを制限
this.bufferPool.push(buffer);
}
}
serialize(schemaString, data) {
const schema = this.getSchema(schemaString);
return schema.toBuffer(data);
}
deserialize(schemaString, buffer) {
const schema = this.getSchema(schemaString);
return schema.fromBuffer(buffer);
}
}
// グローバルインスタンス
const avroManager = new AvroManager();
// 使用例
const schemaString = JSON.stringify({
type: 'record',
name: 'OptimizedRecord',
fields: [
{ name: 'id', type: 'long' },
{ name: 'data', type: 'string' }
]
});
// シリアライズ
const buffer = avroManager.serialize(schemaString, { id: 1, data: 'test' });
// デシリアライズ
const data = avroManager.deserialize(schemaString, buffer);
ベストプラクティス
1. ライブラリの選択
- avsc: 高機能が必要な場合、スキーマ進化を重視する場合
- avro-js: シンプルな使用例、ブラウザ互換性を重視する場合
2. スキーマ設計
// 良い例: デフォルト値を持つオプショナルフィールド
const goodSchema = {
type: 'record',
name: 'User',
fields: [
{ name: 'id', type: 'long' },
{ name: 'name', type: 'string' },
{ name: 'email', type: ['null', 'string'], default: null },
{ name: 'isActive', type: 'boolean', default: true }
]
};
// 避けるべき: デフォルト値のない新フィールド
const badSchema = {
type: 'record',
name: 'User',
fields: [
{ name: 'id', type: 'long' },
{ name: 'name', type: 'string' },
{ name: 'newField', type: 'string' } // デフォルト値がない
]
};
3. エラーハンドリング
function safeSerialize(schema, data) {
try {
return schema.toBuffer(data);
} catch (error) {
console.error('シリアライズエラー:', error.message);
throw error;
}
}
function safeDeserialize(schema, buffer) {
try {
return schema.fromBuffer(buffer);
} catch (error) {
console.error('デシリアライズエラー:', error.message);
throw error;
}
}
まとめ
Apache Avro for JavaScriptは、モダンなWebアプリケーションやNode.jsアプリケーションにおいて、高性能なデータシリアライゼーションを実現する優れたソリューションです。スキーマ進化のサポート、コンパクトなサイズ、ブラウザ互換性により、リアルタイムデータストリーミング、マイクロサービス間通信、イベントドリブンアーキテクチャなど幅広い用途に適用できます。avscとavro-jsのどちらも優れたライブラリですが、プロジェクトの要件に応じて適切な選択をすることが重要です。