AWS 기술 블로그

Amazon MSK를 활용한 데이터베이스 간 CDC 구현하기

최근 많은 고객들이 디지털 전환을 위해 온프레미스의 모놀리식 아키텍처에서 MSA를 통한 클라우드로의 전환을 하기 위해 많은 노력을 하고 있습니다. 리스크가 있는 빅뱅 방식보다 점진적 전환 전략을 선택하다 보니 과도기적으로 온프레미스와 클라우드에 동시에 데이터베이스를 운영하게 되고 클라우드DB에서 온프레미스DB로 데이터를 동기화 해야 할 필요가 생기게 됩니다. 물론 마이그레이션이 완료되면 제거될 부분이지만 그 전까지는 서비스 중단을 예방하기 위해 안전하고 확장 가능한 동기화 아키텍처를 선택해야만 합니다.

Apache Kafka는 스트리밍 서비스 뿐만 아니라 실시간 데이터 동기화를 구현할 수 있는 오픈소스 플랫폼으로 AWS의 완전 관리형 서비스인 Amazon MSK Cluster 와 통합으로 Kafka를 활용하는 것이 더욱 쉽고 편리 해졌습니다. 또한 이기종 데이터베이스 간 데이터 변경 분을 추출하고 반영하기 위해 MSK ConnectApache Debezium, Confluent Connectors 등을 함께 사용하게 되면 안정적이고 확장 가능한 커넥터를 구축할 수 있습니다.

이번 블로그에서는 Amazon MSK를 사용하여 이기종 데이터베이스 간 데이터를 동기화하는 과정에 대해 알아보고 안정적인 운영을 위한 모니터링과 MSK Connect 재생성 시 오프셋 재개 방안에 대해서도 알아보도록 하겠습니다.

솔루션 개요

아래는 블로그에서 안내할 솔루션의 아키텍처입니다.  MySQL에서 ORACLE로의 데이터 동기화를 위해 Capture는 Debezium For MySQL을, Sink는 Confluent JDBC를 MSK Connect와 함께 사용하고 데이터 스트리밍은 Amazon MSK를 사용하여 솔루션을 구축합니다.

MSK Connect 이해

MSK Connect 는 Apache Kafka 클러스터와 데이터를 쉽게 주고 받을 수 있게 도와주는 Amazon MSK의 기능입니다. MSK Connect는 부하 변화에 따라 자동으로 크기가 조정되며 사용한 리소스에 대해서만 비용이 발생됩니다. 아래 Amazon MSK Connect 아키텍처에서 Worker는 Connector 로직을 실행하는 Java 가상 머신 프로세스입니다. 각 Worker는 병렬 스레드로 실행되는 Task를 생성하고 데이터 동기화 작업을 수행합니다. 작업 간 상태를 저장하지 않으므로 장애 또는 중단 발생 시 자동 재시작이 가능합니다.

단계 1 : 인프라 환경 생성

원활한 실습을 위해 사전에 준비된 CloudFormation 을 통해 아래 그림과 같은 기본 인프라 환경을 구축합니다. 블로그의 모든 실습 내용은 서울리전(ap-northeast-2)에서 진행되므로 현재 사용하고 있는 리전이 서울리전(ap-northeast-2)인지 확인하세요.

  1. 아래 버튼을 통해 AWS CloudFormation 스택(cfn.yaml)을 다운로드 받습니다.
  2. CloudFormation 콘솔로 이동하여 Create stack 선택 후 cfn.yaml 업로드 합니다.
  3. Stack name 에 workshop-msk을 입력하고 나머지는 default 값을 유지하며 생성을 완료합니다. 약 5분 정도 소요가 되며 아래 그림과 같이 2개의 VPC 네트워크 영역에 MySQL, ORACLE 데이터베이스가 설치된 2개의 EC2가 생성됩니다.
  4. 생성이 완료되면 CloudFormation 의 Outputs 탭에서 아래 그림과 같은 정보를 확인할 수 있습니다. 빨간 박스로 표시된 4개의 IP는 다음 단계에서 사용되므로 편의를 위해 메모를 합니다.

단계 2 : 소스/타겟 데이터베이스 시작

