AWS Compute Blog

Migrating from RabbitMQ to Amazon MQ

This post is courtesy of Sam Dengler, AWS Solutions Architect.

UPDATE –  Beginning November 4, 2020, Amazon MQ introduced support for RabbitMQ, so you can now migrate your existing RabbitMQ message brokers to AWS without having to rewrite code. You can learn how to migrate your applications through this easier process in this updated blog post.

Message brokers can be used to solve a number of needs in enterprise architectures, including managing workload queues and broadcasting messages to a number of subscribers. Some AWS customers are using RabbitMQ today and would like to migrate to a managed service to reduce the overhead of operating their own message broker.

Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easier to operate and scale message brokers in the cloud. Amazon MQ provides compatibility with your existing workloads that use standard protocols such as OpenWire, AMQP, MQTT, and Stomp (all enabled with SSL). Amazon MQ automatically provisions infrastructure configured as a single-instance broker or as an active/standby broker for high availability.

In this post, I describe how to launch a new Amazon MQ instance. I review example Java code to migrate from a RabbitMQ to Amazon MQ message broker using clients for ActiveMQ, Apache Qpid JMS, and Spring JmsTemplates. I also review best practices for Amazon MQ and changes from RabbitMQ to Amazon MQ to support Publish/Subscribe message patterns.

Getting started with Amazon MQ

To start, open the Amazon MQ console. Enter a broker name and choose Next step.

Launch a new Amazon MQ instance, choosing the mq.t2.micro instance type and Single-instance broker deployment mode, creating a user name and password, and choosing Create broker.

After several minutes, your instance changes status from Creation in progress to Running.  You can visit the Details page of your broker to retrieve connection information, including a link to the ActiveMQ web console where you can monitor the status of your instance queues, etc. In the following code examples, you use the OpenWire and AMQP endpoints.

To be able to access your broker, you must configure one of your security groups to allow inbound traffic. For more information, see the link to Detailed instructions in the blue box in the Connections section.

Now that your Amazon MQ broker is running, let’s look at some code!

Dependencies

The following code examples have dependencies across a range of libraries in order to demonstrate RabbitMQ, ActiveMQ, Qpid, Spring JMS templates, and connection pooling. I’ve listed all the dependencies in a single Maven pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>MyGroup</groupId>
    <artifactId>MyArtifact</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
    
        <!-- RabbitMQ -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.1.2</version>
        </dependency>

        <!-- Apache Connection Pooling -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.0</version>
        </dependency>
        
        <!-- Apache ActiveMQ -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.15.0</version>
        </dependency>
        
        <!-- Apache QPid -->
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-client</artifactId>
            <version>6.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-jms-client</artifactId>
            <version>0.29.0</version>
        </dependency>
                
        <!-- Spring JmsTemplate -->
         <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.0.3.RELEASE</version>
        </dependency>
        
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

    </dependencies>
</project>

RabbitMQ

Here’s an example using RabbitMQ to send and receive a message via queue. The installation and configuration of RabbitMQ is out of scope for this post. For instructions for downloading and installing RabbitMQ, see Downloading and Installing RabbitMQ.

RabbitMQ uses the AMQP 0-9-1 protocol by default, with support for AMQP 1.0 via a plugin. The RabbitMQ examples in this post use the AMQP 0-9

RabbitMQ queue example

To start, here’s some sample code to send and receive a message in RabbitMQ using a queue.

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

public class RabbitMQExample {

    private static final boolean ACKNOWLEDGE_MODE = true;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT;
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // Create a connection factory.
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Establish a connection for the producer.
        Connection producerConnection = connectionFactory.newConnection();

        // Create a channel for the producer.
        Channel producerChannel = producerConnection.createChannel();

        // Create a queue named "MyQueue".
        producerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Create a message.
        String text = "Hello from RabbitMQ!";

        // Send the message.
        producerChannel.basicPublish("", QUEUE, null, text.getBytes());
        System.out.println("Message sent: " + text);

        // Clean up the producer.
        producerChannel.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        Connection consumerConnection = connectionFactory.newConnection();

        // Create a channel for the consumer.
        Channel consumerChannel = consumerConnection.createChannel();

        // Create a queue named "MyQueue".
        consumerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Receive the message.
        GetResponse response = consumerChannel.basicGet(QUEUE, ACKNOWLEDGE_MODE);
        String message = new String(response.getBody(), "UTF-8");
        System.out.println("Message received: " + message);

