Apache Avro
Library
Apache Avro
Overview
Apache Avro is a high-performance data serialization system developed by the Apache Software Foundation. Its key feature is powerful schema evolution capabilities that allow safe data structure changes over time. With its compact binary format and multi-language support (Java, Python, C++, C#, JavaScript, etc.), it is standardly used in large-scale data processing platforms like Apache Kafka, Hadoop, and Apache Spark.
Details
Apache Avro 2025 edition is a mature library that meets the needs for robust schema evolution in big data and streaming architectures. Through its fully self-describing format where schema and data are integrated, it enables long-term data storage and seamless data exchange between systems. It comprehensively provides features necessary for continuous data pipeline operations in enterprise environments, including direct JSON mapping support, no code generation required, and backward/forward compatibility guarantees.
Key Features
- Schema Evolution: Safe schema changes with guaranteed backward and forward compatibility
- Compact Format: Data size reduction through efficient binary encoding
- Multi-language Support: Unified APIs in Java, Python, C++, C#, JavaScript, Ruby, etc.
- Fully Self-describing: Self-describing format integrating schema and data
- Streaming Support: Affinity with real-time data processing platforms like Apache Kafka
- Enterprise Track Record: Rich adoption cases in Hadoop, Spark, and Kafka ecosystems
Pros and Cons
Pros
- Industry-leading schema evolution functionality for long-term data management
- Efficient storage and network transfer through compact binary format
- Standard integration with major data platforms like Apache Kafka, Hadoop, and Spark
- Compatibility with existing systems through direct JSON mapping support
- Flexibility independent of programming languages with no code generation required
- Over 10 years of development track record and rich enterprise adoption cases
Cons
- Limited direct human readability due to binary format
- Importance of initial design and learning cost due to schema-first approach
- Constraints on dynamic schema changes requiring planned schema design in advance
- Potential over-complexity for small-scale data or simple use cases
- Requires specialized tools to verify schema-data consistency during debugging
- Difficulty in ad-hoc analysis compared to text formats like JSON
Reference Pages
Code Examples
Basic Setup
# Python
pip install avro-python3
# Java (Maven)
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
# JavaScript/Node.js
npm install avsc
# C#
dotnet add package Apache.Avro
# Go
go get github.com/hamba/avro/v2
# C++
# Ubuntu/Debian
sudo apt-get install libavro-dev
Schema Definition and Data Processing
// user.avsc - Avro schema file
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]},
{"name": "email", "type": "string"},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}
# Python basic usage example
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import time
# Load schema
schema = avro.schema.parse(open("user.avsc", "rb").read())
# Write data
writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({
"name": "John Doe",
"favorite_number": 42,
"favorite_color": "blue",
"email": "[email protected]",
"created_at": int(time.time() * 1000) # timestamp in milliseconds
})
writer.append({
"name": "Jane Smith",
"favorite_number": None, # nullable field
"favorite_color": None,
"email": "[email protected]",
"created_at": int(time.time() * 1000)
})
writer.close()
# Read data
reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
print(f"Name: {user['name']}, Email: {user['email']}")
if user['favorite_number']:
print(f" Favorite number: {user['favorite_number']}")
if user['favorite_color']:
print(f" Favorite color: {user['favorite_color']}")
reader.close()
Java Usage Example
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import java.io.File;
import java.io.IOException;
public class AvroExample {
public static void main(String[] args) throws IOException {
// Parse schema
Schema schema = new Schema.Parser().parse(new File("user.avsc"));
// Create GenericRecord
GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "John Doe");
user1.put("favorite_number", 42);
user1.put("favorite_color", "blue");
user1.put("email", "[email protected]");
user1.put("created_at", System.currentTimeMillis());
GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Jane Smith");
user2.put("favorite_number", null);
user2.put("favorite_color", null);
user2.put("email", "[email protected]");
user2.put("created_at", System.currentTimeMillis());
// Serialize to file
File file = new File("users.avro");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.close();
// Deserialize from file
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader);
for (GenericRecord user : dataFileReader) {
System.out.println("Name: " + user.get("name"));
System.out.println("Email: " + user.get("email"));
if (user.get("favorite_number") != null) {
System.out.println(" Favorite number: " + user.get("favorite_number"));
}
if (user.get("favorite_color") != null) {
System.out.println(" Favorite color: " + user.get("favorite_color"));
}
}
dataFileReader.close();
}
}
Schema Evolution Practice
// user_v1.avsc - Initial schema
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}
// user_v2.avsc - Evolved schema (maintaining backward compatibility)
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": "int", "default": 0}, // New field with default value
{"name": "profile", "type": ["null", "string"], "default": null}, // Optional field
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}, "default": 0}
]
}
# Schema evolution processing example
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
# Create data with old schema
old_schema = avro.schema.parse(open("user_v1.avsc", "rb").read())
writer = DataFileWriter(open("old_users.avro", "wb"), DatumWriter(), old_schema)
writer.append({"name": "Existing User", "email": "[email protected]"})
writer.close()
# Read old data with new schema (backward compatibility)
new_schema = avro.schema.parse(open("user_v2.avsc", "rb").read())
reader = DataFileReader(open("old_users.avro", "rb"), DatumReader(old_schema, new_schema))
for user in reader:
print(f"Name: {user['name']}")
print(f"Email: {user['email']}")
print(f"Age: {user['age']}") # Default value 0 is set
print(f"Profile: {user['profile']}") # Default value null is set
reader.close()
# Create data with new schema
writer = DataFileWriter(open("new_users.avro", "wb"), DatumWriter(), new_schema)
writer.append({
"name": "New User",
"email": "[email protected]",
"age": 25,
"profile": "Engineer",
"created_at": int(time.time() * 1000)
})
writer.close()
High-Performance Streaming Processing
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter, BinaryEncoder, BinaryDecoder
import io
import json
class AvroStreamProcessor:
def __init__(self, schema_file):
self.schema = avro.schema.parse(open(schema_file, "rb").read())
def serialize_batch(self, records):
"""Efficient batch serialization of large data"""
bytes_writer = io.BytesIO()
encoder = BinaryEncoder(bytes_writer)
datum_writer = DatumWriter(self.schema)
# Batch encoding
for record in records:
datum_writer.write(record, encoder)
return bytes_writer.getvalue()
def deserialize_batch(self, data_bytes):
"""Batch deserialization"""
bytes_reader = io.BytesIO(data_bytes)
decoder = BinaryDecoder(bytes_reader)
datum_reader = DatumReader(self.schema)
records = []
try:
while True:
record = datum_reader.read(decoder)
records.append(record)
except:
pass # End of data
return records
def stream_processing_simulation(self, record_count=100000):
"""Large volume streaming processing simulation"""
import time
# Generate large volume data
start_time = time.time()
records = []
for i in range(record_count):
records.append({
"name": f"User{i}",
"email": f"user{i}@example.com",
"age": 20 + (i % 50),
"profile": f"Profile{i}" if i % 10 == 0 else None,
"created_at": int((time.time() + i) * 1000)
})
# Batch serialization
serialized = self.serialize_batch(records)
encode_time = time.time() - start_time
# Batch deserialization
start_time = time.time()
deserialized = self.deserialize_batch(serialized)
decode_time = time.time() - start_time
print(f"Record count: {record_count}")
print(f"Serialize time: {encode_time:.3f}s")
print(f"Deserialize time: {decode_time:.3f}s")
print(f"Data size: {len(serialized) / 1024 / 1024:.2f} MB")
print(f"Average size per record: {len(serialized) / record_count:.1f} bytes")
return deserialized
# Usage example
processor = AvroStreamProcessor("user_v2.avsc")
result = processor.stream_processing_simulation(50000)
print(f"Processing complete: {len(result)} records")
Apache Kafka Integration
# Kafka + Avro integration example (using confluent-kafka-python)
from confluent_kafka import Producer, Consumer
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
# Kafka producer configuration
avro_producer = AvroProducer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
# Send message
def send_user_data(user_data):
try:
avro_producer.produce(
topic='user-events',
key={'user_id': user_data['user_id']},
value=user_data
)
avro_producer.flush()
print(f"Message sent successfully: {user_data['name']}")
except SerializerError as e:
print(f"Serialization error: {e}")
# Kafka consumer configuration
avro_consumer = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'user-group',
'schema.registry.url': 'http://localhost:8081',
'auto.offset.reset': 'earliest'
})
avro_consumer.subscribe(['user-events'])
# Receive message
def consume_user_data():
while True:
try:
message = avro_consumer.poll(1.0)
if message is None:
continue
if message.error():
print(f"Consumer error: {message.error()}")
continue
user_data = message.value()
print(f"Received data: {user_data['name']} - {user_data['email']}")
except SerializerError as e:
print(f"Deserialization error: {e}")
# Usage example
user_sample = {
"user_id": "user123",
"name": "Kafka User",
"email": "[email protected]",
"age": 30,
"profile": "Streaming Engineer",
"created_at": int(time.time() * 1000)
}
send_user_data(user_sample)
consume_user_data()
Performance Optimization and Best Practices
import avro.schema
import time
import json
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader
import os
class AvroPerformanceOptimizer:
@staticmethod
def compare_formats(data_samples, iterations=1000):
"""Avro vs JSON performance comparison"""
schema = avro.schema.parse("""
{
"type": "record",
"name": "TestRecord",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "data", "type": {"type": "array", "items": "double"}},
{"name": "metadata", "type": {"type": "map", "values": "string"}}
]
}
""")
# Avro serialization
avro_start = time.time()
for _ in range(iterations):
writer = DataFileWriter(open("temp.avro", "wb"), DatumWriter(), schema)
for sample in data_samples:
writer.append(sample)
writer.close()
avro_serialize_time = time.time() - avro_start
# Avro deserialization
avro_start = time.time()
for _ in range(iterations):
reader = DataFileReader(open("temp.avro", "rb"), DatumReader())
for record in reader:
pass # Data reading
reader.close()
avro_deserialize_time = time.time() - avro_start
# JSON serialization
json_start = time.time()
for _ in range(iterations):
with open("temp.json", "w") as f:
json.dump(data_samples, f)
json_serialize_time = time.time() - json_start
# JSON deserialization
json_start = time.time()
for _ in range(iterations):
with open("temp.json", "r") as f:
json.load(f)
json_deserialize_time = time.time() - json_start
# File size comparison
avro_size = os.path.getsize("temp.avro")
json_size = os.path.getsize("temp.json")
print("=== Performance Comparison Results ===")
print(f"Data samples: {len(data_samples)}")
print(f"Iterations: {iterations}")
print(f"")
print(f"Avro serialize: {avro_serialize_time:.3f}s")
print(f"Avro deserialize: {avro_deserialize_time:.3f}s")
print(f"JSON serialize: {json_serialize_time:.3f}s")
print(f"JSON deserialize: {json_deserialize_time:.3f}s")
print(f"")
print(f"Avro file size: {avro_size} bytes")
print(f"JSON file size: {json_size} bytes")
print(f"Size reduction: {((json_size - avro_size) / json_size * 100):.1f}%")
print(f"")
print(f"Serialize speedup: {json_serialize_time / avro_serialize_time:.1f}x")
print(f"Deserialize speedup: {json_deserialize_time / avro_deserialize_time:.1f}x")
# Remove temporary files
os.remove("temp.avro")
os.remove("temp.json")
# Execute performance test
test_data = []
for i in range(1000):
test_data.append({
"id": i,
"name": f"TestRecord{i}",
"data": [i * 1.1, i * 2.2, i * 3.3],
"metadata": {
"category": f"cat{i % 10}",
"priority": str(i % 5),
"status": "active" if i % 2 == 0 else "inactive"
}
})
optimizer = AvroPerformanceOptimizer()
optimizer.compare_formats(test_data, iterations=10)
JavaScript Usage with Advanced Features
// Node.js with avsc library
const avro = require('avsc');
// Define schema
const schema = avro.Type.forSchema({
type: 'record',
name: 'User',
fields: [
{name: 'name', type: 'string'},
{name: 'email', type: 'string'},
{name: 'age', type: ['null', 'int'], default: null},
{name: 'created_at', type: {type: 'long', logicalType: 'timestamp-millis'}}
]
});
// Serialize data
const userData = {
name: 'JavaScript User',
email: '[email protected]',
age: 28,
created_at: Date.now()
};
const buffer = schema.toBuffer(userData);
console.log('Serialized size:', buffer.length, 'bytes');
// Deserialize data
const deserializedData = schema.fromBuffer(buffer);
console.log('Deserialized:', deserializedData);
// Schema evolution with resolver
const oldSchema = avro.Type.forSchema({
type: 'record',
name: 'User',
fields: [
{name: 'name', type: 'string'},
{name: 'email', type: 'string'}
]
});
const newSchema = avro.Type.forSchema({
type: 'record',
name: 'User',
fields: [
{name: 'name', type: 'string'},
{name: 'email', type: 'string'},
{name: 'age', type: 'int', default: 0},
{name: 'department', type: ['null', 'string'], default: null}
]
});
// Create resolver for schema evolution
const resolver = newSchema.createResolver(oldSchema);
// Old data
const oldData = {name: 'Old User', email: '[email protected]'};
const oldBuffer = oldSchema.toBuffer(oldData);
// Read old data with new schema
const evolvedData = newSchema.fromBuffer(oldBuffer, resolver);
console.log('Evolved data:', evolvedData);
// Output: {name: 'Old User', email: '[email protected]', age: 0, department: null}