Apache Avro .NET
ライブラリ
Apache Avro .NET
概要
Apache Avro .NETは、Avroデータシリアライゼーションシステムの.NET実装です。スキーマ進化をサポートする自己記述型のバイナリ形式で、ビッグデータシステムやKafkaとの統合で広く使用されています。リッチなデータ構造、高速なバイナリフォーマット、RPCサポートを提供し、Hadoopエコシステムとの親和性が高いことが特徴です。
詳細
Apache Avro .NET 1.12.0は、Avroプロトコルの公式.NET実装です。2025年現在も活発にメンテナンスされており、GenericRecord、SpecificRecord、Reflect APIの3つのシリアライゼーション方式をサポートしています。スキーマはJSON形式で定義され、データとともに保存・転送されるため、スキーマ進化に対応できます。Azure Schema RegistryやConfluent Kafka Platformとの統合も可能で、エンタープライズレベルのストリーミングシステムで使用されています。
主な特徴
- スキーマ進化: 前方互換性・後方互換性のあるスキーマ変更をサポート
- 3つのシリアライゼーション方式: Generic、Specific、Reflectモード
- コンパクトなバイナリ形式: JSONと比較して高効率なデータ圧縮
- コード生成ツール: Apache.Avro.Toolsによるスキーマからのクラス生成
- RPCサポート: リモートプロシージャコールの実装
- Kafkaとの統合: ストリーミングデータ処理での標準的な使用
メリット・デメリット
メリット
- スキーマ進化により、システムの長期的な保守性が向上
- 自己記述型フォーマットでメタデータとデータが一体化
- Hadoopエコシステム、Kafka、Sparkとの優れた互換性
- バイナリ形式による高効率なデータ圧縮とパフォーマンス
- 動的型付けと静的型付けの両方をサポート
- Apache公式サポートによる信頼性
デメリット
- .NET向けドキュメントが不足しており、学習コストが高い
- 論理型(日付、固定小数点等)の.NETサポートが未完成
- .NET Standard対応が部分的で、一部の環境で制限あり
- JSONと比較して人間が読みにくいバイナリ形式
- スキーマ管理の複雑性により運用負荷が増加
- Javaバージョンと比較して機能パリティが不完全
参考ページ
書き方の例
基本的なセットアップ
<PackageReference Include="Apache.Avro" Version="1.12.0" />
<PackageReference Include="Apache.Avro.Tools" Version="1.12.0" />
スキーマ定義
{
"type": "record",
"name": "User",
"namespace": "com.example.avro",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null},
{"name": "age", "type": "int"},
{"name": "created", "type": "long"}
]
}
Generic Record使用例
using Avro;
using Avro.Generic;
using Avro.IO;
using Avro.File;
// スキーマの読み込み
var schemaJson = File.ReadAllText("user.avsc");
var schema = Schema.Parse(schemaJson) as RecordSchema;
// GenericRecordの作成
var genericRecord = new GenericRecord(schema);
genericRecord.Add("id", 1001L);
genericRecord.Add("name", "山田太郎");
genericRecord.Add("email", "[email protected]");
genericRecord.Add("age", 30);
genericRecord.Add("created", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
// ファイルへの書き込み
using (var writer = DataFileWriter<GenericRecord>.OpenWriter(
new GenericDatumWriter<GenericRecord>(schema), "users.avro"))
{
writer.Append(genericRecord);
}
// ファイルからの読み込み
using (var reader = DataFileReader<GenericRecord>.OpenReader("users.avro"))
{
foreach (var record in reader.NextEntries)
{
Console.WriteLine($"ID: {record["id"]}, Name: {record["name"]}");
}
}
Specific Record(コード生成)使用例
# スキーマからC#クラスを生成
avrogen -s user.avsc .
// 生成されたクラスの使用
var user = new com.example.avro.User
{
id = 1002,
name = "田中花子",
email = "[email protected]",
age = 25,
created = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
// シリアライズ
using (var stream = new MemoryStream())
{
var writer = new SpecificDatumWriter<User>(user.Schema);
var encoder = new BinaryEncoder(stream);
writer.Write(user, encoder);
// バイト配列として取得
byte[] serializedData = stream.ToArray();
}
Reflect API使用例
using Avro.Reflect;
// POCOクラス定義
public class Person
{
public long Id { get; set; }
public string Name { get; set; }
public string Email { get; set; }
public int Age { get; set; }
public DateTime Created { get; set; }
}
// Reflect APIでのシリアライズ
var person = new Person
{
Id = 1003,
Name = "佐藤次郎",
Email = "[email protected]",
Age = 35,
Created = DateTime.UtcNow
};
var writer = new ReflectWriter<Person>(person.GetType());
using (var stream = new MemoryStream())
{
var encoder = new BinaryEncoder(stream);
writer.Write(person, encoder);
byte[] data = stream.ToArray();
}
スキーマ進化の例
// 新しいフィールドを追加したスキーマ
var newSchemaJson = @"{
""type"": ""record"",
""name"": ""User"",
""fields"": [
{""name"": ""id"", ""type"": ""long""},
{""name"": ""name"", ""type"": ""string""},
{""name"": ""email"", ""type"": [""null"", ""string""], ""default"": null},
{""name"": ""age"", ""type"": ""int""},
{""name"": ""created"", ""type"": ""long""},
{""name"": ""department"", ""type"": ""string"", ""default"": ""Unknown""}
]
}";
// 古いデータを新しいスキーマで読み込み
var writerSchema = Schema.Parse(originalSchemaJson);
var readerSchema = Schema.Parse(newSchemaJson);
var resolver = new DefaultResolver(writerSchema, readerSchema);
Kafkaとの統合例
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
// Schema Registryクライアントの設定
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:8081"
};
using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
// Avroシリアライザーの設定
var avroSerializerConfig = new AvroSerializerConfig
{
BufferBytes = 100
};
// プロデューサーの作成
var producerConfig = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
using var producer = new ProducerBuilder<string, GenericRecord>(producerConfig)
.SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry, avroSerializerConfig))
.Build();