Apache Pulsar

クラウドネイティブ分散メッセージングシステム。マルチテナント、地理分散レプリケーション、スキーマ管理を統合。Yahoo開発のKafka代替。

メッセージブローカーストリーミングマルチテナントジオレプリケーションクラウドネイティブ高スループット

サーバー

Apache Pulsar

概要

Apache Pulsarは、Yahoo(現Verizon Media)によって開発されたクラウドネイティブな分散メッセージング・ストリーミングプラットフォームです。従来のメッセージングシステムの制約を解決するために設計され、ネイティブなマルチテナンシー、リアルタイムジオレプリケーション、階層分離アーキテクチャを特徴とします。サービング層とストレージ層の分離により、動的スケーリングと優れた運用性を実現し、数百万メッセージ/秒の高スループット処理とミリ秒レベルの低レイテンシを両立。IoTデータ処理、リアルタイム分析、イベント駆動アーキテクチャに最適化されています。

詳細

Apache Pulsar 2025年版は、クラウドファーストの企業環境において次世代メッセージングプラットフォームとしての地位を確立しています。独自の階層分離アーキテクチャにより、Apache BookKeeperベースの分散ストレージ層とPulsarブローカーのサービング層を完全に分離し、従来のKafkaのような結合型アーキテクチャでは困難だった柔軟なスケーリングを実現。マルチテナンシーはネームスペース、リソース分離、クォータ管理により実装され、単一クラスタで複数の組織・アプリケーションを安全に運用可能。ジオレプリケーションは設定ベースで簡単に有効化でき、グローバル分散環境でのデータ一貫性と災害復旧を保証します。Pulsar Functions、Pulsar IO、トランザクション、Schema Registryなどの包括的なエコシステムも充実しています。

主な特徴

  • 階層分離アーキテクチャ: サービング層とストレージ層の分離による柔軟なスケーリング
  • ネイティブマルチテナンシー: テナント、ネームスペース、リソース分離による安全な共有利用
  • リアルタイムジオレプリケーション: グローバル分散環境での自動データ複製
  • 高パフォーマンス: 数百万メッセージ/秒とミリ秒レベルの低レイテンシ
  • 無限保存: Apache BookKeeperによる永続化とティアードストレージ
  • 豊富なエコシステム: Functions、IO、SQL、Schema Registryの統合

メリット・デメリット

メリット

  • Kafkaでは困難なマルチテナンシーとジオレプリケーションをネイティブサポート
  • 階層分離によりストレージとコンピュートを独立してスケール可能
  • クラウドネイティブ設計によりKubernetes環境での運用性が優秀
  • ティアードストレージにより低コストでの長期データ保存
  • トランザクション、Schema Registry、関数処理を統合プラットフォームで提供
  • IoT、リアルタイム分析などの大容量ストリーミング処理に最適

デメリット

  • Kafkaに比べて相対的にエコシステムが小さく導入事例が少ない
  • 複雑なアーキテクチャによる学習コストの高さ
  • 単純なレイテンシベンチマークではKafkaが上回る場合がある
  • 運用・監視ツールの成熟度がKafkaより劣る
  • BookKeeperストレージ層の管理に専門知識が必要
  • 小規模環境では複雑性がオーバーヘッドとなる可能性

参考ページ

書き方の例

インストールと基本セットアップ

# Java環境設定(Java 11以上が必要)
sudo apt update
sudo apt install openjdk-17-jdk
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64

# Pulsar バイナリダウンロード・インストール
wget https://archive.apache.org/dist/pulsar/pulsar-3.1.1/apache-pulsar-3.1.1-bin.tar.gz
tar -xzf apache-pulsar-3.1.1-bin.tar.gz
cd apache-pulsar-3.1.1

# スタンドアロンモードでの起動
bin/pulsar standalone

# 別のターミナルで動作確認
bin/pulsar-admin clusters list
bin/pulsar-admin tenants list
bin/pulsar-admin namespaces list public

