Apache ActiveMQ

Java Message Service(JMS)対応のメッセージブローカー。多様なプロトコル対応、トランザクション、セキュリティ機能を提供。エンタープライズ環境での実績。

メッセージブローカーJMSエンタープライズJavaキューイングパブリッシュ・サブスクライブ

サーバー

Apache ActiveMQ

概要

Apache ActiveMQは、Javaベースのオープンソースメッセージブローカーとして最も人気が高く、業界標準プロトコルを幅広くサポートするマルチプロトコル対応のエンタープライズメッセージングソリューションです。JMS 1.1完全準拠により堅牢なメッセージング機能を提供し、OpenWire、STOMP、MQTT、AMQP、REST、WebSocketsなど多様なプロトコルに対応。ポイントツーポイントとパブリッシュ・サブスクライブの両方のメッセージパターンをサポートし、数千の同時接続クライアントを効率的に処理できる高いスケーラビリティを誇ります。

詳細

Apache ActiveMQ 2025年版は、長年の実績により培われた安定性と信頼性を基盤として、現代のエンタープライズ環境の要求に応える包括的なメッセージング機能を提供しています。ActiveMQ Classicとして知られる従来版は6.1.7が最新で、Jakarta Messaging 3.1とJMS 2.0の部分対応、JMS 1.1の完全対応を実現。Apache ActiveMQ Artemisは次世代アーキテクチャとして、より高いパフォーマンスとスケーラビリティを追求した設計となっています。Network of Brokersによるクラスタリング、共有ストレージやレプリケーションによる高可用性、KahaDB・LevelDB・JDBCによる柔軟な永続化オプションを提供し、エンタープライズグレードのメッセージング要件を満たします。

主な特徴

  • JMS完全準拠: JMS 1.1標準の完全実装とJakarta Messaging対応
  • マルチプロトコル対応: OpenWire、STOMP、MQTT、AMQP、RESTなど幅広いプロトコル
  • 高いスケーラビリティ: 数千の同時接続クライアントを効率的に処理
  • エンタープライズ機能: Network of Brokers、高可用性、永続化、JMX監視
  • 柔軟な展開: スタンドアロン、組み込み、クラスタ構成に対応
  • 多言語クライアント: Java、.NET、C++、Python、Ruby等の豊富なクライアント

メリット・デメリット

メリット

  • Javaエンタープライズ環境での豊富な実績と成熟したエコシステム
  • JMS標準に完全準拠し業界ベストプラクティスを実装
  • Network of Brokersによる柔軟なスケールアウト能力
  • Webコンソールによる優れた管理性と監視機能
  • KahaDB、JDBC等の多様な永続化オプション
  • Apache Camelとの統合によるエンタープライズ統合パターンの実現

デメリット

  • 高スループット要件にはArtemisやより軽量なブローカーが優位
  • メモリ使用量がRedisやNATSより多い傾向
  • クラスタ設定と高可用性構成の複雑さ
  • 初期設定と最適化に専門知識が必要
  • レガシーなアーキテクチャによる新しいプロトコル対応の遅れ
  • 大規模環境でのGCチューニングの必要性

参考ページ

書き方の例

インストールと基本セットアップ

# ActiveMQ Classic ダウンロードとインストール
wget https://archive.apache.org/dist/activemq/6.1.7/apache-activemq-6.1.7-bin.tar.gz
tar -xzf apache-activemq-6.1.7-bin.tar.gz
sudo mv apache-activemq-6.1.7 /opt/activemq
sudo useradd -r -s /bin/false activemq
sudo chown -R activemq:activemq /opt/activemq

# システムサービス設定
sudo tee /etc/systemd/system/activemq.service << 'EOF'
[Unit]
Description=Apache ActiveMQ
After=network.target

[Service]
Type=forking
User=activemq
Group=activemq
ExecStart=/opt/activemq/bin/activemq start
ExecStop=/opt/activemq/bin/activemq stop
ExecReload=/opt/activemq/bin/activemq restart
WorkingDirectory=/opt/activemq
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64

[Install]
WantedBy=multi-user.target
EOF

# サービス有効化・起動
sudo systemctl enable activemq
sudo systemctl start activemq

# 状態確認
sudo systemctl status activemq
netstat -tlnp | grep :61616  # OpenWire プロトコル
netstat -tlnp | grep :8161   # Web管理コンソール

