RabbitMQ
高い信頼性を持つメッセージブローカー。AMQP、MQTT、STOMPプロトコル対応。複雑なルーティング、メッセージ確認、クラスタリング機能を提供。
RabbitMQ
RabbitMQは、Advanced Message Queuing Protocol (AMQP) 0-9-1をベースとした、オープンソースのメッセージブローカーです。高い信頼性、柔軟なルーティング、エンタープライズ機能を提供し、分散システムとマイクロサービス間の通信を支援します。
主な特徴
高い信頼性
- メッセージの永続化とトランザクション
- Publisher Confirms による配信保証
- Consumer Acknowledgments による処理確認
- メッセージ重複配信防止
柔軟なルーティング
- 複数のExchange(交換器)タイプ
- Direct、Topic、Headers、Fanout ルーティング
- 複雑なメッセージルーティングパターン
- カスタムルーティング機能
高可用性・クラスタリング
- Multi-node クラスタ構成
- Queue Mirroring(キューミラーリング)
- Quorum Queues による分散合意
- 自動フェイルオーバー
エンタープライズ機能
- 管理UI/API
- 詳細なモニタリング・メトリクス
- プラグインアーキテクチャ
- LDAP・外部認証連携
インストール
Ubuntu/Debian
# Erlang のインストール
sudo apt update
sudo apt install curl gnupg apt-transport-https
# RabbitMQ 公式リポジトリ追加
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
curl -1sLf "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xf77f1eda57ebb1cc" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg > /dev/null
echo "deb [signed-by=/usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg] http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
echo "deb [signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ $(lsb_release -cs) main" | sudo tee -a /etc/apt/sources.list.d/rabbitmq.list
sudo apt update
sudo apt install erlang-base erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
erlang-runtime-tools erlang-snmp erlang-ssl erlang-syntax-tools \
erlang-tftp erlang-tools erlang-xmerl
# RabbitMQ インストール
sudo apt install rabbitmq-server
CentOS/RHEL
# Erlang リポジトリ追加
sudo dnf install epel-release
sudo dnf update
# RabbitMQ リポジトリ追加
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
# インストール
sudo dnf install erlang rabbitmq-server
Docker
# RabbitMQ with Management Plugin
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3-management
# データ永続化
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq_data:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3-management
Docker Compose
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
RABBITMQ_DEFAULT_VHOST: /
ports:
- "5672:5672" # AMQP port
- "15672:15672" # Management UI
volumes:
- rabbitmq_data:/var/lib/rabbitmq
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
networks:
- rabbitmq_net
volumes:
rabbitmq_data:
networks:
rabbitmq_net:
基本設定
rabbitmq.conf
# ネットワーク設定
listeners.tcp.default = 5672
management.tcp.port = 15672
# 認証設定
default_user = admin
default_pass = password
default_vhost = /
# メモリ設定
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.5
# ディスク設定
disk_free_limit.relative = 2.0
# ログ設定
log.console = true
log.console.level = info
log.file = /var/log/rabbitmq/rabbit.log
log.file.level = info
# クラスタ設定
cluster_name = my-rabbit-cluster
cluster_partition_handling = autoheal
# SSL/TLS設定(オプション)
# listeners.ssl.default = 5671
# ssl_options.cacertfile = /path/to/ca_certificate.pem
# ssl_options.certfile = /path/to/server_certificate.pem
# ssl_options.keyfile = /path/to/server_key.pem
# ssl_options.verify = verify_peer
# ssl_options.fail_if_no_peer_cert = false
enabled_plugins
[rabbitmq_management,
rabbitmq_management_agent,
rabbitmq_prometheus,
rabbitmq_shovel,
rabbitmq_shovel_management,
rabbitmq_federation,
rabbitmq_federation_management].
基本概念と操作
キュー管理
# サービス開始・停止
sudo systemctl start rabbitmq-server
sudo systemctl stop rabbitmq-server
sudo systemctl status rabbitmq-server
# プラグイン管理
sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmq-plugins list
sudo rabbitmq-plugins disable rabbitmq_federation
# ユーザー管理
sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl set_user_tags myuser administrator
sudo rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
sudo rabbitmqctl list_users
# Virtual Host 管理
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl list_vhosts
sudo rabbitmqctl delete_vhost myvhost
# キュー管理
sudo rabbitmqctl list_queues
sudo rabbitmqctl list_queues name messages consumers
sudo rabbitmqctl purge_queue myqueue
rabbitmqadmin コマンド
# rabbitmqadmin のダウンロードとインストール
curl -O http://localhost:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
sudo mv rabbitmqadmin /usr/local/bin/
# キュー作成
rabbitmqadmin declare queue name=test-queue durable=true
# メッセージ送信
rabbitmqadmin publish exchange=amq.default routing_key=test-queue payload="Hello, RabbitMQ!"
# メッセージ受信
rabbitmqadmin get queue=test-queue requeue=false
# Exchange作成
rabbitmqadmin declare exchange name=test-exchange type=direct
# バインディング作成
rabbitmqadmin declare binding source=test-exchange destination=test-queue routing_key=test
プログラミング例
Python (pika)
import pika
import json
# 接続設定
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials('admin', 'password')
)
)
channel = connection.channel()
# Producer例
def send_message():
# キュー宣言
channel.queue_declare(queue='hello', durable=True)
# メッセージ送信
message = {"user_id": 123, "action": "login", "timestamp": "2024-01-15T10:00:00Z"}
channel.basic_publish(
exchange='',
routing_key='hello',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # メッセージ永続化
content_type='application/json'
)
)
print("Message sent")
# Consumer例
def callback(ch, method, properties, body):
try:
message = json.loads(body)
print(f"Received: {message}")
# メッセージ処理
process_message(message)
# 処理完了確認
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error processing message: {e}")
# 処理失敗の場合はreject
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def consume_messages():
# キュー宣言
channel.queue_declare(queue='hello', durable=True)
# QoS設定(一度に処理するメッセージ数制限)
channel.basic_qos(prefetch_count=1)
# コンシューマー設定
channel.basic_consume(queue='hello', on_message_callback=callback)
print("Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
def process_message(message):
# メッセージ処理ロジック
print(f"Processing user {message['user_id']} action: {message['action']}")
if __name__ == "__main__":
try:
# send_message() # Producer
consume_messages() # Consumer
finally:
connection.close()
Java (Spring AMQP)
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
@SpringBootApplication
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class, args);
}
@Bean
public Queue testQueue() {
return QueueBuilder.durable("test-queue").build();
}
@Bean
public DirectExchange testExchange() {
return new DirectExchange("test-exchange");
}
@Bean
public Binding testBinding(Queue testQueue, DirectExchange testExchange) {
return BindingBuilder.bind(testQueue).to(testExchange).with("test-key");
}
}
@Service
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("test-exchange", "test-key", message);
System.out.println("Message sent: " + message);
}
public void sendMessageWithProperties(Object message) {
rabbitTemplate.convertAndSend(
"test-exchange",
"test-key",
message,
messageProperties -> {
messageProperties.setContentType("application/json");
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setPriority(1);
return messageProperties;
}
);
}
}
@Service
public class MessageConsumer {
@RabbitListener(queues = "test-queue")
public void receiveMessage(String message) {
try {
System.out.println("Received: " + message);
// メッセージ処理ロジック
processMessage(message);
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
throw e; // エラー時はrequeue
}
}
@RabbitListener(queues = "test-queue")
public void receiveMessageWithDetails(
@Payload String message,
@Header Map<String, Object> headers,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
System.out.println("Message: " + message);
System.out.println("Headers: " + headers);
processMessage(message);
// 手動ACK
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
// 手動NACK
channel.basicNack(deliveryTag, false, true);
} catch (Exception nackEx) {
System.err.println("Error sending NACK: " + nackEx.getMessage());
}
}
}
private void processMessage(String message) {
// メッセージ処理ロジック
System.out.println("Processing: " + message);
}
}
Node.js (amqplib)
const amqp = require('amqplib');
class RabbitMQService {
constructor() {
this.connection = null;
this.channel = null;
}
async connect() {
try {
this.connection = await amqp.connect('amqp://admin:password@localhost:5672/');
this.channel = await this.connection.createChannel();
// 接続エラーハンドリング
this.connection.on('error', (err) => {
console.error('Connection error:', err);
});
console.log('Connected to RabbitMQ');
} catch (error) {
console.error('Failed to connect to RabbitMQ:', error);
throw error;
}
}
async setupQueue(queueName, options = {}) {
const defaultOptions = {
durable: true,
exclusive: false,
autoDelete: false
};
return await this.channel.assertQueue(queueName, { ...defaultOptions, ...options });
}
async setupExchange(exchangeName, type = 'direct', options = {}) {
const defaultOptions = {
durable: true
};
return await this.channel.assertExchange(exchangeName, type, { ...defaultOptions, ...options });
}
async bindQueue(queueName, exchangeName, routingKey) {
return await this.channel.bindQueue(queueName, exchangeName, routingKey);
}
async publish(exchangeName, routingKey, message, options = {}) {
const defaultOptions = {
persistent: true,
contentType: 'application/json'
};
const messageBuffer = Buffer.from(JSON.stringify(message));
return this.channel.publish(exchangeName, routingKey, messageBuffer, { ...defaultOptions, ...options });
}
async consume(queueName, callback, options = {}) {
const defaultOptions = {
noAck: false
};
return await this.channel.consume(queueName, async (msg) => {
if (msg) {
try {
const content = JSON.parse(msg.content.toString());
await callback(content, msg);
if (!defaultOptions.noAck) {
this.channel.ack(msg);
}
} catch (error) {
console.error('Error processing message:', error);
this.channel.nack(msg, false, true); // requeue
}
}
}, { ...defaultOptions, ...options });
}
async close() {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
}
}
// 使用例
async function main() {
const rabbitmq = new RabbitMQService();
try {
await rabbitmq.connect();
// Setup
await rabbitmq.setupExchange('user-events', 'topic');
await rabbitmq.setupQueue('user-registration');
await rabbitmq.bindQueue('user-registration', 'user-events', 'user.registered');
// Producer
await rabbitmq.publish('user-events', 'user.registered', {
userId: 123,
email: '[email protected]',
timestamp: new Date().toISOString()
});
// Consumer
await rabbitmq.consume('user-registration', async (message, msg) => {
console.log('Received:', message);
// メッセージ処理
console.log(`Processing user registration for ${message.email}`);
});
} catch (error) {
console.error('Error:', error);
}
}
main();
Exchange タイプと ルーティング
Direct Exchange
# Producer
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(
exchange='direct_logs',
routing_key='error',
body='Error message'
)
# Consumer
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Bind multiple routing keys
severities = ['error', 'warning']
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
Topic Exchange
# Producer
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_keys = ['user.created', 'user.updated', 'order.shipped', 'payment.failed']
for key in routing_keys:
channel.basic_publish(
exchange='topic_logs',
routing_key=key,
body=f'Message for {key}'
)
# Consumer - Subscribe to user events
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='user.*')
Fanout Exchange
# Producer - Broadcast to all queues
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(
exchange='logs',
routing_key='', # Ignored in fanout
body='Broadcast message'
)
# Consumer
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
クラスタ構成
3ノードクラスター設定
# ノード1 (rabbit1)
sudo hostnamectl set-hostname rabbit1
echo "127.0.0.1 rabbit1" | sudo tee -a /etc/hosts
echo "192.168.1.10 rabbit2" | sudo tee -a /etc/hosts
echo "192.168.1.11 rabbit3" | sudo tee -a /etc/hosts
# Erlang Cookie設定(全ノードで同じ値)
sudo systemctl stop rabbitmq-server
echo "ALWEFIHANFKLAPWEFFAWE" | sudo tee /var/lib/rabbitmq/.erlang.cookie
sudo chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie
# RabbitMQ起動
sudo systemctl start rabbitmq-server
# ノード2, 3の設定後、クラスタに参加
# ノード2で実行
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@rabbit1
sudo rabbitmqctl start_app
# ノード3で実行
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@rabbit1
sudo rabbitmqctl start_app
# クラスタ状態確認
sudo rabbitmqctl cluster_status
High Availability (HA) ポリシー設定
# 全てのキューをクラスタ全体にミラーリング
sudo rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
# 特定の数のノードにミラーリング
sudo rabbitmqctl set_policy ha-two "^ha\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
# Quorum Queue ポリシー設定
sudo rabbitmqctl set_policy qq-policy "^qq\." '{"queue-type":"quorum"}'
監視とメンテナンス
基本監視コマンド
# クラスタ状態
sudo rabbitmqctl cluster_status
# ノード状態
sudo rabbitmqctl node_health_check
# キュー情報
sudo rabbitmqctl list_queues name messages consumers memory
# Exchange情報
sudo rabbitmqctl list_exchanges name type
# 接続情報
sudo rabbitmqctl list_connections
# チャンネル情報
sudo rabbitmqctl list_channels
# メモリ使用量
sudo rabbitmqctl status
Management API使用例
# API経由でキュー一覧取得
curl -u admin:password http://localhost:15672/api/queues
# 特定キューの詳細
curl -u admin:password http://localhost:15672/api/queues/%2F/test-queue
# メッセージ送信
curl -u admin:password -H "Content-Type: application/json" \
-X POST http://localhost:15672/api/exchanges/%2F/amq.default/publish \
-d '{"properties":{},"routing_key":"test-queue","payload":"hello","payload_encoding":"string"}'
# メッセージ受信
curl -u admin:password -H "Content-Type: application/json" \
-X POST http://localhost:15672/api/queues/%2F/test-queue/get \
-d '{"count":1,"requeue":true,"encoding":"auto"}'
Prometheus メトリクス
# Prometheus プラグイン有効化
sudo rabbitmq-plugins enable rabbitmq_prometheus
# メトリクス確認
curl http://localhost:15692/metrics
パフォーマンス最適化
メッセージ永続化設定
# 永続化キュー作成
channel.queue_declare(queue='persistent_queue', durable=True)
# 永続化メッセージ送信
channel.basic_publish(
exchange='',
routing_key='persistent_queue',
body='persistent message',
properties=pika.BasicProperties(delivery_mode=2) # 永続化
)
バッチ処理・トランザクション
# バッチPublish
channel.confirm_delivery() # Publisher Confirms有効化
for i in range(1000):
channel.basic_publish(
exchange='',
routing_key='batch_queue',
body=f'message {i}'
)
# Publisher Confirms確認
if channel.get_waiting_message_count() == 0:
print("All messages confirmed")
コネクションプール設定
import pika.pool
# コネクションプール設定
pool_params = pika.URLParameters('amqp://admin:password@localhost:5672/')
pool = pika.pool.QueuedPool(
create=lambda: pika.BlockingConnection(pool_params),
max_size=10,
max_overflow=0,
timeout=10,
recycle=-1,
stale=45
)
# プールからコネクション取得
with pool.acquire() as connection:
channel = connection.channel()
channel.basic_publish(exchange='', routing_key='test', body='message')
RabbitMQは、その高い信頼性、柔軟なルーティング機能、豊富なエンタープライズ機能により、ミッションクリティカルなアプリケーション、マイクロサービス間通信、非同期処理システムで広く採用されています。AMQP準拠により、言語やプラットフォーム間での相互運用性も確保されています。