        // Clean up the consumer.
        consumerChannel.close();
        consumerConnection.close();
    }
}

In this example, you need to specify the ENDPOINT, USERNAME, and PASSWORD for your RabbitMQ message broker using environment variables or dependency injection.

This example uses the RabbitMQ client library to establish connectivity to the message broker and a channel for communication. In RabbitMQ, messages are sent over the channel to a named queue, which stores messages in a buffer, and from which consumers can receive and process messages. In this example, you publish a message using the Channel.basicPublish method, using the default exchange, identified by an empty string (“”).

To receive and process the messages in the queue, create a second connection, channel, and queue. Queue declaration is an idempotent operation, so there is no harm in declaring it twice. In this example, you receive the message using the Channel.basicGet method, automatically acknowledging message receipt to the broker.

This example demonstrates the basics of sending and receiving a message of one type. However, what if you wanted to publish messages of different types such that various consumers could subscribe only to pertinent message types (that is, pub/sub)?  Here’s a RabbitMQ example using topic exchanges to route messages to different queues.

RabbitMQ topic example

This example is similar to the one earlier. To enable topic publishing, specify two additional properties: EXCHANGE and ROUTING_KEY. RabbitMQ uses the exchange and routing key properties for routing messaging. Look at how these properties change the code to publish a message.

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;

public class RabbitMQExample {

    private static final boolean ACKNOWLEDGE_MODE = true;

    // The Endpoint, Username, Password, Queue, Exhange, and Routing Key should
    // be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "amqp://localhost:5672"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";
    private static final String EXCHANGE = "MyExchange";
    private static final String ROUTING_KEY = "MyRoutingKey";
    
    public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // Create a connection factory.
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Establish a connection for the producer.
        Connection producerConnection = connectionFactory.newConnection();

        // Create a channel for the producer.
        Channel producerChannel = producerConnection.createChannel();

        // Create a queue named "MyQueue".
        producerChannel.queueDeclare(QUEUE, false, false, false, null);

        // Create an exchange named "MyExchange".
        producerChannel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);

        // Bind "MyQueue" to "MyExchange", using routing key "MyRoutingKey".
        producerChannel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

        // Create a message.
        String text = "Hello from RabbitMQ!";

        // Send the message.
        producerChannel.basicPublish(EXCHANGE, ROUTING_KEY, null, text.getBytes());
        System.out.println("Message sent: " + text);

        // Clean up the producer.
        producerChannel.close();
        producerConnection.close();
        
        ...


As before, you establish a connection to the RabbitMQ message broker, a channel for communication, and a queue to buffer messages for consumption. In addition to these components, you declare an explicit exchange of type BuiltinExchangeType.TOPIC and bind the queue to the exchange using the ROUTING_KEY that filters messages to send to the queue.

Again, publish a message using the Channel.basicPublish method. This time, instead of publishing the message to a queue, specify the EXCHANGE and ROUTING_KEY values for the message. RabbitMQ uses these properties to route the message to the appropriate queue, from which a consumer receives the message using the same code from the first example.

JMS API

Now that you’ve seen examples for queue and topic publishing in RabbitMQ, look at code changes to support Amazon MQ, starting with the ActiveMQ client. But first, a quick review of the Java Messaging Service (JMS) API.

The remainder of the examples in this post use the JMS API, which abstracts messaging methods from underlying protocol and client implementations. The JMS API programming model uses a combination of connection factories, connections, sessions, destinations, message producers, and message consumers to send and receive messages. The following image (from The Java EE 6 Tutorial) shows the relationship between these components:

ActiveMQ OpenWire connectivity to Amazon MQ

Here’s how JMS is used with ActiveMQ to send and receive messages on a queue.

The ActiveMQ client uses the OpenWire protocol, supported by Amazon MQ. The OpenWire protocol can be found in your Amazon MQ broker’s endpoint list (screenshot). It requires that the security group for the Amazon MQ be open for the ActiveMQ OpenWire protocol endpoint port, 61617.

ActiveMQ queue example