EC2 콘솔에 접속을 하게 되면 MSK-SOURCE-DB 와 MSK-TARGET-DB 이름의 EC2 인스턴스를 확인할 수 있습니다. 각 인스턴스에 접속하여 MSK-SOURCE-DB 에는 MySQL 를 시작, MSK-TARGET-DB 에는 ORACLE을 시작합니다.

  1. MSK-SOURCE-DB 인스턴스 마우스 우측 클릭 > Connect > Session Manager 탭 > Connect > SSH 접속 (만약 접속 오류 발생 시 1,2분 기다린 후 다시 시도합니다.)
  2. 아래 명령을 통해 MySQL 을 시작합니다. (약 3분 소요됩니다.)
    sudo -i
    sudo hostnamectl set-hostname source-mysql
    systemctl start mysqld
    su - mysql
    ss # db 접속
    select * from product;
    exit
    exit
    echo "complete"
    
  1. 아래 명령을 통해 데이터 동기화를 보다 쉽게 확인하기 위한 웹서비스를 실행합니다.
    sudo -i su - app
    cp .mysql_env .env
    rm -rf workshop-ecommerce
    git clone https://github.com/color275/workshop-msk-ecommerce.git workshop-ecommerce
    cd /home/app/workshop-ecommerce/
    python3.8 -m venv venv
    source venv/bin/activate
    pip install --upgrade pip
    pip install -r requirement.txt
    cd /home/app/workshop-ecommerce/ecommerce
    nohup python manage.py runserver 0.0.0.0:8000 &
    echo "complete"
    exit
  1. MSK-Target-DB 인스턴스 마우스 우측 클릭 > Connect > Session Manager 탭 > Connect > SSH 접속 (만약 접속 오류 발생 시 1,2분 기다린 후 다시 시도합니다.)
  2. 아래 명령을 통해 ORACLE 을 기동합니다. ( 약 5분 소요됩니다)
    sudo -i
    sudo hostnamectl set-hostname target-oracle
    su - oracle -c './scripts/startup.sh'
    su - oracle -c 'lsnrctl start'
    su - oracle
    ss
    set linesize 200
    set pagesize 100
    col name for a10
    col img_path for a20
    col category for a20
    col price for 99999
    select name,img_path,category,price from product;
    exit
    echo "complete"
    
  1. 아래 명령을 통해 데이터 동기화를 보다 쉽게 확인하기 위한 웹서비스를 실행합니다.
    sudo -i su - app
    cp .oracle_env .env
    rm -rf workshop-ecommerce
    git clone https://github.com/color275/workshop-msk-ecommerce.git workshop-ecommerce
    cd /home/app/workshop-ecommerce/
    python3.8 -m venv venv
    source venv/bin/activate
    pip install --upgrade pip
    pip install -r requirement.txt
    cd /home/app/workshop-ecommerce/ecommerce
    nohup python manage.py runserver 0.0.0.0:8000 &
    exit
    
  1. MySQL 과 ORACLE 기동과 웹서비스 실행이 완료되었으면, 앞에서 메모한 SourceDBPublicIP 와 TargetDBPublicIP 를 통해 Chrome 웹브라우저 통해 접속합니다. 각 웹서비스는 MySQL 과 ORACLE 를 데이터 소스로 하고 있기 때문에 MSK 를 통한 동기화 구성 시 Source 웹서비스에서 주문 발생 시 Target 웹서비스에서 주문정보가 동기화가 되는지 확인 함으로서 MSK 동기화 정상 작동 여부를 체크 할 수 있습니다. 아래 그림 처럼 우측 상단의 source-mysql / target-oracle 표기로 사용하고 있는 데이터베이스를 구분할 수 있으며 지금은 데이터 동기화 구성이 되어 있지 않으므로 MySQL 웹서비스(왼쪽)에서 주문을 발생해도 ORACLE 웹서비스(오른쪽)의 주문정보에 동기화 되지 않습니다..
    – 왼쪽 화면: http://SourceDBPublicIP:8000 (aws/1234)
    – 오른쪽 화면 : http://TargetDBPublicIP:8000 (aws/1234)

단계 3 : Amazon MSK Cluster 생성

