Apache ActiveMQ

Message broker supporting Java Message Service (JMS). Provides diverse protocol support, transactions, and security features. Proven track record in enterprise environments.

Message BrokerJMSEnterpriseJavaQueuingPublish Subscribe

Server

Apache ActiveMQ

Overview

Apache ActiveMQ is the most popular open-source, multi-protocol, Java-based message broker that supports industry standard protocols, enabling users to benefit from client choices across a broad range of languages and platforms. As a JMS 1.1 compliant messaging solution, it provides robust messaging capabilities through support for OpenWire, STOMP, MQTT, AMQP, REST, and WebSockets protocols. ActiveMQ supports both point-to-point and publish-subscribe messaging patterns, efficiently handling thousands of concurrent client connections with high scalability and enterprise-grade reliability.

Details

Apache ActiveMQ 2025 edition continues to serve as a comprehensive enterprise messaging solution built on decades of proven stability and reliability. ActiveMQ Classic, with version 6.1.7 as the latest release, provides partial Jakarta Messaging 3.1 and JMS 2.0 support along with full JMS 1.1 compliance. Apache ActiveMQ Artemis represents the next-generation architecture designed for higher performance and scalability. The platform offers Network of Brokers clustering, high availability through shared storage or replication, flexible persistence options including KahaDB, LevelDB, and JDBC, and comprehensive enterprise features including JMX monitoring, dead letter queues, and virtual topics.

Key Features

  • Full JMS Compliance: Complete JMS 1.1 implementation with Jakarta Messaging support
  • Multi-Protocol Support: OpenWire, STOMP, MQTT, AMQP, REST, and WebSockets
  • High Scalability: Efficiently handles thousands of concurrent client connections
  • Enterprise Features: Network of Brokers, high availability, persistence, JMX monitoring
  • Flexible Deployment: Standalone, embedded, and clustered configurations
  • Multi-Language Clients: Rich client support for Java, .NET, C++, Python, Ruby, and more

Pros and Cons

Pros

  • Extensive track record and mature ecosystem in Java enterprise environments
  • Full compliance with JMS standards implementing industry best practices
  • Flexible scale-out capabilities through Network of Brokers architecture
  • Excellent management and monitoring capabilities via web console
  • Multiple persistence options including KahaDB and JDBC
  • Seamless integration with Apache Camel for enterprise integration patterns

Cons

  • Performance limitations compared to Artemis or lightweight brokers for high-throughput scenarios
  • Higher memory footprint compared to Redis or NATS
  • Complex cluster configuration and high availability setup
  • Requires specialized knowledge for initial configuration and optimization
  • Slower adoption of newer protocols due to legacy architecture
  • GC tuning requirements in large-scale environments

Reference Pages

Code Examples

Installation and Basic Setup

# Download and Install ActiveMQ Classic
wget https://archive.apache.org/dist/activemq/6.1.7/apache-activemq-6.1.7-bin.tar.gz
tar -xzf apache-activemq-6.1.7-bin.tar.gz
sudo mv apache-activemq-6.1.7 /opt/activemq
sudo useradd -r -s /bin/false activemq
sudo chown -R activemq:activemq /opt/activemq

# System service configuration
sudo tee /etc/systemd/system/activemq.service << 'EOF'
[Unit]
Description=Apache ActiveMQ
After=network.target

[Service]
Type=forking
User=activemq
Group=activemq
ExecStart=/opt/activemq/bin/activemq start
ExecStop=/opt/activemq/bin/activemq stop
ExecReload=/opt/activemq/bin/activemq restart
WorkingDirectory=/opt/activemq
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64

[Install]
WantedBy=multi-user.target
EOF

# Enable and start service
sudo systemctl enable activemq
sudo systemctl start activemq

# Check status
sudo systemctl status activemq
netstat -tlnp | grep :61616  # OpenWire protocol
netstat -tlnp | grep :8161   # Web management console

# Web management console access
# http://localhost:8161/admin (admin/admin)

Basic JMS Producer and Consumer

// Maven dependencies
// <dependency>
//   <groupId>org.apache.activemq</groupId>
//   <artifactId>activemq-client</artifactId>
//   <version>6.1.7</version>
// </dependency>

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.CountDownLatch;

