Apache Pulsar

Cloud-native distributed messaging system. Integrates multi-tenancy, geo-distributed replication, and schema management. Kafka alternative developed by Yahoo.

Message BrokerStreamingMulti-tenantGeo-replicationCloud NativeHigh Throughput

Server

Apache Pulsar

Overview

Apache Pulsar is a cloud-native distributed messaging and streaming platform originally developed by Yahoo (now Verizon Media) to address the limitations of traditional messaging systems. Designed with native multi-tenancy, real-time geo-replication, and a unique tiered architecture that separates serving and storage layers, Pulsar enables dynamic scaling and operational excellence. The platform delivers millions of messages per second with high throughput processing and millisecond-level low latency, making it ideal for IoT data processing, real-time analytics, and event-driven architectures in cloud-first environments.

Details

Apache Pulsar 2025 edition has established itself as the next-generation messaging platform for cloud-first enterprise environments. Its unique tiered architecture completely separates the Apache BookKeeper-based distributed storage layer from the Pulsar broker serving layer, enabling flexible scaling that was difficult to achieve with traditional coupled architectures like Kafka. Multi-tenancy is implemented through namespaces, resource isolation, and quota management, allowing multiple organizations and applications to safely operate on a single cluster. Geo-replication can be easily enabled through configuration, ensuring data consistency and disaster recovery in globally distributed environments. The comprehensive ecosystem includes Pulsar Functions, Pulsar IO, transactions, Schema Registry, and other enterprise-grade features.

Key Features

  • Tiered Architecture: Flexible scaling through separation of serving and storage layers
  • Native Multi-Tenancy: Secure shared usage through tenant, namespace, and resource isolation
  • Real-time Geo-Replication: Automatic data replication in global distributed environments
  • High Performance: Millions of messages per second with millisecond-level low latency
  • Infinite Storage: Persistence via Apache BookKeeper and tiered storage capabilities
  • Rich Ecosystem: Integrated Functions, IO, SQL, and Schema Registry

Pros and Cons

Pros

  • Native support for multi-tenancy and geo-replication, which are difficult with Kafka
  • Independent scaling of storage and compute through tiered separation
  • Excellent operability in Kubernetes environments with cloud-native design
  • Cost-effective long-term data storage through tiered storage
  • Unified platform providing transactions, Schema Registry, and function processing
  • Optimal for high-volume streaming processing like IoT and real-time analytics

Cons

  • Relatively smaller ecosystem and fewer adoption cases compared to Kafka
  • High learning curve due to complex architecture
  • Kafka may outperform in simple latency benchmarks in some cases
  • Less mature operational and monitoring tools compared to Kafka
  • Requires specialized knowledge for BookKeeper storage layer management
  • Complexity may become overhead in small-scale environments

Reference Pages

Code Examples

Installation and Basic Setup

# Java environment setup (Java 11+ required)
sudo apt update
sudo apt install openjdk-17-jdk
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64

# Pulsar binary download and installation
wget https://archive.apache.org/dist/pulsar/pulsar-3.1.1/apache-pulsar-3.1.1-bin.tar.gz
tar -xzf apache-pulsar-3.1.1-bin.tar.gz
cd apache-pulsar-3.1.1

# Start in standalone mode
bin/pulsar standalone

# Verify operation in another terminal
bin/pulsar-admin clusters list
bin/pulsar-admin tenants list
bin/pulsar-admin namespaces list public