Amazon MSK는 안전하고 가용성이 뛰어난 완전 관리형 Apache Kafka 서비스로서, 스트리밍 데이터를 실시간으로 손쉽게 수집하고 처리할 수 있습니다. 이번 단계에서는 Amazon MSK Cluster를 생성합니다.

  1. MSK Cluster 콘솔에서 Create cluster 버튼을 클릭합니다.
  2. Cluster settings 단계에서 아래와 같이 선택/입력 합니다.
    • Cluster creation method : Custom create
    • Cluster name : workshop-msk-cluster
    • Cluster type : Provisioned
  3. Broker type 단계에서 Number of zones 와 Brokers per zone 모두 2개를 입력합니다. 2개의 가용영역 마다 2개의 Broker 를 생성하는 설정으로 총 4개의 Broker가 생성되게 됩니다.
  4. Configuration 단계에서 Custom configuration 를 선택하고 Create configuration 버튼을 클릭하여 새로운 configuration 을 생성합니다.
  5. Configuration name 에 workshop-msk-config 를 입력하고 Configuration properties for revision 1 에 아래 내용을 입력 후 Create 버튼을 클릭합니다.
    auto.create.topics.enable=true
    delete.topic.enable=true
    default.replication.factor=3
    min.insync.replicas=2
    num.io.threads=8
    num.network.threads=5
    num.partitions=1
    num.replica.fetchers=2
    replica.lag.time.max.ms=30000
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    socket.send.buffer.bytes=102400
    unclean.leader.election.enable=true
    zookeeper.session.timeout.ms=18000
    
    • create.topics.enable=true : 토픽 자동 생성
    • topic.enable=true : 토픽 삭제 허용 여부
  1. 생성한 workshop-msk-config 를 선택하고 Next 버튼을 클릭합니다.
  2. Networking 단계에서 아래 내용을 선택/입력합니다.
    • VPC : MSK-SOURCE-VPC 선택
    • First zone : ap-northeast-2a 선택, subnet 은 PUBLIC SUBNET 선택
    • Second zone : ap-northeast-2c 선택, subnet 은 PUBLIC SUBNET 선택
    • Security groups in Amazon EC2 : Browse 선택 후 MSK-SG 선택
  3. Security 단계에서는 IAM role-based authentication 를 선택하고 Next 버튼을 클릭합니다. Amazon MSK에서는 아래 4가지의 인증 방식을 제공하는데 이번 실습에서는 IAM 인증을 사용합니다.
인증 방식 설명
Unauthenticated access 별도의 인증이 필요하지 않고 모든 접근을 허용
SASL/SCRAM authentication user/password 인증 또는  AWS Secrets Manager를 사용하여 저장 및 보호되는 로그인 자격 증명 사용
IAM role-based authentication AWS IAM를 사용하여 Amazon MSK 클러스터에 대한 인증과 권한를 제어
TLS 인증 애플리케이션에서 Amazon MSK 브로커로의 연결에 대해 TLS를 통한 클라이언트 인증을 활성화
  1. Monitoring and tags 단계에서는 Enable partition-level monitoring 을 선택하고 Broker log delivery 단계에서 Deliver to Amazon CloudWatch Logs 선택 후 Browse 버튼을 통해 WORKSHOP-MSK 로그 그룹을 선택합니다. MSK 모니터링 레벨은 위에서 아래 항목으로 갈 수록 상세 로깅이 가능하며 비용이 추가됩니다. 자세한 내용은Amazon MSK metrics for monitoring with CloudWatch  를 참고하세요.
  2. Review and create 화면에서 하단 Create cluster 버튼을 클릭합니다. MSK Cluster 가 생성 완료 될 때 까지 기다립니다. (약 20분~30분 정도 소요됩니다. )
  3. MSK Cluster 생성이 완료되면 MSK Cluster 를 사용하기 위한 endpoint 를 조회할 수 있습니다. 생성된 workshop-msk-cluster 를 클릭하고 우측 상단의 View client information 을 클릭하면 아래 캡쳐와 같이 IAM 인증을 위한 endpoint 를 확인할 수 있습니다. 이후 단계에서 사용 되므로 Bootstrap Address 이름으로 따로 메모를 합니다.

단계 4 : Config Provider를 사용하여 민감 정보 보호

DB간 동기화를 위해서는 각DB 접속을 위한 User, Password, IP 등의 정보를 입력을 해야 합니다. 일반 텍스트 사용시 정보가 노출되어 보안사고가 발생될 수 있는데 AWS Secrets Manager 를 함께 사용하게 되면 중요 정보를 안전하게 저장하여 노출을 방지할 수 있습니다.

AWS Secrets Manager 콘솔에서 Store a new secret 버튼을 클릭하고 아래와 같이 DB 접속을 위한 User, Password, 각 데이터베이스 접속 IP, MSK Bootstrap Address를 저장합니다.

  • Secret type : other type of secret 선택
  • Key/value pairs
    • username : testuser
    • password : testuser
    • source_db_ip : SourceDBPrivateIP (이전 단계에서 메모한 값)
    • target_db_ip : TargetDBPrivateIP (이전 단계에서 메모한 값)
    • msk_bootstrap_address : Bootstrap Address (이전 단계에서 메모한 값)
  • Secret name : workshop-msk-db-secret

단계 5 : Worker configuration 생성

MSK Worker configuration 는 데이터베이스에 접속할 때 사용하는 User, Password, IP 등 중요정보를 외부화 하는 설정을 할 수 있습니다. 이전 단계에서 생성한 Secrets Manager 에 저장한 접속 정보를 사용하도록 Worker configuration 를 생성합니다.