# Docker Composeでのクラスタ起動
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  zookeeper:
    image: apachepulsar/pulsar:3.1.1
    hostname: zookeeper
    container_name: zookeeper
    command: >
      bash -c "bin/apply-config-from-env.py conf/zookeeper.conf &&
               exec bin/pulsar zookeeper"
    environment:
      PULSAR_MEM: "-Xms512m -Xmx512m"
      PULSAR_GC: "-XX:+UseG1GC"
    volumes:
      - ./data/zookeeper:/pulsar/data/zookeeper

  bookkeeper:
    image: apachepulsar/pulsar:3.1.1
    hostname: bookkeeper
    container_name: bookkeeper
    command: >
      bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf &&
               exec bin/pulsar bookie"
    environment:
      PULSAR_MEM: "-Xms1g -Xmx1g"
      PULSAR_GC: "-XX:+UseG1GC"
      BOOKIE_MEM: "-Xms1g -Xmx1g"
    depends_on:
      - zookeeper
    volumes:
      - ./data/bookkeeper:/pulsar/data/bookkeeper

  broker:
    image: apachepulsar/pulsar:3.1.1
    hostname: broker
    container_name: broker
    command: >
      bash -c "bin/apply-config-from-env.py conf/broker.conf &&
               exec bin/pulsar broker"
    environment:
      PULSAR_MEM: "-Xms1g -Xmx1g"
      PULSAR_GC: "-XX:+UseG1GC"
    ports:
      - "6650:6650"    # Pulsar プロトコル
      - "8080:8080"    # HTTP Admin API
    depends_on:
      - zookeeper
      - bookkeeper

volumes:
  zookeeper-data:
  bookkeeper-data:
EOF

docker-compose up -d

# クラスタ状態確認
sleep 30
curl http://localhost:8080/admin/v2/clusters

基本的なトピック管理とプロデューサー・コンシューマー

# テナント作成
bin/pulsar-admin tenants create my-tenant \
    --admin-roles my-admin-role \
    --allowed-clusters standalone

# ネームスペース作成
bin/pulsar-admin namespaces create my-tenant/my-namespace

# トピック作成(パーティション付き)
bin/pulsar-admin topics create-partitioned-topic \
    persistent://my-tenant/my-namespace/my-topic \
    --partitions 4

# トピック一覧確認
bin/pulsar-admin topics list my-tenant/my-namespace

# トピック詳細情報
bin/pulsar-admin topics stats \
    persistent://my-tenant/my-namespace/my-topic

# コンソールプロデューサー
bin/pulsar-client produce \
    persistent://my-tenant/my-namespace/my-topic \
    --messages "Hello Pulsar Message 1,Hello Pulsar Message 2,Hello Pulsar Message 3"

# コンソールコンシューマー
bin/pulsar-client consume \
    persistent://my-tenant/my-namespace/my-topic \
    --subscription-name my-subscription \
    --num-messages 0

# サブスクリプション管理
bin/pulsar-admin topics subscriptions \
    persistent://my-tenant/my-namespace/my-topic

# サブスクリプション詳細
bin/pulsar-admin topics stats-internal \
    persistent://my-tenant/my-namespace/my-topic

Java プロデューサー・コンシューマー実装

// Maven dependencies
// <dependency>
//   <groupId>org.apache.pulsar</groupId>
//   <artifactId>pulsar-client</artifactId>
//   <version>3.1.1</version>
// </dependency>

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;

import java.util.concurrent.TimeUnit;

// Java プロデューサー例
public class PulsarProducerExample {
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String TOPIC_NAME = "persistent://my-tenant/my-namespace/orders";
    
    public static void main(String[] args) throws Exception {
        // Pulsarクライアント作成
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .connectionTimeout(10, TimeUnit.SECONDS)
                .operationTimeout(15, TimeUnit.SECONDS)
                .build();
        
        // プロデューサー作成
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic(TOPIC_NAME)
                .producerName("order-producer")
                .batchingMaxMessages(100)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .compressionType(CompressionType.SNAPPY)
                .create();
        
        try {
            // メッセージ送信
            for (int i = 0; i < 1000; i++) {
                String orderData = String.format(
                    "{\"orderId\":%d,\"customerId\":%d,\"amount\":%.2f,\"timestamp\":%d}",
                    i, i % 100, Math.random() * 1000, System.currentTimeMillis()
                );
                
                // 非同期送信
                producer.newMessage()
                        .key("order-" + i)
                        .value(orderData)
                        .property("source", "java-app")
                        .sendAsync()
                        .thenAccept(messageId -> {
                            System.out.printf("Message sent successfully: %s%n", messageId);
                        })
                        .exceptionally(throwable -> {
                            System.err.println("Failed to send message: " + throwable.getMessage());
                            return null;
                        });
                
                Thread.sleep(10);
            }
            
            // 送信完了を待つ
            producer.flush();
            
        } finally {
            producer.close();
            client.close();
        }
    }
}

