Apache Avro

SerializationSchema EvolutionBig DataStreamingJSONBinaryJavaPythonApache

Library

Apache Avro

Overview

Apache Avro is a high-performance data serialization system developed by the Apache Software Foundation. Its key feature is powerful schema evolution capabilities that allow safe data structure changes over time. With its compact binary format and multi-language support (Java, Python, C++, C#, JavaScript, etc.), it is standardly used in large-scale data processing platforms like Apache Kafka, Hadoop, and Apache Spark.

Details

Apache Avro 2025 edition is a mature library that meets the needs for robust schema evolution in big data and streaming architectures. Through its fully self-describing format where schema and data are integrated, it enables long-term data storage and seamless data exchange between systems. It comprehensively provides features necessary for continuous data pipeline operations in enterprise environments, including direct JSON mapping support, no code generation required, and backward/forward compatibility guarantees.

Key Features

  • Schema Evolution: Safe schema changes with guaranteed backward and forward compatibility
  • Compact Format: Data size reduction through efficient binary encoding
  • Multi-language Support: Unified APIs in Java, Python, C++, C#, JavaScript, Ruby, etc.
  • Fully Self-describing: Self-describing format integrating schema and data
  • Streaming Support: Affinity with real-time data processing platforms like Apache Kafka
  • Enterprise Track Record: Rich adoption cases in Hadoop, Spark, and Kafka ecosystems

Pros and Cons

Pros

  • Industry-leading schema evolution functionality for long-term data management
  • Efficient storage and network transfer through compact binary format
  • Standard integration with major data platforms like Apache Kafka, Hadoop, and Spark
  • Compatibility with existing systems through direct JSON mapping support
  • Flexibility independent of programming languages with no code generation required
  • Over 10 years of development track record and rich enterprise adoption cases

Cons

  • Limited direct human readability due to binary format
  • Importance of initial design and learning cost due to schema-first approach
  • Constraints on dynamic schema changes requiring planned schema design in advance
  • Potential over-complexity for small-scale data or simple use cases
  • Requires specialized tools to verify schema-data consistency during debugging
  • Difficulty in ad-hoc analysis compared to text formats like JSON

Reference Pages

Code Examples

Basic Setup

# Python
pip install avro-python3

# Java (Maven)
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.11.3</version>
</dependency>

# JavaScript/Node.js
npm install avsc

# C#
dotnet add package Apache.Avro

# Go
go get github.com/hamba/avro/v2

# C++
# Ubuntu/Debian
sudo apt-get install libavro-dev

Schema Definition and Data Processing

// user.avsc - Avro schema file
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_number", "type": ["int", "null"]},
    {"name": "favorite_color", "type": ["string", "null"]},
    {"name": "email", "type": "string"},
    {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
  ]
}
# Python basic usage example
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import time

# Load schema
schema = avro.schema.parse(open("user.avsc", "rb").read())

# Write data
writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({
    "name": "John Doe",
    "favorite_number": 42,
    "favorite_color": "blue",
    "email": "[email protected]",
    "created_at": int(time.time() * 1000)  # timestamp in milliseconds
})
writer.append({
    "name": "Jane Smith",
    "favorite_number": None,  # nullable field
    "favorite_color": None,
    "email": "[email protected]", 
    "created_at": int(time.time() * 1000)
})
writer.close()

# Read data
reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
    print(f"Name: {user['name']}, Email: {user['email']}")
    if user['favorite_number']:
        print(f"  Favorite number: {user['favorite_number']}")
    if user['favorite_color']:
        print(f"  Favorite color: {user['favorite_color']}")
reader.close()

Java Usage Example

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;

import java.io.File;
import java.io.IOException;

public class AvroExample {
    public static void main(String[] args) throws IOException {
        // Parse schema
        Schema schema = new Schema.Parser().parse(new File("user.avsc"));
        
        // Create GenericRecord
        GenericRecord user1 = new GenericData.Record(schema);
        user1.put("name", "John Doe");
        user1.put("favorite_number", 42);
        user1.put("favorite_color", "blue");
        user1.put("email", "[email protected]");
        user1.put("created_at", System.currentTimeMillis());
        
        GenericRecord user2 = new GenericData.Record(schema);
        user2.put("name", "Jane Smith");
        user2.put("favorite_number", null);
        user2.put("favorite_color", null);
        user2.put("email", "[email protected]");
        user2.put("created_at", System.currentTimeMillis());
        
        // Serialize to file
        File file = new File("users.avro");
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        dataFileWriter.create(schema, file);
        dataFileWriter.append(user1);
        dataFileWriter.append(user2);
        dataFileWriter.close();
        
        // Deserialize from file
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader);
        
        for (GenericRecord user : dataFileReader) {
            System.out.println("Name: " + user.get("name"));
            System.out.println("Email: " + user.get("email"));
            if (user.get("favorite_number") != null) {
                System.out.println("  Favorite number: " + user.get("favorite_number"));
            }
            if (user.get("favorite_color") != null) {
                System.out.println("  Favorite color: " + user.get("favorite_color"));
            }
        }
        dataFileReader.close();
    }
}