Worker configuration 콘솔에 접속하여 Worker configuration name 에는 workshop-worker-config 를 입력하고 Create worker configuration 내용에는 아래 값을 입력합니다.

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
config.providers=secretsmanager
config.providers.secretsmanager.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
config.providers.secretsmanager.param.region=ap-northeast-2
    • *.converter : json 포맷으로 메시지를 전달
    • *.providers : provider를 AWS Secrets Manager로 정의

단계 6 : Connector Plugin 저장을 위한 S3 버킷 생성

MSK Connect에서 Connector 로 사용할 Debezium For MySQL 과 Confluent JDBC Sink Plugin 을 다운받아 S3에 업로드 하는 과정을 진행합니다. S3 콘솔에 접속한 후 Create bucket 버튼을 통해 workshop-msk-connector-plugin-[id] 이름의 버킷을 생성합니다. S3 버킷명은 유일해야 하기 때문에 [id] 부분에 여러분의 메일 ID 등 유니크한 값을 대체합니다.

단계 7 : Debezium for MySQL Plugin 생성

Aurora MySQL 에서 변경 사항을 수집(Changed Data Capture)하기 위해 Debezium Connector for MySQL 를 사용할 수 있습니다. Debezium 은 MySQL 의 binlog 를 통해 데이터를 캡처하므로 Source MySQL Server에 log_bin 파라메터가 On, binlog_format 파라메터가 row 로 셋팅 되어야 합니다. 자세한 내용은 Setting up MySQL를 참고하세요.

  1. Debezium Plugin 과 Secrets Manager 를 위한 구성 공급자 파일을 다운로드 받아 debezium-connector-mysql.zip 로 압축합니다. (Debezium Plugin 페이지 를 통해 다운로드 받을 수도 있습니다.)
    # Plugin download ( debezium + msk provider )
    mkdir source_db_plugin; cd source_db_plugin
    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.3.3.Final/debezium-connector-mysql-2.3.3.Final-plugin.tar.gz
    tar zxvf debezium-connector-mysql-2.3.3.Final-plugin.tar.gz 
    wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.1.0/msk-config-providers-0.1.0-with-dependencies.zip
    unzip msk-config-providers-0.1.0-with-dependencies.zip
    zip -r ../debezium-connector-mysql.zip * 
    cd ..
    
  2. 생성된 debezium-connector-mysql.zip 파일을 workshop-msk-connector-plugin-[id] 버킷에 업로드합니다.
  3. Custom plugins 콘솔에서 Create custom plugin 버튼을 클릭, 아래 내용을 입력 후 우측 하단의Create custom plugin 을 클릭하여 Plugin 생성을 완료합니다. ( [id] 수정 필요 )
    1. S3 URI : s3://workshop-msk-connector-plugin-[id]/debezium-connector-mysql.zip
    2. Custom plugin name : debezium-connector-mysql

단계 8 : Confluent JDBC Sink Plugin 생성

변경 사항을 ORACLE에 적용하기 위해 Confluent JDBC Sink Connector 를 사용할 수 있습니다. ORACLE 뿐만 아니라 MySQL, PostgreSQL, SQL Server, DB2 등의 다양한 데이터베이스 Plugin 도 지원 합니다.

  1. Confluent JDBC Sink Plugin 과 Secrets Manger 를 위한 구성 공급자 파일을 다운받아 confluentinc-kafka-connect-jdbc.zip 으로 압축합니다. (Confluent JDBC Sink 다운로드 페이지를 통해 다운로드 받을 수도 있습니다.)
    # Plugin download ( confluent jdbc sink + msk provider )
    mkdir target_db_plugin; cd target_db_plugin
    wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.4/confluentinc-kafka-connect-jdbc-10.7.4.zip
    unzip confluentinc-kafka-connect-jdbc-10.7.4.zip
    wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.1.0/msk-config-providers-0.1.0-with-dependencies.zip
    unzip msk-config-providers-0.1.0-with-dependencies.zip
    zip -r ../confluentinc-kafka-connect-jdbc.zip * 
    cd ..
  1. confluentinc-kafka-connect-jdbc.zip 파일을 workshop-msk-connector-plugin-[id] 버킷에 업로드합니다.
  2. Custom plugins 콘솔에서 Create custom plugin 버튼을 클릭, 아래 내용을 입력 후 우측 하단의 Create custom plugin 을 클릭하여 Plugin 생성을 완료합니다. ( [id] 수정 필요 )
  • S3 URI : s3://workshop-msk-connector-plugin-[id]/confluentinc-kafka-connect-jdbc.zip
  • Custom plugin name : confluentinc-kafka-connect-jdbc

단계 9 : Source MSK Connect 생성

