RabbitMQ
Highly reliable message broker. Supports AMQP, MQTT, STOMP protocols. Provides complex routing, message acknowledgment, and clustering capabilities.
Server
RabbitMQ
Overview
RabbitMQ is an open-source message broker based on the Advanced Message Queuing Protocol (AMQP) 0-9-1, providing high reliability, flexible routing, and enterprise features to support communication between distributed systems and microservices. Originally developed by Rabbit Technologies and now maintained by VMware, RabbitMQ has established itself as one of the most popular messaging solutions in enterprise environments. With robust features including message persistence, publisher confirms, consumer acknowledgments, and sophisticated routing capabilities through multiple exchange types, RabbitMQ enables reliable asynchronous communication patterns essential for modern distributed architectures.
Details
RabbitMQ 2025 edition continues to serve as a cornerstone messaging platform for enterprise applications, offering proven stability and extensive enterprise features. Built on Erlang/OTP for exceptional fault tolerance and concurrent processing capabilities, RabbitMQ provides multiple exchange types (Direct, Topic, Headers, Fanout) for flexible message routing patterns. The platform supports high availability through clustering, queue mirroring, and quorum queues with distributed consensus algorithms. Enterprise features include comprehensive management UI and API, detailed monitoring and metrics, plugin architecture for extensibility, and integration with external authentication systems like LDAP. With support for multiple protocols including AMQP, STOMP, MQTT, and HTTP, RabbitMQ adapts to diverse messaging requirements while maintaining strong delivery guarantees.
Key Features
- High Reliability: Message persistence, transactions, and delivery confirmations
- Flexible Routing: Multiple exchange types for complex message routing patterns
- High Availability: Multi-node clustering with queue mirroring and quorum queues
- Enterprise Features: Management UI/API, monitoring, plugins, and external authentication
- Protocol Support: AMQP, STOMP, MQTT, HTTP for diverse client requirements
- Operational Excellence: Comprehensive tools for monitoring, management, and maintenance
Pros and Cons
Pros
- Proven reliability and stability in mission-critical enterprise environments
- Rich feature set with comprehensive routing capabilities and delivery guarantees
- Excellent operational tools including management UI and extensive monitoring
- Strong ecosystem with plugins and multi-language client library support
- AMQP compliance ensuring interoperability across languages and platforms
- Active community and commercial support options from VMware
Cons
- Lower throughput compared to modern streaming platforms like Kafka or Pulsar
- Complex clustering setup and maintenance requiring Erlang expertise
- Memory-intensive operations especially with large message queues
- Limited horizontal scaling capabilities compared to newer architectures
- Performance degradation with very large queue depths
- Learning curve for understanding AMQP concepts and routing patterns
Reference Pages
Code Examples
Installation and Basic Setup
# Ubuntu/Debian installation
sudo apt update
sudo apt install curl gnupg apt-transport-https
# Add RabbitMQ official repository
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
curl -1sLf "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xf77f1eda57ebb1cc" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg > /dev/null
echo "deb [signed-by=/usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg] http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
echo "deb [signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ $(lsb_release -cs) main" | sudo tee -a /etc/apt/sources.list.d/rabbitmq.list
sudo apt update
sudo apt install erlang-base erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
erlang-runtime-tools erlang-snmp erlang-ssl erlang-syntax-tools \
erlang-tftp erlang-tools erlang-xmerl
# Install RabbitMQ
sudo apt install rabbitmq-server
# Enable management plugin
sudo rabbitmq-plugins enable rabbitmq_management
# Create admin user
sudo rabbitmqctl add_user admin password
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# Start service
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
# Access management UI at http://localhost:15672 (admin/password)
Docker Deployment
# RabbitMQ with Management Plugin
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3-management
# With persistent storage
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq_data:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3-management
# Docker Compose configuration
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
RABBITMQ_DEFAULT_VHOST: /
ports:
- "5672:5672" # AMQP port
- "15672:15672" # Management UI
volumes:
- rabbitmq_data:/var/lib/rabbitmq
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
networks:
- rabbitmq_net
volumes:
rabbitmq_data:
networks:
rabbitmq_net:
EOF
docker-compose up -d
Basic Queue and User Management
# Service management
sudo systemctl start rabbitmq-server
sudo systemctl stop rabbitmq-server
sudo systemctl status rabbitmq-server
# Plugin management
sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmq-plugins list
sudo rabbitmq-plugins disable rabbitmq_federation
# User management
sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl set_user_tags myuser administrator
sudo rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
sudo rabbitmqctl list_users
# Virtual host management
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl list_vhosts
sudo rabbitmqctl delete_vhost myvhost
# Queue management
sudo rabbitmqctl list_queues
sudo rabbitmqctl list_queues name messages consumers
sudo rabbitmqctl purge_queue myqueue
# Using rabbitmqadmin command line tool
curl -O http://localhost:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
sudo mv rabbitmqadmin /usr/local/bin/
# Declare queue
rabbitmqadmin declare queue name=test-queue durable=true
# Publish message
rabbitmqadmin publish exchange=amq.default routing_key=test-queue payload="Hello, RabbitMQ!"
# Get message
rabbitmqadmin get queue=test-queue requeue=false
# Declare exchange
rabbitmqadmin declare exchange name=test-exchange type=direct
# Create binding
rabbitmqadmin declare binding source=test-exchange destination=test-queue routing_key=test
Python Producer and Consumer (using pika)
# pip install pika
import pika
import json
import time
from datetime import datetime
class RabbitMQClient:
def __init__(self, host='localhost', port=5672, virtual_host='/', username='admin', password='password'):
self.connection_params = pika.ConnectionParameters(
host=host,
port=port,
virtual_host=virtual_host,
credentials=pika.PlainCredentials(username, password),
heartbeat=600,
blocked_connection_timeout=300
)
self.connection = None
self.channel = None
def connect(self):
"""Establish connection to RabbitMQ"""
self.connection = pika.BlockingConnection(self.connection_params)
self.channel = self.connection.channel()
def close(self):
"""Close connection"""
if self.connection and not self.connection.is_closed:
self.connection.close()
# Producer example
class MessageProducer(RabbitMQClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.connect()
def setup_queue(self, queue_name, durable=True):
"""Declare a durable queue"""
self.channel.queue_declare(queue=queue_name, durable=durable)
def send_message(self, queue_name, message, routing_key=None):
"""Send message to queue"""
if routing_key is None:
routing_key = queue_name
# Ensure queue exists
self.setup_queue(queue_name)
# Prepare message
message_body = json.dumps(message) if isinstance(message, dict) else str(message)
# Send message with persistence
self.channel.basic_publish(
exchange='',
routing_key=routing_key,
body=message_body,
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
content_type='application/json',
timestamp=int(time.time())
)
)
print(f"Message sent to {queue_name}: {message}")
def send_with_exchange(self, exchange_name, routing_key, message, exchange_type='direct'):
"""Send message through exchange"""
# Declare exchange
self.channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type, durable=True)
message_body = json.dumps(message) if isinstance(message, dict) else str(message)
self.channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=message_body,
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"Message sent to exchange {exchange_name} with routing key {routing_key}")
# Consumer example
class MessageConsumer(RabbitMQClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.connect()
def setup_queue(self, queue_name, durable=True):
"""Declare a durable queue"""
self.channel.queue_declare(queue=queue_name, durable=durable)
def process_message(self, ch, method, properties, body):
"""Default message processing callback"""
try:
# Parse message
if properties.content_type == 'application/json':
message = json.loads(body.decode())
else:
message = body.decode()
print(f"Received message: {message}")
# Process message (implement your business logic here)
self.handle_message(message)
# Acknowledge message processing
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error processing message: {e}")
# Reject message and requeue for retry
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def handle_message(self, message):
"""Override this method to implement business logic"""
print(f"Processing: {message}")
# Simulate processing time
time.sleep(0.1)
def start_consuming(self, queue_name, prefetch_count=1):
"""Start consuming messages from queue"""
# Ensure queue exists
self.setup_queue(queue_name)
# Set QoS to limit number of unacknowledged messages
self.channel.basic_qos(prefetch_count=prefetch_count)
# Set up consumer
self.channel.basic_consume(
queue=queue_name,
on_message_callback=self.process_message
)
print(f"Waiting for messages from {queue_name}. To exit press CTRL+C")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
print("Stopping consumer...")
self.channel.stop_consuming()
self.close()
# Usage examples
if __name__ == "__main__":
# Producer usage
producer = MessageProducer()
# Send simple messages
for i in range(10):
order_data = {
'order_id': f'ORDER-{i:04d}',
'customer_id': f'CUST-{i % 3}',
'amount': round(100 + (i * 15.5), 2),
'timestamp': datetime.now().isoformat(),
'status': 'pending'
}
producer.send_message('orders', order_data)
# Send through exchange
producer.send_with_exchange('user-events', 'user.created', {
'user_id': 'USER-123',
'email': '[email protected]',
'created_at': datetime.now().isoformat()
}, 'topic')
producer.close()
# Consumer usage
# Uncomment to run consumer
# consumer = MessageConsumer()
# consumer.start_consuming('orders')
Java Spring AMQP Implementation
// Maven dependencies
// <dependency>
// <groupId>org.springframework.boot</groupId>
// <artifactId>spring-boot-starter-amqp</artifactId>
// </dependency>
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
@SpringBootApplication
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class, args);
}
// Queue configuration
@Bean
public Queue ordersQueue() {
return QueueBuilder.durable("orders").build();
}
@Bean
public Queue notificationsQueue() {
return QueueBuilder.durable("notifications").build();
}
// Exchange configuration
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order-exchange");
}
@Bean
public TopicExchange userExchange() {
return new TopicExchange("user-events");
}
// Binding configuration
@Bean
public Binding orderBinding(Queue ordersQueue, DirectExchange orderExchange) {
return BindingBuilder.bind(ordersQueue).to(orderExchange).with("order.created");
}
@Bean
public Binding notificationBinding(Queue notificationsQueue, TopicExchange userExchange) {
return BindingBuilder.bind(notificationsQueue).to(userExchange).with("user.*");
}
}
// Message producer service
@Service
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendOrderMessage(Object orderData) {
rabbitTemplate.convertAndSend("order-exchange", "order.created", orderData);
System.out.println("Order message sent: " + orderData);
}
public void sendUserEvent(String eventType, Object userData) {
String routingKey = "user." + eventType;
rabbitTemplate.convertAndSend("user-events", routingKey, userData);
System.out.println("User event sent: " + routingKey);
}
public void sendMessageWithProperties(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(
exchange,
routingKey,
message,
messageProperties -> {
messageProperties.setContentType("application/json");
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setPriority(1);
messageProperties.setHeader("source", "spring-app");
messageProperties.setTimestamp(new Date());
return messageProperties;
}
);
}
}
// Message consumer service
@Service
public class MessageConsumer {
@RabbitListener(queues = "orders")
public void handleOrderMessage(String orderMessage) {
try {
System.out.println("Processing order: " + orderMessage);
// Implement order processing logic
processOrder(orderMessage);
} catch (Exception e) {
System.err.println("Error processing order: " + e.getMessage());
throw e; // Re-throw to trigger requeue
}
}
@RabbitListener(queues = "notifications")
public void handleNotificationMessage(
@Payload String message,
@Header Map<String, Object> headers,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
System.out.println("Notification: " + message);
System.out.println("Headers: " + headers);
// Process notification
processNotification(message);
// Manual acknowledgment
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
// Manual negative acknowledgment with requeue
channel.basicNack(deliveryTag, false, true);
} catch (Exception nackEx) {
System.err.println("Error sending NACK: " + nackEx.getMessage());
}
}
}
// Dead letter queue handler
@RabbitListener(queues = "orders.dlq")
public void handleDeadLetterMessage(String message) {
System.err.println("Dead letter message received: " + message);
// Implement dead letter handling logic
// Log, alert, manual intervention, etc.
}
private void processOrder(String orderData) {
// Business logic implementation
System.out.println("Order processing completed: " + orderData);
// Simulate processing time
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processNotification(String message) {
// Notification processing logic
System.out.println("Notification sent: " + message);
}
}
// Configuration for advanced features
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("Message delivered successfully");
} else {
System.err.println("Message delivery failed: " + cause);
}
});
return template;
}
// Dead letter queue configuration
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("orders.dlq").build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dlx");
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("orders.dlq");
}
@Bean
public Queue ordersQueueWithDLQ() {
return QueueBuilder.durable("orders")
.withArgument("x-dead-letter-exchange", "dlx")
.withArgument("x-dead-letter-routing-key", "orders.dlq")
.withArgument("x-message-ttl", 300000) // 5 minutes TTL
.build();
}
}
Node.js Implementation (using amqplib)
// npm install amqplib
const amqp = require('amqplib');
class RabbitMQService {
constructor(url = 'amqp://admin:password@localhost:5672/') {
this.url = url;
this.connection = null;
this.channel = null;
}
async connect() {
try {
this.connection = await amqp.connect(this.url);
this.channel = await this.connection.createChannel();
// Connection error handling
this.connection.on('error', (err) => {
console.error('Connection error:', err);
});
this.connection.on('close', () => {
console.log('Connection closed');
});
console.log('Connected to RabbitMQ');
return this.channel;
} catch (error) {
console.error('Failed to connect to RabbitMQ:', error);
throw error;
}
}
async setupQueue(queueName, options = {}) {
const defaultOptions = {
durable: true,
exclusive: false,
autoDelete: false,
arguments: {}
};
return await this.channel.assertQueue(queueName, { ...defaultOptions, ...options });
}
async setupExchange(exchangeName, type = 'direct', options = {}) {
const defaultOptions = {
durable: true,
autoDelete: false
};
return await this.channel.assertExchange(exchangeName, type, { ...defaultOptions, ...options });
}
async bindQueue(queueName, exchangeName, routingKey) {
return await this.channel.bindQueue(queueName, exchangeName, routingKey);
}
async publishToQueue(queueName, message, options = {}) {
await this.setupQueue(queueName);
const defaultOptions = {
persistent: true,
contentType: 'application/json'
};
const messageBuffer = Buffer.from(JSON.stringify(message));
return this.channel.sendToQueue(queueName, messageBuffer, { ...defaultOptions, ...options });
}
async publishToExchange(exchangeName, routingKey, message, options = {}) {
const defaultOptions = {
persistent: true,
contentType: 'application/json'
};
const messageBuffer = Buffer.from(JSON.stringify(message));
return this.channel.publish(exchangeName, routingKey, messageBuffer, { ...defaultOptions, ...options });
}
async consume(queueName, callback, options = {}) {
await this.setupQueue(queueName);
const defaultOptions = {
noAck: false
};
// Set prefetch count for QoS
await this.channel.prefetch(1);
return await this.channel.consume(queueName, async (msg) => {
if (msg) {
try {
const content = JSON.parse(msg.content.toString());
await callback(content, msg);
if (!defaultOptions.noAck) {
this.channel.ack(msg);
}
} catch (error) {
console.error('Error processing message:', error);
this.channel.nack(msg, false, true); // requeue
}
}
}, { ...defaultOptions, ...options });
}
async createWorker(queueName, workerFunction, concurrency = 1) {
await this.setupQueue(queueName);
await this.channel.prefetch(concurrency);
for (let i = 0; i < concurrency; i++) {
this.channel.consume(queueName, async (msg) => {
if (msg) {
try {
const content = JSON.parse(msg.content.toString());
await workerFunction(content);
this.channel.ack(msg);
} catch (error) {
console.error(`Worker ${i} error:`, error);
this.channel.nack(msg, false, true);
}
}
});
}
console.log(`Started ${concurrency} workers for queue: ${queueName}`);
}
async close() {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
}
}
// Usage examples
async function main() {
const rabbitmq = new RabbitMQService();
try {
await rabbitmq.connect();
// Setup exchanges and queues
await rabbitmq.setupExchange('user-events', 'topic');
await rabbitmq.setupQueue('user-registrations');
await rabbitmq.setupQueue('user-notifications');
// Bind queues to exchange
await rabbitmq.bindQueue('user-registrations', 'user-events', 'user.registered');
await rabbitmq.bindQueue('user-notifications', 'user-events', 'user.*');
// Producer example
const userRegisteredEvent = {
userId: 'USER-123',
email: '[email protected]',
registeredAt: new Date().toISOString(),
source: 'web-app'
};
await rabbitmq.publishToExchange('user-events', 'user.registered', userRegisteredEvent);
console.log('User registration event published');
// Consumer example
await rabbitmq.consume('user-registrations', async (message, msg) => {
console.log('Processing user registration:', message);
// Simulate processing
await new Promise(resolve => setTimeout(resolve, 100));
console.log(`User ${message.userId} registration processed`);
});
// Worker pool example
await rabbitmq.createWorker('user-notifications', async (message) => {
console.log('Sending notification to:', message.email);
// Simulate email sending
await new Promise(resolve => setTimeout(resolve, 200));
console.log('Notification sent successfully');
}, 3); // 3 concurrent workers
console.log('Services started. Press Ctrl+C to exit...');
} catch (error) {
console.error('Error:', error);
await rabbitmq.close();
}
}
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('Shutting down gracefully...');
process.exit(0);
});
// Run the example
main().catch(console.error);
Exchange Types and Routing Patterns
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Direct Exchange - Exact routing key match
def setup_direct_exchange():
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# Producer
severities = ['info', 'warning', 'error']
for severity in severities:
channel.basic_publish(
exchange='direct_logs',
routing_key=severity,
body=f'{severity} message'
)
# Consumer for specific severities
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
for severity in ['error', 'warning']:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
# Topic Exchange - Pattern-based routing
def setup_topic_exchange():
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# Producer
routing_keys = [
'user.created', 'user.updated', 'user.deleted',
'order.created', 'order.shipped', 'order.cancelled',
'payment.processed', 'payment.failed'
]
for key in routing_keys:
channel.basic_publish(
exchange='topic_logs',
routing_key=key,
body=f'Event: {key}'
)
# Consumer for all user events
result = channel.queue_declare(queue='user_events', exclusive=False)
channel.queue_bind(exchange='topic_logs', queue='user_events', routing_key='user.*')
# Consumer for critical events
result = channel.queue_declare(queue='critical_events', exclusive=False)
channel.queue_bind(exchange='topic_logs', queue='critical_events', routing_key='*.failed')
channel.queue_bind(exchange='topic_logs', queue='critical_events', routing_key='*.cancelled')
# Fanout Exchange - Broadcast to all queues
def setup_fanout_exchange():
channel.exchange_declare(exchange='notifications', exchange_type='fanout')
# Producer
channel.basic_publish(
exchange='notifications',
routing_key='', # Ignored in fanout
body='System maintenance scheduled'
)
# Multiple consumers receive the same message
queues = ['email_notifications', 'sms_notifications', 'push_notifications']
for queue_name in queues:
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='notifications', queue=queue_name)
# Headers Exchange - Route based on message headers
def setup_headers_exchange():
channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')
# Producer with headers
properties = pika.BasicProperties(
headers={
'format': 'pdf',
'type': 'report',
'priority': 'high'
}
)
channel.basic_publish(
exchange='headers_exchange',
routing_key='',
body='Report data',
properties=properties
)
# Consumer for PDF reports
result = channel.queue_declare(queue='pdf_processor', exclusive=False)
channel.queue_bind(
exchange='headers_exchange',
queue='pdf_processor',
arguments={'format': 'pdf', 'x-match': 'any'}
)
Clustering and High Availability Setup
# Three-node cluster setup
# Node 1 (rabbit1)
sudo hostnamectl set-hostname rabbit1
echo "127.0.0.1 rabbit1" | sudo tee -a /etc/hosts
echo "192.168.1.10 rabbit2" | sudo tee -a /etc/hosts
echo "192.168.1.11 rabbit3" | sudo tee -a /etc/hosts
# Set Erlang cookie (same value on all nodes)
sudo systemctl stop rabbitmq-server
echo "ALWEFIHANFKLAPWEFFAWE" | sudo tee /var/lib/rabbitmq/.erlang.cookie
sudo chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie
# Start RabbitMQ
sudo systemctl start rabbitmq-server
# After setting up nodes 2 and 3, join cluster
# On node 2:
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@rabbit1
sudo rabbitmqctl start_app
# On node 3:
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset
sudo rabbitmqctl join_cluster rabbit@rabbit1
sudo rabbitmqctl start_app
# Check cluster status
sudo rabbitmqctl cluster_status
# High Availability policies
# Mirror all queues across all nodes
sudo rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
# Mirror specific queues to exactly 2 nodes
sudo rabbitmqctl set_policy ha-two "^ha\\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
# Quorum queue policy (recommended for RabbitMQ 3.8+)
sudo rabbitmqctl set_policy qq-policy "^qq\\." '{"queue-type":"quorum"}'
# Check policies
sudo rabbitmqctl list_policies
Monitoring and Performance Management
# Basic monitoring commands
# Cluster status
sudo rabbitmqctl cluster_status
# Node health check
sudo rabbitmqctl node_health_check
# Queue information
sudo rabbitmqctl list_queues name messages consumers memory
# Exchange information
sudo rabbitmqctl list_exchanges name type
# Connection information
sudo rabbitmqctl list_connections
# Channel information
sudo rabbitmqctl list_channels
# Memory usage
sudo rabbitmqctl status
# Enable Prometheus plugin for metrics
sudo rabbitmq-plugins enable rabbitmq_prometheus
# Check metrics endpoint
curl http://localhost:15692/metrics
# Management API examples
# Get all queues
curl -u admin:password http://localhost:15672/api/queues
# Get specific queue details
curl -u admin:password http://localhost:15672/api/queues/%2F/test-queue
# Publish message via API
curl -u admin:password -H "Content-Type: application/json" \
-X POST http://localhost:15672/api/exchanges/%2F/amq.default/publish \
-d '{"properties":{},"routing_key":"test-queue","payload":"hello","payload_encoding":"string"}'
# Get message via API
curl -u admin:password -H "Content-Type: application/json" \
-X POST http://localhost:15672/api/queues/%2F/test-queue/get \
-d '{"count":1,"requeue":true,"encoding":"auto"}'
# Monitoring script
cat > monitor-rabbitmq.sh << 'EOF'
#!/bin/bash
echo "=== RabbitMQ Monitoring Report $(date) ==="
# Node status
echo "=== Node Status ==="
sudo rabbitmqctl node_health_check 2>/dev/null && echo "Node: OK" || echo "Node: ERROR"
# Cluster status
echo "=== Cluster Status ==="
sudo rabbitmqctl cluster_status
# Queue summary
echo "=== Queue Summary ==="
echo "Total queues: $(sudo rabbitmqctl list_queues | wc -l)"
echo "Total messages: $(sudo rabbitmqctl list_queues messages | awk '{sum+=$2} END {print sum}')"
# Connection summary
echo "=== Connection Summary ==="
echo "Active connections: $(sudo rabbitmqctl list_connections | wc -l)"
# Memory usage
echo "=== Memory Usage ==="
sudo rabbitmqctl status | grep -A5 "Memory usage"
echo "=== Monitoring Complete ==="
EOF
chmod +x monitor-rabbitmq.sh