RabbitMQ

高い信頼性を持つメッセージブローカー。AMQP、MQTT、STOMPプロトコル対応。複雑なルーティング、メッセージ確認、クラスタリング機能を提供。

メッセージングキューAMQP高可用性信頼性分散マイクロサービスエンタープライズ

RabbitMQ

RabbitMQは、Advanced Message Queuing Protocol (AMQP) 0-9-1をベースとした、オープンソースのメッセージブローカーです。高い信頼性、柔軟なルーティング、エンタープライズ機能を提供し、分散システムとマイクロサービス間の通信を支援します。

主な特徴

高い信頼性

  • メッセージの永続化とトランザクション
  • Publisher Confirms による配信保証
  • Consumer Acknowledgments による処理確認
  • メッセージ重複配信防止

柔軟なルーティング

  • 複数のExchange(交換器)タイプ
  • Direct、Topic、Headers、Fanout ルーティング
  • 複雑なメッセージルーティングパターン
  • カスタムルーティング機能

高可用性・クラスタリング

  • Multi-node クラスタ構成
  • Queue Mirroring(キューミラーリング)
  • Quorum Queues による分散合意
  • 自動フェイルオーバー

エンタープライズ機能

  • 管理UI/API
  • 詳細なモニタリング・メトリクス
  • プラグインアーキテクチャ
  • LDAP・外部認証連携

インストール

Ubuntu/Debian

# Erlang のインストール
sudo apt update
sudo apt install curl gnupg apt-transport-https

# RabbitMQ 公式リポジトリ追加
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
curl -1sLf "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xf77f1eda57ebb1cc" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg > /dev/null

echo "deb [signed-by=/usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg] http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
echo "deb [signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ $(lsb_release -cs) main" | sudo tee -a /etc/apt/sources.list.d/rabbitmq.list

sudo apt update
sudo apt install erlang-base erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                 erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                 erlang-runtime-tools erlang-snmp erlang-ssl erlang-syntax-tools \
                 erlang-tftp erlang-tools erlang-xmerl

# RabbitMQ インストール
sudo apt install rabbitmq-server

CentOS/RHEL

# Erlang リポジトリ追加
sudo dnf install epel-release
sudo dnf update

# RabbitMQ リポジトリ追加
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash

# インストール
sudo dnf install erlang rabbitmq-server

Docker

# RabbitMQ with Management Plugin
docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=password \
  rabbitmq:3-management

# データ永続化
docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -v rabbitmq_data:/var/lib/rabbitmq \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=password \
  rabbitmq:3-management

Docker Compose

version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
      RABBITMQ_DEFAULT_VHOST: /
    ports:
      - "5672:5672"      # AMQP port
      - "15672:15672"    # Management UI
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
    networks:
      - rabbitmq_net

volumes:
  rabbitmq_data:

networks:
  rabbitmq_net:

基本設定

rabbitmq.conf

# ネットワーク設定
listeners.tcp.default = 5672
management.tcp.port = 15672

# 認証設定
default_user = admin
default_pass = password
default_vhost = /

# メモリ設定
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.5

# ディスク設定
disk_free_limit.relative = 2.0

# ログ設定
log.console = true
log.console.level = info
log.file = /var/log/rabbitmq/rabbit.log
log.file.level = info

# クラスタ設定
cluster_name = my-rabbit-cluster
cluster_partition_handling = autoheal

# SSL/TLS設定(オプション)
# listeners.ssl.default = 5671
# ssl_options.cacertfile = /path/to/ca_certificate.pem
# ssl_options.certfile   = /path/to/server_certificate.pem
# ssl_options.keyfile    = /path/to/server_key.pem
# ssl_options.verify     = verify_peer
# ssl_options.fail_if_no_peer_cert = false

enabled_plugins

[rabbitmq_management,
 rabbitmq_management_agent,
 rabbitmq_prometheus,
 rabbitmq_shovel,
 rabbitmq_shovel_management,
 rabbitmq_federation,
 rabbitmq_federation_management].

基本概念と操作

キュー管理

# サービス開始・停止
sudo systemctl start rabbitmq-server
sudo systemctl stop rabbitmq-server
sudo systemctl status rabbitmq-server

# プラグイン管理
sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmq-plugins list
sudo rabbitmq-plugins disable rabbitmq_federation

