AWS 기술 블로그

Amazon Data Firehose를 활용한 실시간 OLTP to ICEBERG 구현하기

배경

기존의 빅데이터 프레임워크, 특히 HIVE 기반의 데이터레이크를 클라우드 환경에서 구성 할 경우 큰 규모의 데이터를 저장하고 분석하는 데 있어 효율적입니다. 그러나 트랜잭션을 지원하지 않는 관계로 데이터의 복잡성이 증가 할 경우 성능 저하 및 운영 복잡성을 초래할 수 있습니다. 이러한 기존의 데이터레이크의 한계를 극복하기 위해 오픈 테이블 포맷인 Apache Iceberg가 등장했습니다. Apache Iceberg는 스냅샷 기반의 트랜잭션 관리, 스키마 진화(Schema-Evolution), 파티셔닝 진화(Partition-Evolution) 등의 기능을 제공하여 데이터 레이크 환경에서도 대규모 데이터를 효율적으로 관리 할 수 있게 합니다.

Apache Iceberg는 Spark, Presto, Hive, Flink와 같은 빅데이터 처리 프레임워크와 호환되며 AWS 환경에서는 AWS Glue, Amazon EMR, Amazon Athena와 같은 관리형 데이터 분석 서비스를 이용하여 Iceberg 테이블을 관리 할 수 있습니다. 그러나 스트리밍 환경에서 Iceberg를 Spark Streaming, Flink 등과 함께 사용할 경우, 이들 도구에 대한 학습 곡선이 높아 추가적인 리소스와 시간이 필요할 수 있습니다.

Amazon Data Firehose의 Apache Iceberg 지원

이제 Amazon Data Firehose를 사용할 경우 Spark Streaming 또는 Flink를 사용하지 않고도 실시간 데이터를 Amazon S3의 Apache Iceberg 테이블로 직접 전달할 수 있습니다. 이 기능을 사용하면 단일 스트림의 레코드를 다른 Apache Iceberg 테이블로 라우팅하고, INSERT, UPDATE, DELETE 작업을 해당되는 Apache Iceberg 테이블에 자동으로 적용할 수 있습니다. Amazon Data Firehose는 Glue Data Catalog에 저장된 Iceberg 테이블을 대상으로 하며 정확히 한 번만 전송되도록 보장합니다.

Amazon Data Firehose를 통해 Apache Iceberg를 타겟 테이블로 지정 할 경우 아래의 제한사항을 확인해야 합니다.

  1. 소스와 타겟의 컬럼명 및 데이터 타입 일치
    Amazon Data Firehose를 통해 Iceberg를 타겟으로 지정 할 경우 소스 데이터의 열 이름과 데이터 유형이 대상 테이블의 열 이름과 일치해야 Amazon Data Firehose가 성공적으로 전송할 수 있습니다. 열 이름이나 데이터 유형이 일치하지 않으면 Amazon Data Firehose는 오류를 발생시키고 이를 S3 오류 버킷으로 전달합니다. 모든 열 이름과 데이터 유형이 타겟 테이블과 일치하지만 소스 레코드에 추가 필드가 있는 경우, Amazon Data Firehose는 새 필드를 건너뜁니다.
  2. JSON 구조
    Amazon Data Firehose는 다중 레벨 중첩(Nested) JSON에서 첫 번째 레벨의 노드만 가져옵니다. 예를 들어 아래와 같은 JSON 구문이 있을 경우 Amazon Data Firehose는 position 필드를 포함한 첫번째 노드의 값 만 가져옵니다.

    {
       "deviceId":"<solution_unique_device_id>",
       "value":"<actual_value>",
       "position":{
          "x":143.595901,
          "y":476.399628,
       }
    }
  3. 레코드당 하나의 JSON 객체
    Firehose에 전달하는 레코드에는 하나의 JSON 객체만 보낼 수 있습니다. 레코드 내에서 여러 JSON 객체를 집계하여 전송하는 경우, Firehose는 오류를 발생시키고 이를 S3 오류 버킷으로 전달합니다. Kinesis Producer Library(KPL)로 레코드를 집계하고 Amazon Kinesis Data Stream을 소스로 사용하는 경우 Amazon Data Firehose는 자동으로 집계를 해제하고 레코드당 하나의 JSON 객체를 사용합니다.
  4. Compaction 및 스토리지 최적화
    Iceberg 테이블은 트랜잭션이 발생할 때 마다 Snapshot 및 데이터 파일 발생하며, 이는 스토리지 비용 향상과 읽기 쿼리의 성능 저하를 가져옵니다. 이러한 문제를 해결하기 위해 AWS Glue Data Catalog는 Iceberg 테이블을 관리하기 위한 다양한 기능을 제공합니다. Glue Data Catalog의 Snapshot 삭제 및 참조하지 않는 파일 삭제 기능을 통해 스토리지 비용을 최적화 할 수 있으며 Compaction 기능을 활용해 작은 파일을 큰 파일로 합쳐 읽기 성능을 개선 할 수 있습니다.