Next, here’s an example to send and receive messages to Amazon MQ using the ActiveMQ client. This example should look familiar, as it follows the same flow to send and receive messages via a queue. I’ve included the example in full and then highlighted the differences to consider when migrating from RabbitMQ.

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class ActiveMQClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection();
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination producerDestination = producerSession.createQueue(QUEUE);

        // Create a producer from the session to the queue.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination consumerDestination = consumerSession.createQueue(QUEUE);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

In this example, you use the ActiveMQ client to establish connectivity to AmazonMQ using the OpenWire protocol with the ActiveMQConnectionFactory class to specify the endpoint and user credentials. For this example, use the master user name and password chosen when creating the Amazon MQ broker earlier. However, it’s a best practice to create additional Amazon MQ users for brokers in non-sandbox environments.

You could use the ActiveMQConnectionFactory to establish connectivity to the Amazon MQ broker. However, it is a best practice in Amazon MQ to group multiple producer requests using the ActiveMQ PooledConnectionFactory to wrap the ActiveMQConnectionFactory.

Using the PooledConnectionFactory, you can create a connection to Amazon MQ and establish a session to send a message. Like the RabbitMQ queue example, create a message queue destination using the Session.createQueue method, and a message producer to send the message to the queue.

For the consumer, use the ActiveMQConnectionFactory, NOT the PooledConnectionFactory, to create a connection, session, queue destination, and message consumer to receive the message because pooling of consumers is not considered a best practice. For more information, see the ActiveMQ Spring Support page.

ActiveMQ virtual destinations on Amazon MQ

Here’s how topic publishing differs from RabbitMQ to Amazon MQ.

If you remember from the RabbitMQ topic example, you bound a queue to an exchange using a routing key to control queue destinations when sending messages using a key attribute.

You run into a problem if you try to implement topic subscription using the message consumer in the preceding ActiveMQ queue example. The following is an excerpt from Virtual Destinations, which provides more detail on this subject:

A JMS durable subscriber MessageConsumer is created with a unique JMS clientID and durable subscriber name. To be JMS-compliant, only one JMS connection can be active at any point in time for one JMS clientID, and only one consumer can be active for a clientID and subscriber name. That is, only one thread can be actively consuming from a given logical topic subscriber.

To solve this, ActiveMQ supports the concept of a virtual destination, which provides a logical topic subscription access to a physical queue for consumption without breaking JMS compliance. To do so, ActiveMQ uses a simple convention for specifying the topic and queue names to configure message routing.

  • Topic names must use the “VirtualTopic.” prefix, followed by the topic name. For example, VirtualTopic.MyTopic.
  • Consumer names must use the “Consumer.” prefix, followed by the consumer name, followed by the topic name. For example, Consumer.MyConsumer.VirtualTopic.MyTopic.

ActiveMQ topic example

Next, here’s an example for the ActiveMQ client that demonstrates publishing messaging to topics. This example is similar to the ActiveMQ Queue Example. In this one, create a Topic destination instead of a queue destination.

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class ActiveMQClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, Producer Topic, and Consumer Topic
    // should be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
    private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection();
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a topic named "VirtualTopic.MyTopic".
        Destination producerDestination = producerSession.createTopic(PRODUCER_TOPIC);

        // Create a producer from the session to the topic.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection();
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue called "Consumer.Consumer1.VirtualTopic.MyTopic".
        Destination consumerDestination = consumerSession.createQueue(CONSUMER1_TOPIC);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

In this example, the message producer uses the Session.createTopic method with the topic name, VirtualTopic.MyTopic, as the publishing destination. The message consumer code does not change, but the queue destination uses the virtual destination convention, Consumer.Consumer1.VirtualTopic.MyTopic. ActiveMQ uses these names for the topic and queue to route messages accordingly.

AMQP connectivity to Amazon MQ

Now that you’ve explored some examples using an ActiveMQ client, look at examples using the Qpid JMS client to connect to the Amazon MQ broker over the AMQP 1.0 protocol and see how they differ.

The Qpid client uses the Advanced Message Queuing Protocol (AMQP) 1.0 protocol, supported by Amazon MQ. The AMQP 1.0 protocol can be found in your Amazon MQ broker’s endpoint list (screenshot). It uses port 5671, which must be opened in the Security Group associated with the Amazon MQ broker.

The AMQP endpoint specifies a transport, amqp+ssl. For encrypted connections, Qpid expects the protocol name to be amqps, instead of amqp+ssl, however the rest of the connection address remains the same.