# Cluster startup with Docker Compose
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  zookeeper:
    image: apachepulsar/pulsar:3.1.1
    hostname: zookeeper
    container_name: zookeeper
    command: >
      bash -c "bin/apply-config-from-env.py conf/zookeeper.conf &&
               exec bin/pulsar zookeeper"
    environment:
      PULSAR_MEM: "-Xms512m -Xmx512m"
      PULSAR_GC: "-XX:+UseG1GC"
    volumes:
      - ./data/zookeeper:/pulsar/data/zookeeper

  bookkeeper:
    image: apachepulsar/pulsar:3.1.1
    hostname: bookkeeper
    container_name: bookkeeper
    command: >
      bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf &&
               exec bin/pulsar bookie"
    environment:
      PULSAR_MEM: "-Xms1g -Xmx1g"
      PULSAR_GC: "-XX:+UseG1GC"
      BOOKIE_MEM: "-Xms1g -Xmx1g"
    depends_on:
      - zookeeper
    volumes:
      - ./data/bookkeeper:/pulsar/data/bookkeeper

  broker:
    image: apachepulsar/pulsar:3.1.1
    hostname: broker
    container_name: broker
    command: >
      bash -c "bin/apply-config-from-env.py conf/broker.conf &&
               exec bin/pulsar broker"
    environment:
      PULSAR_MEM: "-Xms1g -Xmx1g"
      PULSAR_GC: "-XX:+UseG1GC"
    ports:
      - "6650:6650"    # Pulsar protocol
      - "8080:8080"    # HTTP Admin API
    depends_on:
      - zookeeper
      - bookkeeper

volumes:
  zookeeper-data:
  bookkeeper-data:
EOF

docker-compose up -d

# Check cluster status
sleep 30
curl http://localhost:8080/admin/v2/clusters

Basic Topic Management and Producer/Consumer Operations

# Create tenant
bin/pulsar-admin tenants create my-tenant \
    --admin-roles my-admin-role \
    --allowed-clusters standalone

# Create namespace
bin/pulsar-admin namespaces create my-tenant/my-namespace

# Create partitioned topic
bin/pulsar-admin topics create-partitioned-topic \
    persistent://my-tenant/my-namespace/my-topic \
    --partitions 4

# List topics
bin/pulsar-admin topics list my-tenant/my-namespace

# Topic detailed information
bin/pulsar-admin topics stats \
    persistent://my-tenant/my-namespace/my-topic

# Console producer
bin/pulsar-client produce \
    persistent://my-tenant/my-namespace/my-topic \
    --messages "Hello Pulsar Message 1,Hello Pulsar Message 2,Hello Pulsar Message 3"

# Console consumer
bin/pulsar-client consume \
    persistent://my-tenant/my-namespace/my-topic \
    --subscription-name my-subscription \
    --num-messages 0

# Subscription management
bin/pulsar-admin topics subscriptions \
    persistent://my-tenant/my-namespace/my-topic

# Subscription details
bin/pulsar-admin topics stats-internal \
    persistent://my-tenant/my-namespace/my-topic

Java Producer and Consumer Implementation

// Maven dependencies
// <dependency>
//   <groupId>org.apache.pulsar</groupId>
//   <artifactId>pulsar-client</artifactId>
//   <version>3.1.1</version>
// </dependency>

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;

import java.util.concurrent.TimeUnit;

// Java Producer example
public class PulsarProducerExample {
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String TOPIC_NAME = "persistent://my-tenant/my-namespace/orders";
    
    public static void main(String[] args) throws Exception {
        // Create Pulsar client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .connectionTimeout(10, TimeUnit.SECONDS)
                .operationTimeout(15, TimeUnit.SECONDS)
                .build();
        
        // Create producer
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic(TOPIC_NAME)
                .producerName("order-producer")
                .batchingMaxMessages(100)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .compressionType(CompressionType.SNAPPY)
                .create();
        
        try {
            // Send messages
            for (int i = 0; i < 1000; i++) {
                String orderData = String.format(
                    "{\"orderId\":%d,\"customerId\":%d,\"amount\":%.2f,\"timestamp\":%d}",
                    i, i % 100, Math.random() * 1000, System.currentTimeMillis()
                );
                
                // Asynchronous send
                producer.newMessage()
                        .key("order-" + i)
                        .value(orderData)
                        .property("source", "java-app")
                        .sendAsync()
                        .thenAccept(messageId -> {
                            System.out.printf("Message sent successfully: %s%n", messageId);
                        })
                        .exceptionally(throwable -> {
                            System.err.println("Failed to send message: " + throwable.getMessage());
                            return null;
                        });
                
                Thread.sleep(10);
            }
            
            // Wait for send completion
            producer.flush();
            
        } finally {
            producer.close();
            client.close();
        }
    }
}

// Java Consumer example
public class PulsarConsumerExample {
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String TOPIC_NAME = "persistent://my-tenant/my-namespace/orders";
    private static final String SUBSCRIPTION_NAME = "order-processor";
    
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .build();
        