Amazon Data Firehose 활용한 OLTP to ICEBERG 아키텍처

Amazon Aurora에서 변경되는 데이터를 추적하기 위해 AWS DMS변경 데이터 캡처(CDC) 기능을 사용합니다. 변경 데이터 캡처(CDC) 기능은 데이터베이스 엔진의 기본 API를 사용하여 데이터베이스 로그에 대한 변경 사항을 수집하며 소스 데이터베이스 엔진이 MySQL의 경우, AWS DMS는 ROW 기반의 바이너리 로그(binlogs)의 변경 사항을 읽고, 이 변경 사항을 대상으로 마이그레이션합니다. Aurora Mysql의 Binlog를 활성화하기 위해 Parameter Group의 binlog_format"ROW"로 설정합니다. 그 외 고려 사항은 AWS Document에서 확인 할 수 있습니다.

아키텍처 흐름

  1. AWS DMS에서 Aurora Mysql의 변경사항을 추적합니다. 변경 데이터 캡처(CDC) 기능을 활용하기 위해 Task를 생성합니다.
  2. DMS Task의 Target은 Amazon Kinesis Data Streams로 지정합니다. 트랜잭션의 순서를 보장하기 위해 Shard의 수는 1개로 고정합니다. 샤드의 수를 늘릴 경우 향후 트랜잭션 순서 보장을 위한 후처리 과정이 필요합니다. 만약 DMS Replication Instance가 Private Subnet에 위치 할 경우 Kinesis Data Streams 용 VPC Endpoint를 생성하여 AWS Private Network 기반으로 Binlog를 전송합니다.
  3. 들어오는 레코드를 대상 테이블로 라우팅하는 방법을 결정하는 복잡한 규칙이 있는 시나리오가 있을 수 있습니다. 이러한 경우, Firehose에서 AWS Lambda 함수를 사용하여 소스 스트림을 변환하고 Lambda 변환 함수의 출력의 일부로 라우팅 정보를 제공할 수 있습니다.
  4. Glue Data Catalog에 등록된 Iceberg 테이블을 대상으로 Insert/Update/Delete 작업을 진행합니다. Amazon Data Fireshose에서 라우팅 된 정보를 기반으로 해당 Database 및 Table 자동으로 적용됩니다.
  5. Iceberg의 특성으로 트랜잭션이 발생할 때 마다 Snapshot 및 데이터 파일 발생하며, 이는 스토리지 비용 향상과 읽기 쿼리의 성능 저하를 가져옵니다. Glue Data Catalog의 Snapshot 삭제 및 참조하지 않는 파일 삭제 기능을 통해 스토리지 비용을 최적화 할 수 있습니다. 또한 Glue의 Compaction 기능을 활용해 작은 파일을 큰 파일로 합쳐 읽기 성능을 개선 할 수 있습니다.

Amazon Data Firehose 설정

1. 소스와 대상 선택

  • 소스는 Kinesis Data Streams로 지정하고 대상은 Apache Iceberg로 지정합니다.
  • 소스가 될 Kinesis Data Streams 리소스를 선택하며 Amazon Data Fireshose의 리소스 이름을 지정합니다.

2. 레코드 변형 활성화

  • 들어오는 레코드를 변환하기 위해 레코드 변환을 활성화 합니다. Lambda에서 사용할 함수는 아래의 Lambda 코드를 참조합니다.
  • Lambda에서 데이터를 처리하기 위한 버퍼 크기 및 버퍼 간격을 지정 할 수 있습니다. 이를 통해 Lambda가 호출되는 간격을 조절 할 수 있습니다.
  • Lambda가 호출 되는 버퍼 사이즈 및 버퍼 인터벌 시간에 따라 Lambda의 Timeout 설정을 지정해야합니다. 1분 이상의 Timeout 설정을 권장합니다.