// Java コンシューマー例
public class PulsarConsumerExample {
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String TOPIC_NAME = "persistent://my-tenant/my-namespace/orders";
    private static final String SUBSCRIPTION_NAME = "order-processor";
    
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .build();
        
        // コンシューマー作成
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic(TOPIC_NAME)
                .subscriptionName(SUBSCRIPTION_NAME)
                .subscriptionType(SubscriptionType.Shared)  // Shared, Exclusive, Failover, Key_Shared
                .receiverQueueSize(1000)
                .acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
                .subscribe();
        
        try {
            while (true) {
                try {
                    // メッセージ受信(タイムアウト付き)
                    Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
                    
                    if (message != null) {
                        // メッセージ処理
                        String orderData = message.getValue();
                        String orderKey = message.getKey();
                        
                        System.out.printf("Received order: key=%s, data=%s%n", orderKey, orderData);
                        
                        // ビジネスロジック処理
                        processOrder(orderData);
                        
                        // メッセージ確認応答
                        consumer.acknowledge(message);
                        
                    } else {
                        System.out.println("No message received within timeout");
                    }
                    
                } catch (Exception e) {
                    System.err.println("Error processing message: " + e.getMessage());
                    // エラー処理(negative acknowledge、dead letter queue等)
                }
            }
        } finally {
            consumer.close();
            client.close();
        }
    }
    
    private static void processOrder(String orderData) {
        // 注文処理ロジック
        System.out.println("Processing order: " + orderData);
        
        // 処理時間のシミュレーション
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// メッセージリスナー型コンシューマー例
public class PulsarMessageListenerExample {
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("persistent://my-tenant/my-namespace/orders")
                .subscriptionName("order-listener")
                .subscriptionType(SubscriptionType.Shared)
                .messageListener((consumer1, msg) -> {
                    try {
                        String orderData = msg.getValue();
                        String messageKey = msg.getKey();
                        
                        System.out.printf("Listener received: key=%s, data=%s%n", 
                            messageKey, orderData);
                        
                        // 非同期確認応答
                        consumer1.acknowledgeAsync(msg)
                                .thenRun(() -> System.out.println("Message acknowledged"))
                                .exceptionally(throwable -> {
                                    System.err.println("Ack failed: " + throwable.getMessage());
                                    return null;
                                });
                        
                    } catch (Exception e) {
                        System.err.println("Message processing failed: " + e.getMessage());
                        consumer1.negativeAcknowledge(msg);
                    }
                })
                .subscribe();
        
        // メッセージリスナーは別スレッドで動作するため、メインスレッドを維持
        System.out.println("Message listener started. Press Enter to exit...");
        System.in.read();
        
        consumer.close();
        client.close();
    }
}

マルチテナンシーとジオレプリケーション設定

# マルチテナント環境設定
# テナント1(開発環境)
bin/pulsar-admin tenants create dev-tenant \
    --admin-roles dev-admin \
    --allowed-clusters standalone

bin/pulsar-admin namespaces create dev-tenant/app1
bin/pulsar-admin namespaces create dev-tenant/app2

# テナント2(本番環境)
bin/pulsar-admin tenants create prod-tenant \
    --admin-roles prod-admin \
    --allowed-clusters standalone

bin/pulsar-admin namespaces create prod-tenant/frontend
bin/pulsar-admin namespaces create prod-tenant/backend

# リソースクォータ設定
bin/pulsar-admin resource-quotas set \
    --bundle-specs 0x00000000_0xffffffff \
    --msg-rate-in 1000 \
    --msg-rate-out 2000 \
    --bandwidth-in 1048576 \
    --bandwidth-out 2097152 \
    --memory 1073741824 \
    --dynamic true \
    dev-tenant/app1

# ジオレプリケーション設定
# クラスタ1設定(東京)
bin/pulsar-admin clusters create tokyo-cluster \
    --url http://tokyo-pulsar:8080 \
    --broker-url pulsar://tokyo-pulsar:6650

# クラスタ2設定(大阪)
bin/pulsar-admin clusters create osaka-cluster \
    --url http://osaka-pulsar:8080 \
    --broker-url pulsar://osaka-pulsar:6650

# グローバルテナント作成
bin/pulsar-admin tenants create global-tenant \
    --admin-roles global-admin \
    --allowed-clusters tokyo-cluster,osaka-cluster