Schema Evolution Practice

// user_v1.avsc - Initial schema
{
  "namespace": "example.avro", 
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}

// user_v2.avsc - Evolved schema (maintaining backward compatibility)
{
  "namespace": "example.avro",
  "type": "record", 
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "age", "type": "int", "default": 0},  // New field with default value
    {"name": "profile", "type": ["null", "string"], "default": null},  // Optional field
    {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}, "default": 0}
  ]
}
# Schema evolution processing example
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

# Create data with old schema
old_schema = avro.schema.parse(open("user_v1.avsc", "rb").read())
writer = DataFileWriter(open("old_users.avro", "wb"), DatumWriter(), old_schema)
writer.append({"name": "Existing User", "email": "[email protected]"})
writer.close()

# Read old data with new schema (backward compatibility)
new_schema = avro.schema.parse(open("user_v2.avsc", "rb").read())
reader = DataFileReader(open("old_users.avro", "rb"), DatumReader(old_schema, new_schema))

for user in reader:
    print(f"Name: {user['name']}")
    print(f"Email: {user['email']}")
    print(f"Age: {user['age']}")  # Default value 0 is set
    print(f"Profile: {user['profile']}")  # Default value null is set
reader.close()

# Create data with new schema
writer = DataFileWriter(open("new_users.avro", "wb"), DatumWriter(), new_schema)
writer.append({
    "name": "New User",
    "email": "[email protected]",
    "age": 25,
    "profile": "Engineer",
    "created_at": int(time.time() * 1000)
})
writer.close()

High-Performance Streaming Processing

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter, BinaryEncoder, BinaryDecoder
import io
import json

class AvroStreamProcessor:
    def __init__(self, schema_file):
        self.schema = avro.schema.parse(open(schema_file, "rb").read())
        
    def serialize_batch(self, records):
        """Efficient batch serialization of large data"""
        bytes_writer = io.BytesIO()
        encoder = BinaryEncoder(bytes_writer)
        datum_writer = DatumWriter(self.schema)
        
        # Batch encoding
        for record in records:
            datum_writer.write(record, encoder)
        
        return bytes_writer.getvalue()
    
    def deserialize_batch(self, data_bytes):
        """Batch deserialization"""
        bytes_reader = io.BytesIO(data_bytes)
        decoder = BinaryDecoder(bytes_reader)
        datum_reader = DatumReader(self.schema)
        
        records = []
        try:
            while True:
                record = datum_reader.read(decoder)
                records.append(record)
        except:
            pass  # End of data
        
        return records
    
    def stream_processing_simulation(self, record_count=100000):
        """Large volume streaming processing simulation"""
        import time
        
        # Generate large volume data
        start_time = time.time()
        records = []
        for i in range(record_count):
            records.append({
                "name": f"User{i}",
                "email": f"user{i}@example.com",
                "age": 20 + (i % 50),
                "profile": f"Profile{i}" if i % 10 == 0 else None,
                "created_at": int((time.time() + i) * 1000)
            })
        
        # Batch serialization
        serialized = self.serialize_batch(records)
        encode_time = time.time() - start_time
        
        # Batch deserialization
        start_time = time.time()
        deserialized = self.deserialize_batch(serialized)
        decode_time = time.time() - start_time
        
        print(f"Record count: {record_count}")
        print(f"Serialize time: {encode_time:.3f}s")
        print(f"Deserialize time: {decode_time:.3f}s")
        print(f"Data size: {len(serialized) / 1024 / 1024:.2f} MB")
        print(f"Average size per record: {len(serialized) / record_count:.1f} bytes")
        
        return deserialized

# Usage example
processor = AvroStreamProcessor("user_v2.avsc")
result = processor.stream_processing_simulation(50000)
print(f"Processing complete: {len(result)} records")

Apache Kafka Integration

# Kafka + Avro integration example (using confluent-kafka-python)
from confluent_kafka import Producer, Consumer
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

# Kafka producer configuration
avro_producer = AvroProducer({
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)

# Send message
def send_user_data(user_data):
    try:
        avro_producer.produce(
            topic='user-events',
            key={'user_id': user_data['user_id']},
            value=user_data
        )
        avro_producer.flush()
        print(f"Message sent successfully: {user_data['name']}")
    except SerializerError as e:
        print(f"Serialization error: {e}")

# Kafka consumer configuration
avro_consumer = AvroConsumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'user-group',
    'schema.registry.url': 'http://localhost:8081',
    'auto.offset.reset': 'earliest'
})

avro_consumer.subscribe(['user-events'])

# Receive message
def consume_user_data():
    while True:
        try:
            message = avro_consumer.poll(1.0)
            if message is None:
                continue
            if message.error():
                print(f"Consumer error: {message.error()}")
                continue
                
            user_data = message.value()
            print(f"Received data: {user_data['name']} - {user_data['email']}")
            
        except SerializerError as e:
            print(f"Deserialization error: {e}")
            
# Usage example
user_sample = {
    "user_id": "user123",
    "name": "Kafka User",
    "email": "[email protected]",
    "age": 30,
    "profile": "Streaming Engineer",
    "created_at": int(time.time() * 1000)
}