# Web管理コンソールアクセス
# http://localhost:8161/admin (admin/admin)

基本的なJMS プロデューサー・コンシューマー

// Maven依存関係
// <dependency>
//   <groupId>org.apache.activemq</groupId>
//   <artifactId>activemq-client</artifactId>
//   <version>6.1.7</version>
// </dependency>

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.CountDownLatch;

// JMS プロデューサー例
public class MessageProducer {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String QUEUE_NAME = "test.queue";
    
    public static void main(String[] args) throws Exception {
        // 接続ファクトリー作成
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
        
        // 接続とセッション作成
        Connection connection = factory.createConnection();
        connection.start();
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // プロデューサー作成
        javax.jms.MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        
        // メッセージ送信
        for (int i = 0; i < 100; i++) {
            TextMessage message = session.createTextMessage("Hello ActiveMQ! Message " + i);
            message.setStringProperty("id", String.valueOf(i));
            message.setStringProperty("type", "test");
            producer.send(message);
            System.out.println("Sent: " + message.getText());
        }
        
        // リソースクリーンアップ
        producer.close();
        session.close();
        connection.close();
    }
}

// JMS コンシューマー例
public class MessageConsumer {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String QUEUE_NAME = "test.queue";
    
    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = factory.createConnection();
        connection.start();
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // コンシューマー作成
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        
        // メッセージリスナー設定
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        String id = message.getStringProperty("id");
                        String type = message.getStringProperty("type");
                        
                        System.out.println("Received: " + textMessage.getText());
                        System.out.println("ID: " + id + ", Type: " + type);
                        
                        // メッセージ処理ロジック
                        processMessage(textMessage);
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        
        // メッセージ待機
        System.out.println("Waiting for messages... (Press Enter to exit)");
        System.in.read();
        
        consumer.close();
        session.close();
        connection.close();
    }
    
    private static void processMessage(TextMessage message) throws JMSException {
        // ビジネスロジック処理
        System.out.println("Processing: " + message.getText());
        
        // 処理時間のシミュレーション
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// パブリッシュ・サブスクライブ例
public class TopicPublisher {
    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();
        
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("news.updates");
        
        javax.jms.MessageProducer producer = session.createProducer(topic);
        
        // トピックにメッセージ発行
        for (int i = 0; i < 10; i++) {
            TextMessage message = session.createTextMessage("News Update " + i);
            message.setStringProperty("category", "technology");
            message.setLongProperty("timestamp", System.currentTimeMillis());
            
            producer.send(message);
            System.out.println("Published: " + message.getText());
            Thread.sleep(1000);
        }
        
        producer.close();
        session.close();
        connection.close();
    }
}

高度な設定とパフォーマンスチューニング

<!-- activemq.xml 設定例 -->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

  <broker xmlns="http://activemq.apache.org/schema/core" 
          brokerName="localhost" 
          dataDirectory="${activemq.data}"
          schedulerSupport="true"
          useJmx="true">

    <!-- 永続化設定 -->
    <persistenceAdapter>
      <kahaDB directory="${activemq.data}/kahadb"
              journalMaxFileLength="32mb"
              enableIndexWriteAsync="true"
              enableJournalDiskSyncs="false"
              cleanupInterval="30000"/>
    </persistenceAdapter>

    <!-- メモリ設定 -->
    <systemUsage>
      <systemUsage>
        <memoryUsage>
          <memoryUsage percentOfJvmHeap="70"/>
        </memoryUsage>
        <storeUsage>
          <storeUsage limit="100gb"/>
        </storeUsage>
        <tempUsage>
          <tempUsage limit="50gb"/>
        </tempUsage>
      </systemUsage>
    </systemUsage>

    <!-- デッドレターキュー設定 -->
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic=">" producerFlowControl="true">
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="1000"/>
            </pendingMessageLimitStrategy>
          </policyEntry>
          
          <policyEntry queue=">" producerFlowControl="true" 
                       memoryLimit="10mb" useCache="false">
            <deadLetterStrategy>
              <individualDeadLetterStrategy 
                queuePrefix="DLQ." 
                useQueueForQueueMessages="true"/>
            </deadLetterStrategy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>

    <!-- トランスポートコネクタ -->
    <transportConnectors>
      <transportConnector name="openwire" 
                         uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
      <transportConnector name="stomp" 
                         uri="stomp://0.0.0.0:61613?maximumConnections=1000"/>
      <transportConnector name="mqtt" 
                         uri="mqtt://0.0.0.0:1883?maximumConnections=1000"/>
      <transportConnector name="ws" 
                         uri="ws://0.0.0.0:61614?maximumConnections=1000"/>
    </transportConnectors>

    <!-- ネットワーク設定 (クラスタリング) -->
    <networkConnectors>
      <networkConnector uri="static:(tcp://broker2:61616,tcp://broker3:61616)"
                       duplex="true"
                       networkTTL="3"
                       dynamicOnly="true"
                       conduitSubscriptions="true"/>
    </networkConnectors>

    <!-- プラグイン -->
    <plugins>
      <statisticsBrokerPlugin/>
      <simpleAuthenticationPlugin>
        <users>
          <authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>
          <authenticationUser username="producer" password="producer" groups="publishers"/>
          <authenticationUser username="consumer" password="consumer" groups="consumers"/>
        </users>
      </simpleAuthenticationPlugin>
      
      <authorizationPlugin>
        <map>
          <authorizationMap>
            <authorizationEntries>
              <authorizationEntry queue=">" write="admins,publishers" read="admins,consumers" admin="admins"/>
              <authorizationEntry topic=">" write="admins,publishers" read="admins,consumers" admin="admins"/>
            </authorizationEntries>
          </authorizationMap>
        </map>
      </authorizationPlugin>
    </plugins>

  </broker>

  <!-- JMX設定 -->
  <import resource="jetty.xml"/>
  