# ユーザー管理
sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl set_user_tags myuser administrator
sudo rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
sudo rabbitmqctl list_users

# Virtual Host 管理
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl list_vhosts
sudo rabbitmqctl delete_vhost myvhost

# キュー管理
sudo rabbitmqctl list_queues
sudo rabbitmqctl list_queues name messages consumers
sudo rabbitmqctl purge_queue myqueue

rabbitmqadmin コマンド

# rabbitmqadmin のダウンロードとインストール
curl -O http://localhost:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
sudo mv rabbitmqadmin /usr/local/bin/

# キュー作成
rabbitmqadmin declare queue name=test-queue durable=true

# メッセージ送信
rabbitmqadmin publish exchange=amq.default routing_key=test-queue payload="Hello, RabbitMQ!"

# メッセージ受信
rabbitmqadmin get queue=test-queue requeue=false

# Exchange作成
rabbitmqadmin declare exchange name=test-exchange type=direct

# バインディング作成
rabbitmqadmin declare binding source=test-exchange destination=test-queue routing_key=test

プログラミング例

Python (pika)

import pika
import json

# 接続設定
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=pika.PlainCredentials('admin', 'password')
    )
)
channel = connection.channel()

# Producer例
def send_message():
    # キュー宣言
    channel.queue_declare(queue='hello', durable=True)
    
    # メッセージ送信
    message = {"user_id": 123, "action": "login", "timestamp": "2024-01-15T10:00:00Z"}
    channel.basic_publish(
        exchange='',
        routing_key='hello',
        body=json.dumps(message),
        properties=pika.BasicProperties(
            delivery_mode=2,  # メッセージ永続化
            content_type='application/json'
        )
    )
    print("Message sent")

# Consumer例
def callback(ch, method, properties, body):
    try:
        message = json.loads(body)
        print(f"Received: {message}")
        
        # メッセージ処理
        process_message(message)
        
        # 処理完了確認
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error processing message: {e}")
        # 処理失敗の場合はreject
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

def consume_messages():
    # キュー宣言
    channel.queue_declare(queue='hello', durable=True)
    
    # QoS設定(一度に処理するメッセージ数制限)
    channel.basic_qos(prefetch_count=1)
    
    # コンシューマー設定
    channel.basic_consume(queue='hello', on_message_callback=callback)
    
    print("Waiting for messages. To exit press CTRL+C")
    channel.start_consuming()

def process_message(message):
    # メッセージ処理ロジック
    print(f"Processing user {message['user_id']} action: {message['action']}")

if __name__ == "__main__":
    try:
        # send_message()  # Producer
        consume_messages()  # Consumer
    finally:
        connection.close()

Java (Spring AMQP)

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

@SpringBootApplication
public class RabbitMQApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQApplication.class, args);
    }

    @Bean
    public Queue testQueue() {
        return QueueBuilder.durable("test-queue").build();
    }

    @Bean
    public DirectExchange testExchange() {
        return new DirectExchange("test-exchange");
    }

    @Bean
    public Binding testBinding(Queue testQueue, DirectExchange testExchange) {
        return BindingBuilder.bind(testQueue).to(testExchange).with("test-key");
    }
}

@Service
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("test-exchange", "test-key", message);
        System.out.println("Message sent: " + message);
    }

    public void sendMessageWithProperties(Object message) {
        rabbitTemplate.convertAndSend(
            "test-exchange", 
            "test-key", 
            message,
            messageProperties -> {
                messageProperties.setContentType("application/json");
                messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                messageProperties.setPriority(1);
                return messageProperties;
            }
        );
    }
}

@Service
public class MessageConsumer {
    @RabbitListener(queues = "test-queue")
    public void receiveMessage(String message) {
        try {
            System.out.println("Received: " + message);
            // メッセージ処理ロジック
            processMessage(message);
        } catch (Exception e) {
            System.err.println("Error processing message: " + e.getMessage());
            throw e; // エラー時はrequeue
        }
    }