# グローバルネームスペース作成(自動レプリケーション)
bin/pulsar-admin namespaces create global-tenant/global-namespace \
    --clusters tokyo-cluster,osaka-cluster

# 特定トピックのレプリケーション有効化
bin/pulsar-admin topics set-replication-clusters \
    persistent://global-tenant/global-namespace/replicated-topic \
    --clusters tokyo-cluster,osaka-cluster

# レプリケーション状況確認
bin/pulsar-admin topics stats \
    persistent://global-tenant/global-namespace/replicated-topic

高度な設定と機能

// Schema Registry使用例
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.SchemaType;

public class SchemaExample {
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        
        // JSON Schema定義
        String jsonSchema = "{\n" +
                "  \"type\": \"object\",\n" +
                "  \"properties\": {\n" +
                "    \"orderId\": {\"type\": \"integer\"},\n" +
                "    \"customerId\": {\"type\": \"integer\"},\n" +
                "    \"amount\": {\"type\": \"number\"},\n" +
                "    \"timestamp\": {\"type\": \"integer\"}\n" +
                "  },\n" +
                "  \"required\": [\"orderId\", \"customerId\", \"amount\"]\n" +
                "}";
        
        Schema<GenericRecord> schema = Schema.JSON(jsonSchema);
        
        // スキーマ付きプロデューサー
        Producer<GenericRecord> producer = client.newProducer(schema)
                .topic("persistent://my-tenant/my-namespace/schema-topic")
                .create();
        
        // レコード作成・送信
        GenericRecord record = schema.newRecordBuilder()
                .set("orderId", 12345)
                .set("customerId", 67890)
                .set("amount", 199.99)
                .set("timestamp", System.currentTimeMillis())
                .build();
        
        producer.send(record);
        
        producer.close();
        client.close();
    }
}

// トランザクション例
public class TransactionExample {
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .enableTransaction(true)
                .build();
        
        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("persistent://my-tenant/my-namespace/transaction-topic")
                .sendTimeout(0, TimeUnit.SECONDS)  // トランザクション使用時は必須
                .create();
        
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("persistent://my-tenant/my-namespace/transaction-topic")
                .subscriptionName("transaction-subscription")
                .subscribe();
        
        try {
            // トランザクション開始
            Transaction transaction = client.newTransaction()
                    .withTransactionTimeout(5, TimeUnit.MINUTES)
                    .build()
                    .get();
            
            // トランザクション内でメッセージ送信
            producer.newMessage(transaction)
                    .value("Transaction Message 1")
                    .send();
            
            producer.newMessage(transaction)
                    .value("Transaction Message 2")
                    .send();
            
            // トランザクションコミット
            transaction.commit().get();
            
            System.out.println("Transaction committed successfully");
            
        } catch (Exception e) {
            System.err.println("Transaction failed: " + e.getMessage());
        } finally {
            producer.close();
            consumer.close();
            client.close();
        }
    }
}

Python クライアント実装

# pip install pulsar-client

import pulsar
import json
import time
from datetime import datetime

# Python プロデューサー例
class PulsarProducerExample:
    def __init__(self, service_url='pulsar://localhost:6650'):
        self.client = pulsar.Client(service_url)
        self.producer = self.client.create_producer(
            'persistent://my-tenant/my-namespace/python-topic',
            producer_name='python-producer',
            batching_enabled=True,
            batching_max_messages=100,
            batching_max_publish_delay_ms=10,
            compression_type=pulsar.CompressionType.SNAPPY
        )
    
    def send_order(self, order_id, customer_id, amount):
        order = {
            'order_id': order_id,
            'customer_id': customer_id,
            'amount': amount,
            'timestamp': datetime.now().isoformat(),
            'status': 'pending'
        }
        
        try:
            # メッセージ送信
            msg_id = self.producer.send(
                content=json.dumps(order).encode('utf-8'),
                properties={
                    'source': 'python-app',
                    'version': '1.0'
                },
                partition_key=f'customer-{customer_id}'
            )
            
            print(f"Order sent successfully: {order_id}, message_id: {msg_id}")
            
        except Exception as e:
            print(f"Failed to send order {order_id}: {e}")
    
    def close(self):
        self.producer.close()
        self.client.close()