이번 단계에서는 MySQL의 변경 데이터를 Capture 하기 위한 Source MSK Connect를 생성합니다. Debezium Plugin이 실행될 서버로 EC2 등의 VM을 사용할 수 있지만 안정적으로 운영하기 위해서 가용성과 확장성을 검토해야 합니다. Amazon MSK Connect 는 리전 내에서 가용성을 보장하고 확장 가능한 완전 관리형 서비스로 Plugin 을 결합하여 쉽고 안정적으로 Connector를 구축하고 운영 할 수 있습니다.

  1. MSK Connectors 콘솔에서 Create connector 버튼을 클릭합니다.
  2. Custom plugins 단계에서 이전에 생성한 debezium-connector-mysql 을 선택합니다.
  3. Connector Properties 단계에서 Connector name 에 source-connector를 입력합니다.
  4. Apache Kafka cluster 단계에서 workshop-msk-cluster 왼쪽 라디오 버튼 체크 후 Authentication 에는 IAM을 선택 합니다.
  5. Connector configuration 단계에서 Configuration settings 에 아래 내용을 입력합니다.
    connector.class=io.debezium.connector.mysql.MySqlConnector
    tasks.max=1
    transforms=unwrap
    transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
    include.schema.changes=true
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    topic.prefix=workshop
    database.hostname=${secretsmanager:workshop-msk-db-secret:source_db_ip}
    database.port=3306
    database.server.id=123400
    database.user=${secretsmanager:workshop-msk-db-secret:username}
    database.password=${secretsmanager:workshop-msk-db-secret:username}
    database.connectionTimeZone=Asia/Seoul
    database.include.list=ecommerce
    table.include.list=ecommerce.product,ecommerce.orders
    time.precision.mode=connect
    schema.history.internal.kafka.topic=workshop
    schema.history.internal.kafka.bootstrap.servers=${secretsmanager:workshop-msk-db-secret:msk_bootstrap_address}
    schema.history.internal.producer.security.protocol=SASL_SSL
    schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM
    schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM
    schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    schema.history.internal.consumer.security.protocol=SASL_SSL
    
    • connector.class=io.debezium.connector.mysql.MySqlConnector
      connector 의 JAVA 클래스 입니다.
    • task.max=1
      MySQL connector는 항상 단일 테스크를 사용하므로 기본값인 1로 설정합니다.
    • transforms.*
      SMT(Single Message Transformation) 로 필요한 메시지만 보낼 수 있도록 사이즈를 줄입니다.(참고)
    • snapshot.mode=schema_only
      Connector 생성 시 변경 데이터만 수집합니다. (현재 설정에 포함되어 있지 않지만 운영환경에서 필요 할 수 있습니다.)
    • topic.prefix=workshop
      생성될 topic의 접두어를 지정합니다.
    • schema.history.internal.kafka.topic=workshop
      스키마의 변경이력을 저장할 topic 명을 지정합니다.
    • table.include.list=ecommerce.product,ecommerce.orders
      동기화할 소스 테이블을 지정합니다.
    • 민감 정보 입력을 위해서는 다음과 같은 포맷으로 Configuration 에 입력합니다.
      ${secretsmanager:[생성한 AWS Secrets Manager 명]:[민감정보]}
  6. Connector capacity 단계에서 아래와 같이 선택/입력합니다.
    1. Capacity type : Provisioned
    2. Number of workers : 1 (Debezium MySQL Connector의 tasks.max는 1이므로 worker를 1로 설정)
  7. Worker configuration 단계에서 Use a custom configuration 을 선택하고 이전 단계에서 생성한 workshop-worker-config 를 선택합니다.
  8. Access permissions 단계에서 iam-msk-connect-role 을 선택합니다. iam-msk-connect-role 은 원활한 실습을 위해 사전에 CloudFormation 을 통해 생성된 Role 이며 MSK Cluster와 Secrets Manager 그리고 S3 등의 접근 권한을 필요로 합니다.
  9. Security 단계에서 Next 버튼을 클릭합니다.
  10. Logs 단계에서는 Deliver to Amazon CloudWatch Logs 를 체크하고 Browse 버튼을 통해 WORKSHOP-MSK를 선택합니다.
  11. 마지막 페이지에서 Create connector 버튼을 클릭하여 Connect 생성을 완료합니다. ( 약 10분 소요 됩니다.)

단계 10 : Target MSK Connect 생성