        // Create consumer
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic(TOPIC_NAME)
                .subscriptionName(SUBSCRIPTION_NAME)
                .subscriptionType(SubscriptionType.Shared)  // Shared, Exclusive, Failover, Key_Shared
                .receiverQueueSize(1000)
                .acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
                .subscribe();
        
        try {
            while (true) {
                try {
                    // Receive message with timeout
                    Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
                    
                    if (message != null) {
                        // Process message
                        String orderData = message.getValue();
                        String orderKey = message.getKey();
                        
                        System.out.printf("Received order: key=%s, data=%s%n", orderKey, orderData);
                        
                        // Business logic processing
                        processOrder(orderData);
                        
                        // Acknowledge message
                        consumer.acknowledge(message);
                        
                    } else {
                        System.out.println("No message received within timeout");
                    }
                    
                } catch (Exception e) {
                    System.err.println("Error processing message: " + e.getMessage());
                    // Error handling (negative acknowledge, dead letter queue, etc.)
                }
            }
        } finally {
            consumer.close();
            client.close();
        }
    }
    
    private static void processOrder(String orderData) {
        // Order processing logic
        System.out.println("Processing order: " + orderData);
        
        // Simulate processing time
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// Message listener type consumer example
public class PulsarMessageListenerExample {
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("persistent://my-tenant/my-namespace/orders")
                .subscriptionName("order-listener")
                .subscriptionType(SubscriptionType.Shared)
                .messageListener((consumer1, msg) -> {
                    try {
                        String orderData = msg.getValue();
                        String messageKey = msg.getKey();
                        
                        System.out.printf("Listener received: key=%s, data=%s%n", 
                            messageKey, orderData);
                        
                        // Asynchronous acknowledgment
                        consumer1.acknowledgeAsync(msg)
                                .thenRun(() -> System.out.println("Message acknowledged"))
                                .exceptionally(throwable -> {
                                    System.err.println("Ack failed: " + throwable.getMessage());
                                    return null;
                                });
                        
                    } catch (Exception e) {
                        System.err.println("Message processing failed: " + e.getMessage());
                        consumer1.negativeAcknowledge(msg);
                    }
                })
                .subscribe();
        
        // Message listener runs on separate thread, so maintain main thread
        System.out.println("Message listener started. Press Enter to exit...");
        System.in.read();
        
        consumer.close();
        client.close();
    }
}

Multi-Tenancy and Geo-Replication Configuration

# Multi-tenant environment setup
# Tenant 1 (development environment)
bin/pulsar-admin tenants create dev-tenant \
    --admin-roles dev-admin \
    --allowed-clusters standalone

bin/pulsar-admin namespaces create dev-tenant/app1
bin/pulsar-admin namespaces create dev-tenant/app2

# Tenant 2 (production environment)
bin/pulsar-admin tenants create prod-tenant \
    --admin-roles prod-admin \
    --allowed-clusters standalone

bin/pulsar-admin namespaces create prod-tenant/frontend
bin/pulsar-admin namespaces create prod-tenant/backend

# Resource quota configuration
bin/pulsar-admin resource-quotas set \
    --bundle-specs 0x00000000_0xffffffff \
    --msg-rate-in 1000 \
    --msg-rate-out 2000 \
    --bandwidth-in 1048576 \
    --bandwidth-out 2097152 \
    --memory 1073741824 \
    --dynamic true \
    dev-tenant/app1

# Geo-replication configuration
# Cluster 1 setup (Tokyo)
bin/pulsar-admin clusters create tokyo-cluster \
    --url http://tokyo-pulsar:8080 \
    --broker-url pulsar://tokyo-pulsar:6650

# Cluster 2 setup (Osaka)
bin/pulsar-admin clusters create osaka-cluster \
    --url http://osaka-pulsar:8080 \
    --broker-url pulsar://osaka-pulsar:6650

# Create global tenant
bin/pulsar-admin tenants create global-tenant \
    --admin-roles global-admin \
    --allowed-clusters tokyo-cluster,osaka-cluster

# Create global namespace (automatic replication)
bin/pulsar-admin namespaces create global-tenant/global-namespace \
    --clusters tokyo-cluster,osaka-cluster

