Apache Avro .NET

シリアライゼーションC#.NETAvroスキーマ進化バイナリ形式ビッグデータ

ライブラリ

Apache Avro .NET

概要

Apache Avro .NETは、Avroデータシリアライゼーションシステムの.NET実装です。スキーマ進化をサポートする自己記述型のバイナリ形式で、ビッグデータシステムやKafkaとの統合で広く使用されています。リッチなデータ構造、高速なバイナリフォーマット、RPCサポートを提供し、Hadoopエコシステムとの親和性が高いことが特徴です。

詳細

Apache Avro .NET 1.12.0は、Avroプロトコルの公式.NET実装です。2025年現在も活発にメンテナンスされており、GenericRecord、SpecificRecord、Reflect APIの3つのシリアライゼーション方式をサポートしています。スキーマはJSON形式で定義され、データとともに保存・転送されるため、スキーマ進化に対応できます。Azure Schema RegistryやConfluent Kafka Platformとの統合も可能で、エンタープライズレベルのストリーミングシステムで使用されています。

主な特徴

  • スキーマ進化: 前方互換性・後方互換性のあるスキーマ変更をサポート
  • 3つのシリアライゼーション方式: Generic、Specific、Reflectモード
  • コンパクトなバイナリ形式: JSONと比較して高効率なデータ圧縮
  • コード生成ツール: Apache.Avro.Toolsによるスキーマからのクラス生成
  • RPCサポート: リモートプロシージャコールの実装
  • Kafkaとの統合: ストリーミングデータ処理での標準的な使用

メリット・デメリット

メリット

  • スキーマ進化により、システムの長期的な保守性が向上
  • 自己記述型フォーマットでメタデータとデータが一体化
  • Hadoopエコシステム、Kafka、Sparkとの優れた互換性
  • バイナリ形式による高効率なデータ圧縮とパフォーマンス
  • 動的型付けと静的型付けの両方をサポート
  • Apache公式サポートによる信頼性

デメリット

  • .NET向けドキュメントが不足しており、学習コストが高い
  • 論理型(日付、固定小数点等)の.NETサポートが未完成
  • .NET Standard対応が部分的で、一部の環境で制限あり
  • JSONと比較して人間が読みにくいバイナリ形式
  • スキーマ管理の複雑性により運用負荷が増加
  • Javaバージョンと比較して機能パリティが不完全

参考ページ

書き方の例

基本的なセットアップ

<PackageReference Include="Apache.Avro" Version="1.12.0" />
<PackageReference Include="Apache.Avro.Tools" Version="1.12.0" />

スキーマ定義

{
  "type": "record",
  "name": "User",
  "namespace": "com.example.avro",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null},
    {"name": "age", "type": "int"},
    {"name": "created", "type": "long"}
  ]
}

Generic Record使用例

using Avro;
using Avro.Generic;
using Avro.IO;
using Avro.File;

// スキーマの読み込み
var schemaJson = File.ReadAllText("user.avsc");
var schema = Schema.Parse(schemaJson) as RecordSchema;

// GenericRecordの作成
var genericRecord = new GenericRecord(schema);
genericRecord.Add("id", 1001L);
genericRecord.Add("name", "山田太郎");
genericRecord.Add("email", "[email protected]");
genericRecord.Add("age", 30);
genericRecord.Add("created", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());

// ファイルへの書き込み
using (var writer = DataFileWriter<GenericRecord>.OpenWriter(
    new GenericDatumWriter<GenericRecord>(schema), "users.avro"))
{
    writer.Append(genericRecord);
}

// ファイルからの読み込み
using (var reader = DataFileReader<GenericRecord>.OpenReader("users.avro"))
{
    foreach (var record in reader.NextEntries)
    {
        Console.WriteLine($"ID: {record["id"]}, Name: {record["name"]}");
    }
}

Specific Record(コード生成)使用例

# スキーマからC#クラスを生成
avrogen -s user.avsc . 
// 生成されたクラスの使用
var user = new com.example.avro.User
{
    id = 1002,
    name = "田中花子",
    email = "[email protected]",
    age = 25,
    created = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};

// シリアライズ
using (var stream = new MemoryStream())
{
    var writer = new SpecificDatumWriter<User>(user.Schema);
    var encoder = new BinaryEncoder(stream);
    writer.Write(user, encoder);
    
    // バイト配列として取得
    byte[] serializedData = stream.ToArray();
}

Reflect API使用例

using Avro.Reflect;

// POCOクラス定義
public class Person
{
    public long Id { get; set; }
    public string Name { get; set; }
    public string Email { get; set; }
    public int Age { get; set; }
    public DateTime Created { get; set; }
}

// Reflect APIでのシリアライズ
var person = new Person
{
    Id = 1003,
    Name = "佐藤次郎",
    Email = "[email protected]",
    Age = 35,
    Created = DateTime.UtcNow
};

var writer = new ReflectWriter<Person>(person.GetType());
using (var stream = new MemoryStream())
{
    var encoder = new BinaryEncoder(stream);
    writer.Write(person, encoder);
    byte[] data = stream.ToArray();
}

スキーマ進化の例

// 新しいフィールドを追加したスキーマ
var newSchemaJson = @"{
  ""type"": ""record"",
  ""name"": ""User"",
  ""fields"": [
    {""name"": ""id"", ""type"": ""long""},
    {""name"": ""name"", ""type"": ""string""},
    {""name"": ""email"", ""type"": [""null"", ""string""], ""default"": null},
    {""name"": ""age"", ""type"": ""int""},
    {""name"": ""created"", ""type"": ""long""},
    {""name"": ""department"", ""type"": ""string"", ""default"": ""Unknown""}
  ]
}";

// 古いデータを新しいスキーマで読み込み
var writerSchema = Schema.Parse(originalSchemaJson);
var readerSchema = Schema.Parse(newSchemaJson);
var resolver = new DefaultResolver(writerSchema, readerSchema);

Kafkaとの統合例

using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;

// Schema Registryクライアントの設定
var schemaRegistryConfig = new SchemaRegistryConfig
{
    Url = "http://localhost:8081"
};

using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

// Avroシリアライザーの設定
var avroSerializerConfig = new AvroSerializerConfig
{
    BufferBytes = 100
};

// プロデューサーの作成
var producerConfig = new ProducerConfig
{
    BootstrapServers = "localhost:9092"
};

using var producer = new ProducerBuilder<string, GenericRecord>(producerConfig)
    .SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry, avroSerializerConfig))
    .Build();