// JMS Producer example
public class MessageProducer {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String QUEUE_NAME = "test.queue";
    
    public static void main(String[] args) throws Exception {
        // Create connection factory
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
        
        // Create connection and session
        Connection connection = factory.createConnection();
        connection.start();
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // Create producer
        javax.jms.MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        
        // Send messages
        for (int i = 0; i < 100; i++) {
            TextMessage message = session.createTextMessage("Hello ActiveMQ! Message " + i);
            message.setStringProperty("id", String.valueOf(i));
            message.setStringProperty("type", "test");
            producer.send(message);
            System.out.println("Sent: " + message.getText());
        }
        
        // Cleanup resources
        producer.close();
        session.close();
        connection.close();
    }
}

// JMS Consumer example
public class MessageConsumer {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String QUEUE_NAME = "test.queue";
    
    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = factory.createConnection();
        connection.start();
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // Create consumer
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        
        // Set message listener
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        String id = message.getStringProperty("id");
                        String type = message.getStringProperty("type");
                        
                        System.out.println("Received: " + textMessage.getText());
                        System.out.println("ID: " + id + ", Type: " + type);
                        
                        // Message processing logic
                        processMessage(textMessage);
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        
        // Wait for messages
        System.out.println("Waiting for messages... (Press Enter to exit)");
        System.in.read();
        
        consumer.close();
        session.close();
        connection.close();
    }
    
    private static void processMessage(TextMessage message) throws JMSException {
        // Business logic processing
        System.out.println("Processing: " + message.getText());
        
        // Simulate processing time
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// Publish-Subscribe example
public class TopicPublisher {
    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("news.updates");
        
        javax.jms.MessageProducer producer = session.createProducer(topic);
        
        // Publish messages to topic
        for (int i = 0; i < 10; i++) {
            TextMessage message = session.createTextMessage("News Update " + i);
            message.setStringProperty("category", "technology");
            message.setLongProperty("timestamp", System.currentTimeMillis());
            
            producer.send(message);
            System.out.println("Published: " + message.getText());
            Thread.sleep(1000);
        }
        
        producer.close();
        session.close();
        connection.close();
    }
}

Advanced Configuration and Performance Tuning

<!-- activemq.xml configuration example -->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

  <broker xmlns="http://activemq.apache.org/schema/core" 
          brokerName="localhost" 
          dataDirectory="${activemq.data}"
          schedulerSupport="true"
          useJmx="true">

    <!-- Persistence configuration -->
    <persistenceAdapter>
      <kahaDB directory="${activemq.data}/kahadb"
              journalMaxFileLength="32mb"
              enableIndexWriteAsync="true"
              enableJournalDiskSyncs="false"
              cleanupInterval="30000"/>
    </persistenceAdapter>

    <!-- Memory configuration -->
    <systemUsage>
      <systemUsage>
        <memoryUsage>
          <memoryUsage percentOfJvmHeap="70"/>
        </memoryUsage>
        <storeUsage>
          <storeUsage limit="100gb"/>
        </storeUsage>
        <tempUsage>
          <tempUsage limit="50gb"/>
        </tempUsage>
      </systemUsage>
    </systemUsage>

    <!-- Dead letter queue configuration -->
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic=">" producerFlowControl="true">
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="1000"/>
            </pendingMessageLimitStrategy>
          </policyEntry>
          
          <policyEntry queue=">" producerFlowControl="true" 
                       memoryLimit="10mb" useCache="false">
            <deadLetterStrategy>
              <individualDeadLetterStrategy 
                queuePrefix="DLQ." 
                useQueueForQueueMessages="true"/>
            </deadLetterStrategy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>

    <!-- Transport connectors -->
    <transportConnectors>
      <transportConnector name="openwire" 
                         uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
      <transportConnector name="stomp" 
                         uri="stomp://0.0.0.0:61613?maximumConnections=1000"/>
      <transportConnector name="mqtt" 
                         uri="mqtt://0.0.0.0:1883?maximumConnections=1000"/>
      <transportConnector name="ws" 
                         uri="ws://0.0.0.0:61614?maximumConnections=1000"/>
    </transportConnectors>

    <!-- Network configuration (clustering) -->
    <networkConnectors>
      <networkConnector uri="static:(tcp://broker2:61616,tcp://broker3:61616)"
                       duplex="true"
                       networkTTL="3"
                       dynamicOnly="true"
                       conduitSubscriptions="true"/>
    </networkConnectors>

    <!-- Plugins -->
    <plugins>
      <statisticsBrokerPlugin/>
      <simpleAuthenticationPlugin>
        <users>
          <authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>
          <authenticationUser username="producer" password="producer" groups="publishers"/>
          <authenticationUser username="consumer" password="consumer" groups="consumers"/>
        </users>
      </simpleAuthenticationPlugin>
      
      <authorizationPlugin>
        <map>
          <authorizationMap>
            <authorizationEntries>
              <authorizationEntry queue=">" write="admins,publishers" read="admins,consumers" admin="admins"/>
              <authorizationEntry topic=">" write="admins,publishers" read="admins,consumers" admin="admins"/>
            </authorizationEntries>
          </authorizationMap>
        </map>
      </authorizationPlugin>
    </plugins>

  </broker>

  <!-- JMX configuration -->
  <import resource="jetty.xml"/>
  