# Enable replication for specific topic
bin/pulsar-admin topics set-replication-clusters \
    persistent://global-tenant/global-namespace/replicated-topic \
    --clusters tokyo-cluster,osaka-cluster

# Check replication status
bin/pulsar-admin topics stats \
    persistent://global-tenant/global-namespace/replicated-topic

Advanced Configuration and Features

// Schema Registry usage example
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.SchemaType;

public class SchemaExample {
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        // JSON Schema definition
        String jsonSchema = "{\n" +
                "  \"type\": \"object\",\n" +
                "  \"properties\": {\n" +
                "    \"orderId\": {\"type\": \"integer\"},\n" +
                "    \"customerId\": {\"type\": \"integer\"},\n" +
                "    \"amount\": {\"type\": \"number\"},\n" +
                "    \"timestamp\": {\"type\": \"integer\"}\n" +
                "  },\n" +
                "  \"required\": [\"orderId\", \"customerId\", \"amount\"]\n" +
                "}";
        
        Schema<GenericRecord> schema = Schema.JSON(jsonSchema);
        
        // Producer with schema
        Producer<GenericRecord> producer = client.newProducer(schema)
                .topic("persistent://my-tenant/my-namespace/schema-topic")
                .create();
        
        // Create and send record
        GenericRecord record = schema.newRecordBuilder()
                .set("orderId", 12345)
                .set("customerId", 67890)
                .set("amount", 199.99)
                .set("timestamp", System.currentTimeMillis())
                .build();
        
        producer.send(record);
        
        producer.close();
        client.close();
    }
}

// Transaction example
public class TransactionExample {
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .enableTransaction(true)
                .build();
        
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("persistent://my-tenant/my-namespace/transaction-topic")
                .sendTimeout(0, TimeUnit.SECONDS)  // Required when using transactions
                .create();
        
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("persistent://my-tenant/my-namespace/transaction-topic")
                .subscriptionName("transaction-subscription")
                .subscribe();
        
        try {
            // Start transaction
            Transaction transaction = client.newTransaction()
                    .withTransactionTimeout(5, TimeUnit.MINUTES)
                    .build()
                    .get();
            
            // Send messages within transaction
            producer.newMessage(transaction)
                    .value("Transaction Message 1")
                    .send();
            
            producer.newMessage(transaction)
                    .value("Transaction Message 2")
                    .send();
            
            // Commit transaction
            transaction.commit().get();
            
            System.out.println("Transaction committed successfully");
            
        } catch (Exception e) {
            System.err.println("Transaction failed: " + e.getMessage());
        } finally {
            producer.close();
            consumer.close();
            client.close();
        }
    }
}

Python Client Implementation

# pip install pulsar-client

import pulsar
import json
import time
from datetime import datetime

# Python Producer example
class PulsarProducerExample:
    def __init__(self, service_url='pulsar://localhost:6650'):
        self.client = pulsar.Client(service_url)
        self.producer = self.client.create_producer(
            'persistent://my-tenant/my-namespace/python-topic',
            producer_name='python-producer',
            batching_enabled=True,
            batching_max_messages=100,
            batching_max_publish_delay_ms=10,
            compression_type=pulsar.CompressionType.SNAPPY
        )
    
    def send_order(self, order_id, customer_id, amount):
        order = {
            'order_id': order_id,
            'customer_id': customer_id,
            'amount': amount,
            'timestamp': datetime.now().isoformat(),
            'status': 'pending'
        }
        
        try:
            # Send message
            msg_id = self.producer.send(
                content=json.dumps(order).encode('utf-8'),
                properties={
                    'source': 'python-app',
                    'version': '1.0'
                },
                partition_key=f'customer-{customer_id}'
            )
            
            print(f"Order sent successfully: {order_id}, message_id: {msg_id}")
            
        except Exception as e:
            print(f"Failed to send order {order_id}: {e}")
    
    def close(self):
        self.producer.close()
        self.client.close()