send_user_data(user_sample)
consume_user_data()

Performance Optimization and Best Practices

import avro.schema
import time
import json
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader
import os

class AvroPerformanceOptimizer:
    
    @staticmethod
    def compare_formats(data_samples, iterations=1000):
        """Avro vs JSON performance comparison"""
        
        schema = avro.schema.parse("""
        {
            "type": "record",
            "name": "TestRecord",
            "fields": [
                {"name": "id", "type": "long"},
                {"name": "name", "type": "string"},
                {"name": "data", "type": {"type": "array", "items": "double"}},
                {"name": "metadata", "type": {"type": "map", "values": "string"}}
            ]
        }
        """)
        
        # Avro serialization
        avro_start = time.time()
        for _ in range(iterations):
            writer = DataFileWriter(open("temp.avro", "wb"), DatumWriter(), schema)
            for sample in data_samples:
                writer.append(sample)
            writer.close()
        avro_serialize_time = time.time() - avro_start
        
        # Avro deserialization
        avro_start = time.time()
        for _ in range(iterations):
            reader = DataFileReader(open("temp.avro", "rb"), DatumReader())
            for record in reader:
                pass  # Data reading
            reader.close()
        avro_deserialize_time = time.time() - avro_start
        
        # JSON serialization
        json_start = time.time()
        for _ in range(iterations):
            with open("temp.json", "w") as f:
                json.dump(data_samples, f)
        json_serialize_time = time.time() - json_start
        
        # JSON deserialization
        json_start = time.time()
        for _ in range(iterations):
            with open("temp.json", "r") as f:
                json.load(f)
        json_deserialize_time = time.time() - json_start
        
        # File size comparison
        avro_size = os.path.getsize("temp.avro")
        json_size = os.path.getsize("temp.json")
        
        print("=== Performance Comparison Results ===")
        print(f"Data samples: {len(data_samples)}")
        print(f"Iterations: {iterations}")
        print(f"")
        print(f"Avro serialize: {avro_serialize_time:.3f}s")
        print(f"Avro deserialize: {avro_deserialize_time:.3f}s")
        print(f"JSON serialize: {json_serialize_time:.3f}s") 
        print(f"JSON deserialize: {json_deserialize_time:.3f}s")
        print(f"")
        print(f"Avro file size: {avro_size} bytes")
        print(f"JSON file size: {json_size} bytes")
        print(f"Size reduction: {((json_size - avro_size) / json_size * 100):.1f}%")
        print(f"")
        print(f"Serialize speedup: {json_serialize_time / avro_serialize_time:.1f}x")
        print(f"Deserialize speedup: {json_deserialize_time / avro_deserialize_time:.1f}x")
        
        # Remove temporary files
        os.remove("temp.avro")
        os.remove("temp.json")

# Execute performance test
test_data = []
for i in range(1000):
    test_data.append({
        "id": i,
        "name": f"TestRecord{i}",
        "data": [i * 1.1, i * 2.2, i * 3.3],
        "metadata": {
            "category": f"cat{i % 10}",
            "priority": str(i % 5),
            "status": "active" if i % 2 == 0 else "inactive"
        }
    })

optimizer = AvroPerformanceOptimizer()
optimizer.compare_formats(test_data, iterations=10)

JavaScript Usage with Advanced Features

// Node.js with avsc library
const avro = require('avsc');

// Define schema
const schema = avro.Type.forSchema({
  type: 'record',
  name: 'User',
  fields: [
    {name: 'name', type: 'string'},
    {name: 'email', type: 'string'},
    {name: 'age', type: ['null', 'int'], default: null},
    {name: 'created_at', type: {type: 'long', logicalType: 'timestamp-millis'}}
  ]
});

// Serialize data
const userData = {
  name: 'JavaScript User',
  email: '[email protected]',
  age: 28,
  created_at: Date.now()
};

const buffer = schema.toBuffer(userData);
console.log('Serialized size:', buffer.length, 'bytes');

// Deserialize data
const deserializedData = schema.fromBuffer(buffer);
console.log('Deserialized:', deserializedData);

// Schema evolution with resolver
const oldSchema = avro.Type.forSchema({
  type: 'record',
  name: 'User',
  fields: [
    {name: 'name', type: 'string'},
    {name: 'email', type: 'string'}
  ]
});

const newSchema = avro.Type.forSchema({
  type: 'record',
  name: 'User',
  fields: [
    {name: 'name', type: 'string'},
    {name: 'email', type: 'string'},
    {name: 'age', type: 'int', default: 0},
    {name: 'department', type: ['null', 'string'], default: null}
  ]
});

// Create resolver for schema evolution
const resolver = newSchema.createResolver(oldSchema);

// Old data
const oldData = {name: 'Old User', email: '[email protected]'};
const oldBuffer = oldSchema.toBuffer(oldData);

// Read old data with new schema
const evolvedData = newSchema.fromBuffer(oldBuffer, resolver);
console.log('Evolved data:', evolvedData);
// Output: {name: 'Old User', email: '[email protected]', age: 0, department: null}