Apache ActiveMQ
Java Message Service(JMS)対応のメッセージブローカー。多様なプロトコル対応、トランザクション、セキュリティ機能を提供。エンタープライズ環境での実績。
サーバー
Apache ActiveMQ
概要
Apache ActiveMQは、Javaベースのオープンソースメッセージブローカーとして最も人気が高く、業界標準プロトコルを幅広くサポートするマルチプロトコル対応のエンタープライズメッセージングソリューションです。JMS 1.1完全準拠により堅牢なメッセージング機能を提供し、OpenWire、STOMP、MQTT、AMQP、REST、WebSocketsなど多様なプロトコルに対応。ポイントツーポイントとパブリッシュ・サブスクライブの両方のメッセージパターンをサポートし、数千の同時接続クライアントを効率的に処理できる高いスケーラビリティを誇ります。
詳細
Apache ActiveMQ 2025年版は、長年の実績により培われた安定性と信頼性を基盤として、現代のエンタープライズ環境の要求に応える包括的なメッセージング機能を提供しています。ActiveMQ Classicとして知られる従来版は6.1.7が最新で、Jakarta Messaging 3.1とJMS 2.0の部分対応、JMS 1.1の完全対応を実現。Apache ActiveMQ Artemisは次世代アーキテクチャとして、より高いパフォーマンスとスケーラビリティを追求した設計となっています。Network of Brokersによるクラスタリング、共有ストレージやレプリケーションによる高可用性、KahaDB・LevelDB・JDBCによる柔軟な永続化オプションを提供し、エンタープライズグレードのメッセージング要件を満たします。
主な特徴
- JMS完全準拠: JMS 1.1標準の完全実装とJakarta Messaging対応
- マルチプロトコル対応: OpenWire、STOMP、MQTT、AMQP、RESTなど幅広いプロトコル
- 高いスケーラビリティ: 数千の同時接続クライアントを効率的に処理
- エンタープライズ機能: Network of Brokers、高可用性、永続化、JMX監視
- 柔軟な展開: スタンドアロン、組み込み、クラスタ構成に対応
- 多言語クライアント: Java、.NET、C++、Python、Ruby等の豊富なクライアント
メリット・デメリット
メリット
- Javaエンタープライズ環境での豊富な実績と成熟したエコシステム
- JMS標準に完全準拠し業界ベストプラクティスを実装
- Network of Brokersによる柔軟なスケールアウト能力
- Webコンソールによる優れた管理性と監視機能
- KahaDB、JDBC等の多様な永続化オプション
- Apache Camelとの統合によるエンタープライズ統合パターンの実現
デメリット
- 高スループット要件にはArtemisやより軽量なブローカーが優位
- メモリ使用量がRedisやNATSより多い傾向
- クラスタ設定と高可用性構成の複雑さ
- 初期設定と最適化に専門知識が必要
- レガシーなアーキテクチャによる新しいプロトコル対応の遅れ
- 大規模環境でのGCチューニングの必要性
参考ページ
書き方の例
インストールと基本セットアップ
# ActiveMQ Classic ダウンロードとインストール
wget https://archive.apache.org/dist/activemq/6.1.7/apache-activemq-6.1.7-bin.tar.gz
tar -xzf apache-activemq-6.1.7-bin.tar.gz
sudo mv apache-activemq-6.1.7 /opt/activemq
sudo useradd -r -s /bin/false activemq
sudo chown -R activemq:activemq /opt/activemq
# システムサービス設定
sudo tee /etc/systemd/system/activemq.service << 'EOF'
[Unit]
Description=Apache ActiveMQ
After=network.target
[Service]
Type=forking
User=activemq
Group=activemq
ExecStart=/opt/activemq/bin/activemq start
ExecStop=/opt/activemq/bin/activemq stop
ExecReload=/opt/activemq/bin/activemq restart
WorkingDirectory=/opt/activemq
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
[Install]
WantedBy=multi-user.target
EOF
# サービス有効化・起動
sudo systemctl enable activemq
sudo systemctl start activemq
# 状態確認
sudo systemctl status activemq
netstat -tlnp | grep :61616 # OpenWire プロトコル
netstat -tlnp | grep :8161 # Web管理コンソール
# Web管理コンソールアクセス
# http://localhost:8161/admin (admin/admin)
基本的なJMS プロデューサー・コンシューマー
// Maven依存関係
// <dependency>
// <groupId>org.apache.activemq</groupId>
// <artifactId>activemq-client</artifactId>
// <version>6.1.7</version>
// </dependency>
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.CountDownLatch;
// JMS プロデューサー例
public class MessageProducer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "test.queue";
public static void main(String[] args) throws Exception {
// 接続ファクトリー作成
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
// 接続とセッション作成
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
// プロデューサー作成
javax.jms.MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// メッセージ送信
for (int i = 0; i < 100; i++) {
TextMessage message = session.createTextMessage("Hello ActiveMQ! Message " + i);
message.setStringProperty("id", String.valueOf(i));
message.setStringProperty("type", "test");
producer.send(message);
System.out.println("Sent: " + message.getText());
}
// リソースクリーンアップ
producer.close();
session.close();
connection.close();
}
}
// JMS コンシューマー例
public class MessageConsumer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "test.queue";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
// コンシューマー作成
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
// メッセージリスナー設定
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String id = message.getStringProperty("id");
String type = message.getStringProperty("type");
System.out.println("Received: " + textMessage.getText());
System.out.println("ID: " + id + ", Type: " + type);
// メッセージ処理ロジック
processMessage(textMessage);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// メッセージ待機
System.out.println("Waiting for messages... (Press Enter to exit)");
System.in.read();
consumer.close();
session.close();
connection.close();
}
private static void processMessage(TextMessage message) throws JMSException {
// ビジネスロジック処理
System.out.println("Processing: " + message.getText());
// 処理時間のシミュレーション
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// パブリッシュ・サブスクライブ例
public class TopicPublisher {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("news.updates");
javax.jms.MessageProducer producer = session.createProducer(topic);
// トピックにメッセージ発行
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("News Update " + i);
message.setStringProperty("category", "technology");
message.setLongProperty("timestamp", System.currentTimeMillis());
producer.send(message);
System.out.println("Published: " + message.getText());
Thread.sleep(1000);
}
producer.close();
session.close();
connection.close();
}
}
高度な設定とパフォーマンスチューニング
<!-- activemq.xml 設定例 -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost"
dataDirectory="${activemq.data}"
schedulerSupport="true"
useJmx="true">
<!-- 永続化設定 -->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"
journalMaxFileLength="32mb"
enableIndexWriteAsync="true"
enableJournalDiskSyncs="false"
cleanupInterval="30000"/>
</persistenceAdapter>
<!-- メモリ設定 -->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="100gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!-- デッドレターキュー設定 -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
memoryLimit="10mb" useCache="false">
<deadLetterStrategy>
<individualDeadLetterStrategy
queuePrefix="DLQ."
useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!-- トランスポートコネクタ -->
<transportConnectors>
<transportConnector name="openwire"
uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp"
uri="stomp://0.0.0.0:61613?maximumConnections=1000"/>
<transportConnector name="mqtt"
uri="mqtt://0.0.0.0:1883?maximumConnections=1000"/>
<transportConnector name="ws"
uri="ws://0.0.0.0:61614?maximumConnections=1000"/>
</transportConnectors>
<!-- ネットワーク設定 (クラスタリング) -->
<networkConnectors>
<networkConnector uri="static:(tcp://broker2:61616,tcp://broker3:61616)"
duplex="true"
networkTTL="3"
dynamicOnly="true"
conduitSubscriptions="true"/>
</networkConnectors>
<!-- プラグイン -->
<plugins>
<statisticsBrokerPlugin/>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>
<authenticationUser username="producer" password="producer" groups="publishers"/>
<authenticationUser username="consumer" password="consumer" groups="consumers"/>
</users>
</simpleAuthenticationPlugin>
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" write="admins,publishers" read="admins,consumers" admin="admins"/>
<authorizationEntry topic=">" write="admins,publishers" read="admins,consumers" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
</broker>
<!-- JMX設定 -->
<import resource="jetty.xml"/>
</beans>
Spring Boot統合とコネクション設定
// application.yml
/*
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
in-memory: false
pool:
enabled: true
max-connections: 50
idle-timeout: 30s
expiry-timeout: 10s
*/
// Spring Boot設定クラス
@Configuration
@EnableJms
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String user;
@Value("${spring.activemq.password}")
private String password;
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(brokerUrl);
factory.setUserName(user);
factory.setPassword(password);
// パフォーマンスチューニング
factory.setUseAsyncSend(true);
factory.setOptimizeAcknowledge(true);
factory.setMaxThreadPoolSize(50);
factory.setOptimizedMessageDispatch(true);
return factory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setDefaultDestinationName("default.queue");
template.setDeliveryPersistent(true);
template.setExplicitQosEnabled(true);
template.setTimeToLive(300000); // 5分
return template;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("3-10");
factory.setReceiveTimeout(5000L);
factory.setSessionTransacted(true);
return factory;
}
}
// Spring Boot メッセージングサービス
@Service
public class MessageService {
@Autowired
private JmsTemplate jmsTemplate;
// メッセージ送信
public void sendMessage(String destination, Object message) {
jmsTemplate.convertAndSend(destination, message, msg -> {
msg.setStringProperty("sender", "spring-app");
msg.setLongProperty("timestamp", System.currentTimeMillis());
return msg;
});
}
// キューメッセージ受信
@JmsListener(destination = "order.queue")
public void receiveOrderMessage(String message) {
System.out.println("Order received: " + message);
try {
// 注文処理ロジック
processOrder(message);
} catch (Exception e) {
System.err.println("Order processing failed: " + e.getMessage());
throw new RuntimeException("Failed to process order", e);
}
}
// トピックメッセージ受信
@JmsListener(destination = "notification.topic",
containerFactory = "jmsListenerContainerFactory")
public void receiveNotification(
@Payload String message,
@Header Map<String, Object> headers) {
System.out.println("Notification: " + message);
System.out.println("Headers: " + headers);
// 通知処理
handleNotification(message, headers);
}
private void processOrder(String orderData) {
// 注文処理実装
System.out.println("Processing order: " + orderData);
}
private void handleNotification(String message, Map<String, Object> headers) {
// 通知処理実装
System.out.println("Handling notification: " + message);
}
}
// RESTコントローラー例
@RestController
@RequestMapping("/api/messages")
public class MessageController {
@Autowired
private MessageService messageService;
@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody MessageRequest request) {
try {
messageService.sendMessage(request.getDestination(), request.getMessage());
return ResponseEntity.ok("Message sent successfully");
} catch (Exception e) {
return ResponseEntity.status(500).body("Failed to send message: " + e.getMessage());
}
}
}
クラスタリングと高可用性設定
# Network of Brokers設定例
# ブローカー1 (activemq-broker1.xml)
cat > /opt/activemq/conf/activemq-broker1.xml << 'EOF'
<beans>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="broker1"
dataDirectory="/opt/activemq/data/broker1">
<networkConnectors>
<networkConnector uri="static:(tcp://broker2:61616,tcp://broker3:61616)"
duplex="false"
name="broker1-connector"/>
</networkConnectors>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
</beans>
EOF
# 高可用性マスター・スレーブ設定
# マスターブローカー設定
cat > /opt/activemq/conf/activemq-master.xml << 'EOF'
<beans>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="master"
dataDirectory="/shared/activemq/data">
<persistenceAdapter>
<kahaDB directory="/shared/activemq/data/kahadb"
lockKeepAlivePeriod="5000"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
</beans>
EOF
# スレーブブローカー設定
cat > /opt/activemq/conf/activemq-slave.xml << 'EOF'
<beans>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="slave"
dataDirectory="/shared/activemq/data">
<persistenceAdapter>
<kahaDB directory="/shared/activemq/data/kahadb"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>
</transportConnectors>
</broker>
</beans>
EOF
# クライアント側フェイルオーバー設定
java_connection_string="failover:(tcp://master:61616,tcp://slave:61617)?randomize=false&timeout=3000"
監視とパフォーマンス管理
// JMX監視例
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
public class ActiveMQMonitor {
public static void main(String[] args) throws Exception {
// JMX接続
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
JMXConnector connector = JMXConnectorFactory.connect(url);
MBeanServerConnection connection = connector.getMBeanServerConnection();
// ブローカー情報取得
ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
String brokerId = (String) connection.getAttribute(brokerName, "BrokerId");
String brokerVersion = (String) connection.getAttribute(brokerName, "BrokerVersion");
System.out.println("Broker ID: " + brokerId);
System.out.println("Broker Version: " + brokerVersion);
// キュー統計取得
ObjectName queueName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test.queue");
Long queueSize = (Long) connection.getAttribute(queueName, "QueueSize");
Long enqueueCount = (Long) connection.getAttribute(queueName, "EnqueueCount");
Long dequeueCount = (Long) connection.getAttribute(queueName, "DequeueCount");
System.out.println("Queue Size: " + queueSize);
System.out.println("Enqueue Count: " + enqueueCount);
System.out.println("Dequeue Count: " + dequeueCount);
connector.close();
}
}
# ActiveMQ統計情報取得スクリプト
cat > /opt/activemq/bin/monitor.sh << 'EOF'
#!/bin/bash
echo "=== ActiveMQ Monitoring Report $(date) ==="
# プロセス状態確認
echo "=== Process Status ==="
ps aux | grep activemq | grep -v grep
# ポート状態確認
echo "=== Port Status ==="
netstat -tlnp | grep :61616
netstat -tlnp | grep :8161
# メモリ使用量
echo "=== Memory Usage ==="
activemq query --objname type=Broker,brokerName=*,service=Health
# JVM統計
echo "=== JVM Statistics ==="
jstat -gc $(pgrep -f activemq) 5s 1
# ログエラー確認
echo "=== Recent Errors ==="
tail -n 50 /opt/activemq/data/activemq.log | grep -i error
echo "=== Monitoring Complete ==="
EOF
chmod +x /opt/activemq/bin/monitor.sh