3. 목적지 셋팅

  • Iceberg 테이블이 저장되어 있는 Glue Data Catalog의 계정을 선택합니다. 같은 계정안에 위치 할 경우 현재 계정으로 설정합니다.
  • 라우팅 정보에 대한 인라인 구문 분석을 활성화 시키면 JQ 구문을 통해서 Database, Table, Operation에 맞춰 해당 되는 Iceberg 테이블에 쿼리를 실행 할 수 있습니다. 또는 Lambda 를 통해 데이터를 전처리하여 레코드를 라우팅 할 수 있습니다. 이번 글에서는 Lambda를 통해 데이터를 전처리하므로 해당 기능은 비활성화합니다.
  • Update와 Delete문을 적용하기 위해서는 테이블 고유 키를 설정해야 합니다. 목적지 Database, Table, Unique Key(컬럼 명)을 입력합니다. 또는 Iceberg의 식별자-필드-ID를 지정할 수도 있습니다. 테이블별 고유 키를 구성하지 않으면 Amazon Data Firehose는 필수 테이블의 식별자-필드-ID를 확인하고 이를 고유 키로 사용합니다. 둘 다 구성되지 않은 경우 업데이트 및 삭제 작업으로 데이터 전송이 실패합니다.

4. 그 외 기타 설정

  • Amazon Data Fireshose에서 Iceberg 테이블에 적용하기 위한 버퍼 사이즈버퍼 인터벌 시간을 지정 할 수 있습니다.
  • 목적지로 전송이 실패한 데이터를 백업하기 위한 별도의 S3 경로를 지정합니다.
  • Amazon Data Firehose가 Iceberg 테이블에 데이터를 기록하기 위한 IAM Role을 설정합니다. IAM Role 생성을 위해 AWS Document의 내용을 확인합니다.

전처리를 위한 AWS Lambda 코드

Kinesis Data Streams에서 전달 받은 Binlog를 Iceberg에 처리하기 위해 전처리 과정을 진행합니다. 전처리 중 'otfMetadata' 을 지정할 경우 Iceberg에 기록시 database, table, operation에 맞춰 Iceberg 테이블에 기록 할 수 있습니다. 이 샘플 코드를 사용하면 라우팅 정보에 대한 메타데이터 섹션을 추가하여 들어오는 레코드를 변환 할 수 있습니다.

import base64
import json

def lambda_handler(event, context):
    
    firehose_records_output = {}
    firehose_records_output['records'] = []

    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        parsed_data = json.loads(payload)
        
        data = json.dumps(parsed_data['data'])
        metadata = parsed_data['metadata']
        
        database = metadata['schema-name']
        table = metadata['table-name']
        operation = metadata['operation']
        
        if operation not in ('insert', 'update', 'delete'):
            continue
        
        firehose_record_output = {}
        
        firehose_record_output['data'] = base64.b64encode(data.encode('utf-8'))
        firehose_record_output['recordId'] =record['recordId']
        firehose_record_output['result'] =  'Ok'
        firehose_record_output['metadata'] = {
            'otfMetadata': {
                'destinationDatabaseName': database,
                'destinationTableName': table,
                'operation': operation
                }
            }
            
        firehose_records_output['records'].append(firehose_record_output)
        
    return firehose_records_output

테스트하기

Apache Iceberg 데이터베이스 및 테이블 생성하기

Amazon Data Firehose에서 레코드 라우팅이 정상적으로 동작하는지 확인하기 위해 두 개의 데이터베이스 및 테이블을 생성합니다. 아래 쿼리는 Amazon Athena에서 수행합니다. Iceberg 테이블에 대한 메타데이터 및 데이터 파일을 저장하기 위한 Bucket 경로들을 각 지정합니다.

CREATE DATABASE IF NOT EXISTS database1; 
CREATE DATABASE IF NOT EXISTS database2; 

CREATE TABLE database1.table1(
id int, 
name string ) 
LOCATION 's3://{S3 Bucket URL of TABLE2}/' 
TBLPROPERTIES ( 
'table_type'='ICEBERG',
'format'='parquet',
'write_compression'='snappy',
'optimize_rewrite_delete_file_threshold'='10'); 

CREATE TABLE database2.table2(
id int,
name string ) 
LOCATION 's3://{S3 Bucket URL of TABLE2}/' 
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_compression'='snappy',
'optimize_rewrite_delete_file_threshold'='10');