</beans>

Spring Boot Integration and Connection Configuration

// application.yml
/*
spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
    in-memory: false
    pool:
      enabled: true
      max-connections: 50
      idle-timeout: 30s
      expiry-timeout: 10s
*/

// Spring Boot configuration class
@Configuration
@EnableJms
public class ActiveMQConfig {
    
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;
    
    @Value("${spring.activemq.user}")
    private String user;
    
    @Value("${spring.activemq.password}")
    private String password;
    
    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerUrl);
        factory.setUserName(user);
        factory.setPassword(password);
        
        // Performance tuning
        factory.setUseAsyncSend(true);
        factory.setOptimizeAcknowledge(true);
        factory.setMaxThreadPoolSize(50);
        factory.setOptimizedMessageDispatch(true);
        
        return factory;
    }
    
    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory());
        template.setDefaultDestinationName("default.queue");
        template.setDeliveryPersistent(true);
        template.setExplicitQosEnabled(true);
        template.setTimeToLive(300000);  // 5 minutes
        return template;
    }
    
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("3-10");
        factory.setReceiveTimeout(5000L);
        factory.setSessionTransacted(true);
        return factory;
    }
}

// Spring Boot messaging service
@Service
public class MessageService {
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    // Send message
    public void sendMessage(String destination, Object message) {
        jmsTemplate.convertAndSend(destination, message, msg -> {
            msg.setStringProperty("sender", "spring-app");
            msg.setLongProperty("timestamp", System.currentTimeMillis());
            return msg;
        });
    }
    
    // Receive queue messages
    @JmsListener(destination = "order.queue")
    public void receiveOrderMessage(String message) {
        System.out.println("Order received: " + message);
        
        try {
            // Order processing logic
            processOrder(message);
            
        } catch (Exception e) {
            System.err.println("Order processing failed: " + e.getMessage());
            throw new RuntimeException("Failed to process order", e);
        }
    }
    
    // Receive topic messages
    @JmsListener(destination = "notification.topic", 
                 containerFactory = "jmsListenerContainerFactory")
    public void receiveNotification(
            @Payload String message,
            @Header Map<String, Object> headers) {
        
        System.out.println("Notification: " + message);
        System.out.println("Headers: " + headers);
        
        // Notification processing
        handleNotification(message, headers);
    }
    
    private void processOrder(String orderData) {
        // Order processing implementation
        System.out.println("Processing order: " + orderData);
    }
    
    private void handleNotification(String message, Map<String, Object> headers) {
        // Notification processing implementation
        System.out.println("Handling notification: " + message);
    }
}

// REST controller example
@RestController
@RequestMapping("/api/messages")
public class MessageController {
    
    @Autowired
    private MessageService messageService;
    