마찬가지로 Amazon MSK 로 수집된 데이터를 ORACLE 데이터베이스로 동기화하기 위한 Target MSK Connect 를 생성합니다.

  1. MSK Connectors 콘솔에서 Create connector 버튼을 클릭합니다.
  2. Custom plugins 단계에서 이전에 생성한 confluentinc-kafka-connect-jdbc 을 선택합니다.
  3. Connector Properties 단계에서 Connector name 에 target-connector를 입력합니다.
  4. Apache Kafka cluster 단계에서 workshop-msk-cluster 왼쪽 라디오 버튼 체크 후 Authentication 에는 IAM을 선택 합니다.
  5. Connector configuration 단계에서 Configuration settings 에 아래 내용을 입력합니다.
    connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
    tasks.max=1
    transforms.changeTopicName.replacement=$1
    transforms.changeTopicName.regex=workshop.ecommerce.(.*)
    transforms=changeTopicName
    transforms.changeTopicName.type=org.apache.kafka.connect.transforms.RegexRouter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    topics.regex=workshop.ecommerce.(.*)
    connection.url=jdbc:oracle:thin:@${secretsmanager:workshop-msk-db-secret:target_db_ip}:1521/ORCL
    connection.password=${secretsmanager:workshop-msk-db-secret:username}
    connection.user=${secretsmanager:workshop-msk-db-secret:password}
    pk.mode=record_value
    table.name.format=${topic}
    insert.mode=upsert
    quote.sql.identifiers=never
    pk.fields=id
    max.retries=200
    retry.backoff.ms=3000
    auto.evolve=false
    auto.create=false
    
  • class=io.confluent.connect.jdbc.JdbcSinkConnector
    connector 의 JAVA 클래스 입니다.
  • mode=upsert
    데이터의 멱등성을 보장하기 위해 upsert 를 사용합니다.
  • transforms.*
    Debezium Plugin 과 Confluent Plugin 간의 호환성 제공합니다.
  • oracle.jdbc.fanEnabled=false
    ORACLE 접속 시 30초 timeout을 회피합니다.
  • 민감 정보 입력을 위해서는 다음과 같은 포맷으로 Configuration 에 입력합니다. ${secretsmanager:[생성한 AWS Secrets Manager 명]:[민감정보]}
  1. Connector capacity 단계에서 아래와 같이 선택/입력합니다.
    • Capacity type : Provisioned
    • Number of workers : 1
  2. Worker configuration 단계에서 Use a custom configuration 을 선택하고 이전 단계에서 생성한 workshop-worker-config 를 선택합니다.
  3. Access permissions 단계에서 iam-msk-connect-role 을 선택합니다.
  4. Security 단계에서 Next 버튼을 클릭합니다.
  5. Logs 단계에서는 Deliver to Amazon CloudWatch Logs 를 체크하고 Browse 버튼을 통해 WORKSHOP-MSK를 선택합니다.
  6. 마지막 페이지에서 Create connector 버튼을 클릭하여 Connect 생성을 완료합니다. ( 약 10분 소요 됩니다)

단계 11 : 데이터 동기화 확인