# Python Consumer example
class PulsarConsumerExample:
    def __init__(self, service_url='pulsar://localhost:6650'):
        self.client = pulsar.Client(service_url)
        self.consumer = self.client.subscribe(
            'persistent://my-tenant/my-namespace/python-topic',
            subscription_name='python-subscription',
            consumer_type=pulsar.ConsumerType.Shared,
            receiver_queue_size=1000,
            consumer_name='python-consumer'
        )
    
    def process_messages(self):
        try:
            while True:
                try:
                    # Receive message with timeout
                    msg = self.consumer.receive(timeout_millis=5000)
                    
                    # Process message
                    order_data = json.loads(msg.data().decode('utf-8'))
                    print(f"Received order: {order_data}")
                    
                    # Business logic processing
                    self.process_order(order_data)
                    
                    # Acknowledge
                    self.consumer.acknowledge(msg)
                    
                except pulsar.Timeout:
                    print("No message received within timeout")
                    continue
                except Exception as e:
                    print(f"Error processing message: {e}")
                    self.consumer.negative_acknowledge(msg)
                    
        except KeyboardInterrupt:
            print("Stopping consumer...")
        finally:
            self.consumer.close()
            self.client.close()
    
    def process_order(self, order_data):
        # Order processing logic
        order_id = order_data.get('order_id')
        amount = order_data.get('amount')
        
        print(f"Processing order {order_id} for amount ${amount}")
        
        # Simulate processing time
        time.sleep(0.05)

# Usage example
if __name__ == "__main__":
    # Producer example
    producer = PulsarProducerExample()
    
    for i in range(100):
        producer.send_order(
            order_id=i + 1,
            customer_id=(i % 10) + 1,
            amount=round(50 + (i * 10.5), 2)
        )
    
    producer.close()
    
    # Consumer example
    # consumer = PulsarConsumerExample()
    # consumer.process_messages()

Monitoring and Performance Management

# Pulsar cluster monitoring
# Check broker status
bin/pulsar-admin brokers list standalone
bin/pulsar-admin brokers healthcheck

# Topic statistics
bin/pulsar-admin topics stats persistent://my-tenant/my-namespace/my-topic
bin/pulsar-admin topics partitioned-stats persistent://my-tenant/my-namespace/my-topic

# Subscription statistics
bin/pulsar-admin topics subscriptions persistent://my-tenant/my-namespace/my-topic
bin/pulsar-admin topics subscription-stats persistent://my-tenant/my-namespace/my-topic --subscription my-subscription

# BookKeeper statistics
bin/pulsar-admin bookies list
bin/pulsar-admin bookies list-bookies

# Performance testing
# Producer test
bin/pulsar-perf produce \
    persistent://my-tenant/my-namespace/perf-topic \
    --rate 10000 \
    --num-producers 1 \
    --size 1024 \
    --num-messages 1000000

# Consumer test
bin/pulsar-perf consume \
    persistent://my-tenant/my-namespace/perf-topic \
    --subscription-name perf-subscription \
    --num-consumers 1

# Monitoring metrics script
cat > monitor-pulsar.sh << 'EOF'
#!/bin/bash

echo "=== Pulsar Cluster Monitoring $(date) ==="

# Broker status
echo "=== Broker Status ==="
bin/pulsar-admin brokers healthcheck 2>/dev/null && echo "Broker: OK" || echo "Broker: ERROR"

# Cluster information
echo "=== Cluster Info ==="
bin/pulsar-admin clusters list

# Tenant and namespace count
echo "=== Tenants and Namespaces ==="
echo "Total tenants: $(bin/pulsar-admin tenants list | wc -l)"
echo "Total namespaces: $(bin/pulsar-admin namespaces list public | wc -l)"

# Topic count (public tenant only)
echo "=== Topics ==="
echo "Total topics: $(bin/pulsar-admin topics list public/default | wc -l)"

# BookKeeper status
echo "=== BookKeeper Status ==="
bin/pulsar-admin bookies list | wc -l | xargs echo "Available bookies:"

# JVM monitoring (if process is running)
echo "=== JVM Status ==="
pgrep -f "pulsar" > /dev/null && echo "Pulsar processes: $(pgrep -f pulsar | wc -l)" || echo "No Pulsar processes"

echo "=== Monitoring Complete ==="
EOF

chmod +x monitor-pulsar.sh