Amazon Aurora MySQL 데이터베이스 및 테이블 생성하기

Aurora MySQL 인스턴스에 접속하여 타겟과 동일한 이름의 Iceberg 데이터베이스 및 테이블을 생성합니다.

CREATE DATABASE IF NOT EXISTS database1;
CREATE DATABASE IF NOT EXISTS database2;

CREATE TABLE database1.table1(
id int not null auto_increment primary key,
name varchar(50) ); 

CREATE TABLE database2.table2(
id int not null auto_increment primary key,
name varchar(50) );

테스트 결과 확인

Amazon Aurora MySQL 데이터베이스에서 입력/수정/삭제한 데이터들이 해당되는 Iceberg 테이블에 정확히 반영되는지 확인합니다.

INSERT 문 테스트

database1.table1 과 database2.table2에 각 100개의 Row를 입력합니다.

DROP PROCEDURE IF EXISTS insert_100_rows;

DELIMITER //

CREATE PROCEDURE insert_100_rows(
    IN db_name VARCHAR(10),
    IN table_name VARCHAR(10),
    IN name_value VARCHAR(10) 
)
BEGIN
    DECLARE i INT DEFAULT 1;
    DECLARE sql_query VARCHAR(500);  

    WHILE i <= 100 DO
        SET @sql_query = CONCAT('INSERT INTO ', db_name, '.', table_name, ' (name) VALUES (''', name_value, ''')');
        PREPARE stmt FROM @sql_query;
        EXECUTE stmt;
        DEALLOCATE PREPARE stmt;

        SET i = i + 1;
    END WHILE;

END //
DELIMITER ;

-- database1.table1에 100개 입력
CALL insert_100_rows('database1', 'table1','jinsung1');
-- database2.table2에 100개 입력
CALL insert_100_rows('database2', 'table2','jinsung2');

Athena에서 아래의 쿼리를 실행하여 각 Iceberg table을 조회합니다.

SELECT count(*) as numberOfCount FROM "database1"."table1";
SELECT count(*) as numberOfCount FROM "database2"."table2";

아래와 같이 각 해당되는 Iceberg 테이블에 레코드가 100개씩 잘 저장되어 있는것을 확인 할 수 있습니다.

UPDATE 문 테스트

Aurora 데이터베이스에서 레코드를 변경하고 Iceberg 테이블에도 데이터가 함께 변경되는지 확인합니다. id10인 레코드의 name의 값을 ‘updatedJinsungh’로 변경합니다.

UPDATE database1.table1 SET name = 'updatedJinsungh' WHERE id = 10; 

Athena를 통해 Iceberg 테이블을 레코드를 확인하면 데이터가 함께 변경 된 것을 확인 할 수 있습니다.

DELETE 문 테스트

Aurora 데이터베이스에서 database2.table2 레코드 중 name가 jinsung2 인 레코드를 모두 제거합니다. 즉 모든 레코드가 삭제 될 예정입니다.

DELETE FROM database2.table2 WHERE name = 'jinsung2';

Athena를 통해 Iceberg 테이블을 레코드를 확인하면 database2.table2에 해당되는 Iceberg 테이블의 레코드도 모두 삭제 된 것을 확인 할 수 있습니다.

결론

지금까지 Amazon Data Firehose를 사용하여 실시간 데이터를 쉽고 효율적으로 Iceberg 테이블에 트랜잭션을 작업하는 방법에 대해서 알아봤습니다. Amazon Data Firehose를 통해 Apache Iceberg를 활용한 다양한 실시간 데이터 파이프라인을 구성 할 수 있으며 Glue Data Catalog의 기능을 통해 운영 리소스를 최소화 할 수 있습니다.

도움이 되는 실습 워크샵 및 기술 블로그

아래의 링크에서 AWS 환경에서 Apache Iceberg와 관련된 실습 워크샵 및 참조 블로그를 확인 할 수 있습니다.

Huh Jinsung

Huh Jinsung

허진성 솔루션즈 아키텍트는 금융, 핀테크, 제조 고객들의 성공적인 AWS Cloud 여정을 위한 최적의 아키텍트를 구성하고 지원하는 역할을 하고 있습니다.

Sungmin Kim

Sungmin Kim

김성민님은 AWS의 솔루션즈 아키텍트 입니다. Startup 고객들과 협력하여 비즈니스 성과를 실현하는데 도움을 드리고 있습니다.