Qpid JMS queue example

Next, here’s an example to send and receive messages to Amazon MQ using the Qpid client. The Qpid JMS client is built using Apache Qpid Proton, an AMQP messaging toolkit.

import java.util.Hashtable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;

import org.apache.activemq.jms.pool.PooledConnectionFactory;

public class QpidClientExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // "amqps://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:5671"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws JMSException, NamingException {
        // Use JNDI to specify the AMQP endpoint
        Hashtable<Object, Object> env = new Hashtable<Object, Object>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        env.put("connectionfactory.factoryLookup", ENDPOINT);
        javax.naming.Context context = new javax.naming.InitialContext(env);

        // Create a connection factory.
        ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Establish a connection for the producer.
        Connection producerConnection = pooledConnectionFactory.createConnection(USERNAME, PASSWORD);
        producerConnection.start();

        // Create a session.
        Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination producerDestination = producerSession.createQueue(QUEUE);

        // Create a producer from the session to the queue.
        MessageProducer producer = producerSession.createProducer(producerDestination);
        producer.setDeliveryMode(DELIVERY_MODE);

        // Create a message.
        String text = "Hello from Qpid Amazon MQ!";
        TextMessage producerMessage = producerSession.createTextMessage(text);

        // Send the message.
        producer.send(producerMessage);
        System.out.println("Message sent.");

        // Clean up the producer.
        producer.close();
        producerSession.close();
        producerConnection.close();

        // Establish a connection for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        Connection consumerConnection = connectionFactory.createConnection(USERNAME, PASSWORD);
        consumerConnection.start();

        // Create a session.
        Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);

        // Create a queue named "MyQueue".
        Destination consumerDestination = consumerSession.createQueue(QUEUE);

        // Create a message consumer from the session to the queue.
        MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);

        // Begin to wait for messages.
        Message consumerMessage = consumer.receive(1000);

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        consumer.close();
        consumerSession.close();
        consumerConnection.close();
        pooledConnectionFactory.stop();
    }
}

The Qpid queue example is similar to the ActiveMQ Queue Example. They both use the JMS API model to send and receive messages, but the difference is in how the ConnectionFactory and AMQP endpoint is specified. According to the Qpid client configuration documentation, the ConnectionFactory is specified using a JNDI InitialContext to look up JMS objects. The JNDI configuration is popularly specified in a file named jndi.properties on the Java Classpath. In this example, do it programmatically using a HashTable for simplicity.

NOTE: Although the Qpid client and Qpid JMS client are used to establish connectivity to Amazon MQ using the AMQP 1.0 protocol, the producer should still use the ActiveMQ PooledConnectionFactory to wrap the Qpid ConnectionFactory. This can be confusing because Qpid client provides a PooledConnectionFactory that should NOT be used for AMQP 1.0.

The Qpid topic example is identical to the earlier ActiveMQ topic example with the same substitution, which establishes the ConnectionFactory to the AMQP 1.0 endpoint via JNDI.

Spring JMS template queue example

Finally, here are examples using the Spring JmsTemplate to send and receive messages.

This example established connectivity to Amazon MQ using the same protocol and client library used in the ActiveMQ queue example. That example requires that the security group for the Amazon MQ be open for the ActiveMQ OpenWire protocol endpoint port, 61617.

The Spring JmsTemplate provides a higher-level abstraction on top of JMS. Code using the JmsTemplate class only needs to implement handlers to process messages, while the management of connections, sessions, message producers, and message consumers is delegated to Spring. Look at the following code:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMQSpringExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, and Queue should be externalized and
    // configured through environment variables or dependency injection.
    private static final String ENDPOINT; // ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String QUEUE = "MyQueue";

    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Create a JmsTemplate for the producer.
        JmsTemplate producerJmsTemplate = new JmsTemplate();
        producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
        producerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
        producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);
        
        // Create a message creator.
        MessageCreator messageCreator = new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("Hello from Spring Amazon MQ!");
            }
        };

        // Send the message.
        producerJmsTemplate.send(messageCreator);
        System.out.println("Message sent.");

        // Clean up the producer.
        // producer JmsTemplate will close underlying sessions and connections.

        // Create a JmsTemplate for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        JmsTemplate consumerJmsTemplate = new JmsTemplate();
        consumerJmsTemplate.setConnectionFactory(connectionFactory);
        consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
        consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        consumerJmsTemplate.setReceiveTimeout(1000);
        
        // Begin to wait for messages.
        Message consumerMessage = consumerJmsTemplate.receive();

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        // consumer JmsTemplate will close underlying sessions and connections.
        pooledConnectionFactory.stop();
    }
}
 
       