아래와 같이 생성한 Connector 의 Status 가 Running 상태가 되면 데이터 동기화를 위한 모든 구성이 완료되었습니다. 사전에 준비된 웹서비스와 Kafka client를 통해 데이터 동기화가 정상적으로 동작하는지 확인해 보겠습니다.

  1. 웹서비스를 통해 데이터 동기화를 확인하기 위해MySQL과 ORACLE 을 각각의 데이터베이스로 하는 웹페이지를 Chrome 브라우저를 통해 실행합니다.
    – 왼쪽 화면: http://SourceDBPublicIP:8000 (aws/1234)
    – 오른쪽 화면: http://TargetDBPublicIP:8000 (aws/1234)
  2. 왼쪽 MySQL 웹페이지의 “상품정보” 메뉴에서 음식의 주문 버튼을 클릭하면 “주문정보” 메뉴에 최근 주문 정보가 표시됩니다. 이때 오른쪽 ORACLE 웹페이지에서도 주문 데이터가 동기화 되는지 확인해 봅니다.
  1. 다음으로 Kafka Client를 통해 데이터 동기화를 확인해 보겠습니다. Kafka Client 를 설치할 임시 서버로 기존에 생성되어 있는 MSK-SOURCE-DB 인스턴스를 활용합니다.
  2. EC2 콘솔에서 MSK-SOURCE-DB 인스턴스 마우스 우측 클릭 > Connect > Session Manager 탭 > Connect > SSH 접속
  3. 아래 Cmd를 통해 Kafka Client 를 설치하고 MSK Cluster 접속을 위한 인증 작업을 진행합니다. 아래 내용에서 <msk_bootstrap_address> 부분을 이전에 메모한 Bootstrap Address 값으로 변경한 후 SSH 화면에서 실행합니다.
    # kafka client 설치
    sudo -i
    sudo amazon-linux-extras install -y java-openjdk11
    sudo yum install -y java-11-openjdk-devel 
    wget https://downloads.apache.org/kafka/3.5.1/kafka_2.12-3.5.1.tgz
    tar zxvf kafka_2.12-3.5.1.tgz
    ln -s kafka_2.12-3.5.1 kafka
    sudo yum install -y jq
    echo 'PATH=$PATH:$HOME/kafka/bin' >> ~/.bash_profile
    . ~/.bash_profile
    kafka-topics.sh --version
    echo "finish"
    
    # MSK Cluster 접근을 위한 endpoint를 환경변수에 저장
    echo "export MSK_BOOTSTRAP_ADDRESS=<msk_bootstrap_address>" >> ~/.bash_profile
    . ~/.bash_profile
    
    # MSK IAM 인증을 위한 관련 파일 다운로드
    cd /root/kafka/libs
    wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
    
    # IAM 인증 
    echo -n "security.protocol=SASL_SSL
    sasl.mechanism=AWS_MSK_IAM 
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; 
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler" > /tmp/client_iam.properties
    
  4. 아래 kafka-topics.sh 을 통해 생성된 topic 리스트를 확인합니다.
    kafka-topics.sh \
    --bootstrap-server $MSK_BOOTSTRAP_ADDRESS \
    --command-config /tmp/client_iam.properties \
    --list

    1. __amazon_msk_connect_config : 커넥터 내부 구성정보 저장
    2. __amazon_msk_connect_offset : binlog 파일 offset 기록
    3. __amazon_msk_connect_status : 작업 구성상태 변경 기록
    4. workshop : 스키마 히스토리 기록
    5. workshop.ecommerce.orders : orders 테이블의 데이터 변경 기록
    6. workshop.ecommerce.product : product 테이블의 데이터 변경 기록
  1. 아래 kafka-console-consumer.sh를 실행하고 MySQL 웹서비스에서 주문을 발생시키면 메시지가 ecommerce.orders 토픽에 발생 되는 것을 확인할 수 있습니다.
    kafka-console-consumer.sh \
    --bootstrap-server $MSK_BOOTSTRAP_ADDRESS \
    --consumer.config /tmp/client_iam.properties \
    --topic workshop.ecommerce.orders
    

단계 12 : 모니터링

모니터링은 장애를 사전에 예방하기 위한 중요한 활동입니다. MSM Cluster 는 아래의 모니터링 레벨 수준에 따라 자동으로 메트릭을 수집합니다. Default 레벨 모니터링은 무료이며, 다른 메트릭은 추가 요금이 부과 됩니다. 자세한 내용은 링크를 확인하세요.

  • Default 레벨 모니터링
  • Broker 레벨 모니터링
  • Topic, Broker 레벨 모니터링
  • Topic, Partition 레벨 모니터링

동기화에 가장 중요한 부분은 지연에 관련된 모니터링 입니다. 미션 크리티컬한 서비스에 동기화 지연이 발생된다면 장애를 유발 시킬 수 있습니다. 모니터링 레벨 수준에 따라 Consumer 지연에 관련된 메트릭을 CloudWatch에서 확인할 수 있습니다.

메트릭명 모니터링 레벨 설명
EstimatedMaxTimeLag Default 레벨 토픽 기준 소비자 그룹의 메시지 소비 지연 시간
MaxOffsetLag Default 레벨 토픽 기준 파티션 최대 오프셋 지연
SumOffsetLag Default 레벨 토픽 기준 오프셋 지연 합계
EstimatedTimeLag Topic, Partition 레벨 모니터링 파티션 기준 소비 지연 시간
OffsetLag Topic, Partition 레벨 모니터링 파티션 기준 소비자의 오프셋 지연 개수

CloudWatch 의 Dashboards 에서는 지연 메트릭 뿐만 아니라 Broker CPU 사용률, Storage 사용률 등을 활용하여 대시보드를 생성할 수 있습니다.

  1. CloudWatch Dashboards 콘솔 에서 Create dashboard 버튼을 클릭합니다.
  2. Dashboard name 이름에 workshop-msk-monitor 를 입력 후 Create dashboard 버튼을 클릭합니다.
  3. script로 대시보드를 생성할 것이므로 Add widget 은 Cancel 버튼을 클릭합니다.
  4. 우측 상단의 Actions > View/edit source 버튼을 클릭합니다.
  5. 링크를 통해 모니터링 Script(monitor.yaml)를 다운 받습니다.
  6. Dashboard source 에 다운로드 받은 내용을 붙여넣은 후 Update 버튼을 클릭합니다.
  7. 아래와 같이 MSK Cluster와 MSK Connect의 CPU, Memory, Disk, OffsetLag(지연) 모니터링을 할 수 있는 멋진 Dashboard가 생성되었습니다.

단계 13 : Source MSK Connect 재생성 시 오프셋 관리