    @RabbitListener(queues = "test-queue")
    public void receiveMessageWithDetails(
            @Payload String message,
            @Header Map<String, Object> headers,
            Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            System.out.println("Message: " + message);
            System.out.println("Headers: " + headers);
            
            processMessage(message);
            
            // 手動ACK
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            try {
                // 手動NACK
                channel.basicNack(deliveryTag, false, true);
            } catch (Exception nackEx) {
                System.err.println("Error sending NACK: " + nackEx.getMessage());
            }
        }
    }

    private void processMessage(String message) {
        // メッセージ処理ロジック
        System.out.println("Processing: " + message);
    }
}

Node.js (amqplib)

const amqp = require('amqplib');

class RabbitMQService {
    constructor() {
        this.connection = null;
        this.channel = null;
    }

    async connect() {
        try {
            this.connection = await amqp.connect('amqp://admin:password@localhost:5672/');
            this.channel = await this.connection.createChannel();
            
            // 接続エラーハンドリング
            this.connection.on('error', (err) => {
                console.error('Connection error:', err);
            });
            
            console.log('Connected to RabbitMQ');
        } catch (error) {
            console.error('Failed to connect to RabbitMQ:', error);
            throw error;
        }
    }

    async setupQueue(queueName, options = {}) {
        const defaultOptions = {
            durable: true,
            exclusive: false,
            autoDelete: false
        };
        
        return await this.channel.assertQueue(queueName, { ...defaultOptions, ...options });
    }

    async setupExchange(exchangeName, type = 'direct', options = {}) {
        const defaultOptions = {
            durable: true
        };
        
        return await this.channel.assertExchange(exchangeName, type, { ...defaultOptions, ...options });
    }

    async bindQueue(queueName, exchangeName, routingKey) {
        return await this.channel.bindQueue(queueName, exchangeName, routingKey);
    }

    async publish(exchangeName, routingKey, message, options = {}) {
        const defaultOptions = {
            persistent: true,
            contentType: 'application/json'
        };
        
        const messageBuffer = Buffer.from(JSON.stringify(message));
        return this.channel.publish(exchangeName, routingKey, messageBuffer, { ...defaultOptions, ...options });
    }

    async consume(queueName, callback, options = {}) {
        const defaultOptions = {
            noAck: false
        };
        
        return await this.channel.consume(queueName, async (msg) => {
            if (msg) {
                try {
                    const content = JSON.parse(msg.content.toString());
                    await callback(content, msg);
                    
                    if (!defaultOptions.noAck) {
                        this.channel.ack(msg);
                    }
                } catch (error) {
                    console.error('Error processing message:', error);
                    this.channel.nack(msg, false, true); // requeue
                }
            }
        }, { ...defaultOptions, ...options });
    }

    async close() {
        if (this.channel) {
            await this.channel.close();
        }
        if (this.connection) {
            await this.connection.close();
        }
    }
}

// 使用例
async function main() {
    const rabbitmq = new RabbitMQService();
    
    try {
        await rabbitmq.connect();
        
        // Setup
        await rabbitmq.setupExchange('user-events', 'topic');
        await rabbitmq.setupQueue('user-registration');
        await rabbitmq.bindQueue('user-registration', 'user-events', 'user.registered');
        
        // Producer
        await rabbitmq.publish('user-events', 'user.registered', {
            userId: 123,
            email: '[email protected]',
            timestamp: new Date().toISOString()
        });
        
        // Consumer
        await rabbitmq.consume('user-registration', async (message, msg) => {
            console.log('Received:', message);
            // メッセージ処理
            console.log(`Processing user registration for ${message.email}`);
        });
        
    } catch (error) {
        console.error('Error:', error);
    }
}

main();

Exchange タイプと ルーティング

Direct Exchange

# Producer
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(
    exchange='direct_logs',
    routing_key='error',
    body='Error message'
)

# Consumer
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Bind multiple routing keys
severities = ['error', 'warning']
for severity in severities:
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

Topic Exchange

# Producer
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_keys = ['user.created', 'user.updated', 'order.shipped', 'payment.failed']

for key in routing_keys:
    channel.basic_publish(
        exchange='topic_logs',
        routing_key=key,
        body=f'Message for {key}'
    )

# Consumer - Subscribe to user events
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='user.*')

Fanout Exchange

# Producer - Broadcast to all queues
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(
    exchange='logs',
    routing_key='',  # Ignored in fanout
    body='Broadcast message'
)

# Consumer
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

クラスタ構成

3ノードクラスター設定