# Python コンシューマー例
class PulsarConsumerExample:
    def __init__(self, service_url='pulsar://localhost:6650'):
        self.client = pulsar.Client(service_url)
        self.consumer = self.client.subscribe(
            'persistent://my-tenant/my-namespace/python-topic',
            subscription_name='python-subscription',
            consumer_type=pulsar.ConsumerType.Shared,
            receiver_queue_size=1000,
            consumer_name='python-consumer'
        )
    
    def process_messages(self):
        try:
            while True:
                try:
                    # メッセージ受信(タイムアウト付き)
                    msg = self.consumer.receive(timeout_millis=5000)
                    
                    # メッセージ処理
                    order_data = json.loads(msg.data().decode('utf-8'))
                    print(f"Received order: {order_data}")
                    
                    # ビジネスロジック処理
                    self.process_order(order_data)
                    
                    # 確認応答
                    self.consumer.acknowledge(msg)
                    
                except pulsar.Timeout:
                    print("No message received within timeout")
                    continue
                except Exception as e:
                    print(f"Error processing message: {e}")
                    self.consumer.negative_acknowledge(msg)
                    
        except KeyboardInterrupt:
            print("Stopping consumer...")
        finally:
            self.consumer.close()
            self.client.close()
    
    def process_order(self, order_data):
        # 注文処理ロジック
        order_id = order_data.get('order_id')
        amount = order_data.get('amount')
        
        print(f"Processing order {order_id} for amount ${amount}")
        
        # 処理時間のシミュレーション
        time.sleep(0.05)

# 使用例
if __name__ == "__main__":
    # プロデューサー例
    producer = PulsarProducerExample()
    
    for i in range(100):
        producer.send_order(
            order_id=i + 1,
            customer_id=(i % 10) + 1,
            amount=round(50 + (i * 10.5), 2)
        )
    
    producer.close()
    
    # コンシューマー例
    # consumer = PulsarConsumerExample()
    # consumer.process_messages()

監視とパフォーマンス管理

# Pulsarクラスタ監視
# ブローカー状態確認
bin/pulsar-admin brokers list standalone
bin/pulsar-admin brokers healthcheck

# トピック統計情報
bin/pulsar-admin topics stats persistent://my-tenant/my-namespace/my-topic
bin/pulsar-admin topics partitioned-stats persistent://my-tenant/my-namespace/my-topic

# サブスクリプション統計
bin/pulsar-admin topics subscriptions persistent://my-tenant/my-namespace/my-topic
bin/pulsar-admin topics subscription-stats persistent://my-tenant/my-namespace/my-topic --subscription my-subscription

# BookKeeper統計
bin/pulsar-admin bookies list
bin/pulsar-admin bookies list-bookies

# パフォーマンステスト
# プロデューサーテスト
bin/pulsar-perf produce \
    persistent://my-tenant/my-namespace/perf-topic \
    --rate 10000 \
    --num-producers 1 \
    --size 1024 \
    --num-messages 1000000

# コンシューマーテスト
bin/pulsar-perf consume \
    persistent://my-tenant/my-namespace/perf-topic \
    --subscription-name perf-subscription \
    --num-consumers 1

# 監視用メトリクス取得スクリプト
cat > monitor-pulsar.sh << 'EOF'
#!/bin/bash

echo "=== Pulsar Cluster Monitoring $(date) ==="

# ブローカー状態
echo "=== Broker Status ==="
bin/pulsar-admin brokers healthcheck 2>/dev/null && echo "Broker: OK" || echo "Broker: ERROR"

# クラスタ情報
echo "=== Cluster Info ==="
bin/pulsar-admin clusters list

# テナント・ネームスペース数
echo "=== Tenants and Namespaces ==="
echo "Total tenants: $(bin/pulsar-admin tenants list | wc -l)"
echo "Total namespaces: $(bin/pulsar-admin namespaces list public | wc -l)"

# トピック数(publicテナントのみ)
echo "=== Topics ==="
echo "Total topics: $(bin/pulsar-admin topics list public/default | wc -l)"

# BookKeeper状態
echo "=== BookKeeper Status ==="
bin/pulsar-admin bookies list | wc -l | xargs echo "Available bookies:"

# JVM監視(プロセスが動いている場合)
echo "=== JVM Status ==="
pgrep -f "pulsar" > /dev/null && echo "Pulsar processes: $(pgrep -f pulsar | wc -l)" || echo "No Pulsar processes"

echo "=== Monitoring Complete ==="
EOF

chmod +x monitor-pulsar.sh