Although Spring manages connections, sessions, and message producers, the grouping of producer connections is still a best practice. The ActiveMQ PooledConnectionFactory class is used in this example. However, the Spring CachingConnectionFactory object is another option.

Following the PooledConnectionFactory creation, a JmsTemplate is created for the producer and an ActiveMQQueue is created as the message destination. To use JmsTemplate to send a message, a MessageCreator callback is defined that generates a text message via the JmsTemplate.

A second JmsTemplate with an ActiveMQQueue is created for the consumer. In this example, a single message is received synchronously, however, asynchronous message reception is a popular alternative when using message-driven POJOs.

Unlike the ActiveMQ examples, the Spring JMS template example does not require the explicit cleanup of the connection, session, message producer, or message consumer resources, as that is managed by Spring. Make sure to call the PooledConnectionFactory.stop method to cleanly exit the main method.

Finally, here’s an example using a Spring JmsTemplate for topic publishing.

Spring JmsTemplate topic example

This example combines the Spring JmsTemplate queue example with the virtual destinations approach from the ActiveMQ topic example. Look at the following code.

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMQSpringExample {

    private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
    private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;

    // The Endpoint, Username, Password, Producer Topic, and Consumer Topic
    // should be externalized and configured through environment variables or
    // dependency injection.
    private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
    private static final String USERNAME;
    private static final String PASSWORD;
    private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
    private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
    
    public static void main(String[] args) throws JMSException {
        // Create a connection factory.
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);

        // Specify the username and password.
        connectionFactory.setUserName(USERNAME);
        connectionFactory.setPassword(PASSWORD);

        // Create a pooled connection factory.
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);
        pooledConnectionFactory.setMaxConnections(10);

        // Create a JmsTemplate for the producer.
        JmsTemplate producerJmsTemplate = new JmsTemplate();
        producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
        producerJmsTemplate.setDefaultDestination(new ActiveMQTopic(PRODUCER_TOPIC));
        producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);

        // Create a message creator.
        MessageCreator messageCreator = new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("Hello from Spring Amazon MQ!");
            }
        };

        // Send the message.
        producerJmsTemplate.send(messageCreator);
        System.out.println("Message sent.");

        // Clean up the producer.
        // producer JmsTemplate will close underlying sessions and connections.

        // Create a JmsTemplate for the consumer.
        // Note: Consumers should not use PooledConnectionFactory.
        JmsTemplate consumerJmsTemplate = new JmsTemplate();
        consumerJmsTemplate.setConnectionFactory(connectionFactory);
        consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(CONSUMER1_TOPIC));
        consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
        consumerJmsTemplate.setReceiveTimeout(1000);
        
        // Begin to wait for messages.
        Message consumerMessage = consumerJmsTemplate.receive();

        // Receive the message when it arrives.
        TextMessage consumerTextMessage = (TextMessage) consumerMessage;
        System.out.println("Message received: " + consumerTextMessage.getText());

        // Clean up the consumer.
        // consumer JmsTemplate will close underlying sessions and connections.
        pooledConnectionFactory.stop();
    }
}
In this example, follow the ActiveMQ virtual destination naming convention for topics and queues:
  • When creating the producer JMS template, specify an ActiveMQTopic as the destination using the name VirtualTopic.MyTopic.
  • When creating the consumer JMS template, specify an ActiveMQQueue as the destination using the name Consumer.Consumer1.VirtualTopic.MyTopic.

ActiveMQ automatically handles routing messages from topic to queue.

Conclusion

In this post, I reviewed how to get started with an Amazon MQ broker and walked you through several code examples that explored the differences between RabbitMQ and Apache ActiveMQ client integrations. If you are considering migrating to Amazon MQ, these examples should help you understand the changes that might be required.

If you’re thinking about integrating your existing apps with new serverless apps, see the related post, Invoking AWS Lambda from Amazon MQ.

To learn more, see the Amazon MQ website and Developer Guide. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year for new AWS accounts.