AWS 기술 블로그
Amazon MSK를 활용한 데이터베이스 간 CDC 구현하기
최근 많은 고객들이 디지털 전환을 위해 온프레미스의 모놀리식 아키텍처에서 MSA를 통한 클라우드로의 전환을 하기 위해 많은 노력을 하고 있습니다. 리스크가 있는 빅뱅 방식보다 점진적 전환 전략을 선택하다 보니 과도기적으로 온프레미스와 클라우드에 동시에 데이터베이스를 운영하게 되고 클라우드DB에서 온프레미스DB로 데이터를 동기화 해야 할 필요가 생기게 됩니다. 물론 마이그레이션이 완료되면 제거될 부분이지만 그 전까지는 서비스 중단을 예방하기 위해 안전하고 확장 가능한 동기화 아키텍처를 선택해야만 합니다.
Apache Kafka는 스트리밍 서비스 뿐만 아니라 실시간 데이터 동기화를 구현할 수 있는 오픈소스 플랫폼으로 AWS의 완전 관리형 서비스인 Amazon MSK Cluster 와 통합으로 Kafka를 활용하는 것이 더욱 쉽고 편리 해졌습니다. 또한 이기종 데이터베이스 간 데이터 변경 분을 추출하고 반영하기 위해 MSK Connect와 Apache Debezium, Confluent Connectors 등을 함께 사용하게 되면 안정적이고 확장 가능한 커넥터를 구축할 수 있습니다.
이번 블로그에서는 Amazon MSK를 사용하여 이기종 데이터베이스 간 데이터를 동기화하는 과정에 대해 알아보고 안정적인 운영을 위한 모니터링과 MSK Connect 재생성 시 오프셋 재개 방안에 대해서도 알아보도록 하겠습니다.
솔루션 개요
아래는 블로그에서 안내할 솔루션의 아키텍처입니다. MySQL에서 ORACLE로의 데이터 동기화를 위해 Capture는 Debezium For MySQL을, Sink는 Confluent JDBC를 MSK Connect와 함께 사용하고 데이터 스트리밍은 Amazon MSK를 사용하여 솔루션을 구축합니다.
MSK Connect 이해
MSK Connect 는 Apache Kafka 클러스터와 데이터를 쉽게 주고 받을 수 있게 도와주는 Amazon MSK의 기능입니다. MSK Connect는 부하 변화에 따라 자동으로 크기가 조정되며 사용한 리소스에 대해서만 비용이 발생됩니다. 아래 Amazon MSK Connect 아키텍처에서 Worker는 Connector 로직을 실행하는 Java 가상 머신 프로세스입니다. 각 Worker는 병렬 스레드로 실행되는 Task를 생성하고 데이터 동기화 작업을 수행합니다. 작업 간 상태를 저장하지 않으므로 장애 또는 중단 발생 시 자동 재시작이 가능합니다.
단계 1 : 인프라 환경 생성
원활한 실습을 위해 사전에 준비된 CloudFormation 을 통해 아래 그림과 같은 기본 인프라 환경을 구축합니다. 블로그의 모든 실습 내용은 서울리전(ap-northeast-2)에서 진행되므로 현재 사용하고 있는 리전이 서울리전(ap-northeast-2)인지 확인하세요.
- 아래 버튼을 통해 AWS CloudFormation 스택(cfn.yaml)을 다운로드 받습니다.
- CloudFormation 콘솔로 이동하여 Create stack 선택 후 cfn.yaml 업로드 합니다.
- Stack name 에 workshop-msk을 입력하고 나머지는 default 값을 유지하며 생성을 완료합니다. 약 5분 정도 소요가 되며 아래 그림과 같이 2개의 VPC 네트워크 영역에 MySQL, ORACLE 데이터베이스가 설치된 2개의 EC2가 생성됩니다.
- 생성이 완료되면 CloudFormation 의 Outputs 탭에서 아래 그림과 같은 정보를 확인할 수 있습니다. 빨간 박스로 표시된 4개의 IP는 다음 단계에서 사용되므로 편의를 위해 메모를 합니다.
단계 2 : 소스/타겟 데이터베이스 시작
EC2 콘솔에 접속을 하게 되면 MSK-SOURCE-DB 와 MSK-TARGET-DB 이름의 EC2 인스턴스를 확인할 수 있습니다. 각 인스턴스에 접속하여 MSK-SOURCE-DB 에는 MySQL 를 시작, MSK-TARGET-DB 에는 ORACLE을 시작합니다.
- MSK-SOURCE-DB 인스턴스 마우스 우측 클릭 > Connect > Session Manager 탭 > Connect > SSH 접속 (만약 접속 오류 발생 시 1,2분 기다린 후 다시 시도합니다.)
- 아래 명령을 통해 MySQL 을 시작합니다. (약 3분 소요됩니다.)
sudo -i sudo hostnamectl set-hostname source-mysql systemctl start mysqld su - mysql ss # db 접속 select * from product; exit exit echo "complete"
- 아래 명령을 통해 데이터 동기화를 보다 쉽게 확인하기 위한 웹서비스를 실행합니다.
sudo -i su - app cp .mysql_env .env rm -rf workshop-ecommerce git clone https://github.com/color275/workshop-msk-ecommerce.git workshop-ecommerce cd /home/app/workshop-ecommerce/ python3.8 -m venv venv source venv/bin/activate pip install --upgrade pip pip install -r requirement.txt cd /home/app/workshop-ecommerce/ecommerce nohup python manage.py runserver 0.0.0.0:8000 & echo "complete" exit
- MSK-Target-DB 인스턴스 마우스 우측 클릭 > Connect > Session Manager 탭 > Connect > SSH 접속 (만약 접속 오류 발생 시 1,2분 기다린 후 다시 시도합니다.)
- 아래 명령을 통해 ORACLE 을 기동합니다. ( 약 5분 소요됩니다)
sudo -i sudo hostnamectl set-hostname target-oracle su - oracle -c './scripts/startup.sh' su - oracle -c 'lsnrctl start' su - oracle ss set linesize 200 set pagesize 100 col name for a10 col img_path for a20 col category for a20 col price for 99999 select name,img_path,category,price from product; exit echo "complete"
- 아래 명령을 통해 데이터 동기화를 보다 쉽게 확인하기 위한 웹서비스를 실행합니다.
sudo -i su - app cp .oracle_env .env rm -rf workshop-ecommerce git clone https://github.com/color275/workshop-msk-ecommerce.git workshop-ecommerce cd /home/app/workshop-ecommerce/ python3.8 -m venv venv source venv/bin/activate pip install --upgrade pip pip install -r requirement.txt cd /home/app/workshop-ecommerce/ecommerce nohup python manage.py runserver 0.0.0.0:8000 & exit
- MySQL 과 ORACLE 기동과 웹서비스 실행이 완료되었으면, 앞에서 메모한 SourceDBPublicIP 와 TargetDBPublicIP 를 통해 Chrome 웹브라우저 통해 접속합니다. 각 웹서비스는 MySQL 과 ORACLE 를 데이터 소스로 하고 있기 때문에 MSK 를 통한 동기화 구성 시 Source 웹서비스에서 주문 발생 시 Target 웹서비스에서 주문정보가 동기화가 되는지 확인 함으로서 MSK 동기화 정상 작동 여부를 체크 할 수 있습니다. 아래 그림 처럼 우측 상단의 source-mysql / target-oracle 표기로 사용하고 있는 데이터베이스를 구분할 수 있으며 지금은 데이터 동기화 구성이 되어 있지 않으므로 MySQL 웹서비스(왼쪽)에서 주문을 발생해도 ORACLE 웹서비스(오른쪽)의 주문정보에 동기화 되지 않습니다..
– 왼쪽 화면: http://SourceDBPublicIP:8000 (aws/1234)
– 오른쪽 화면 : http://TargetDBPublicIP:8000 (aws/1234)
단계 3 : Amazon MSK Cluster 생성
Amazon MSK는 안전하고 가용성이 뛰어난 완전 관리형 Apache Kafka 서비스로서, 스트리밍 데이터를 실시간으로 손쉽게 수집하고 처리할 수 있습니다. 이번 단계에서는 Amazon MSK Cluster를 생성합니다.
- MSK Cluster 콘솔에서 Create cluster 버튼을 클릭합니다.
- Cluster settings 단계에서 아래와 같이 선택/입력 합니다.
- Cluster creation method : Custom create
- Cluster name : workshop-msk-cluster
- Cluster type : Provisioned
- Broker type 단계에서 Number of zones 와 Brokers per zone 모두 2개를 입력합니다. 2개의 가용영역 마다 2개의 Broker 를 생성하는 설정으로 총 4개의 Broker가 생성되게 됩니다.
- Configuration 단계에서 Custom configuration 를 선택하고 Create configuration 버튼을 클릭하여 새로운 configuration 을 생성합니다.
- Configuration name 에 workshop-msk-config 를 입력하고 Configuration properties for revision 1 에 아래 내용을 입력 후 Create 버튼을 클릭합니다.
auto.create.topics.enable=true delete.topic.enable=true default.replication.factor=3 min.insync.replicas=2 num.io.threads=8 num.network.threads=5 num.partitions=1 num.replica.fetchers=2 replica.lag.time.max.ms=30000 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 socket.send.buffer.bytes=102400 unclean.leader.election.enable=true zookeeper.session.timeout.ms=18000
-
- create.topics.enable=true : 토픽 자동 생성
- topic.enable=true : 토픽 삭제 허용 여부
- 생성한 workshop-msk-config 를 선택하고 Next 버튼을 클릭합니다.
- Networking 단계에서 아래 내용을 선택/입력합니다.
- VPC : MSK-SOURCE-VPC 선택
- First zone : ap-northeast-2a 선택, subnet 은 PUBLIC SUBNET 선택
- Second zone : ap-northeast-2c 선택, subnet 은 PUBLIC SUBNET 선택
- Security groups in Amazon EC2 : Browse 선택 후 MSK-SG 선택
- Security 단계에서는 IAM role-based authentication 를 선택하고 Next 버튼을 클릭합니다. Amazon MSK에서는 아래 4가지의 인증 방식을 제공하는데 이번 실습에서는 IAM 인증을 사용합니다.
인증 방식 | 설명 |
Unauthenticated access | 별도의 인증이 필요하지 않고 모든 접근을 허용 |
SASL/SCRAM authentication | user/password 인증 또는 AWS Secrets Manager를 사용하여 저장 및 보호되는 로그인 자격 증명 사용 |
IAM role-based authentication | AWS IAM를 사용하여 Amazon MSK 클러스터에 대한 인증과 권한를 제어 |
TLS 인증 | 애플리케이션에서 Amazon MSK 브로커로의 연결에 대해 TLS를 통한 클라이언트 인증을 활성화 |
- Monitoring and tags 단계에서는 Enable partition-level monitoring 을 선택하고 Broker log delivery 단계에서 Deliver to Amazon CloudWatch Logs 선택 후 Browse 버튼을 통해 WORKSHOP-MSK 로그 그룹을 선택합니다. MSK 모니터링 레벨은 위에서 아래 항목으로 갈 수록 상세 로깅이 가능하며 비용이 추가됩니다. 자세한 내용은Amazon MSK metrics for monitoring with CloudWatch 를 참고하세요.
- Review and create 화면에서 하단 Create cluster 버튼을 클릭합니다. MSK Cluster 가 생성 완료 될 때 까지 기다립니다. (약 20분~30분 정도 소요됩니다. )
- MSK Cluster 생성이 완료되면 MSK Cluster 를 사용하기 위한 endpoint 를 조회할 수 있습니다. 생성된 workshop-msk-cluster 를 클릭하고 우측 상단의 View client information 을 클릭하면 아래 캡쳐와 같이 IAM 인증을 위한 endpoint 를 확인할 수 있습니다. 이후 단계에서 사용 되므로 Bootstrap Address 이름으로 따로 메모를 합니다.
단계 4 : Config Provider를 사용하여 민감 정보 보호
DB간 동기화를 위해서는 각DB 접속을 위한 User, Password, IP 등의 정보를 입력을 해야 합니다. 일반 텍스트 사용시 정보가 노출되어 보안사고가 발생될 수 있는데 AWS Secrets Manager 를 함께 사용하게 되면 중요 정보를 안전하게 저장하여 노출을 방지할 수 있습니다.
AWS Secrets Manager 콘솔에서 Store a new secret 버튼을 클릭하고 아래와 같이 DB 접속을 위한 User, Password, 각 데이터베이스 접속 IP, MSK Bootstrap Address를 저장합니다.
- Secret type : other type of secret 선택
- Key/value pairs
- username : testuser
- password : testuser
- source_db_ip : SourceDBPrivateIP (이전 단계에서 메모한 값)
- target_db_ip : TargetDBPrivateIP (이전 단계에서 메모한 값)
- msk_bootstrap_address : Bootstrap Address (이전 단계에서 메모한 값)
- Secret name : workshop-msk-db-secret
단계 5 : Worker configuration 생성
MSK Worker configuration 는 데이터베이스에 접속할 때 사용하는 User, Password, IP 등 중요정보를 외부화 하는 설정을 할 수 있습니다. 이전 단계에서 생성한 Secrets Manager 에 저장한 접속 정보를 사용하도록 Worker configuration 를 생성합니다.
Worker configuration 콘솔에 접속하여 Worker configuration name 에는 workshop-worker-config 를 입력하고 Create worker configuration 내용에는 아래 값을 입력합니다.
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
config.providers=secretsmanager
config.providers.secretsmanager.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
config.providers.secretsmanager.param.region=ap-northeast-2
-
- *.converter : json 포맷으로 메시지를 전달
- *.providers : provider를 AWS Secrets Manager로 정의
단계 6 : Connector Plugin 저장을 위한 S3 버킷 생성
MSK Connect에서 Connector 로 사용할 Debezium For MySQL 과 Confluent JDBC Sink Plugin 을 다운받아 S3에 업로드 하는 과정을 진행합니다. S3 콘솔에 접속한 후 Create bucket 버튼을 통해 workshop-msk-connector-plugin-[id] 이름의 버킷을 생성합니다. S3 버킷명은 유일해야 하기 때문에 [id] 부분에 여러분의 메일 ID 등 유니크한 값을 대체합니다.
단계 7 : Debezium for MySQL Plugin 생성
Aurora MySQL 에서 변경 사항을 수집(Changed Data Capture)하기 위해 Debezium Connector for MySQL 를 사용할 수 있습니다. Debezium 은 MySQL 의 binlog 를 통해 데이터를 캡처하므로 Source MySQL Server에 log_bin 파라메터가 On, binlog_format 파라메터가 row 로 셋팅 되어야 합니다. 자세한 내용은 Setting up MySQL를 참고하세요.
- Debezium Plugin 과 Secrets Manager 를 위한 구성 공급자 파일을 다운로드 받아 debezium-connector-mysql.zip 로 압축합니다. (Debezium Plugin 페이지 를 통해 다운로드 받을 수도 있습니다.)
# Plugin download ( debezium + msk provider ) mkdir source_db_plugin; cd source_db_plugin wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.3.3.Final/debezium-connector-mysql-2.3.3.Final-plugin.tar.gz tar zxvf debezium-connector-mysql-2.3.3.Final-plugin.tar.gz wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.1.0/msk-config-providers-0.1.0-with-dependencies.zip unzip msk-config-providers-0.1.0-with-dependencies.zip zip -r ../debezium-connector-mysql.zip * cd ..
- 생성된 debezium-connector-mysql.zip 파일을 workshop-msk-connector-plugin-[id] 버킷에 업로드합니다.
- Custom plugins 콘솔에서 Create custom plugin 버튼을 클릭, 아래 내용을 입력 후 우측 하단의Create custom plugin 을 클릭하여 Plugin 생성을 완료합니다. ( [id] 수정 필요 )
- S3 URI : s3://workshop-msk-connector-plugin-[id]/debezium-connector-mysql.zip
- Custom plugin name : debezium-connector-mysql
단계 8 : Confluent JDBC Sink Plugin 생성
변경 사항을 ORACLE에 적용하기 위해 Confluent JDBC Sink Connector 를 사용할 수 있습니다. ORACLE 뿐만 아니라 MySQL, PostgreSQL, SQL Server, DB2 등의 다양한 데이터베이스 Plugin 도 지원 합니다.
- Confluent JDBC Sink Plugin 과 Secrets Manger 를 위한 구성 공급자 파일을 다운받아 confluentinc-kafka-connect-jdbc.zip 으로 압축합니다. (Confluent JDBC Sink 다운로드 페이지를 통해 다운로드 받을 수도 있습니다.)
# Plugin download ( confluent jdbc sink + msk provider ) mkdir target_db_plugin; cd target_db_plugin wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.4/confluentinc-kafka-connect-jdbc-10.7.4.zip unzip confluentinc-kafka-connect-jdbc-10.7.4.zip wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.1.0/msk-config-providers-0.1.0-with-dependencies.zip unzip msk-config-providers-0.1.0-with-dependencies.zip zip -r ../confluentinc-kafka-connect-jdbc.zip * cd ..
- confluentinc-kafka-connect-jdbc.zip 파일을 workshop-msk-connector-plugin-[id] 버킷에 업로드합니다.
- Custom plugins 콘솔에서 Create custom plugin 버튼을 클릭, 아래 내용을 입력 후 우측 하단의 Create custom plugin 을 클릭하여 Plugin 생성을 완료합니다. ( [id] 수정 필요 )
- S3 URI : s3://workshop-msk-connector-plugin-[id]/confluentinc-kafka-connect-jdbc.zip
- Custom plugin name : confluentinc-kafka-connect-jdbc
단계 9 : Source MSK Connect 생성
이번 단계에서는 MySQL의 변경 데이터를 Capture 하기 위한 Source MSK Connect를 생성합니다. Debezium Plugin이 실행될 서버로 EC2 등의 VM을 사용할 수 있지만 안정적으로 운영하기 위해서 가용성과 확장성을 검토해야 합니다. Amazon MSK Connect 는 리전 내에서 가용성을 보장하고 확장 가능한 완전 관리형 서비스로 Plugin 을 결합하여 쉽고 안정적으로 Connector를 구축하고 운영 할 수 있습니다.
- MSK Connectors 콘솔에서 Create connector 버튼을 클릭합니다.
- Custom plugins 단계에서 이전에 생성한 debezium-connector-mysql 을 선택합니다.
- Connector Properties 단계에서 Connector name 에 source-connector를 입력합니다.
- Apache Kafka cluster 단계에서 workshop-msk-cluster 왼쪽 라디오 버튼 체크 후 Authentication 에는 IAM을 선택 합니다.
- Connector configuration 단계에서 Configuration settings 에 아래 내용을 입력합니다.
connector.class=io.debezium.connector.mysql.MySqlConnector tasks.max=1 transforms=unwrap transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState include.schema.changes=true key.converter.schemas.enable=true value.converter.schemas.enable=true topic.prefix=workshop database.hostname=${secretsmanager:workshop-msk-db-secret:source_db_ip} database.port=3306 database.server.id=123400 database.user=${secretsmanager:workshop-msk-db-secret:username} database.password=${secretsmanager:workshop-msk-db-secret:username} database.connectionTimeZone=Asia/Seoul database.include.list=ecommerce table.include.list=ecommerce.product,ecommerce.orders time.precision.mode=connect schema.history.internal.kafka.topic=workshop schema.history.internal.kafka.bootstrap.servers=${secretsmanager:workshop-msk-db-secret:msk_bootstrap_address} schema.history.internal.producer.security.protocol=SASL_SSL schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; schema.history.internal.consumer.security.protocol=SASL_SSL
- connector.class=io.debezium.connector.mysql.MySqlConnector
connector 의 JAVA 클래스 입니다. - task.max=1
MySQL connector는 항상 단일 테스크를 사용하므로 기본값인 1로 설정합니다. - transforms.*
SMT(Single Message Transformation) 로 필요한 메시지만 보낼 수 있도록 사이즈를 줄입니다.(참고) - snapshot.mode=schema_only
Connector 생성 시 변경 데이터만 수집합니다. (현재 설정에 포함되어 있지 않지만 운영환경에서 필요 할 수 있습니다.) - topic.prefix=workshop
생성될 topic의 접두어를 지정합니다. - schema.history.internal.kafka.topic=workshop
스키마의 변경이력을 저장할 topic 명을 지정합니다. - table.include.list=ecommerce.product,ecommerce.orders
동기화할 소스 테이블을 지정합니다. - 민감 정보 입력을 위해서는 다음과 같은 포맷으로 Configuration 에 입력합니다.
${secretsmanager:[생성한 AWS Secrets Manager 명]:[민감정보]}
- connector.class=io.debezium.connector.mysql.MySqlConnector
- Connector capacity 단계에서 아래와 같이 선택/입력합니다.
- Capacity type : Provisioned
- Number of workers : 1 (Debezium MySQL Connector의 tasks.max는 1이므로 worker를 1로 설정)
- Worker configuration 단계에서 Use a custom configuration 을 선택하고 이전 단계에서 생성한 workshop-worker-config 를 선택합니다.
- Access permissions 단계에서 iam-msk-connect-role 을 선택합니다. iam-msk-connect-role 은 원활한 실습을 위해 사전에 CloudFormation 을 통해 생성된 Role 이며 MSK Cluster와 Secrets Manager 그리고 S3 등의 접근 권한을 필요로 합니다.
- Security 단계에서 Next 버튼을 클릭합니다.
- Logs 단계에서는 Deliver to Amazon CloudWatch Logs 를 체크하고 Browse 버튼을 통해 WORKSHOP-MSK를 선택합니다.
- 마지막 페이지에서 Create connector 버튼을 클릭하여 Connect 생성을 완료합니다. ( 약 10분 소요 됩니다.)
단계 10 : Target MSK Connect 생성
마찬가지로 Amazon MSK 로 수집된 데이터를 ORACLE 데이터베이스로 동기화하기 위한 Target MSK Connect 를 생성합니다.
- MSK Connectors 콘솔에서 Create connector 버튼을 클릭합니다.
- Custom plugins 단계에서 이전에 생성한 confluentinc-kafka-connect-jdbc 을 선택합니다.
- Connector Properties 단계에서 Connector name 에 target-connector를 입력합니다.
- Apache Kafka cluster 단계에서 workshop-msk-cluster 왼쪽 라디오 버튼 체크 후 Authentication 에는 IAM을 선택 합니다.
- Connector configuration 단계에서 Configuration settings 에 아래 내용을 입력합니다.
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 transforms.changeTopicName.replacement=$1 transforms.changeTopicName.regex=workshop.ecommerce.(.*) transforms=changeTopicName transforms.changeTopicName.type=org.apache.kafka.connect.transforms.RegexRouter key.converter.schemas.enable=true value.converter.schemas.enable=true key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter topics.regex=workshop.ecommerce.(.*) connection.url=jdbc:oracle:thin:@${secretsmanager:workshop-msk-db-secret:target_db_ip}:1521/ORCL connection.password=${secretsmanager:workshop-msk-db-secret:username} connection.user=${secretsmanager:workshop-msk-db-secret:password} pk.mode=record_value table.name.format=${topic} insert.mode=upsert quote.sql.identifiers=never pk.fields=id max.retries=200 retry.backoff.ms=3000 auto.evolve=false auto.create=false
- class=io.confluent.connect.jdbc.JdbcSinkConnector
connector 의 JAVA 클래스 입니다. - mode=upsert
데이터의 멱등성을 보장하기 위해 upsert 를 사용합니다. - transforms.*
Debezium Plugin 과 Confluent Plugin 간의 호환성 제공합니다. - oracle.jdbc.fanEnabled=false
ORACLE 접속 시 30초 timeout을 회피합니다. - 민감 정보 입력을 위해서는 다음과 같은 포맷으로 Configuration 에 입력합니다. ${secretsmanager:[생성한 AWS Secrets Manager 명]:[민감정보]}
- Connector capacity 단계에서 아래와 같이 선택/입력합니다.
- Capacity type : Provisioned
- Number of workers : 1
- Worker configuration 단계에서 Use a custom configuration 을 선택하고 이전 단계에서 생성한 workshop-worker-config 를 선택합니다.
- Access permissions 단계에서 iam-msk-connect-role 을 선택합니다.
- Security 단계에서 Next 버튼을 클릭합니다.
- Logs 단계에서는 Deliver to Amazon CloudWatch Logs 를 체크하고 Browse 버튼을 통해 WORKSHOP-MSK를 선택합니다.
- 마지막 페이지에서 Create connector 버튼을 클릭하여 Connect 생성을 완료합니다. ( 약 10분 소요 됩니다)
단계 11 : 데이터 동기화 확인
아래와 같이 생성한 Connector 의 Status 가 Running 상태가 되면 데이터 동기화를 위한 모든 구성이 완료되었습니다. 사전에 준비된 웹서비스와 Kafka client를 통해 데이터 동기화가 정상적으로 동작하는지 확인해 보겠습니다.
- 웹서비스를 통해 데이터 동기화를 확인하기 위해MySQL과 ORACLE 을 각각의 데이터베이스로 하는 웹페이지를 Chrome 브라우저를 통해 실행합니다.
– 왼쪽 화면: http://SourceDBPublicIP:8000 (aws/1234)
– 오른쪽 화면: http://TargetDBPublicIP:8000 (aws/1234) - 왼쪽 MySQL 웹페이지의 “상품정보” 메뉴에서 음식의 주문 버튼을 클릭하면 “주문정보” 메뉴에 최근 주문 정보가 표시됩니다. 이때 오른쪽 ORACLE 웹페이지에서도 주문 데이터가 동기화 되는지 확인해 봅니다.
- 다음으로 Kafka Client를 통해 데이터 동기화를 확인해 보겠습니다. Kafka Client 를 설치할 임시 서버로 기존에 생성되어 있는 MSK-SOURCE-DB 인스턴스를 활용합니다.
- EC2 콘솔에서 MSK-SOURCE-DB 인스턴스 마우스 우측 클릭 > Connect > Session Manager 탭 > Connect > SSH 접속
- 아래 Cmd를 통해 Kafka Client 를 설치하고 MSK Cluster 접속을 위한 인증 작업을 진행합니다. 아래 내용에서 <msk_bootstrap_address> 부분을 이전에 메모한 Bootstrap Address 값으로 변경한 후 SSH 화면에서 실행합니다.
# kafka client 설치 sudo -i sudo amazon-linux-extras install -y java-openjdk11 sudo yum install -y java-11-openjdk-devel wget https://downloads.apache.org/kafka/3.5.1/kafka_2.12-3.5.1.tgz tar zxvf kafka_2.12-3.5.1.tgz ln -s kafka_2.12-3.5.1 kafka sudo yum install -y jq echo 'PATH=$PATH:$HOME/kafka/bin' >> ~/.bash_profile . ~/.bash_profile kafka-topics.sh --version echo "finish" # MSK Cluster 접근을 위한 endpoint를 환경변수에 저장 echo "export MSK_BOOTSTRAP_ADDRESS=<msk_bootstrap_address>" >> ~/.bash_profile . ~/.bash_profile # MSK IAM 인증을 위한 관련 파일 다운로드 cd /root/kafka/libs wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar # IAM 인증 echo -n "security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler" > /tmp/client_iam.properties
- 아래 kafka-topics.sh 을 통해 생성된 topic 리스트를 확인합니다.
kafka-topics.sh \ --bootstrap-server $MSK_BOOTSTRAP_ADDRESS \ --command-config /tmp/client_iam.properties \ --list
-
- __amazon_msk_connect_config : 커넥터 내부 구성정보 저장
- __amazon_msk_connect_offset : binlog 파일 offset 기록
- __amazon_msk_connect_status : 작업 구성상태 변경 기록
- workshop : 스키마 히스토리 기록
- workshop.ecommerce.orders : orders 테이블의 데이터 변경 기록
- workshop.ecommerce.product : product 테이블의 데이터 변경 기록
- 아래 kafka-console-consumer.sh를 실행하고 MySQL 웹서비스에서 주문을 발생시키면 메시지가 ecommerce.orders 토픽에 발생 되는 것을 확인할 수 있습니다.
kafka-console-consumer.sh \ --bootstrap-server $MSK_BOOTSTRAP_ADDRESS \ --consumer.config /tmp/client_iam.properties \ --topic workshop.ecommerce.orders
단계 12 : 모니터링
모니터링은 장애를 사전에 예방하기 위한 중요한 활동입니다. MSM Cluster 는 아래의 모니터링 레벨 수준에 따라 자동으로 메트릭을 수집합니다. Default 레벨 모니터링은 무료이며, 다른 메트릭은 추가 요금이 부과 됩니다. 자세한 내용은 링크를 확인하세요.
- Default 레벨 모니터링
- Broker 레벨 모니터링
- Topic, Broker 레벨 모니터링
- Topic, Partition 레벨 모니터링
동기화에 가장 중요한 부분은 지연에 관련된 모니터링 입니다. 미션 크리티컬한 서비스에 동기화 지연이 발생된다면 장애를 유발 시킬 수 있습니다. 모니터링 레벨 수준에 따라 Consumer 지연에 관련된 메트릭을 CloudWatch에서 확인할 수 있습니다.
메트릭명 | 모니터링 레벨 | 설명 |
EstimatedMaxTimeLag | Default 레벨 | 토픽 기준 소비자 그룹의 메시지 소비 지연 시간 |
MaxOffsetLag | Default 레벨 | 토픽 기준 파티션 최대 오프셋 지연 |
SumOffsetLag | Default 레벨 | 토픽 기준 오프셋 지연 합계 |
EstimatedTimeLag | Topic, Partition 레벨 모니터링 | 파티션 기준 소비 지연 시간 |
OffsetLag | Topic, Partition 레벨 모니터링 | 파티션 기준 소비자의 오프셋 지연 개수 |
CloudWatch 의 Dashboards 에서는 지연 메트릭 뿐만 아니라 Broker CPU 사용률, Storage 사용률 등을 활용하여 대시보드를 생성할 수 있습니다.
- CloudWatch Dashboards 콘솔 에서 Create dashboard 버튼을 클릭합니다.
- Dashboard name 이름에 workshop-msk-monitor 를 입력 후 Create dashboard 버튼을 클릭합니다.
- script로 대시보드를 생성할 것이므로 Add widget 은 Cancel 버튼을 클릭합니다.
- 우측 상단의 Actions > View/edit source 버튼을 클릭합니다.
- 링크를 통해 모니터링 Script(monitor.yaml)를 다운 받습니다.
- Dashboard source 에 다운로드 받은 내용을 붙여넣은 후 Update 버튼을 클릭합니다.
- 아래와 같이 MSK Cluster와 MSK Connect의 CPU, Memory, Disk, OffsetLag(지연) 모니터링을 할 수 있는 멋진 Dashboard가 생성되었습니다.
단계 13 : Source MSK Connect 재생성 시 오프셋 관리
기본적으로 MSK Connect 는 생성될 때 오프셋을 위한 토픽을 생성합니다. Connect 의 arn 일부를 사용하여 이름이 구성되며 생성된 토픽은 아래와 같습니다.
kafka-topics.sh \
--bootstrap-server $MSK_BOOTSTRAP_ADDRESS \
--command-config /tmp/client_iam.properties \
--list
해당 토픽에서 offset 정보를 관리하므로 Source MSK Connect 재생성 시 이전 Connect의 마지막 offset 부터 읽기를 재개하기 위해서는 다음 2가지 작업을 해주어야 합니다.
- Source MSK Connect 명을 이전 이름과 동일하게 지정하여 재생성
- Worker configuration 의 offset.storage.topic 설정에 이전 offset 관리 토픽명 입력
key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter config.providers=secretsmanager config.providers.secretsmanager.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider config.providers.secretsmanager.param.region=ap-northeast-2 # 기존 설정에서 추가된 항목 (주석은 삭제하세요) offset.storage.topic=__amazon_msk_connect_offsets_source-connector_cea196a6-fabc-4c4b-8299-693d559a35bb-4
단계 13 : Target MSK Connect 재생성 시 오프셋 관리
Target(Sink) MSK Connect 의 오프셋 관리는 토픽 대신 Kafka Consumer 그룹을 통해 관리가 됩니다. 그룹명은 connect-{Connector_Name} 으로 생성되므로 Target MSK Connect 재생성 시 마지막 커밋된 오프셋 부터 읽기 위해서는 이전 Connector 와 동일한 이름으로만 재생성을 하면 됩니다.
다시 말하면 Target(Sink) MSK Connect 재생성 시 이전 Connect의 마지막 offset 부터 읽기를 재개하기 위해서는 다음 작업을 해주어야 합니다.
- Target MSK Connect 명을 이전 이름과 동일하게 지정하여 재생성
리소스 정리하기
실습을 완료한 후 불필요한 과금을 방지하기 위해 사용했던 리소스를 삭제합니다.
- MSK Connect콘솔에 접속하여 생성한 모든 connector 를 제거합니다.
- Custom plugins콘솔에 접속하여 생성한 모든 plugin 을 제거합니다.
- MSK Clusters콘솔에 접속하여 생성한 모든 MSK Clusters 를 제거합니다.
- Cluster configurations콘솔에 접속하여 생성한 모든 configurations 를 제거합니다.
- S3콘솔에 접속하여 생성한 모든 bucket 를 제거합니다. (데이터를 먼저 삭제한 후 bucket 삭제를 진행해야 합니다.)
- CloudFormation 콘솔에 접속하여 workshop-msk 선택 후 Delete 버튼을 통해 CloudFormation 을 통해 생성되었던 리소스를 모두 삭제합니다.
마무리
지금까지 Amazon MSK Cluster 와 MSK Connect 그리고 Debezium, Confluent Connector 를 활용하여 데이터베이스 간 데이터 동기화 구축 과정과 모니터링 그리고 오프셋 관리에 대해 알아보았습니다. 물론 설치형으로 사용할 수도 있지만 가용성과 확장성을 고려한다면 완전 관리형인 Amazon MSK Cluster 와 MSK Connect 를 사용함으로서 인프라 관리 부담은 줄이고 비즈니스를 위한 핵심 업무에 집중 할 수 있을 것이라 기대합니다. 또한 인프라 관리는 서비스에 맡기고 성능 최적화를 위해 가장 중요한 메시지 사이즈를 줄이는 활동(압축, 필요한 데이터 선별) 등에 더 많은 시간을 할애하고 집중하여 시너지를 낼 수 있습니다. 본 블로그에서 안내한 구축 과정은 Amazon MSK 워크샵 에서 단계 별 캡쳐 화면을 통해 보다 자세하게 안내하고 있으니 참고 바랍니다.