Apache Pulsar
クラウドネイティブ分散メッセージングシステム。マルチテナント、地理分散レプリケーション、スキーマ管理を統合。Yahoo開発のKafka代替。
サーバー
Apache Pulsar
概要
Apache Pulsarは、Yahoo(現Verizon Media)によって開発されたクラウドネイティブな分散メッセージング・ストリーミングプラットフォームです。従来のメッセージングシステムの制約を解決するために設計され、ネイティブなマルチテナンシー、リアルタイムジオレプリケーション、階層分離アーキテクチャを特徴とします。サービング層とストレージ層の分離により、動的スケーリングと優れた運用性を実現し、数百万メッセージ/秒の高スループット処理とミリ秒レベルの低レイテンシを両立。IoTデータ処理、リアルタイム分析、イベント駆動アーキテクチャに最適化されています。
詳細
Apache Pulsar 2025年版は、クラウドファーストの企業環境において次世代メッセージングプラットフォームとしての地位を確立しています。独自の階層分離アーキテクチャにより、Apache BookKeeperベースの分散ストレージ層とPulsarブローカーのサービング層を完全に分離し、従来のKafkaのような結合型アーキテクチャでは困難だった柔軟なスケーリングを実現。マルチテナンシーはネームスペース、リソース分離、クォータ管理により実装され、単一クラスタで複数の組織・アプリケーションを安全に運用可能。ジオレプリケーションは設定ベースで簡単に有効化でき、グローバル分散環境でのデータ一貫性と災害復旧を保証します。Pulsar Functions、Pulsar IO、トランザクション、Schema Registryなどの包括的なエコシステムも充実しています。
主な特徴
- 階層分離アーキテクチャ: サービング層とストレージ層の分離による柔軟なスケーリング
- ネイティブマルチテナンシー: テナント、ネームスペース、リソース分離による安全な共有利用
- リアルタイムジオレプリケーション: グローバル分散環境での自動データ複製
- 高パフォーマンス: 数百万メッセージ/秒とミリ秒レベルの低レイテンシ
- 無限保存: Apache BookKeeperによる永続化とティアードストレージ
- 豊富なエコシステム: Functions、IO、SQL、Schema Registryの統合
メリット・デメリット
メリット
- Kafkaでは困難なマルチテナンシーとジオレプリケーションをネイティブサポート
- 階層分離によりストレージとコンピュートを独立してスケール可能
- クラウドネイティブ設計によりKubernetes環境での運用性が優秀
- ティアードストレージにより低コストでの長期データ保存
- トランザクション、Schema Registry、関数処理を統合プラットフォームで提供
- IoT、リアルタイム分析などの大容量ストリーミング処理に最適
デメリット
- Kafkaに比べて相対的にエコシステムが小さく導入事例が少ない
- 複雑なアーキテクチャによる学習コストの高さ
- 単純なレイテンシベンチマークではKafkaが上回る場合がある
- 運用・監視ツールの成熟度がKafkaより劣る
- BookKeeperストレージ層の管理に専門知識が必要
- 小規模環境では複雑性がオーバーヘッドとなる可能性
参考ページ
書き方の例
インストールと基本セットアップ
# Java環境設定(Java 11以上が必要)
sudo apt update
sudo apt install openjdk-17-jdk
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
# Pulsar バイナリダウンロード・インストール
wget https://archive.apache.org/dist/pulsar/pulsar-3.1.1/apache-pulsar-3.1.1-bin.tar.gz
tar -xzf apache-pulsar-3.1.1-bin.tar.gz
cd apache-pulsar-3.1.1
# スタンドアロンモードでの起動
bin/pulsar standalone
# 別のターミナルで動作確認
bin/pulsar-admin clusters list
bin/pulsar-admin tenants list
bin/pulsar-admin namespaces list public
# Docker Composeでのクラスタ起動
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
zookeeper:
image: apachepulsar/pulsar:3.1.1
hostname: zookeeper
container_name: zookeeper
command: >
bash -c "bin/apply-config-from-env.py conf/zookeeper.conf &&
exec bin/pulsar zookeeper"
environment:
PULSAR_MEM: "-Xms512m -Xmx512m"
PULSAR_GC: "-XX:+UseG1GC"
volumes:
- ./data/zookeeper:/pulsar/data/zookeeper
bookkeeper:
image: apachepulsar/pulsar:3.1.1
hostname: bookkeeper
container_name: bookkeeper
command: >
bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf &&
exec bin/pulsar bookie"
environment:
PULSAR_MEM: "-Xms1g -Xmx1g"
PULSAR_GC: "-XX:+UseG1GC"
BOOKIE_MEM: "-Xms1g -Xmx1g"
depends_on:
- zookeeper
volumes:
- ./data/bookkeeper:/pulsar/data/bookkeeper
broker:
image: apachepulsar/pulsar:3.1.1
hostname: broker
container_name: broker
command: >
bash -c "bin/apply-config-from-env.py conf/broker.conf &&
exec bin/pulsar broker"
environment:
PULSAR_MEM: "-Xms1g -Xmx1g"
PULSAR_GC: "-XX:+UseG1GC"
ports:
- "6650:6650" # Pulsar プロトコル
- "8080:8080" # HTTP Admin API
depends_on:
- zookeeper
- bookkeeper
volumes:
zookeeper-data:
bookkeeper-data:
EOF
docker-compose up -d
# クラスタ状態確認
sleep 30
curl http://localhost:8080/admin/v2/clusters
基本的なトピック管理とプロデューサー・コンシューマー
# テナント作成
bin/pulsar-admin tenants create my-tenant \
--admin-roles my-admin-role \
--allowed-clusters standalone
# ネームスペース作成
bin/pulsar-admin namespaces create my-tenant/my-namespace
# トピック作成(パーティション付き)
bin/pulsar-admin topics create-partitioned-topic \
persistent://my-tenant/my-namespace/my-topic \
--partitions 4
# トピック一覧確認
bin/pulsar-admin topics list my-tenant/my-namespace
# トピック詳細情報
bin/pulsar-admin topics stats \
persistent://my-tenant/my-namespace/my-topic
# コンソールプロデューサー
bin/pulsar-client produce \
persistent://my-tenant/my-namespace/my-topic \
--messages "Hello Pulsar Message 1,Hello Pulsar Message 2,Hello Pulsar Message 3"
# コンソールコンシューマー
bin/pulsar-client consume \
persistent://my-tenant/my-namespace/my-topic \
--subscription-name my-subscription \
--num-messages 0
# サブスクリプション管理
bin/pulsar-admin topics subscriptions \
persistent://my-tenant/my-namespace/my-topic
# サブスクリプション詳細
bin/pulsar-admin topics stats-internal \
persistent://my-tenant/my-namespace/my-topic
Java プロデューサー・コンシューマー実装
// Maven dependencies
// <dependency>
// <groupId>org.apache.pulsar</groupId>
// <artifactId>pulsar-client</artifactId>
// <version>3.1.1</version>
// </dependency>
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import java.util.concurrent.TimeUnit;
// Java プロデューサー例
public class PulsarProducerExample {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "persistent://my-tenant/my-namespace/orders";
public static void main(String[] args) throws Exception {
// Pulsarクライアント作成
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.connectionTimeout(10, TimeUnit.SECONDS)
.operationTimeout(15, TimeUnit.SECONDS)
.build();
// プロデューサー作成
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(TOPIC_NAME)
.producerName("order-producer")
.batchingMaxMessages(100)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.compressionType(CompressionType.SNAPPY)
.create();
try {
// メッセージ送信
for (int i = 0; i < 1000; i++) {
String orderData = String.format(
"{\"orderId\":%d,\"customerId\":%d,\"amount\":%.2f,\"timestamp\":%d}",
i, i % 100, Math.random() * 1000, System.currentTimeMillis()
);
// 非同期送信
producer.newMessage()
.key("order-" + i)
.value(orderData)
.property("source", "java-app")
.sendAsync()
.thenAccept(messageId -> {
System.out.printf("Message sent successfully: %s%n", messageId);
})
.exceptionally(throwable -> {
System.err.println("Failed to send message: " + throwable.getMessage());
return null;
});
Thread.sleep(10);
}
// 送信完了を待つ
producer.flush();
} finally {
producer.close();
client.close();
}
}
}
// Java コンシューマー例
public class PulsarConsumerExample {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "persistent://my-tenant/my-namespace/orders";
private static final String SUBSCRIPTION_NAME = "order-processor";
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
// コンシューマー作成
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(TOPIC_NAME)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionType(SubscriptionType.Shared) // Shared, Exclusive, Failover, Key_Shared
.receiverQueueSize(1000)
.acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
.subscribe();
try {
while (true) {
try {
// メッセージ受信(タイムアウト付き)
Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
if (message != null) {
// メッセージ処理
String orderData = message.getValue();
String orderKey = message.getKey();
System.out.printf("Received order: key=%s, data=%s%n", orderKey, orderData);
// ビジネスロジック処理
processOrder(orderData);
// メッセージ確認応答
consumer.acknowledge(message);
} else {
System.out.println("No message received within timeout");
}
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
// エラー処理(negative acknowledge、dead letter queue等)
}
}
} finally {
consumer.close();
client.close();
}
}
private static void processOrder(String orderData) {
// 注文処理ロジック
System.out.println("Processing order: " + orderData);
// 処理時間のシミュレーション
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// メッセージリスナー型コンシューマー例
public class PulsarMessageListenerExample {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://my-tenant/my-namespace/orders")
.subscriptionName("order-listener")
.subscriptionType(SubscriptionType.Shared)
.messageListener((consumer1, msg) -> {
try {
String orderData = msg.getValue();
String messageKey = msg.getKey();
System.out.printf("Listener received: key=%s, data=%s%n",
messageKey, orderData);
// 非同期確認応答
consumer1.acknowledgeAsync(msg)
.thenRun(() -> System.out.println("Message acknowledged"))
.exceptionally(throwable -> {
System.err.println("Ack failed: " + throwable.getMessage());
return null;
});
} catch (Exception e) {
System.err.println("Message processing failed: " + e.getMessage());
consumer1.negativeAcknowledge(msg);
}
})
.subscribe();
// メッセージリスナーは別スレッドで動作するため、メインスレッドを維持
System.out.println("Message listener started. Press Enter to exit...");
System.in.read();
consumer.close();
client.close();
}
}
マルチテナンシーとジオレプリケーション設定
# マルチテナント環境設定
# テナント1(開発環境)
bin/pulsar-admin tenants create dev-tenant \
--admin-roles dev-admin \
--allowed-clusters standalone
bin/pulsar-admin namespaces create dev-tenant/app1
bin/pulsar-admin namespaces create dev-tenant/app2
# テナント2(本番環境)
bin/pulsar-admin tenants create prod-tenant \
--admin-roles prod-admin \
--allowed-clusters standalone
bin/pulsar-admin namespaces create prod-tenant/frontend
bin/pulsar-admin namespaces create prod-tenant/backend
# リソースクォータ設定
bin/pulsar-admin resource-quotas set \
--bundle-specs 0x00000000_0xffffffff \
--msg-rate-in 1000 \
--msg-rate-out 2000 \
--bandwidth-in 1048576 \
--bandwidth-out 2097152 \
--memory 1073741824 \
--dynamic true \
dev-tenant/app1
# ジオレプリケーション設定
# クラスタ1設定(東京)
bin/pulsar-admin clusters create tokyo-cluster \
--url http://tokyo-pulsar:8080 \
--broker-url pulsar://tokyo-pulsar:6650
# クラスタ2設定(大阪)
bin/pulsar-admin clusters create osaka-cluster \
--url http://osaka-pulsar:8080 \
--broker-url pulsar://osaka-pulsar:6650
# グローバルテナント作成
bin/pulsar-admin tenants create global-tenant \
--admin-roles global-admin \
--allowed-clusters tokyo-cluster,osaka-cluster
# グローバルネームスペース作成(自動レプリケーション)
bin/pulsar-admin namespaces create global-tenant/global-namespace \
--clusters tokyo-cluster,osaka-cluster
# 特定トピックのレプリケーション有効化
bin/pulsar-admin topics set-replication-clusters \
persistent://global-tenant/global-namespace/replicated-topic \
--clusters tokyo-cluster,osaka-cluster
# レプリケーション状況確認
bin/pulsar-admin topics stats \
persistent://global-tenant/global-namespace/replicated-topic
高度な設定と機能
// Schema Registry使用例
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.SchemaType;
public class SchemaExample {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// JSON Schema定義
String jsonSchema = "{\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
" \"orderId\": {\"type\": \"integer\"},\n" +
" \"customerId\": {\"type\": \"integer\"},\n" +
" \"amount\": {\"type\": \"number\"},\n" +
" \"timestamp\": {\"type\": \"integer\"}\n" +
" },\n" +
" \"required\": [\"orderId\", \"customerId\", \"amount\"]\n" +
"}";
Schema<GenericRecord> schema = Schema.JSON(jsonSchema);
// スキーマ付きプロデューサー
Producer<GenericRecord> producer = client.newProducer(schema)
.topic("persistent://my-tenant/my-namespace/schema-topic")
.create();
// レコード作成・送信
GenericRecord record = schema.newRecordBuilder()
.set("orderId", 12345)
.set("customerId", 67890)
.set("amount", 199.99)
.set("timestamp", System.currentTimeMillis())
.build();
producer.send(record);
producer.close();
client.close();
}
}
// トランザクション例
public class TransactionExample {
public static void main(String[] args) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.enableTransaction(true)
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("persistent://my-tenant/my-namespace/transaction-topic")
.sendTimeout(0, TimeUnit.SECONDS) // トランザクション使用時は必須
.create();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://my-tenant/my-namespace/transaction-topic")
.subscriptionName("transaction-subscription")
.subscribe();
try {
// トランザクション開始
Transaction transaction = client.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();
// トランザクション内でメッセージ送信
producer.newMessage(transaction)
.value("Transaction Message 1")
.send();
producer.newMessage(transaction)
.value("Transaction Message 2")
.send();
// トランザクションコミット
transaction.commit().get();
System.out.println("Transaction committed successfully");
} catch (Exception e) {
System.err.println("Transaction failed: " + e.getMessage());
} finally {
producer.close();
consumer.close();
client.close();
}
}
}
Python クライアント実装
# pip install pulsar-client
import pulsar
import json
import time
from datetime import datetime
# Python プロデューサー例
class PulsarProducerExample:
def __init__(self, service_url='pulsar://localhost:6650'):
self.client = pulsar.Client(service_url)
self.producer = self.client.create_producer(
'persistent://my-tenant/my-namespace/python-topic',
producer_name='python-producer',
batching_enabled=True,
batching_max_messages=100,
batching_max_publish_delay_ms=10,
compression_type=pulsar.CompressionType.SNAPPY
)
def send_order(self, order_id, customer_id, amount):
order = {
'order_id': order_id,
'customer_id': customer_id,
'amount': amount,
'timestamp': datetime.now().isoformat(),
'status': 'pending'
}
try:
# メッセージ送信
msg_id = self.producer.send(
content=json.dumps(order).encode('utf-8'),
properties={
'source': 'python-app',
'version': '1.0'
},
partition_key=f'customer-{customer_id}'
)
print(f"Order sent successfully: {order_id}, message_id: {msg_id}")
except Exception as e:
print(f"Failed to send order {order_id}: {e}")
def close(self):
self.producer.close()
self.client.close()
# Python コンシューマー例
class PulsarConsumerExample:
def __init__(self, service_url='pulsar://localhost:6650'):
self.client = pulsar.Client(service_url)
self.consumer = self.client.subscribe(
'persistent://my-tenant/my-namespace/python-topic',
subscription_name='python-subscription',
consumer_type=pulsar.ConsumerType.Shared,
receiver_queue_size=1000,
consumer_name='python-consumer'
)
def process_messages(self):
try:
while True:
try:
# メッセージ受信(タイムアウト付き)
msg = self.consumer.receive(timeout_millis=5000)
# メッセージ処理
order_data = json.loads(msg.data().decode('utf-8'))
print(f"Received order: {order_data}")
# ビジネスロジック処理
self.process_order(order_data)
# 確認応答
self.consumer.acknowledge(msg)
except pulsar.Timeout:
print("No message received within timeout")
continue
except Exception as e:
print(f"Error processing message: {e}")
self.consumer.negative_acknowledge(msg)
except KeyboardInterrupt:
print("Stopping consumer...")
finally:
self.consumer.close()
self.client.close()
def process_order(self, order_data):
# 注文処理ロジック
order_id = order_data.get('order_id')
amount = order_data.get('amount')
print(f"Processing order {order_id} for amount ${amount}")
# 処理時間のシミュレーション
time.sleep(0.05)
# 使用例
if __name__ == "__main__":
# プロデューサー例
producer = PulsarProducerExample()
for i in range(100):
producer.send_order(
order_id=i + 1,
customer_id=(i % 10) + 1,
amount=round(50 + (i * 10.5), 2)
)
producer.close()
# コンシューマー例
# consumer = PulsarConsumerExample()
# consumer.process_messages()
監視とパフォーマンス管理
# Pulsarクラスタ監視
# ブローカー状態確認
bin/pulsar-admin brokers list standalone
bin/pulsar-admin brokers healthcheck
# トピック統計情報
bin/pulsar-admin topics stats persistent://my-tenant/my-namespace/my-topic
bin/pulsar-admin topics partitioned-stats persistent://my-tenant/my-namespace/my-topic
# サブスクリプション統計
bin/pulsar-admin topics subscriptions persistent://my-tenant/my-namespace/my-topic
bin/pulsar-admin topics subscription-stats persistent://my-tenant/my-namespace/my-topic --subscription my-subscription
# BookKeeper統計
bin/pulsar-admin bookies list
bin/pulsar-admin bookies list-bookies
# パフォーマンステスト
# プロデューサーテスト
bin/pulsar-perf produce \
persistent://my-tenant/my-namespace/perf-topic \
--rate 10000 \
--num-producers 1 \
--size 1024 \
--num-messages 1000000
# コンシューマーテスト
bin/pulsar-perf consume \
persistent://my-tenant/my-namespace/perf-topic \
--subscription-name perf-subscription \
--num-consumers 1
# 監視用メトリクス取得スクリプト
cat > monitor-pulsar.sh << 'EOF'
#!/bin/bash
echo "=== Pulsar Cluster Monitoring $(date) ==="
# ブローカー状態
echo "=== Broker Status ==="
bin/pulsar-admin brokers healthcheck 2>/dev/null && echo "Broker: OK" || echo "Broker: ERROR"
# クラスタ情報
echo "=== Cluster Info ==="
bin/pulsar-admin clusters list
# テナント・ネームスペース数
echo "=== Tenants and Namespaces ==="
echo "Total tenants: $(bin/pulsar-admin tenants list | wc -l)"
echo "Total namespaces: $(bin/pulsar-admin namespaces list public | wc -l)"
# トピック数(publicテナントのみ)
echo "=== Topics ==="
echo "Total topics: $(bin/pulsar-admin topics list public/default | wc -l)"
# BookKeeper状態
echo "=== BookKeeper Status ==="
bin/pulsar-admin bookies list | wc -l | xargs echo "Available bookies:"
# JVM監視(プロセスが動いている場合)
echo "=== JVM Status ==="
pgrep -f "pulsar" > /dev/null && echo "Pulsar processes: $(pgrep -f pulsar | wc -l)" || echo "No Pulsar processes"
echo "=== Monitoring Complete ==="
EOF
chmod +x monitor-pulsar.sh