</beans>

Spring Boot統合とコネクション設定

// application.yml
/*
spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
    in-memory: false
    pool:
      enabled: true
      max-connections: 50
      idle-timeout: 30s
      expiry-timeout: 10s
*/

// Spring Boot設定クラス
@Configuration
@EnableJms
public class ActiveMQConfig {
    
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;
    
    @Value("${spring.activemq.user}")
    private String user;
    
    @Value("${spring.activemq.password}")
    private String password;
    
    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerUrl);
        factory.setUserName(user);
        factory.setPassword(password);
        
        // パフォーマンスチューニング
        factory.setUseAsyncSend(true);
        factory.setOptimizeAcknowledge(true);
        factory.setMaxThreadPoolSize(50);
        factory.setOptimizedMessageDispatch(true);
        
        return factory;
    }
    
    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory());
        template.setDefaultDestinationName("default.queue");
        template.setDeliveryPersistent(true);
        template.setExplicitQosEnabled(true);
        template.setTimeToLive(300000);  // 5分
        return template;
    }
    
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("3-10");
        factory.setReceiveTimeout(5000L);
        factory.setSessionTransacted(true);
        return factory;
    }
}

// Spring Boot メッセージングサービス
@Service
public class MessageService {
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    // メッセージ送信
    public void sendMessage(String destination, Object message) {
        jmsTemplate.convertAndSend(destination, message, msg -> {
            msg.setStringProperty("sender", "spring-app");
            msg.setLongProperty("timestamp", System.currentTimeMillis());
            return msg;
        });
    }
    
    // キューメッセージ受信
    @JmsListener(destination = "order.queue")
    public void receiveOrderMessage(String message) {
        System.out.println("Order received: " + message);
        
        try {
            // 注文処理ロジック
            processOrder(message);
            
        } catch (Exception e) {
            System.err.println("Order processing failed: " + e.getMessage());
            throw new RuntimeException("Failed to process order", e);
        }
    }
    
    // トピックメッセージ受信
    @JmsListener(destination = "notification.topic", 
                 containerFactory = "jmsListenerContainerFactory")
    public void receiveNotification(
            @Payload String message,
            @Header Map<String, Object> headers) {
        
        System.out.println("Notification: " + message);
        System.out.println("Headers: " + headers);
        
        // 通知処理
        handleNotification(message, headers);
    }
    
    private void processOrder(String orderData) {
        // 注文処理実装
        System.out.println("Processing order: " + orderData);
    }
    
    private void handleNotification(String message, Map<String, Object> headers) {
        // 通知処理実装
        System.out.println("Handling notification: " + message);
    }
}

// RESTコントローラー例
@RestController
@RequestMapping("/api/messages")
public class MessageController {
    
    @Autowired
    private MessageService messageService;
    
