Apache Pulsar
Cloud-native distributed messaging system. Integrates multi-tenancy, geo-distributed replication, and schema management. Kafka alternative developed by Yahoo.
Server
Apache Pulsar
Overview
Apache Pulsar is a cloud-native distributed messaging and streaming platform originally developed by Yahoo (now Verizon Media) to address the limitations of traditional messaging systems. Designed with native multi-tenancy, real-time geo-replication, and a unique tiered architecture that separates serving and storage layers, Pulsar enables dynamic scaling and operational excellence. The platform delivers millions of messages per second with high throughput processing and millisecond-level low latency, making it ideal for IoT data processing, real-time analytics, and event-driven architectures in cloud-first environments.
Details
Apache Pulsar 2025 edition has established itself as the next-generation messaging platform for cloud-first enterprise environments. Its unique tiered architecture completely separates the Apache BookKeeper-based distributed storage layer from the Pulsar broker serving layer, enabling flexible scaling that was difficult to achieve with traditional coupled architectures like Kafka. Multi-tenancy is implemented through namespaces, resource isolation, and quota management, allowing multiple organizations and applications to safely operate on a single cluster. Geo-replication can be easily enabled through configuration, ensuring data consistency and disaster recovery in globally distributed environments. The comprehensive ecosystem includes Pulsar Functions, Pulsar IO, transactions, Schema Registry, and other enterprise-grade features.
Key Features
- Tiered Architecture: Flexible scaling through separation of serving and storage layers
- Native Multi-Tenancy: Secure shared usage through tenant, namespace, and resource isolation
- Real-time Geo-Replication: Automatic data replication in global distributed environments
- High Performance: Millions of messages per second with millisecond-level low latency
- Infinite Storage: Persistence via Apache BookKeeper and tiered storage capabilities
- Rich Ecosystem: Integrated Functions, IO, SQL, and Schema Registry
Pros and Cons
Pros
- Native support for multi-tenancy and geo-replication, which are difficult with Kafka
- Independent scaling of storage and compute through tiered separation
- Excellent operability in Kubernetes environments with cloud-native design
- Cost-effective long-term data storage through tiered storage
- Unified platform providing transactions, Schema Registry, and function processing
- Optimal for high-volume streaming processing like IoT and real-time analytics
Cons
- Relatively smaller ecosystem and fewer adoption cases compared to Kafka
- High learning curve due to complex architecture
- Kafka may outperform in simple latency benchmarks in some cases
- Less mature operational and monitoring tools compared to Kafka
- Requires specialized knowledge for BookKeeper storage layer management
- Complexity may become overhead in small-scale environments
Reference Pages
Code Examples
Installation and Basic Setup
# Java environment setup (Java 11+ required)
sudo apt update
sudo apt install openjdk-17-jdk
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
# Pulsar binary download and installation
wget https://archive.apache.org/dist/pulsar/pulsar-3.1.1/apache-pulsar-3.1.1-bin.tar.gz
tar -xzf apache-pulsar-3.1.1-bin.tar.gz
cd apache-pulsar-3.1.1
# Start in standalone mode
bin/pulsar standalone
# Verify operation in another terminal
bin/pulsar-admin clusters list
bin/pulsar-admin tenants list
bin/pulsar-admin namespaces list public
# Cluster startup with Docker Compose
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
zookeeper:
image: apachepulsar/pulsar:3.1.1
hostname: zookeeper
container_name: zookeeper
command: >
bash -c "bin/apply-config-from-env.py conf/zookeeper.conf &&
exec bin/pulsar zookeeper"
environment:
PULSAR_MEM: "-Xms512m -Xmx512m"
PULSAR_GC: "-XX:+UseG1GC"
volumes:
- ./data/zookeeper:/pulsar/data/zookeeper
bookkeeper:
image: apachepulsar/pulsar:3.1.1
hostname: bookkeeper
container_name: bookkeeper
command: >
bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf &&
exec bin/pulsar bookie"
environment:
PULSAR_MEM: "-Xms1g -Xmx1g"
PULSAR_GC: "-XX:+UseG1GC"
BOOKIE_MEM: "-Xms1g -Xmx1g"
depends_on:
- zookeeper
volumes:
- ./data/bookkeeper:/pulsar/data/bookkeeper
broker:
image: apachepulsar/pulsar:3.1.1
hostname: broker
container_name: broker
command: >
bash -c "bin/apply-config-from-env.py conf/broker.conf &&
exec bin/pulsar broker"
environment:
PULSAR_MEM: "-Xms1g -Xmx1g"
PULSAR_GC: "-XX:+UseG1GC"
ports:
- "6650:6650" # Pulsar protocol
- "8080:8080" # HTTP Admin API
depends_on:
- zookeeper
- bookkeeper
volumes:
zookeeper-data:
bookkeeper-data:
EOF
docker-compose up -d
# Check cluster status
sleep 30
curl http://localhost:8080/admin/v2/clusters
Basic Topic Management and Producer/Consumer Operations
# Create tenant
bin/pulsar-admin tenants create my-tenant \
--admin-roles my-admin-role \
--allowed-clusters standalone
# Create namespace
bin/pulsar-admin namespaces create my-tenant/my-namespace
# Create partitioned topic
bin/pulsar-admin topics create-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic \
--partitions 4
# List topics
bin/pulsar-admin topics list my-tenant/my-namespace
# Topic detailed information
bin/pulsar-admin topics stats \
persistent://my-tenant/my-namespace/my-topic
# Console producer
bin/pulsar-client produce \
persistent://my-tenant/my-namespace/my-topic \
--messages "Hello Pulsar Message 1,Hello Pulsar Message 2,Hello Pulsar Message 3"
# Console consumer
bin/pulsar-client consume \
persistent://my-tenant/my-namespace/my-topic \
--subscription-name my-subscription \
--num-messages 0
# Subscription management
bin/pulsar-admin topics subscriptions \
persistent://my-tenant/my-namespace/my-topic
# Subscription details
bin/pulsar-admin topics stats-internal \
persistent://my-tenant/my-namespace/my-topic
Java Producer and Consumer Implementation
// Maven dependencies
// <dependency>
// <groupId>org.apache.pulsar</groupId>
// <artifactId>pulsar-client</artifactId>
// <version>3.1.1</version>
// </dependency>
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import java.util.concurrent.TimeUnit;
// Java Producer example
public class PulsarProducerExample {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "persistent://my-tenant/my-namespace/orders";
public static void main(String[] args) throws Exception {
// Create Pulsar client
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.connectionTimeout(10, TimeUnit.SECONDS)
.operationTimeout(15, TimeUnit.SECONDS)
.build();
// Create producer
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(TOPIC_NAME)
.producerName("order-producer")
.batchingMaxMessages(100)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.compressionType(CompressionType.SNAPPY)
.create();
try {
// Send messages
for (int i = 0; i < 1000; i++) {
String orderData = String.format(
"{\"orderId\":%d,\"customerId\":%d,\"amount\":%.2f,\"timestamp\":%d}",
i, i % 100, Math.random() * 1000, System.currentTimeMillis()
);
// Asynchronous send
producer.newMessage()
.key("order-" + i)
.value(orderData)
.property("source", "java-app")
.sendAsync()
.thenAccept(messageId -> {
System.out.printf("Message sent successfully: %s%n", messageId);
})
.exceptionally(throwable -> {
System.err.println("Failed to send message: " + throwable.getMessage());
return null;
});
Thread.sleep(10);
}
// Wait for send completion
producer.flush();
} finally {
producer.close();
client.close();
}
}
}
// Java Consumer example
public class PulsarConsumerExample {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "persistent://my-tenant/my-namespace/orders";
private static final String SUBSCRIPTION_NAME = "order-processor";
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
// Create consumer
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(TOPIC_NAME)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionType(SubscriptionType.Shared) // Shared, Exclusive, Failover, Key_Shared
.receiverQueueSize(1000)
.acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
.subscribe();
try {
while (true) {
try {
// Receive message with timeout
Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
if (message != null) {
// Process message
String orderData = message.getValue();
String orderKey = message.getKey();
System.out.printf("Received order: key=%s, data=%s%n", orderKey, orderData);
// Business logic processing
processOrder(orderData);
// Acknowledge message
consumer.acknowledge(message);
} else {
System.out.println("No message received within timeout");
}
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
// Error handling (negative acknowledge, dead letter queue, etc.)
}
}
} finally {
consumer.close();
client.close();
}
}
private static void processOrder(String orderData) {
// Order processing logic
System.out.println("Processing order: " + orderData);
// Simulate processing time
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// Message listener type consumer example
public class PulsarMessageListenerExample {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://my-tenant/my-namespace/orders")
.subscriptionName("order-listener")
.subscriptionType(SubscriptionType.Shared)
.messageListener((consumer1, msg) -> {
try {
String orderData = msg.getValue();
String messageKey = msg.getKey();
System.out.printf("Listener received: key=%s, data=%s%n",
messageKey, orderData);
// Asynchronous acknowledgment
consumer1.acknowledgeAsync(msg)
.thenRun(() -> System.out.println("Message acknowledged"))
.exceptionally(throwable -> {
System.err.println("Ack failed: " + throwable.getMessage());
return null;
});
} catch (Exception e) {
System.err.println("Message processing failed: " + e.getMessage());
consumer1.negativeAcknowledge(msg);
}
})
.subscribe();
// Message listener runs on separate thread, so maintain main thread
System.out.println("Message listener started. Press Enter to exit...");
System.in.read();
consumer.close();
client.close();
}
}
Multi-Tenancy and Geo-Replication Configuration
# Multi-tenant environment setup
# Tenant 1 (development environment)
bin/pulsar-admin tenants create dev-tenant \
--admin-roles dev-admin \
--allowed-clusters standalone
bin/pulsar-admin namespaces create dev-tenant/app1
bin/pulsar-admin namespaces create dev-tenant/app2
# Tenant 2 (production environment)
bin/pulsar-admin tenants create prod-tenant \
--admin-roles prod-admin \
--allowed-clusters standalone
bin/pulsar-admin namespaces create prod-tenant/frontend
bin/pulsar-admin namespaces create prod-tenant/backend
# Resource quota configuration
bin/pulsar-admin resource-quotas set \
--bundle-specs 0x00000000_0xffffffff \
--msg-rate-in 1000 \
--msg-rate-out 2000 \
--bandwidth-in 1048576 \
--bandwidth-out 2097152 \
--memory 1073741824 \
--dynamic true \
dev-tenant/app1
# Geo-replication configuration
# Cluster 1 setup (Tokyo)
bin/pulsar-admin clusters create tokyo-cluster \
--url http://tokyo-pulsar:8080 \
--broker-url pulsar://tokyo-pulsar:6650
# Cluster 2 setup (Osaka)
bin/pulsar-admin clusters create osaka-cluster \
--url http://osaka-pulsar:8080 \
--broker-url pulsar://osaka-pulsar:6650
# Create global tenant
bin/pulsar-admin tenants create global-tenant \
--admin-roles global-admin \
--allowed-clusters tokyo-cluster,osaka-cluster
# Create global namespace (automatic replication)
bin/pulsar-admin namespaces create global-tenant/global-namespace \
--clusters tokyo-cluster,osaka-cluster
# Enable replication for specific topic
bin/pulsar-admin topics set-replication-clusters \
persistent://global-tenant/global-namespace/replicated-topic \
--clusters tokyo-cluster,osaka-cluster
# Check replication status
bin/pulsar-admin topics stats \
persistent://global-tenant/global-namespace/replicated-topic
Advanced Configuration and Features
// Schema Registry usage example
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.SchemaType;
public class SchemaExample {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// JSON Schema definition
String jsonSchema = "{\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
" \"orderId\": {\"type\": \"integer\"},\n" +
" \"customerId\": {\"type\": \"integer\"},\n" +
" \"amount\": {\"type\": \"number\"},\n" +
" \"timestamp\": {\"type\": \"integer\"}\n" +
" },\n" +
" \"required\": [\"orderId\", \"customerId\", \"amount\"]\n" +
"}";
Schema<GenericRecord> schema = Schema.JSON(jsonSchema);
// Producer with schema
Producer<GenericRecord> producer = client.newProducer(schema)
.topic("persistent://my-tenant/my-namespace/schema-topic")
.create();
// Create and send record
GenericRecord record = schema.newRecordBuilder()
.set("orderId", 12345)
.set("customerId", 67890)
.set("amount", 199.99)
.set("timestamp", System.currentTimeMillis())
.build();
producer.send(record);
producer.close();
client.close();
}
}
// Transaction example
public class TransactionExample {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.enableTransaction(true)
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("persistent://my-tenant/my-namespace/transaction-topic")
.sendTimeout(0, TimeUnit.SECONDS) // Required when using transactions
.create();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://my-tenant/my-namespace/transaction-topic")
.subscriptionName("transaction-subscription")
.subscribe();
try {
// Start transaction
Transaction transaction = client.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();
// Send messages within transaction
producer.newMessage(transaction)
.value("Transaction Message 1")
.send();
producer.newMessage(transaction)
.value("Transaction Message 2")
.send();
// Commit transaction
transaction.commit().get();
System.out.println("Transaction committed successfully");
} catch (Exception e) {
System.err.println("Transaction failed: " + e.getMessage());
} finally {
producer.close();
consumer.close();
client.close();
}
}
}
Python Client Implementation
# pip install pulsar-client
import pulsar
import json
import time
from datetime import datetime
# Python Producer example
class PulsarProducerExample:
def __init__(self, service_url='pulsar://localhost:6650'):
self.client = pulsar.Client(service_url)
self.producer = self.client.create_producer(
'persistent://my-tenant/my-namespace/python-topic',
producer_name='python-producer',
batching_enabled=True,
batching_max_messages=100,
batching_max_publish_delay_ms=10,
compression_type=pulsar.CompressionType.SNAPPY
)
def send_order(self, order_id, customer_id, amount):
order = {
'order_id': order_id,
'customer_id': customer_id,
'amount': amount,
'timestamp': datetime.now().isoformat(),
'status': 'pending'
}
try:
# Send message
msg_id = self.producer.send(
content=json.dumps(order).encode('utf-8'),
properties={
'source': 'python-app',
'version': '1.0'
},
partition_key=f'customer-{customer_id}'
)
print(f"Order sent successfully: {order_id}, message_id: {msg_id}")
except Exception as e:
print(f"Failed to send order {order_id}: {e}")
def close(self):
self.producer.close()
self.client.close()
# Python Consumer example
class PulsarConsumerExample:
def __init__(self, service_url='pulsar://localhost:6650'):
self.client = pulsar.Client(service_url)
self.consumer = self.client.subscribe(
'persistent://my-tenant/my-namespace/python-topic',
subscription_name='python-subscription',
consumer_type=pulsar.ConsumerType.Shared,
receiver_queue_size=1000,
consumer_name='python-consumer'
)
def process_messages(self):
try:
while True:
try:
# Receive message with timeout
msg = self.consumer.receive(timeout_millis=5000)
# Process message
order_data = json.loads(msg.data().decode('utf-8'))
print(f"Received order: {order_data}")
# Business logic processing
self.process_order(order_data)
# Acknowledge
self.consumer.acknowledge(msg)
except pulsar.Timeout:
print("No message received within timeout")
continue
except Exception as e:
print(f"Error processing message: {e}")
self.consumer.negative_acknowledge(msg)
except KeyboardInterrupt:
print("Stopping consumer...")
finally:
self.consumer.close()
self.client.close()
def process_order(self, order_data):
# Order processing logic
order_id = order_data.get('order_id')
amount = order_data.get('amount')
print(f"Processing order {order_id} for amount ${amount}")
# Simulate processing time
time.sleep(0.05)
# Usage example
if __name__ == "__main__":
# Producer example
producer = PulsarProducerExample()
for i in range(100):
producer.send_order(
order_id=i + 1,
customer_id=(i % 10) + 1,
amount=round(50 + (i * 10.5), 2)
)
producer.close()
# Consumer example
# consumer = PulsarConsumerExample()
# consumer.process_messages()
Monitoring and Performance Management
# Pulsar cluster monitoring
# Check broker status
bin/pulsar-admin brokers list standalone
bin/pulsar-admin brokers healthcheck
# Topic statistics
bin/pulsar-admin topics stats persistent://my-tenant/my-namespace/my-topic
bin/pulsar-admin topics partitioned-stats persistent://my-tenant/my-namespace/my-topic
# Subscription statistics
bin/pulsar-admin topics subscriptions persistent://my-tenant/my-namespace/my-topic
bin/pulsar-admin topics subscription-stats persistent://my-tenant/my-namespace/my-topic --subscription my-subscription
# BookKeeper statistics
bin/pulsar-admin bookies list
bin/pulsar-admin bookies list-bookies
# Performance testing
# Producer test
bin/pulsar-perf produce \
persistent://my-tenant/my-namespace/perf-topic \
--rate 10000 \
--num-producers 1 \
--size 1024 \
--num-messages 1000000
# Consumer test
bin/pulsar-perf consume \
persistent://my-tenant/my-namespace/perf-topic \
--subscription-name perf-subscription \
--num-consumers 1
# Monitoring metrics script
cat > monitor-pulsar.sh << 'EOF'
#!/bin/bash
echo "=== Pulsar Cluster Monitoring $(date) ==="
# Broker status
echo "=== Broker Status ==="
bin/pulsar-admin brokers healthcheck 2>/dev/null && echo "Broker: OK" || echo "Broker: ERROR"
# Cluster information
echo "=== Cluster Info ==="
bin/pulsar-admin clusters list
# Tenant and namespace count
echo "=== Tenants and Namespaces ==="
echo "Total tenants: $(bin/pulsar-admin tenants list | wc -l)"
echo "Total namespaces: $(bin/pulsar-admin namespaces list public | wc -l)"
# Topic count (public tenant only)
echo "=== Topics ==="
echo "Total topics: $(bin/pulsar-admin topics list public/default | wc -l)"
# BookKeeper status
echo "=== BookKeeper Status ==="
bin/pulsar-admin bookies list | wc -l | xargs echo "Available bookies:"
# JVM monitoring (if process is running)
echo "=== JVM Status ==="
pgrep -f "pulsar" > /dev/null && echo "Pulsar processes: $(pgrep -f pulsar | wc -l)" || echo "No Pulsar processes"
echo "=== Monitoring Complete ==="
EOF
chmod +x monitor-pulsar.sh