# ノード1 (rabbit1)
sudo hostnamectl set-hostname rabbit1
echo "127.0.0.1 rabbit1" | sudo tee -a /etc/hosts
echo "192.168.1.10 rabbit2" | sudo tee -a /etc/hosts
echo "192.168.1.11 rabbit3" | sudo tee -a /etc/hosts

# Erlang Cookie設定(全ノードで同じ値)
sudo systemctl stop rabbitmq-server
echo "ALWEFIHANFKLAPWEFFAWE" | sudo tee /var/lib/rabbitmq/.erlang.cookie
sudo chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie

# RabbitMQ起動
sudo systemctl start rabbitmq-server

# ノード2, 3の設定後、クラスタに参加
# ノード2で実行
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@rabbit1
sudo rabbitmqctl start_app

# ノード3で実行
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@rabbit1
sudo rabbitmqctl start_app

# クラスタ状態確認
sudo rabbitmqctl cluster_status

High Availability (HA) ポリシー設定

# 全てのキューをクラスタ全体にミラーリング
sudo rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

# 特定の数のノードにミラーリング
sudo rabbitmqctl set_policy ha-two "^ha\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

# Quorum Queue ポリシー設定
sudo rabbitmqctl set_policy qq-policy "^qq\." '{"queue-type":"quorum"}'

監視とメンテナンス

基本監視コマンド

# クラスタ状態
sudo rabbitmqctl cluster_status

# ノード状態
sudo rabbitmqctl node_health_check

# キュー情報
sudo rabbitmqctl list_queues name messages consumers memory

# Exchange情報
sudo rabbitmqctl list_exchanges name type

# 接続情報
sudo rabbitmqctl list_connections

# チャンネル情報
sudo rabbitmqctl list_channels

# メモリ使用量
sudo rabbitmqctl status

Management API使用例

# API経由でキュー一覧取得
curl -u admin:password http://localhost:15672/api/queues

# 特定キューの詳細
curl -u admin:password http://localhost:15672/api/queues/%2F/test-queue

# メッセージ送信
curl -u admin:password -H "Content-Type: application/json" \
     -X POST http://localhost:15672/api/exchanges/%2F/amq.default/publish \
     -d '{"properties":{},"routing_key":"test-queue","payload":"hello","payload_encoding":"string"}'

# メッセージ受信
curl -u admin:password -H "Content-Type: application/json" \
     -X POST http://localhost:15672/api/queues/%2F/test-queue/get \
     -d '{"count":1,"requeue":true,"encoding":"auto"}'

Prometheus メトリクス

# Prometheus プラグイン有効化
sudo rabbitmq-plugins enable rabbitmq_prometheus

# メトリクス確認
curl http://localhost:15692/metrics

パフォーマンス最適化

メッセージ永続化設定

# 永続化キュー作成
channel.queue_declare(queue='persistent_queue', durable=True)

# 永続化メッセージ送信
channel.basic_publish(
    exchange='',
    routing_key='persistent_queue',
    body='persistent message',
    properties=pika.BasicProperties(delivery_mode=2)  # 永続化
)

バッチ処理・トランザクション

# バッチPublish
channel.confirm_delivery()  # Publisher Confirms有効化

for i in range(1000):
    channel.basic_publish(
        exchange='',
        routing_key='batch_queue',
        body=f'message {i}'
    )

# Publisher Confirms確認
if channel.get_waiting_message_count() == 0:
    print("All messages confirmed")

コネクションプール設定

import pika.pool

# コネクションプール設定
pool_params = pika.URLParameters('amqp://admin:password@localhost:5672/')
pool = pika.pool.QueuedPool(
    create=lambda: pika.BlockingConnection(pool_params),
    max_size=10,
    max_overflow=0,
    timeout=10,
    recycle=-1,
    stale=45
)

# プールからコネクション取得
with pool.acquire() as connection:
    channel = connection.channel()
    channel.basic_publish(exchange='', routing_key='test', body='message')

RabbitMQは、その高い信頼性、柔軟なルーティング機能、豊富なエンタープライズ機能により、ミッションクリティカルなアプリケーション、マイクロサービス間通信、非同期処理システムで広く採用されています。AMQP準拠により、言語やプラットフォーム間での相互運用性も確保されています。