기본적으로 MSK Connect 는 생성될 때 오프셋을 위한 토픽을 생성합니다. Connect 의 arn 일부를 사용하여 이름이 구성되며 생성된 토픽은 아래와 같습니다.

kafka-topics.sh \
--bootstrap-server $MSK_BOOTSTRAP_ADDRESS \
--command-config /tmp/client_iam.properties \
--list

해당 토픽에서 offset 정보를 관리하므로 Source MSK Connect 재생성 시 이전 Connect의 마지막 offset 부터 읽기를 재개하기 위해서는 다음 2가지 작업을 해주어야 합니다.

  • Source MSK Connect 명을 이전 이름과 동일하게 지정하여 재생성
  • Worker configuration 의 offset.storage.topic 설정에 이전 offset 관리 토픽명 입력
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    config.providers=secretsmanager
    config.providers.secretsmanager.class=com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
    config.providers.secretsmanager.param.region=ap-northeast-2
    # 기존 설정에서 추가된 항목 (주석은 삭제하세요)
    offset.storage.topic=__amazon_msk_connect_offsets_source-connector_cea196a6-fabc-4c4b-8299-693d559a35bb-4
    

단계 13 : Target MSK Connect 재생성 시 오프셋 관리

Target(Sink) MSK Connect 의 오프셋 관리는 토픽 대신 Kafka Consumer 그룹을 통해 관리가 됩니다. 그룹명은 connect-{Connector_Name} 으로 생성되므로 Target MSK Connect 재생성 시 마지막 커밋된 오프셋 부터 읽기 위해서는 이전 Connector 와 동일한 이름으로만 재생성을 하면 됩니다.

다시 말하면 Target(Sink) MSK Connect 재생성 시 이전 Connect의 마지막 offset 부터 읽기를 재개하기 위해서는 다음 작업을 해주어야 합니다.

  • Target MSK Connect 명을 이전 이름과 동일하게 지정하여 재생성

리소스 정리하기

실습을 완료한 후 불필요한 과금을 방지하기 위해 사용했던 리소스를 삭제합니다.

  1. MSK Connect콘솔에 접속하여 생성한 모든 connector 를 제거합니다.
  2. Custom plugins콘솔에 접속하여 생성한 모든 plugin 을 제거합니다.
  3. MSK Clusters콘솔에 접속하여 생성한 모든 MSK Clusters 를 제거합니다.
  4. Cluster configurations콘솔에 접속하여 생성한 모든 configurations 를 제거합니다.
  5. S3콘솔에 접속하여 생성한 모든 bucket 를 제거합니다. (데이터를 먼저 삭제한 후 bucket 삭제를 진행해야 합니다.)
  6. CloudFormation 콘솔에 접속하여 workshop-msk 선택 후 Delete 버튼을 통해 CloudFormation 을 통해 생성되었던 리소스를 모두 삭제합니다.

마무리

지금까지 Amazon MSK Cluster 와 MSK Connect 그리고 Debezium, Confluent Connector 를 활용하여 데이터베이스 간 데이터 동기화 구축 과정과 모니터링 그리고 오프셋 관리에 대해 알아보았습니다. 물론 설치형으로 사용할 수도 있지만 가용성과 확장성을 고려한다면 완전 관리형인 Amazon MSK Cluster 와 MSK Connect 를 사용함으로서 인프라 관리 부담은 줄이고 비즈니스를 위한 핵심 업무에 집중 할 수 있을 것이라 기대합니다. 또한 인프라 관리는 서비스에 맡기고 성능 최적화를 위해 가장 중요한 메시지 사이즈를 줄이는 활동(압축, 필요한 데이터 선별) 등에 더 많은 시간을 할애하고 집중하여 시너지를 낼 수 있습니다. 본 블로그에서 안내한 구축 과정은 Amazon MSK 워크샵 에서 단계 별 캡쳐 화면을 통해 보다 자세하게 안내하고 있으니 참고 바랍니다.

Chiho Lee

Chiho Lee

이치호 솔루션즈 아키텍트는 데이터베이스 운영 및 데이터 엔지니어링 경험을 바탕으로 고객이 클라우드를 통한 비즈니스의 가치 창출을 달성 할 수 있도록 고객과 함께 효율적인 아키텍처를 구성하는 역할을 수행하고 있습니다.

YooSung Jeon

YooSung Jeon

전유성 솔루션즈 아키텍트는 통신/공공 산업군에서 데이터 분석과 다양한 오픈소스 활용 경험을 바탕으로 DNB(Digital Native Business) 고객을 대상으로 고객의 비즈니스 성과를 달성하도록 최적의 아키텍처를 구성하는 역할을 수행하고 있습니다.