    @PostMapping("/send")
    public ResponseEntity<String> sendMessage(@RequestBody MessageRequest request) {
        try {
            messageService.sendMessage(request.getDestination(), request.getMessage());
            return ResponseEntity.ok("Message sent successfully");
        } catch (Exception e) {
            return ResponseEntity.status(500).body("Failed to send message: " + e.getMessage());
        }
    }
}

クラスタリングと高可用性設定

# Network of Brokers設定例

# ブローカー1 (activemq-broker1.xml)
cat > /opt/activemq/conf/activemq-broker1.xml << 'EOF'
<beans>
  <broker xmlns="http://activemq.apache.org/schema/core" 
          brokerName="broker1" 
          dataDirectory="/opt/activemq/data/broker1">
    
    <networkConnectors>
      <networkConnector uri="static:(tcp://broker2:61616,tcp://broker3:61616)"
                       duplex="false"
                       name="broker1-connector"/>
    </networkConnectors>
    
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
    </transportConnectors>
    
  </broker>
</beans>
EOF

# 高可用性マスター・スレーブ設定
# マスターブローカー設定
cat > /opt/activemq/conf/activemq-master.xml << 'EOF'
<beans>
  <broker xmlns="http://activemq.apache.org/schema/core" 
          brokerName="master" 
          dataDirectory="/shared/activemq/data">
    
    <persistenceAdapter>
      <kahaDB directory="/shared/activemq/data/kahadb"
              lockKeepAlivePeriod="5000"/>
    </persistenceAdapter>
    
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
    </transportConnectors>
    
  </broker>
</beans>
EOF

# スレーブブローカー設定
cat > /opt/activemq/conf/activemq-slave.xml << 'EOF'
<beans>
  <broker xmlns="http://activemq.apache.org/schema/core" 
          brokerName="slave" 
          dataDirectory="/shared/activemq/data">
    
    <persistenceAdapter>
      <kahaDB directory="/shared/activemq/data/kahadb"/>
    </persistenceAdapter>
    
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>
    </transportConnectors>
    
  </broker>
</beans>
EOF

# クライアント側フェイルオーバー設定
java_connection_string="failover:(tcp://master:61616,tcp://slave:61617)?randomize=false&timeout=3000"

監視とパフォーマンス管理

// JMX監視例
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

public class ActiveMQMonitor {
    
    public static void main(String[] args) throws Exception {
        // JMX接続
        JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
        JMXConnector connector = JMXConnectorFactory.connect(url);
        MBeanServerConnection connection = connector.getMBeanServerConnection();
        
        // ブローカー情報取得
        ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
        String brokerId = (String) connection.getAttribute(brokerName, "BrokerId");
        String brokerVersion = (String) connection.getAttribute(brokerName, "BrokerVersion");
        
        System.out.println("Broker ID: " + brokerId);
        System.out.println("Broker Version: " + brokerVersion);
        
        // キュー統計取得
        ObjectName queueName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test.queue");
        Long queueSize = (Long) connection.getAttribute(queueName, "QueueSize");
        Long enqueueCount = (Long) connection.getAttribute(queueName, "EnqueueCount");
        Long dequeueCount = (Long) connection.getAttribute(queueName, "DequeueCount");
        
        System.out.println("Queue Size: " + queueSize);
        System.out.println("Enqueue Count: " + enqueueCount);
        System.out.println("Dequeue Count: " + dequeueCount);
        
        connector.close();
    }
}

# ActiveMQ統計情報取得スクリプト
cat > /opt/activemq/bin/monitor.sh << 'EOF'
#!/bin/bash

echo "=== ActiveMQ Monitoring Report $(date) ==="

# プロセス状態確認
echo "=== Process Status ==="
ps aux | grep activemq | grep -v grep

# ポート状態確認
echo "=== Port Status ==="
netstat -tlnp | grep :61616
netstat -tlnp | grep :8161

# メモリ使用量
echo "=== Memory Usage ==="
activemq query --objname type=Broker,brokerName=*,service=Health

# JVM統計
echo "=== JVM Statistics ==="
jstat -gc $(pgrep -f activemq) 5s 1

# ログエラー確認
echo "=== Recent Errors ==="
tail -n 50 /opt/activemq/data/activemq.log | grep -i error

echo "=== Monitoring Complete ==="
EOF

chmod +x /opt/activemq/bin/monitor.sh