Apache ActiveMQ
Message broker supporting Java Message Service (JMS). Provides diverse protocol support, transactions, and security features. Proven track record in enterprise environments.
Server
Apache ActiveMQ
Overview
Apache ActiveMQ is the most popular open-source, multi-protocol, Java-based message broker that supports industry standard protocols, enabling users to benefit from client choices across a broad range of languages and platforms. As a JMS 1.1 compliant messaging solution, it provides robust messaging capabilities through support for OpenWire, STOMP, MQTT, AMQP, REST, and WebSockets protocols. ActiveMQ supports both point-to-point and publish-subscribe messaging patterns, efficiently handling thousands of concurrent client connections with high scalability and enterprise-grade reliability.
Details
Apache ActiveMQ 2025 edition continues to serve as a comprehensive enterprise messaging solution built on decades of proven stability and reliability. ActiveMQ Classic, with version 6.1.7 as the latest release, provides partial Jakarta Messaging 3.1 and JMS 2.0 support along with full JMS 1.1 compliance. Apache ActiveMQ Artemis represents the next-generation architecture designed for higher performance and scalability. The platform offers Network of Brokers clustering, high availability through shared storage or replication, flexible persistence options including KahaDB, LevelDB, and JDBC, and comprehensive enterprise features including JMX monitoring, dead letter queues, and virtual topics.
Key Features
- Full JMS Compliance: Complete JMS 1.1 implementation with Jakarta Messaging support
- Multi-Protocol Support: OpenWire, STOMP, MQTT, AMQP, REST, and WebSockets
- High Scalability: Efficiently handles thousands of concurrent client connections
- Enterprise Features: Network of Brokers, high availability, persistence, JMX monitoring
- Flexible Deployment: Standalone, embedded, and clustered configurations
- Multi-Language Clients: Rich client support for Java, .NET, C++, Python, Ruby, and more
Pros and Cons
Pros
- Extensive track record and mature ecosystem in Java enterprise environments
- Full compliance with JMS standards implementing industry best practices
- Flexible scale-out capabilities through Network of Brokers architecture
- Excellent management and monitoring capabilities via web console
- Multiple persistence options including KahaDB and JDBC
- Seamless integration with Apache Camel for enterprise integration patterns
Cons
- Performance limitations compared to Artemis or lightweight brokers for high-throughput scenarios
- Higher memory footprint compared to Redis or NATS
- Complex cluster configuration and high availability setup
- Requires specialized knowledge for initial configuration and optimization
- Slower adoption of newer protocols due to legacy architecture
- GC tuning requirements in large-scale environments
Reference Pages
- Apache ActiveMQ Official Website
- Apache ActiveMQ Classic Documentation
- Apache ActiveMQ Artemis Documentation
Code Examples
Installation and Basic Setup
# Download and Install ActiveMQ Classic
wget https://archive.apache.org/dist/activemq/6.1.7/apache-activemq-6.1.7-bin.tar.gz
tar -xzf apache-activemq-6.1.7-bin.tar.gz
sudo mv apache-activemq-6.1.7 /opt/activemq
sudo useradd -r -s /bin/false activemq
sudo chown -R activemq:activemq /opt/activemq
# System service configuration
sudo tee /etc/systemd/system/activemq.service << 'EOF'
[Unit]
Description=Apache ActiveMQ
After=network.target
[Service]
Type=forking
User=activemq
Group=activemq
ExecStart=/opt/activemq/bin/activemq start
ExecStop=/opt/activemq/bin/activemq stop
ExecReload=/opt/activemq/bin/activemq restart
WorkingDirectory=/opt/activemq
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
[Install]
WantedBy=multi-user.target
EOF
# Enable and start service
sudo systemctl enable activemq
sudo systemctl start activemq
# Check status
sudo systemctl status activemq
netstat -tlnp | grep :61616 # OpenWire protocol
netstat -tlnp | grep :8161 # Web management console
# Web management console access
# http://localhost:8161/admin (admin/admin)
Basic JMS Producer and Consumer
// Maven dependencies
// <dependency>
// <groupId>org.apache.activemq</groupId>
// <artifactId>activemq-client</artifactId>
// <version>6.1.7</version>
// </dependency>
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.CountDownLatch;
// JMS Producer example
public class MessageProducer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "test.queue";
public static void main(String[] args) throws Exception {
// Create connection factory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
// Create connection and session
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
// Create producer
javax.jms.MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// Send messages
for (int i = 0; i < 100; i++) {
TextMessage message = session.createTextMessage("Hello ActiveMQ! Message " + i);
message.setStringProperty("id", String.valueOf(i));
message.setStringProperty("type", "test");
producer.send(message);
System.out.println("Sent: " + message.getText());
}
// Cleanup resources
producer.close();
session.close();
connection.close();
}
}
// JMS Consumer example
public class MessageConsumer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "test.queue";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
// Create consumer
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
// Set message listener
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String id = message.getStringProperty("id");
String type = message.getStringProperty("type");
System.out.println("Received: " + textMessage.getText());
System.out.println("ID: " + id + ", Type: " + type);
// Message processing logic
processMessage(textMessage);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// Wait for messages
System.out.println("Waiting for messages... (Press Enter to exit)");
System.in.read();
consumer.close();
session.close();
connection.close();
}
private static void processMessage(TextMessage message) throws JMSException {
// Business logic processing
System.out.println("Processing: " + message.getText());
// Simulate processing time
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// Publish-Subscribe example
public class TopicPublisher {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("news.updates");
javax.jms.MessageProducer producer = session.createProducer(topic);
// Publish messages to topic
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("News Update " + i);
message.setStringProperty("category", "technology");
message.setLongProperty("timestamp", System.currentTimeMillis());
producer.send(message);
System.out.println("Published: " + message.getText());
Thread.sleep(1000);
}
producer.close();
session.close();
connection.close();
}
}
Advanced Configuration and Performance Tuning
<!-- activemq.xml configuration example -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost"
dataDirectory="${activemq.data}"
schedulerSupport="true"
useJmx="true">
<!-- Persistence configuration -->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"
journalMaxFileLength="32mb"
enableIndexWriteAsync="true"
enableJournalDiskSyncs="false"
cleanupInterval="30000"/>
</persistenceAdapter>
<!-- Memory configuration -->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!-- Dead letter queue configuration -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
memoryLimit="10mb" useCache="false">
<deadLetterStrategy>
<individualDeadLetterStrategy
queuePrefix="DLQ."
useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!-- Transport connectors -->
<transportConnectors>
<transportConnector name="openwire"
uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp"
uri="stomp://0.0.0.0:61613?maximumConnections=1000"/>
<transportConnector name="mqtt"
uri="mqtt://0.0.0.0:1883?maximumConnections=1000"/>
<transportConnector name="ws"
uri="ws://0.0.0.0:61614?maximumConnections=1000"/>
</transportConnectors>
<!-- Network configuration (clustering) -->
<networkConnectors>
<networkConnector uri="static:(tcp://broker2:61616,tcp://broker3:61616)"
duplex="true"
networkTTL="3"
dynamicOnly="true"
conduitSubscriptions="true"/>
</networkConnectors>
<!-- Plugins -->
<plugins>
<statisticsBrokerPlugin/>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>
<authenticationUser username="producer" password="producer" groups="publishers"/>
<authenticationUser username="consumer" password="consumer" groups="consumers"/>
</users>
</simpleAuthenticationPlugin>
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" write="admins,publishers" read="admins,consumers" admin="admins"/>
<authorizationEntry topic=">" write="admins,publishers" read="admins,consumers" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
</broker>
<!-- JMX configuration -->
<import resource="jetty.xml"/>
</beans>
Spring Boot Integration and Connection Configuration
// application.yml
/*
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
in-memory: false
pool:
enabled: true
max-connections: 50
idle-timeout: 30s
expiry-timeout: 10s
*/
// Spring Boot configuration class
@Configuration
@EnableJms
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String user;
@Value("${spring.activemq.password}")
private String password;
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(brokerUrl);
factory.setUserName(user);
factory.setPassword(password);
// Performance tuning
factory.setUseAsyncSend(true);
factory.setOptimizeAcknowledge(true);
factory.setMaxThreadPoolSize(50);
factory.setOptimizedMessageDispatch(true);
return factory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setDefaultDestinationName("default.queue");
template.setDeliveryPersistent(true);
template.setExplicitQosEnabled(true);
template.setTimeToLive(300000); // 5 minutes
return template;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("3-10");
factory.setReceiveTimeout(5000L);
factory.setSessionTransacted(true);
return factory;
}
}
// Spring Boot messaging service
@Service
public class MessageService {
@Autowired
private JmsTemplate jmsTemplate;
// Send message
public void sendMessage(String destination, Object message) {
jmsTemplate.convertAndSend(destination, message, msg -> {
msg.setStringProperty("sender", "spring-app");
msg.setLongProperty("timestamp", System.currentTimeMillis());
return msg;
});
}
// Receive queue messages
@JmsListener(destination = "order.queue")
public void receiveOrderMessage(String message) {
System.out.println("Order received: " + message);
try {
// Order processing logic
processOrder(message);
} catch (Exception e) {
System.err.println("Order processing failed: " + e.getMessage());
throw new RuntimeException("Failed to process order", e);
}
}
// Receive topic messages
@JmsListener(destination = "notification.topic",
containerFactory = "jmsListenerContainerFactory")
public void receiveNotification(
@Payload String message,
@Header Map<String, Object> headers) {
System.out.println("Notification: " + message);
System.out.println("Headers: " + headers);
// Notification processing
handleNotification(message, headers);
}
private void processOrder(String orderData) {
// Order processing implementation
System.out.println("Processing order: " + orderData);
}
private void handleNotification(String message, Map<String, Object> headers) {
// Notification processing implementation
System.out.println("Handling notification: " + message);
}
}
// REST controller example
@RestController
@RequestMapping("/api/messages")
public class MessageController {
@Autowired
private MessageService messageService;
@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody MessageRequest request) {
try {
messageService.sendMessage(request.getDestination(), request.getMessage());
return ResponseEntity.ok("Message sent successfully");
} catch (Exception e) {
return ResponseEntity.status(500).body("Failed to send message: " + e.getMessage());
}
}
}
Clustering and High Availability Configuration
# Network of Brokers configuration example
# Broker 1 (activemq-broker1.xml)
cat > /opt/activemq/conf/activemq-broker1.xml << 'EOF'
<beans>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="broker1"
dataDirectory="/opt/activemq/data/broker1">
<networkConnectors>
<networkConnector uri="static:(tcp://broker2:61616,tcp://broker3:61616)"
duplex="false"
name="broker1-connector"/>
</networkConnectors>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
</beans>
EOF
# High availability master-slave configuration
# Master broker configuration
cat > /opt/activemq/conf/activemq-master.xml << 'EOF'
<beans>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="master"
dataDirectory="/shared/activemq/data">
<persistenceAdapter>
<kahaDB directory="/shared/activemq/data/kahadb"
lockKeepAlivePeriod="5000"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
</beans>
EOF
# Slave broker configuration
cat > /opt/activemq/conf/activemq-slave.xml << 'EOF'
<beans>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="slave"
dataDirectory="/shared/activemq/data">
<persistenceAdapter>
<kahaDB directory="/shared/activemq/data/kahadb"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>
</transportConnectors>
</broker>
</beans>
EOF
# Client-side failover configuration
java_connection_string="failover:(tcp://master:61616,tcp://slave:61617)?randomize=false&timeout=3000"
Monitoring and Performance Management
// JMX monitoring example
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
public class ActiveMQMonitor {
public static void main(String[] args) throws Exception {
// JMX connection
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
JMXConnector connector = JMXConnectorFactory.connect(url);
MBeanServerConnection connection = connector.getMBeanServerConnection();
// Get broker information
ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
String brokerId = (String) connection.getAttribute(brokerName, "BrokerId");
String brokerVersion = (String) connection.getAttribute(brokerName, "BrokerVersion");
System.out.println("Broker ID: " + brokerId);
System.out.println("Broker Version: " + brokerVersion);
// Get queue statistics
ObjectName queueName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test.queue");
Long queueSize = (Long) connection.getAttribute(queueName, "QueueSize");
Long enqueueCount = (Long) connection.getAttribute(queueName, "EnqueueCount");
Long dequeueCount = (Long) connection.getAttribute(queueName, "DequeueCount");
System.out.println("Queue Size: " + queueSize);
System.out.println("Enqueue Count: " + enqueueCount);
System.out.println("Dequeue Count: " + dequeueCount);
connector.close();
}
}
# ActiveMQ statistics monitoring script
cat > /opt/activemq/bin/monitor.sh << 'EOF'
#!/bin/bash
echo "=== ActiveMQ Monitoring Report $(date) ==="
# Process status check
echo "=== Process Status ==="
ps aux | grep activemq | grep -v grep
# Port status check
echo "=== Port Status ==="
netstat -tlnp | grep :61616
netstat -tlnp | grep :8161
# Memory usage
echo "=== Memory Usage ==="
activemq query --objname type=Broker,brokerName=*,service=Health
# JVM statistics
echo "=== JVM Statistics ==="
jstat -gc $(pgrep -f activemq) 5s 1
# Log error check
echo "=== Recent Errors ==="
tail -n 50 /opt/activemq/data/activemq.log | grep -i error
echo "=== Monitoring Complete ==="
EOF
chmod +x /opt/activemq/bin/monitor.sh