Amazon Web Services 한국 블로그
RabbitMQ에서 관리형 메시지 브로커 서비스인 Amazon MQ로 빠르게 이전하기
최신 IT 아키텍처에서 메시지 브로커(Message Broker)는 워크로드 대기열을 관리하고 메시지를 여러 구독자(Subscribers)에게 전달하는 등 수많은 요구 사항을 해결하는 데 사용될 수 있습니다. 일부 AWS 고객은 RabbitMQ를 사용하고 있으며, AmazonMQ라는 새로운 관리형 서비스로 마이그레이션하여 자체 메시지 브로커를 운영하는 오버헤드를 줄이려는 요구가 많습니다.
즉, Apache ActiveMQ를 위한 관리형 메시지 브로커 서비스인 Amazon MQ를 사용하면 클라우드에 메시지 브로커를 쉽게 설치 및 운영하고 확장할 수 있습니다. Amazon MQ는 OpenWire, AMQP, MQTT 및 Stomp와 같이 표준 프로토콜을 사용하는 기존 워크로드와 호환됩니다(모드 SSL 사용). Amazon MQ는 단일 인스턴스 브로커 또는 고가용성을 위한 액티브/스탠바이 브로커로 구성된 인프라를 자동으로 프로비저닝합니다.
이 글에서는 Amazon MQ에 대한 간략한 소개와 ActiveMQ 클라이언트, Apache Qpid JMS 클라이언트 및 Spring JmsTemplate을 사용하여 RabbitMQ에서 Amazon MQ 메시지 브로커로 마이그레이션하는 예제 Java 코드를 소개합니다. 또한, Amazon MQ 모범 사례와 게시/구독 메시지 패턴을 지원하기 위해 RabbitMQ에서 Amazon MQ로 변경 사항을 알려드리도록 하겠습니다.
Amazon MQ 시작하기
Amazon MQ 관리 콘솔에서 새로운 브로커를 생성하기 위해서는 먼저 브로커 이름을 입력하고 Next Step을 선택합니다. mq.t2.micro 인스턴스 유형과 Single-instance broker 배포 모드를 선택하고, 사용자 이름과 암호를 생성하고, Create broker를 선택하여 새로운 Amazon MQ 인스턴스를 시작할 수 있습니다.
몇 분 후, 인스턴스의 상태가 Creation in progress에서 Running으로 변경됩니다. 브로커의 Details 페이지로 이동하면 인스턴스 대기열의 상태 등을 모니터링할 수 있는 ActiveMQ 웹 콘솔 링크를 비롯한 연결 정보를 검색할 수 있습니다. 다음 코드 예제에서는 OpenWire 및 AMQP 엔드포인트를 사용합니다.
브로커에 액세스할 수 있으려면 보안 그룹 중 하나를 인바운드 트래픽을 허용하도록 구성해야 합니다. 자세한 내용은 Connections 섹션에서 파란색 상자에 있는 Detailed instructions 링크를 참조하십시오.
Amazon MQ 브로커가 실행되고 있으므로, 이제 코드 일부를 살펴보겠습니다!
애플리케이션 의존성
다음 코드 예제에는 RabbitMQ, ActiveMQ, Qpid, Spring JMS Template 및 연결 풀링을 보여주기 위해 다양한 라이브러리에 의존하고 있습니다. 하나의 Maven pom.xml에 모든 종속성을 나열했습니다.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>MyGroup</groupId>
<artifactId>MyArtifact</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- RabbitMQ -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.1.2</version>
</dependency>
<!-- Apache Connection Pooling -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
<!-- Apache ActiveMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.0</version>
</dependency>
<!-- Apache QPid -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-client</artifactId>
<version>6.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.29.0</version>
</dependency>
<!-- Spring JmsTemplate -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.0.3.RELEASE</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
RabbitMQ
이제 RabbitMQ를 사용하여 대기열을 통해 메시지를 보내고 받는 예제입니다. RabbitMQ 설치와 구성은 이 게시물에서 다루지 않습니다. RabbitMQ 다운로드 및 설치에 대한 지침은 Downloading and Installing RabbitMQ 페이지를 참조하십시오.
RabbitMQ는 기본적으로 AMQP 0-9-1 프로토콜을 사용하여 플러그인을 통해 AMQP 1.0을 지원합니다. 이 게시물의 RabbitMQ 예제에서는 AMQP 0-9를 사용합니다.
RabbitMQ 대기열 예제
먼저, 다음은 대기열을 사용하여 RabbitMQ에서 메시지를 보내고 받는 샘플 코드입니다.
import java.io.IOException; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.GetResponse; public class RabbitMQExample { private static final boolean ACKNOWLEDGE_MODE = true; // The Endpoint, Username, Password, and Queue should be externalized and // configured through environment variables or dependency injection. private static final String ENDPOINT; private static final String USERNAME; private static final String PASSWORD; private static final String QUEUE = "MyQueue"; public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException { // 연결 팩토리 생성. ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUri(ENDPOINT); // 사용자 이름과 암호 지정. connectionFactory.setUsername(USERNAME); connectionFactory.setPassword(PASSWORD); // 생산자를 위한 연결 구성. Connection producerConnection = connectionFactory.newConnection(); // 생산자를 위한 채널 생성. Channel producerChannel = producerConnection.createChannel(); // "MyQueue"라는 대기열 생성. producerChannel.queueDeclare(QUEUE, false, false, false, null); // 메시지 생성. String text = "Hello from RabbitMQ!"; // 메시지 전송. producerChannel.basicPublish("", QUEUE, null, text.getBytes()); System.out.println("Message sent: " + text); // 생산자 정리. producerChannel.close(); producerConnection.close(); // 소비자를 위한 연결 생성. Connection consumerConnection = connectionFactory.newConnection(); // 소비자를 위한 채널 생성. Channel consumerChannel = consumerConnection.createChannel(); // "MyQueue"라는 대기열 생성. consumerChannel.queueDeclare(QUEUE, false, false, false, null); // 메시지 수신. GetResponse response = consumerChannel.basicGet(QUEUE, ACKNOWLEDGE_MODE); String message = new String(response.getBody(), "UTF-8"); System.out.println("Message received: " + message); // 소비자 정리. consumerChannel.close(); consumerConnection.close(); } }
이 예제에서는 환경 변수 또는 DI(디펜던시 인젝션)를 사용하여 RabbitMQ 메시지 브로커의 ENDPOINT, USERNAME 및 PASSWORD를 지정해야 합니다.
이 예제에서는 RabbitMQ 클라이언트 라이브러리를 사용하여 메시지 브로커에 대한 연결과 통신용 채널을 구성합니다. RabbitMQ에서는 메시지가 채널을 통해 명명된 대기열로 전송되며, 이 대기열에서 메시지를 버퍼에 저장하고 소비자는 이 대기열에서 메시지를 수신 및 처리할 수 있습니다. 이 예제에서는 Channel.basicPublish
메서드에서 빈 문자열("")로 식별되는 기본 교환 형식을 사용하여 메시지를 게시합니다.
대기열에 있는 메시지를 수신 및 처리하려면 두 번째 연결, 채널 및 대기열을 생성합니다. 대기열 선언은 멱등적(idempotent) 작업이므로 두 번 선언해도 아무런 피해가 없습니다. 이 예제에서는 Channel.basicGet
메서드를 사용하여 메시지를 수신하며 메시지를 수신했음을 브로커에 자동으로 알려줍니다.
이 예제는 단일 유형의 메시지를 보내고 받는 데 필요한 기본적인 것들을 보여줍니다. 하지만 다양한 소비자가 관련 메시지 유형만 구독할 수 있도록 서로 다른 유형의 메시지를 게시하려면(pub/sub) 어떻게 해야 할까요? 다음은 주제 교환을 사용하여 메시지를 서로 다른 대기열로 라우팅하는 RabbitMQ 예제입니다.
RabbitMQ 주제(Topic) 예제
이 예제는 앞에 설명한 예제와 비슷합니다. 주제 게시를 지원하려면 EXCHANGE와 ROUTING_KEY라는 두 개의 추가 속성을 지정합니다. RabbitMQ는 메시징 라우팅을 위해 교환 및 라우팅 키 속성을 사용합니다. 이러한 속성이 어떻게 메시지를 게시하도록 코드를 변경하는지 살펴보십시오.
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
public class RabbitMQExample {
private static final boolean ACKNOWLEDGE_MODE = true;
// The Endpoint, Username, Password, Queue, Exhange, and Routing Key should
// be externalized and configured through environment variables or
// dependency injection.
private static final String ENDPOINT; // "amqp://localhost:5672"
private static final String USERNAME;
private static final String PASSWORD;
private static final String QUEUE = "MyQueue";
private static final String EXCHANGE = "MyExchange";
private static final String ROUTING_KEY = "MyRoutingKey";
public static void main(String[] args) throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
// 연결 팩토리 생성.
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri(ENDPOINT);
// 사용자 이름과 암호 지정.
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
// 생산자를 위한 연결 구성.
Connection producerConnection = connectionFactory.newConnection();
// 생산자를 위한 채널 생성.
Channel producerChannel = producerConnection.createChannel();
// "MyQueue"라는 대기열 생성.
producerChannel.queueDeclare(QUEUE, false, false, false, null);
// "MyExchange"라는 교환 생성.
producerChannel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
// 라우팅 키 "MyRoutingKey"를 사용하여 "MyQueue"을 "MyExchange"에 바인딩.
producerChannel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
// 메시지 생성.
String text = "Hello from RabbitMQ!";
// 메시지 전송.
producerChannel.basicPublish(EXCHANGE, ROUTING_KEY, null, text.getBytes());
System.out.println("Message sent: " + text);
// 생산자 정리.
producerChannel.close();
producerConnection.close();
...
이전과 마찬가지로 RabbitMQ 메시지 브로커에 대한 연결, 통신용 채널, 소비를 위해 메시지를 버퍼링하는 대기열을 구성합니다. 이러한 구성 요소 외에도 BuiltinExchangeType.TOPIC 유형의 명시적 교환을 선언하고 대기열로 보낼 메시지를 필터링하는 ROUTING_KEY를 사용하여 대기열을 교환에 바인딩합니다.
다시 한번 Channel.basicPublish 메서드를 사용하여 메시지를 게시합니다. 이번에는 메시지를 대기열에 게시하지 않고 메시지에 대한 EXCHANGE 및 ROUTING_KEY 값을 지정합니다. RabbitMQ는 이러한 속성을 사용하여 메시지를 적절한 대기열로 라우팅합니다. 소비자는 해당 대기열에서 첫 번째 예제와 동일한 코드를 사용하여 메시지를 수신하게 됩니다.
JMS API
이제 RabbitMQ에서 대기열 및 주제를 게시하는 예제를 확인했으니, ActiveMQ 클라이언트를 시작으로 Amazon MQ를 지원하기 위한 코드 변경 사항을 살펴보겠습니다. 하지만 먼저 Java Messaging Service(JMS) API를 간략하게 살펴보겠습니다.
이 게시물의 나머지 예제에서는 기본 프로토콜과 클라이언트 구현의 메시징 메서드를 추상화하는 JMS API를 사용합니다. JMS API 프로그래밍 모델은 연결 팩토리, 연결, 세션, 대상, 메시지 생산자 및 메시지 소비자를 조합하여 메시지를 보내고 받습니다. 다음 이미지(Java EE 6 자습서에서 발췌)는 이러한 구성 요소 간의 관계를 보여줍니다.
ActiveMQ OpenWire로 Amazon MQ에 연결
다음은 대기열의 메시지를 보내고 받는데 어떻게 JMS가 ActiveMQ와 함께 사용되는지 보여줍니다.
ActiveMQ 클라이언트는 Amazon MQ에서 지원하는 OpenWire 프로토콜을 사용합니다. OpenWire 프로토콜은 Amazon MQ 브로커의 엔드포인트 목록에 나와 있습니다(스크린샷). Amazon MQ용 보안 그룹이 ActiveMQ OpenWire 프로토콜 엔드포인트 포트 61617을 허용해야 합니다.
ActiveMQ 대기열 예제
다음은 ActiveMQ 클라이언트를 사용하여 Amazon MQ에서 메시지를 보내고 받는 예제입니다. 이 예제는 대기열을 통해 메시지를 보내고 받는 것과 동일한 흐름을 따르므로 익숙할 것입니다. 전체 예제를 추가한 후 RabbitMQ에서 마이그레이션할 때 고려해야 할 차이점을 강조 표시했습니다.
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.jms.pool.PooledConnectionFactory; public class ActiveMQClientExample { private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT; private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE; // The Endpoint, Username, Password, and Queue should be externalized and // configured through environment variables or dependency injection. private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617" private static final String USERNAME; private static final String PASSWORD; private static final String QUEUE = "MyQueue"; public static void main(String[] args) throws JMSException { // 연결 팩토리 생성. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT); // 사용자 이름과 암호 지정. connectionFactory.setUserName(USERNAME); connectionFactory.setPassword(PASSWORD); // 풀링된 연결 팩토리 생성. PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); // 생산자를 위한 연결 구성. Connection producerConnection = pooledConnectionFactory.createConnection(); producerConnection.start(); // 세션 생성. Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE); // "MyQueue"라는 대기열 생성. Destination producerDestination = producerSession.createQueue(QUEUE); // 세션에서 대기열에 대한 생산자 생성. MessageProducer producer = producerSession.createProducer(producerDestination); producer.setDeliveryMode(DELIVERY_MODE); // 메시지 생성. String text = "Hello from Amazon MQ!"; TextMessage producerMessage = producerSession.createTextMessage(text); // 메시지 전송. producer.send(producerMessage); System.out.println("Message sent."); // 생산자 정리. producer.close(); producerSession.close(); producerConnection.close(); // 소비자를 위한 연결 구성. // 참고: 소비자는 PooledConnectionFactory를 사용해서는 안 됩니다. Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start(); // 세션 생성. Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE); // "MyQueue"라는 대기열 생성. Destination consumerDestination = consumerSession.createQueue(QUEUE); // 세션에서 대기열에 대한 메시지 소비자 생성. MessageConsumer consumer = consumerSession.createConsumer(consumerDestination); // 메시지를 기다리기 시작. Message consumerMessage = consumer.receive(1000); // 메시지가 도착하면 이를 수신. TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText()); // 소비자 정리. consumer.close(); consumerSession.close(); consumerConnection.close(); pooledConnectionFactory.stop(); } }
이 예제에서는 ActiveMQ 클라이언트를 사용하여 AmazonMQ에 대한 연결을 설정할 때 OpenWire 프로토콜에서 ActiveMQConnectionFactory 클래스를 사용하여 엔드포인트와 사용자 자격 증명을 지정합니다. 이 예제에서는 앞에서 Amazon MQ 브로커를 생성할 때 선택한 마스터 사용자 이름과 암호를 사용합니다. 하지만 샌드박스가 아닌 환경에서는 브로커를 위해 추가적인 Amazon MQ 사용자를 생성하는 것이 모범 사례입니다.
ActiveMQConnectionFactory를 사용하여 Amazon MQ 브로커에 대한 연결을 설정할 수 있습니다. 하지만 ActiveMQ PooledConnectionFactory를 사용해 ActiveMQConnectionFactory를 래핑하여 여러 생산자 요청을 그룹화하는 것이 Amazon MQ의 모범 사례입니다.
PooledConnectionFactory를 사용하면 Amazon MQ에 대한 연결을 생성하고 세션을 구성하여 메시지를 전송할 수 있습니다. RabbitMQ 대기열 예제와 마찬가지로 Session.createQueue 메서드를 사용하여 메시지 대기열 대상과 메시지를 대기열에 보낼 메시지 생산자를 생성합니다.
소비자의 경우 PooledConnectionFactory가 아니라 ActiveMQConnectionFactory를 사용하여 연결, 세션, 대기열 대상, 메시지를 수신할 메시지 소비자를 생성합니다. 소비자 풀링은 모범 사례로 간주되지 않기 때문입니다. 자세한 내용은ActiveMQ Spring Support 페이지를 참조하십시오.
Amazon MQ의 ActiveMQ 가상 대상
RabbitMQ와 Amazon MQ에서 주제 게시의 차이점은 다음과 같습니다.
RabbitMQ 주제 예제에서는 키 속성을 사용하여 메시지를 전송할 때 라우팅 키를 사용하여 대기열을 교환에 바인딩하는 방식으로 대기열 대상을 제어했습니다.
앞의 ActiveMQ 대기열 예제에서 메시지 소비자를 사용하여 주제 구독을 구현하려고 시도하면 문제가 발생합니다. 다음은 Virtual Destinations 페이지에서 발췌한 것으로 이 주제에 대한 세부 정보를 제공합니다.
JMS 지속형 구독자 MessageConsumer는 고유한 JMS clientID와 지속형 구독자 이름을 사용해 생성됩니다. JMS를 준수하려면, 어느 시점에서든 하나의 JMS clientID에 대해 하나의 JMS 연결만 활성화될 수 있으며 clientID와 구독자 이름에 대해 하나의 소비자만 활성화될 수 있습니다. 즉, 주어진 논리적 주제 구독자는 하나의 스레드만 소비할 수 있습니다.
이를 해결하기 위해 ActiveMQ는 JMS 규정 준수를 위반하지 않고 소비할 수 있도록 물리적 대기열에 대한 논리적 주제 구독 액세스를 제공하는 가상 대상 개념을 지원합니다. 이를 위해 ActiveMQ는 주제 및 대기열 이름을 지정하는 간단한 규칙을 사용하여 메시지 라우팅을 구성합니다.
- 주제 이름에서는 "VirtualTopic." 접두사를 사용해야 하고 그다음에 주제 이름이 나와야 합니다. 예: VirtualTopic.MyTopic.
- 소비자 이름에는 "Consumer." 접두사를 사용해야 하고 그다음에 소비자 이름과 주제 이름이 이어서 나와야 합니다. 예: Consumer.MyConsumer.VirtualTopic.MyTopic
ActiveMQ 주제 예제
다음은 주제에 메시징을 게시하는 방법을 보여주는 ActiveMQ 클라이언트 예제입니다. 이 예제는 ActiveMQ 대기열 예제와 비슷합니다. 이 예제에서는 대기열 대상 대신에 주제 대상을 생성합니다.
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
public class ActiveMQClientExample {
private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;
// Endpoint, Username, Password, Producer Topic 및 Consumer Topic은
// 환경 변수 또는 DI(디펜던시 인젝션)를 통해
// 구체화되고 구성되어야 합니다.
private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
private static final String USERNAME;
private static final String PASSWORD;
private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
public static void main(String[] args) throws JMSException {
// 연결 팩토리 생성.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);
// 사용자 이름과 암호 지정.
connectionFactory.setUserName(USERNAME);
connectionFactory.setPassword(PASSWORD);
// 풀링된 연결 팩토리 생성.
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setMaxConnections(10);
// 생산자를 위한 연결 구성.
Connection producerConnection = pooledConnectionFactory.createConnection();
producerConnection.start();
// 세션 생성.
Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);
// "VirtualTopic.MyTopic"이라는 주제 생성.
Destination producerDestination = producerSession.createTopic(PRODUCER_TOPIC);
// 세션에서 주제에 대한 생산자를 생성.
MessageProducer producer = producerSession.createProducer(producerDestination);
producer.setDeliveryMode(DELIVERY_MODE);
// 메시지 생성.
String text = "Hello from Amazon MQ!";
TextMessage producerMessage = producerSession.createTextMessage(text);
// 메시지 전송.
producer.send(producerMessage);
System.out.println("Message sent.");
// 생산자 정리.
producer.close();
producerSession.close();
producerConnection.close();
// 소비자를 위한 연결 구성.
// 참고: 소비자는 PooledConnectionFactory를 사용해서는 안 됩니다.
Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
// 세션 생성.
Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);
// "Consumer.Consumer1.VirtualTopic.MyTopic"이라는 대기열 생성.
Destination consumerDestination = consumerSession.createQueue(CONSUMER1_TOPIC);
// 세션에서 대기열에 대한 메시지 소비자 생성.
MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
// 메시지를 기다리기 시작.
Message consumerMessage = consumer.receive(1000);
// 메시지가 도착하면 이를 수신.
TextMessage consumerTextMessage = (TextMessage) consumerMessage;
System.out.println("Message received: " + consumerTextMessage.getText());
// 소비자 정리.
consumer.close();
consumerSession.close();
consumerConnection.close();
pooledConnectionFactory.stop();
}
}
이 예제에서는 메시지 생산자가 Session.createTopic 메서드를 사용하고 게시 대상으로 주제 이름 VirtualTopic.MyTopic을 사용합니다. 메시지 소비자 코드는 변경되지 않지만, 대기열 대상은 가상 대상 규칙 Consumer.Consumer1.VirtualTopic.MyTopic을 사용합니다. ActiveMQ는 주제와 대기열에 이러한 이름을 사용하여 이에 따라 메시지를 라우팅합니다.
AMQP로 Amazon MQ에 연결
ActiveMQ 클라이언트를 사용하는 몇 가지 예제를 살펴보았으니, 이제 Qpid JMS 클라이언트를 사용하여 AMQP 1.0 프로토콜을 통해 Amazon MQ 브로커에 연결하는 예제를 살펴보고 차이점을 확인해 보겠습니다.
Qpid 클라이언트는 Amazon MQ에서 지원하는 AMQP(Advanced Message Queuing Protocol) 1.0 프로토콜을 사용합니다. AMQP 1.0 프로토콜은 Amazon MQ 브로커의 엔드포인트 목록에 나와 있습니다(스크린샷). 이 프로토콜은 포트 5671을 사용하며 Amazon MQ 브로커에 연결된 보안 그룹에서 이 포트를 허용해야 합니다.
AMQP 엔드포인트는 amqp+ssl 전송을 지정합니다. 암호화된 연결의 경우 Qpid에서는 프로토콜 이름이 amqp+ssl이 아니라 amqps가 되어야 합니다. 연결 주소 나머지는 동일합니다.
Qpid JMS 대기열 예제
다음은 Qpid 클라이언트를 사용하여 Amazon MQ에서 메시지를 보내고 받는 예제입니다. Qpid JMS 클라이언트는 AMQP 메시징 도구 키트인 Apache Qpid Proton은 사용하여 구축되어 있습니다.
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
public class QpidClientExample {
private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;
// Endpoint, Username, Password 및 Queue는 환경 변수
// 또는 DI(디펜던시 인젝션)를 통해 구체화되고 구성되어야 합니다.
private static final String ENDPOINT; // "amqps://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:5671"
private static final String USERNAME;
private static final String PASSWORD;
private static final String QUEUE = "MyQueue";
public static void main(String[] args) throws JMSException, NamingException {
// Use JNDI to specify the AMQP endpoint
Hashtable<Object, Object> env = new Hashtable<Object, Object>();
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
env.put("connectionfactory.factoryLookup", ENDPOINT);
javax.naming.Context context = new javax.naming.InitialContext(env);
// 연결 팩토리 생성.
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("factoryLookup");
// 풀링된 연결 팩토리 생성.
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setMaxConnections(10);
// 생산자를 위한 연결 구성.
Connection producerConnection = pooledConnectionFactory.createConnection(USERNAME, PASSWORD);
producerConnection.start();
// 세션 생성.
Session producerSession = producerConnection.createSession(false, ACKNOWLEDGE_MODE);
// "MyQueue"라는 대기열 생성.
Destination producerDestination = producerSession.createQueue(QUEUE);
// 세션에서 대기열에 대한 생산자 생성.
MessageProducer producer = producerSession.createProducer(producerDestination);
producer.setDeliveryMode(DELIVERY_MODE);
// 메시지 생성.
String text = "Hello from Qpid Amazon MQ!";
TextMessage producerMessage = producerSession.createTextMessage(text);
// 메시지 전송.
producer.send(producerMessage);
System.out.println("Message sent.");
// 생산자 정리.
producer.close();
producerSession.close();
producerConnection.close();
// 소비자를 위한 연결 구성.
// 참고: 소비자는 PooledConnectionFactory를 사용해서는 안 됩니다.
Connection consumerConnection = connectionFactory.createConnection(USERNAME, PASSWORD);
consumerConnection.start();
// 세션 생성.
Session consumerSession = consumerConnection.createSession(false, ACKNOWLEDGE_MODE);
// "MyQueue"라는 대기열 생성.
Destination consumerDestination = consumerSession.createQueue(QUEUE);
// 세션에서 대기열에 대한 메시지 소비자 생성.
MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
// 메시지를 기다리기 시작.
Message consumerMessage = consumer.receive(1000);
// 메시지가 도착하면 이를 수신.
TextMessage consumerTextMessage = (TextMessage) consumerMessage;
System.out.println("Message received: " + consumerTextMessage.getText());
// 소비자 정리.
consumer.close();
consumerSession.close();
consumerConnection.close();
pooledConnectionFactory.stop();
}
}
Qpid 대기열 예제는 ActiveMQ 대기열 예제와 비슷합니다. 둘 다 JMS API 모델을 사용하여 메시지를 보내고 받지만, ConnectionFactory 및 AMQP 엔드포인트가 지정되는 방법이 다릅니다. Qpid 클라이언트 구성 설명서에 따르면 ConnectionFactory는 JMS 객체를 조회할 수 있도록 JNDI InitialContext를 사용하여 지정됩니다. JNDI 구성은 Java Classpath의 jndi.properties라는 파일에 일반적으로 지정됩니다. 이 예제에서는 간편성을 위해 HashTable 사용하여 프로그래밍 방식으로 구성합니다.
참고: AMQP 1.0 프로토콜을 통해 Amazon MQ에 대한 연결을 설정하기 위해 Qpid 클라이언트 및 Qpid JMS 클라이언트가 사용되지만, 생산자는 여전히 ActiveMQ PooledConnectionFactory를 사용하여 Qpid ConnectionFactory를 래핑해야 합니다. AMQP 1.0에 사용하면 안 되는 PooledConnectionFactory
를 Qpid 클라이언트가 제공하므로 이는 혼동을 줄 수 있습니다.
Qpid 주제 예제는 이전 ActiveMQ 주제 예제와 동일합니다. 같은 대체 항목을 사용해 JNDI를 통해 AMQP 1.0 엔드포인트에 대한 ConnectionFactory를 설정합니다.
Spring JMS 템플릿 대기열 예제
마지막으로 다음은 Spring JmsTemplate을 사용하여 메시지를 보내고 받는 예제입니다.
이 예제에서는 ActiveMQ 대기열 예제에서 사용한 것과 동일한 프로토콜 및 클라이언트 라이브러리를 사용하여 Amazon MQ에 대한 연결을 설정했습니다. 그 예제의 경우 Amazon MQ에 대한 보안 그룹에서 ActiveMQ OpenWire 프로토콜 엔드포인트 포트 61617을 허용해야 합니다.
Spring JmsTemplate은 JMS 위에 더 높은 수준의 추상화를 제공합니다. JmsTemplate 클래스를 사용하는 코드는 핸들러가 메시지를 처리하도록 구현하기만 하면 되고, 연결, 세션, 메시지 생산자 및 메시지 소비자의 관리는 Spring에 위임됩니다. 다음 코드를 살펴보십시오.
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class ActiveMQSpringExample {
private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;
// Endpoint, Username, Password 및 Queue는 환경 변수
// 또는 DI(디펜던시 인젝션)를 통해 구체화되고 구성되어야 합니다.
private static final String ENDPOINT; // ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
private static final String USERNAME;
private static final String PASSWORD;
private static final String QUEUE = "MyQueue";
public static void main(String[] args) throws JMSException {
// 연결 팩토리 생성.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);
// 사용자 이름과 암호 지정.
connectionFactory.setUserName(USERNAME);
connectionFactory.setPassword(PASSWORD);
// 풀링된 연결 팩토리 생성.
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setMaxConnections(10);
// 생산자를 위한 JmsTemplate 생성.
JmsTemplate producerJmsTemplate = new JmsTemplate();
producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
producerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);
// 메시지 생성자 생성.
MessageCreator messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("Hello from Spring Amazon MQ!");
}
};
// 메시지 전송.
producerJmsTemplate.send(messageCreator);
System.out.println("Message sent.");
// 생산자 정리.
// 생산자 JmsTemplate이 해당 세션 및 연결을 종료.
// 소비자를 위한 JmsTemplate 생성.
// 참고: 소비자는 PooledConnectionFactory를 사용해서는 안 됩니다.
JmsTemplate consumerJmsTemplate = new JmsTemplate();
consumerJmsTemplate.setConnectionFactory(connectionFactory);
consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(QUEUE));
consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
consumerJmsTemplate.setReceiveTimeout(1000);
// 메시지를 기다리기 시작.
Message consumerMessage = consumerJmsTemplate.receive();
// 메시지가 도착하면 이를 수신.
TextMessage consumerTextMessage = (TextMessage) consumerMessage;
System.out.println("Message received: " + consumerTextMessage.getText());
// JmsTemplate이 해당 세션과 연결을 종료합니다.
pooledConnectionFactory.stop();
}
}
Spring에서 연결, 세션 및 메시지 생산자를 관리하긴 하지만, 생산자 연결을 그룹화하는 것이 여전히 모범 사례입니다. 이 예제에서는 ActiveMQ PooledConnectionFactory 클래스가 사용됩니다. 하지만 Spring CachingConnectionFactory 객체를 사용할 수도 있습니다.
PooledConnectionFactory 생성 후, 생산자를 위한 JmsTemplate이 생성되고 메시지 대상으로 ActiveMQQueue가 생성됩니다. JmsTemplate을 사용하여 메시지를 전송하기 위해, JmsTemplate을 통해 텍스트 메시지를 생성하는 MessageCreator 콜백이 정의됩니다.
소비자를 위해 ActiveMQQueue와 함께 두 번째 JmsTemplate이 생성됩니다. 이 예제에서는 하나의 메시지가 동기식으로 수신되지만, 메시지 중심의 POJO를 사용할 때는 비동기식 메시지 수신이 널리 사용됩니다.
ActiveMQ 예제와는 달리 Spring JMS 템플릿 예제에서는 연결, 세션, 메시지 생산자 또는 메시지 소비자 리소스를 명시적으로 정리할 필요가 없습니다. Spring에서 이를 관리하기 때문입니다. 잊지말고 PooledConnectionFactory.stop 메서드를 호출하여 메인 메서드를 완전히 종료하시기 바랍니다.
마지막으로 다음은 Spring JmsTemplate을 사용하여 주제를 게시하는 예제입니다.
Spring JmsTemplate 주제 예제
이 예제에서는 Spring JmsTemplate 대기열 예제를 ActiveMQ 주제 예제의 가상 대상 접근 방식과 결합합니다. 다음 코드를 살펴보십시오.
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class ActiveMQSpringExample {
private static final int DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;
// Endpoint, Username, Password 및 Queue는 환경 변수
// 또는 DI(디펜던시 인젝션)를 통해 구체화되고
// 구성되어야 합니다.
private static final String ENDPOINT; // "ssl://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.mq.us-east-1.amazonaws.com:61617"
private static final String USERNAME;
private static final String PASSWORD;
private static final String PRODUCER_TOPIC = "VirtualTopic.MyTopic";
private static final String CONSUMER1_TOPIC = "Consumer.Consumer1." + PRODUCER_TOPIC;
public static void main(String[] args) throws JMSException {
// 연결 팩토리 생성.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ENDPOINT);
// 사용자 이름과 암호 지정.
connectionFactory.setUserName(USERNAME);
connectionFactory.setPassword(PASSWORD);
// 풀링된 연결 팩토리 생성.
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setMaxConnections(10);
// 생산자를 위한 JmsTemplate 생성.
JmsTemplate producerJmsTemplate = new JmsTemplate();
producerJmsTemplate.setConnectionFactory(pooledConnectionFactory);
producerJmsTemplate.setDefaultDestination(new ActiveMQTopic(PRODUCER_TOPIC));
producerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
producerJmsTemplate.setDeliveryMode(DELIVERY_MODE);
// 메시지 생성자 생성.
MessageCreator messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("Hello from Spring Amazon MQ!");
}
};
// 메시지 전송.
producerJmsTemplate.send(messageCreator);
System.out.println("Message sent.");
// 생산자 정리.
// 생산자 JmsTemplate이 해당 세션 및 연결을 종료.
// 소비자를 위한 JmsTemplate 생성.
// 참고: 소비자는 PooledConnectionFactory를 사용해서는 안 됩니다.
JmsTemplate consumerJmsTemplate = new JmsTemplate();
consumerJmsTemplate.setConnectionFactory(connectionFactory);
consumerJmsTemplate.setDefaultDestination(new ActiveMQQueue(CONSUMER1_TOPIC));
consumerJmsTemplate.setSessionAcknowledgeMode(ACKNOWLEDGE_MODE);
consumerJmsTemplate.setReceiveTimeout(1000);
// 메시지를 기다리기 시작.
Message consumerMessage = consumerJmsTemplate.receive();
// 메시지가 도착하면 이를 수신.
TextMessage consumerTextMessage = (TextMessage) consumerMessage;
System.out.println("Message received: " + consumerTextMessage.getText());
// 소비자 정리.
// 소비자 JmsTemplate이 해당 세션과 연결을 종료합니다.
pooledConnectionFactory.stop();
}
}
이 예제에서는 주제와 대기열에 대해 ActiveMQ 가상 대상 이름 지정 규칙을 따릅니다.
- 생산자 JMS 템플릿을 생성할 때 VirtualTopic.MyTopic 이름을 사용하여 ActiveMQTopic을 대상으로 지정합니다.
- 소비자 JMS 템플릿을 생성할 때 Consumer.Consumer1.VirtualTopic.MyTopic 이름을 사용하여 ActiveMQQueue를 대상으로 지정합니다.
ActiveMQ는 주제에서 대기열로 메시지 라우팅을 자동으로 처리합니다.
마무리
이 글에서는 Amazon MQ 브로커를 시작하는 방법을 설명하고, RabbitMQ와 Apache ActiveMQ 클라이언트 통합 간 차이점을 보여주는 몇 가지 코드 예제를 살펴보았습니다. Amazon MQ로 마이그레이션을 고려하고 있다면, 이러한 예제가 필요할 수 있는 변경 사항을 이해하는 데 도움이 될 것입니다.
기존 앱을 새로운 서버리스 앱과 통합하려는 경우, 관련 게시물인 Invoking AWS Lambda from Amazon MQ를 참조하십시오.
자세한 내용은 Amazon MQ 웹 사이트와 개발자 안내서를 참조하십시오. AWS 프리 티어를 통해 무료로 Amazon MQ를 사용해 볼 수 있습니다. 새로운 AWS 계정에 대해 1년 동안 매월 단일 인스턴스 mq.t2.micro 브로커를 최대 750시간 그리고 스토리지를 1GB까지 제공합니다.
이 글은 AWS Compute 블로그의 Migrating from RabbitMQ to Amazon MQ의 한국어 번역입니다.