    @PostMapping("/send")
    public ResponseEntity<String> sendMessage(@RequestBody MessageRequest request) {
        try {
            messageService.sendMessage(request.getDestination(), request.getMessage());
            return ResponseEntity.ok("Message sent successfully");
        } catch (Exception e) {
            return ResponseEntity.status(500).body("Failed to send message: " + e.getMessage());
        }
    }
}

Clustering and High Availability Configuration

# Network of Brokers configuration example

# Broker 1 (activemq-broker1.xml)
cat > /opt/activemq/conf/activemq-broker1.xml << 'EOF'
<beans>
  <broker xmlns="http://activemq.apache.org/schema/core" 
          brokerName="broker1" 
          dataDirectory="/opt/activemq/data/broker1">
    
    <networkConnectors>
      <networkConnector uri="static:(tcp://broker2:61616,tcp://broker3:61616)"
                       duplex="false"
                       name="broker1-connector"/>
    </networkConnectors>
    
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
    </transportConnectors>
    
  </broker>
</beans>
EOF

# High availability master-slave configuration
# Master broker configuration
cat > /opt/activemq/conf/activemq-master.xml << 'EOF'
<beans>
  <broker xmlns="http://activemq.apache.org/schema/core" 
          brokerName="master" 
          dataDirectory="/shared/activemq/data">
    
    <persistenceAdapter>
      <kahaDB directory="/shared/activemq/data/kahadb"
              lockKeepAlivePeriod="5000"/>
    </persistenceAdapter>
    
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
    </transportConnectors>
    
  </broker>
</beans>
EOF

# Slave broker configuration
cat > /opt/activemq/conf/activemq-slave.xml << 'EOF'
<beans>
  <broker xmlns="http://activemq.apache.org/schema/core" 
          brokerName="slave" 
          dataDirectory="/shared/activemq/data">
    
    <persistenceAdapter>
      <kahaDB directory="/shared/activemq/data/kahadb"/>
    </persistenceAdapter>
    
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>
    </transportConnectors>
    
  </broker>
</beans>
EOF

# Client-side failover configuration
java_connection_string="failover:(tcp://master:61616,tcp://slave:61617)?randomize=false&timeout=3000"

Monitoring and Performance Management

// JMX monitoring example
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

public class ActiveMQMonitor {
    
    public static void main(String[] args) throws Exception {
        // JMX connection
        JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
        JMXConnector connector = JMXConnectorFactory.connect(url);
        MBeanServerConnection connection = connector.getMBeanServerConnection();
        
        // Get broker information
        ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
        String brokerId = (String) connection.getAttribute(brokerName, "BrokerId");
        String brokerVersion = (String) connection.getAttribute(brokerName, "BrokerVersion");
        
        System.out.println("Broker ID: " + brokerId);
        System.out.println("Broker Version: " + brokerVersion);
        
        // Get queue statistics
        ObjectName queueName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test.queue");
        Long queueSize = (Long) connection.getAttribute(queueName, "QueueSize");
        Long enqueueCount = (Long) connection.getAttribute(queueName, "EnqueueCount");
        Long dequeueCount = (Long) connection.getAttribute(queueName, "DequeueCount");
        
        System.out.println("Queue Size: " + queueSize);
        System.out.println("Enqueue Count: " + enqueueCount);
        System.out.println("Dequeue Count: " + dequeueCount);
        
        connector.close();
    }
}

# ActiveMQ statistics monitoring script
cat > /opt/activemq/bin/monitor.sh << 'EOF'
#!/bin/bash

echo "=== ActiveMQ Monitoring Report $(date) ==="

# Process status check
echo "=== Process Status ==="
ps aux | grep activemq | grep -v grep

# Port status check
echo "=== Port Status ==="
netstat -tlnp | grep :61616
netstat -tlnp | grep :8161

# Memory usage
echo "=== Memory Usage ==="
activemq query --objname type=Broker,brokerName=*,service=Health

# JVM statistics
echo "=== JVM Statistics ==="
jstat -gc $(pgrep -f activemq) 5s 1

# Log error check
echo "=== Recent Errors ==="
tail -n 50 /opt/activemq/data/activemq.log | grep -i error

echo "=== Monitoring Complete ==="
EOF

chmod +x /opt/activemq/bin/monitor.sh