AWS 기술 블로그
Apache Iceberg Connector for AWS Glue를 이용하여 데이터레이크 CRUD 하기
AWS Glue와 AWS Database Migration Service (DMS)는 온프레미스 데이터 소스를 Amazon Simple Storage Service (Amazon S3) 데이터레이크에 복제하는 도구로서 유용하게 사용되고 있습니다. 많은 고객들이 데이터 소스에서 업데이트가 발생할 때마다 데이터레이크에 반영되기를 원하지만, 관계형 데이터베이스 (RDB) 만큼 쉽게 데이터레이크에 UPDATE
나 DELETE
하는 것은 쉽지 않습니다. Apache Hudi, Delta Lake와 함께 Apache Iceberg는 데이터레이크 내의 데이터를 쉽게 수정하고 삭제할 수 있는 오픈소스 데이터 프로세싱 프레임워크입니다.
이 글에서는 데이터레이크에 AWS DMS를 통해 데이터를 적재하고, Apache Iceberg Connector를 이용하여 Iceberg Table을 생성하고 처리하는 방법을 다룹니다. Amazon Athena는 대화형 쿼리 서비스로서, 데이터레이크를 SQL을 통해 쉽게 분석하기 위해 사용되고 있습니다. 그러나, 이전까지는 Amazon Athena에서 테이블을 조회하거나 데이터를 입력하는 기능만 제공하였고, 데이터를 수정하고 삭제하는 기능은 제공하지 못했습니다. 데이터레이크의 데이터를 삭제하는 요구는 정보보호 컴플라이언스 준수를 위해 필수적인 사항입니다.
이를 위해, AWS는 2022년 4월에 Apache Iceberg 기반의Amazon Athena ACID 트랜잭션 기능을 정식 출시했습니다. 이 기능을 이용하여, Amazon Athena에서 데이터레이크에 대한 CRUD (Create, Read, Update, Delete)를 수행할 수 있고, 데이터레이크의 타임스템프 (Timestamp)와 버전(Version)을 기반으로 데이터 변경 이전 상태를 조회할 수 있는 Time-Travel 쿼리를 실행할 수 있습니다.
Apache Iceberg는 데이터레이크에 저장된 데이터에 대한 오픈 소스 테이블 형식을 제공하여 데이터 엔지니어가 쿼리 성능을 유지하면서 지속적으로 변경되는 데이터 세트를 관리하는 것과 같은 복잡한 문제를 관리하는 데 도움을 줍니다. Apache Iceberg를 사용하면 다음을 수행할 수 있습니다.
- 완전한 읽기 격리 및 다중 동시 쓰기를 통해 파일을 원자적으로 추가, 제거 또는 수정할 수 있는 여러 애플리케이션 간에 테이블 상의 트랜잭션 일관성 유지
- 시간 경과에 따른 테이블 변경 사항을 추적하기 위해 전체 스키마 진화 구현
- Time-Travel 쿼리를 실행하여 이전 데이터를 조회하고 업데이트 간의 변경 사항을 확인
- 하이브 테이블과 달리 물리적 디렉토리에 의존하지 않고 데이터 볼륨이 변경될 때 파티션 구성을 업데이트할 수 있는 파티션 진화를 통해 테이블을 유연한 파티션 레이아웃으로 구성
- 테이블을 이전 버전으로 롤백하여 문제를 신속하게 수정하고 테이블을 정상 상태로 복원
- 대용량 데이터 세트 등에 대한 고성능 쿼리를 위해 고급 계획 및 필터링을 수행
솔루션 개요
이번 포스팅에서는 먼저 Amazon DMS를 통해 관계형 데이터베이스 (Relational Database)로부터 Amazon S3로 변경 데이터를 포함한 전체 데이터를 적재합니다. 다음으로 Apache Iceberg Connector를 이용하여AWS Glue Job에서 Apache Iceberg 테이블을 생성하고, 지속적으로 변경되는 데이터를 UPSERT 또는 DELETE하는 방법을 살펴봅니다. 또한, 생성된 Apache Iceberg 테이블을Amazon Athena와 Amazon EMR을 통해 조회, 데이터 변경 및 삭제, 그리고 변경 이전 시점으로 복원하는 방법을 알아봅니다.
단계 요약
이번 포스팅은 아래와 같은 단계로 구성됩니다.
- 단계 1 : Apache Iceberg Connector for AWS Glue로 Apache Iceberg Connection 생성하기
- 단계 2 : AWS CloudFormation Stack으로 리소스 배포하기
- 단계 3 : AWS DMS로 소스 데이터를 Amazon S3에 초기 적재하기
- 단계 4 : AWS Glue Job으로 Apache Iceberg 테이블에Amazon S3의 초기 데이터 적재하기
- 단계 5 : Amazon Athena로 Apache Iceberg 테이블 조회하기
- 단계 6 : AWS DMS로 CDC (Change Data Capture) 데이터를 Amazon S3에 적재하기
- 단계 7 : AWS Glue Job으로 Apache Iceberg 테이블에 UPSERT와 DELETE 수행하기
- 단계 8 : Amazon Athena로 Apache Iceberg 테이블 조회, 수정, 삭제, 복원하기
- 단계 9 : Amazon EMR Notebook에서 Apache Iceberg 데이터 조회하기
사전 준비사항
솔루션을 배포하기 전에 아래와 같은 사항이 미리 준비되어야 합니다.
이번 블로그 포스팅에서 사용할 리전은 Asia Pacific (Seoul)입니다. 이후 모든 리전은 Asia Pacific (Seoul)로 선택합니다.
단계 1 : Apache Iceberg Connector for AWS Glue로 Apache Iceberg Connection 생성하기
AWS Glue Studio Console의 Marketplace로 이동하여, “Apache Iceberg Connector for AWS Glue”를 검색하고, 검색된 링크를 클릭합니다. (바로가기 링크는 여기입니다.)
“Continue to Subscribe”를 선택합니다.
“Accept Terms”를 선택합니다.
“Continue to Configuration”을 선택합니다.
Fulfillment option에서 “Glue 3.0”을 선택하고, “Continue to Launch”를 선택합니다.
블로그 포스팅을 작성하는 시점까지 Apache Iceberg Connector for AWS Glue의 가장 최신 버전은 0.12.0-2(Feb 14, 2022)입니다. 필요하다면, “Glue 1.0/2.0”을 선택할 수도 있습니다.
“Usage instructions”를 선택합니다.
팝업된 창에서 “Activate the Glue connector…”를 선택합니다.
Connection properties의 Name 필드에 Iceberg connection 이름을 입력합니다.(예, iceberg-connection)
“Create connection and activate connector”를 선택합니다.
이제 AWS Glue Studio console에서 Connection이 성공적으로 추가된 것을 확인할 수 있습니다.
단계 2 : AWS CloudFormation Stack으로 리소스 배포하기
이번 포스트를 위한 CloudFormation Stack을 생성합니다.
- AWS Management Console에 접속하여, Asia Pacific (Seoul) 리전을 선택합니다..
- 아래 “Launch Stack”을 선택합니다.
- 아래 스크린과 같이 “VPC Configuration”과 “Access Configuration” 파라미터에 값을 입력하고, “Next”버튼을 선택합니다.
- VPC 정보는 기본 값을 그대로 사용합니다.
- Aurora DB Password는 특수문자 없이 최소8자 이상, 최소 1개의 대문자/소문자/숫자 조합으로 지정합니다.
- 사전 작업으로 생성한 Key Pair를 선택합니다.
- 여러분의 PC Client IP를 입력합니다.
- “I acknowledge that …”을 체크하고, “Create Stack” 버튼을 선택합니다.
CloudFormation Stack이 완료되기까지 약 20분 정도 소요됩니다.
CloudFormation template은 다음과 같은 리소스를 생성합니다.
- 리소스가 사용할 Amazon VPC, Subnets, Routing Tables, Internet Gateway, NAT Gateway, Security Groups
- SSH Tunneling을 위한 Amazon EC2 Bastion Host
- Amazon Aurora Cluster, DMS를 위한 Subnet Groups
- 데이터 암호화를 위한 AWS Key Management Service (AWS KMS)
- Amazon DMS의 Source Amazon Aurora PostgreSQL Cluster
- Amazon DMS의 Target S3 Bucket과 scripts가 복사되는 Bucket
- Amazon DMS의 Replication Instance, Source/Target Endpoints, Replication Tasks
- Amazon S3와 AWS Glue의 Private Access를 위한 Amazon VPC Endpoints
- AWS Glue Job, Lambda, DMS를 실행할 때 필요한 AWS IAM Policy와 Role
- AWS Glue Job에서 소스 데이터베이스 접근을 위한 Glue JDBC Connection
- Amazon DMS Role 체크와 Glue Scripts 복사를 위한 Lambda Functions
- Apache Iceberg가 활성화된 Amazon EMR Cluster
단계 3 : AWS DMS로 소스 데이터를 Amazon S3에 초기 적재하기
- Source Aurora Cluster에 접속합니다. SQL Workbench/J 또는 DBeaver Tool을 다운로드하여 설치합니다. 이번 포스트에서는 DBeaver Tool을 사용합니다.
- DB Connection을 만들기 위해 DBeaver Tool에서PostgreSQL을 선택합니다.
- Connection Settings에서 PostgreSQL connection을 위한 정보를 입력합니다.
필드 값 Host CloudFormation Outputs의 EndpointOfAuroraCluster 값 Database lakehouse_source_db Username postgres Password CloudFormation template의Aurora DB Password 입력 값 SSH Tunnel을 통해 접속하기 위해 SSH 정보를 입력합니다.
필드 값 Use SSH Tunnel 체크 Host/IP CloudFormation Outputs의 PublicIPOfEC2InstanceForTunnnel 값 User Name ec2-user Authentication Method Public Key Private Key CloudFormation template의Key Pair Name으로 입력한 Key Pair 파일 - New SQL script 창에서 아래 스크립트를 실행합니다.
create schema human_resources; create table human_resources.employee_details (emp_no int PRIMARY KEY,name varchar(30), department varchar(30), city varchar(50), salary int); alter table human_resources.employee_details REPLICA IDENTITY FULL; --This statement is required for Aurora postgres to replicate deleted records through AWS DMS insert into human_resources.employee_details values (1, 'Adam', 'IT', 'SFO', 50000); insert into human_resources.employee_details values (2, 'Susan', 'Sales', 'NY', 60000); insert into human_resources.employee_details values (3, 'Jeff', 'Finance', 'Tokyo', 55000); insert into human_resources.employee_details values (4, 'Bill', 'Manufacturing', 'New Delhi', 70000); insert into human_resources.employee_details values (5, 'Joe', 'IT', 'Chicago', 45000); insert into human_resources.employee_details values (6, 'Steve', 'Finance', 'NY', 60000); insert into human_resources.employee_details values (7, 'Mike', 'IT', 'SFO', 60000); commit;
위 SQL 스크립트에서 “REPLICA IDENTITY FULL”은 PostgreSQL DB에서 변경되는 모든 레코드를 AWS DMS가 복제하기 위해 필요합니다.
- AWS DMS Endpoints의 상태를 확인합니다.
CloudFormation stack을 통해 자동으로 생성된 3개의 Endpoint와 각 Endpoint의 Connection Status가 “successful”인지 확인합니다. - Source Aurora DB에서 S3로 초기 데이터(Full load)를 적재합니다.
DMS -> Database migration tasks 메뉴에서 “lakehouse-db-to-s3-full” task를 선택하고, “Actions”에서 “Restart/Resume”을 선택합니다.Database migration task의 Status가 “Ready” -> “Starting” -> ”Running” -> ”Load complete”로 순차적으로 변경됩니다. 성공적으로 완료된 이후 Amazon S3에 “LOAD000….parquet” 파일이 생성된 것을 확인할 수 있습니다.
단계 4 : AWS Glue Job으로 Apache Iceberg 테이블에Amazon S3의 초기 데이터 적재하기
이제 Amazon S3로 Full Load된 데이터를 AWS Glue Job을 통해 Apache Iceberg 테이블에 저장합니다.
- AWS Glue Job을 통해Apache Iceberg Table를 생성하고, S3 데이터를 Iceberg Table에 입력합니다.
AWS Glue Studio Console로 접속하여 “Jobs” -> “Spark script editor” -> “Create”를 선택합니다.Job details 탭에 설정 값을 입력합니다.
필드 값 Name employee-details-full-etl Description This job loads the data from employee_details dataset and creates the Iceberg Table. IAM Role CloudFormation Output의 {ExecuteGlueJobRole} 선택 Type Spark Glue version Glue 3.0 Language Python 3 Worker type G.1X Requested number of workers 10 Job bookmark Enable Number of retries 0 Job timeout 2880 Script path s3://{ScriptsAndTempS3Bucket}/scripts/ Job metrics Check Continuous logging Check Spark UI Check Spark UI logs path s3://{ScriptsAndTempS3Bucket}/SparkUI/ Maximum concurrency 1 Temporary path s3://{ScriptsAndTempS3Bucket}/temp/ Use Glue data catalog Check Connections iceberg-connection (Apache Iceberg Connection Name 선택) Job parameters 부분에 Glue Job에서 사용할 Key-Value를 입력합니다.
키 값 설명 –raw_s3_path s3://{RawS3Bucket}/full-load DMS로 복제된 Full Load S3 위치 –catalog glue_catalog Iceberg Table의 Catalog 이름 –iceberg_s3_path s3://{CuratedS3Bucket} Iceberg Table의 S3 위치 –database human_resources Iceberg Table의 데이터베이스 –table_name employee_details Iceberg Table의 테이블 이름 –primary_key emp_no Iceberg Table의 Primary Key –partition_key department Iceberg Table의 Partition {RawS3Bucket}, {CuratedS3Bucket}는 CloudFormation Output을 참조합니다.
Script 탭에 아래 Glue Job Script를 붙여 넣습니다. 이 Script는 s3://{ScriptsAndTempS3Bucket}/artifacts/iceberg-on-glue/scripts/위치에 복사되어 있습니다. 파일명은 employee-details-full-etl.py입니다.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.conf import SparkConf from awsglue.dynamicframe import DynamicFrame from pyspark.sql.functions import concat, col, lit, to_timestamp from datetime import datetime ## @params: [JOB_NAME] #0. Define Glue job arguments from Glue job parameters args = getResolvedOptions(sys.argv, ['JOB_NAME','raw_s3_path','catalog','iceberg_s3_path','database','table_name','primary_key','partition_key']) # Examples of Glue Job Parameters # raw_s3_path : s3://icebergglueblog-raws3bucket-winby5ygqhi3/full-load # catalog : glue_catalog # iceberg_s3_path : s3://icebergglueblog-curateds3bucket-140bn8buyn1gn # database : human_resources # table_name : employee_details # primary_key : emp_no # partition_key : department #1. Set variables RAW_S3_PATH = args.get("raw_s3_path") CATALOG = args.get("catalog") ICEBERG_S3_PATH = args.get("iceberg_s3_path") DATABASE = args.get("database") TABLE_NAME = args.get("table_name") PK = args.get("primary_key") PARTITION = args.get("partition_key") DYNAMODB_LOCK_TABLE = f'{TABLE_NAME}_lock' #2. Set the Spark Configuration of Apache Iceberg. You can refer the Apache Iceberg Connector Usage Instructions. def set_iceberg_spark_conf() -> SparkConf: conf = SparkConf() \ .set(f"spark.sql.catalog.{CATALOG}", "org.apache.iceberg.spark.SparkCatalog") \ .set(f"spark.sql.catalog.{CATALOG}.warehouse", ICEBERG_S3_PATH) \ .set(f"spark.sql.catalog.{CATALOG}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .set(f"spark.sql.catalog.{CATALOG}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ .set(f"spark.sql.catalog.{CATALOG}.lock-impl", "org.apache.iceberg.aws.glue.DynamoLockManager") \ .set(f"spark.sql.catalog.{CATALOG}.lock.table", DYNAMODB_LOCK_TABLE) \ .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") return conf #3. Set the Spark + Glue context conf = set_iceberg_spark_conf() glueContext = GlueContext(SparkContext(conf=conf)) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) #4. Read data from S3 location where is full-loaded by DMS. fullDyf = glueContext.create_dynamic_frame_from_options( connection_type='s3', connection_options={ 'paths': [f'{RAW_S3_PATH}/{DATABASE}/{TABLE_NAME}/'], 'groupFiles': 'none', 'recurse': True }, format='parquet', transformation_ctx='fullDyf') print(f"Count of data after last job bookmark:{fullDyf.count()}") fullDf = fullDyf.toDF() if(fullDyf.count() > 0): #5. Create Apache Iceberg Table from S3 dropColumnList = ['Op','schema_name','table_name'] current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S') fullDf = fullDf.drop(*dropColumnList).withColumn('update_ts_dms',to_timestamp(col('update_ts_dms'))) fullDf = fullDf.withColumn('last_applied_date',to_timestamp(lit(current_datetime))) fullDf.createOrReplaceTempView(f"{TABLE_NAME}_full") spark.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG}.{DATABASE}") existing_tables = spark.sql(f"SHOW TABLES IN {CATALOG}.{DATABASE};") df_existing_tables = existing_tables.select('tableName').rdd.flatMap(lambda x:x).collect() if f"{TABLE_NAME}_iceberg" not in df_existing_tables: print(f"Table {TABLE_NAME}_iceberg does not exist in Glue Catalog. Creating it now.") spark.sql(f"""CREATE TABLE IF NOT EXISTS {CATALOG}.{DATABASE}.{TABLE_NAME}_iceberg USING iceberg TBLPROPERTIES ('format-version'='2') PARTITIONED BY ({PARTITION}) as (SELECT * from {TABLE_NAME}_full)""") else: print(f"Table {TABLE_NAME}_iceberg already exists") #6. Read data from Apache Iceberg Table spark.sql(f"SELECT * FROM {CATALOG}.{DATABASE}.{TABLE_NAME}_iceberg limit 5").show() print(f"Total count of {TABLE_NAME}_Iceberg Table Results:\n") countDf = spark.sql(f"SELECT count(*) FROM {CATALOG}.{DATABASE}.{TABLE_NAME}_iceberg") print(f"{countDf.show()}") print(f"Iceberg data load is completed successfully.") else: print(f"No Data changed.") print(f"Glue Job is completed successfully.") job.commit()
Glue Job 설정이 완료되었으면, “Save”를 선택하여 Job을 저장합니다.
Iceberg Table생성 구문 중 TBLPROPERTIES (‘format-version’=’2’)는 Apache Iceberg Table Format V2로 생성하기 위해 지정합니다. Format V2인 경우, Amazon Athena에서 Iceberg Table에 DML을 수행할 수 있습니다.
AWS Glue job에서 Iceberg Table을 사용하기 위해서는 Apache Iceberg Spark Configuration을 설정해야 합니다. Spark Configuration은 AWS Glue job script 안에 포함하거나 Glue job의 파라미터로 지정할 수 있습니다.
이번 포스팅에서는 Spark configuration을 AWS Glue job script에 포함합니다.(위 Glue Job Code 중 def set_iceberg_spark_conf() 부분입니다.) 필요하다면, Spark configuration을 공통 Python 파일로 지정할 수 있습니다. Spark configuration의 자세한 내용은 단계 1에서 생성한 Apache Iceberg Connector 의 “Usage instructions”과 https://iceberg.apache.org/aws/을 참조합니다.
- “Run”을 선택하여 Glue Job을 실행하고, Job 상태를 확인합니다.
“Run status”에서 Glue Job이 성공적으로 완료되었는지 확인하고, CloudWatch logs의 “Output logs”에서 실행된 내역을 확인합니다. 만약 오류가 발생한 경우에는 “Error logs”에서 오류를 확인하고 조치합니다.
- Glue Job Parameter의 –iceberg_s3_path위치에서 Iceberg Table이 저장되어 있는 내용을 확인합니다. Apache Iceberg Format의 data와 metadata를 확인할 수 있습니다.
단계 5 : Amazon Athena로 Apache Iceberg 테이블 조회하기
- Amazon Athena Console에 접속하여Query result location 을 지정합니다.
“Amazon Athena” -> “Query editor” -> “Settings”을 선택합니다.Location of query result에 “{ScriptsAndTempS3Bucket}/athena-result/”를 입력합니다. {ScriptsAndTempS3Bucket}는CloudFormation Output을 참조합니다. (반드시 맨 뒤에 “/”를 포함해야 합니다.)
- Amazon Athena Query Editor에서 Iceberg Table을 조회합니다.
-
select * from "human_resources"."employee_details_iceberg" limit 10;
“last_applied_date” 컬럼에는 Iceberg Table에 데이터가 저장된 시간이 표시됩니다.
단계 6 : AWS DMS로 CDC (Change Data Capture) 데이터를 Amazon S3에 적재하기
- AWS DMS Console에서 CDC task를 시작합니다.
Status는 “Ready” -> “Starting” -> “Replication ongoing”으로 변경됩니다.
- SQL Client Tool에서 Source Database의 “employee_details” table에 DML을 수행합니다.
update human_resources.employee_details set city='New Delhi' where emp_no = 2; update human_resources.employee_details set salary=70000 where emp_no = 5; insert into human_resources.employee_details values (8, 'John', 'Sales', 'SFO', 90000); insert into human_resources.employee_details values (9, 'Eli', 'Purchasing', 'Chicago', 90000); delete from human_resources.employee_details where emp_no = 3; commit;
- CDC task에 의해 복제된 데이터를 확인합니다.
Source Database에 DML 을 수행한 이후, CDC task는 변경된 데이터를 자동으로 복제합니다. 파일명은 {RawS3Bucket}/cdc-load/human_resources/employee_details/yyyymmdd-…parquet로 생성됩니다.파일을 선택하고, “Action” -> “Query with S3 Select”로 파일의 내용을 확인합니다.
여기서 “U”는 Update, “I”는 Insert, “D”는 Delete된 데이터를 의미합니다.
단계 7 : AWS Glue Job으로 Apache Iceberg 테이블에 UPSERT와 DELETE 수행하기
- CDC 데이터를 Iceberg Table에 반영할 AWS Glue Job을 생성합니다.
단계 4와 동일하게 AWS Glue Studio Console로 접속하여 “Jobs” -> “Spark script editor” -> “Create”를 선택합니다. (좀 더 간편하게 Glue Job을 생성하기 위해서는 “employee-details-full-etl”을 선택하고, Actions -> Clone job을 선택합니다. Clone된 이후 아래의 세부 설정 값에 맞게 변경합니다.)Job details 탭에 설정 값을 입력합니다.
필드 값 Name employee-details-cdc-etl Description This job merges the CDC data into employee_details_Iceberg Table. IAM Role CloudFormation Output의 {ExecuteGlueJobRole} 선택 Type Spark Glue version Glue 3.0 Language Python 3 Worker type G.1X Requested number of workers 10 Job bookmark Enable Number of retries 0 Job timeout 2880 Script path s3://{ScriptsAndTempS3Bucket}/scripts/ Job metrics Check Continuous logging Check Spark UI Check Spark UI logs path s3://{ScriptsAndTempS3Bucket}/SparkUI/ Maximum concurrency 1 Temporary path s3://{ScriptsAndTempS3Bucket}/temp/ Use Glue data catalog Check Connections iceberg-connection (Apache Iceberg Connection Name 선택) Job parameters 부분에 Glue Job에서 사용할 Key-Value를 입력합니다.
키 값 설명 –raw_s3_path s3://{RawS3Bucket}/cdc-load DMS로 복제된 CDC Load S3 위치 –catalog glue_catalog Iceberg Table의 Catalog 이름 –iceberg_s3_path s3://{CuratedS3Bucket} Iceberg Table의 S3 위치 –database human_resources Iceberg Table의 데이터베이스 –table_name employee_details Iceberg Table의 테이블 이름 –primary_key emp_no Iceberg Table의 Primary Key –partition_key department Iceberg Table의 Partition {RawS3Bucket}, {CuratedS3Bucket}는 CloudFormation Output을 참조합니다. –raw_s3_path의 값은 employee-details-full-etl과 달리s3://…/cdc-load 입니다.
Script 탭에 아래 Glue Job Script를 붙여 넣습니다. 이 Script는 s3://{ScriptsAndTempS3Bucket}/artifacts/iceberg-on-glue/scripts/위치에 복사되어 있습니다. 파일명은 employee-details-cdc-etl.py입니다.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.conf import SparkConf from awsglue.dynamicframe import DynamicFrame from pyspark.sql.functions import concat, col, lit, to_timestamp from datetime import datetime ## @params: [JOB_NAME] #0. Define Glue job arguments from Glue job parameters args = getResolvedOptions(sys.argv, ['JOB_NAME','raw_s3_path','catalog','iceberg_s3_path','database','table_name','primary_key','partition_key']) # Examples of Glue Job Parameters # raw_s3_path : s3://icebergglueblog-raws3bucket-winby5ygqhi3/cdc-load # catalog : glue_catalog # iceberg_s3_path : s3://icebergglueblog-curateds3bucket-140bn8buyn1gn # database : human_resources # table_name : employee_details # primary_key : emp_no # partition_key : department #1. Set variables RAW_S3_PATH = args.get("raw_s3_path") CATALOG = args.get("catalog") ICEBERG_S3_PATH = args.get("iceberg_s3_path") DATABASE = args.get("database") TABLE_NAME = args.get("table_name") PK = args.get("primary_key") PARTITION = args.get("partition_key") DYNAMODB_LOCK_TABLE = f'{TABLE_NAME}_lock' #2. Set the Spark Configuration of Apache Iceberg. You can refer the Apache Iceberg Connector Usage Instructions. def set_iceberg_spark_conf() -> SparkConf: conf = SparkConf() \ .set(f"spark.sql.catalog.{CATALOG}", "org.apache.iceberg.spark.SparkCatalog") \ .set(f"spark.sql.catalog.{CATALOG}.warehouse", ICEBERG_S3_PATH) \ .set(f"spark.sql.catalog.{CATALOG}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .set(f"spark.sql.catalog.{CATALOG}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ .set(f"spark.sql.catalog.{CATALOG}.lock-impl", "org.apache.iceberg.aws.glue.DynamoLockManager") \ .set(f"spark.sql.catalog.{CATALOG}.lock.table", DYNAMODB_LOCK_TABLE) \ .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") return conf #3. Set the Spark + Glue context conf = set_iceberg_spark_conf() glueContext = GlueContext(SparkContext(conf=conf)) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) #4. Read data from S3 location where is cdc-loaded by DMS. cdcDyf = glueContext.create_dynamic_frame_from_options( connection_type='s3', connection_options={ 'paths': [f'{RAW_S3_PATH}/{DATABASE}/{TABLE_NAME}/'], 'groupFiles': 'none', 'recurse': True }, format='parquet', transformation_ctx='cdcDyf') print(f"Count of CDC data after last job bookmark:{cdcDyf.count()}") cdcDf = cdcDyf.toDF() if(cdcDyf.count() > 0): # Count by CDC Operations(Insert, Update, Delete) cdcInsertCount = cdcDf.filter("Op = 'I'").count() cdcUpdateCount = cdcDf.filter("Op = 'U'").count() cdcDeleteCount = cdcDf.filter("Op = 'D'").count() print(f"Inserted count: {cdcInsertCount}") print(f"Updated count: {cdcUpdateCount}") print(f"Deleted count: {cdcDeleteCount}") print(f"Total CDC count: {cdcDf.count()}") #5. Merge CDC data into Iceberg Table dropColumnList = ['Op','schema_name','table_name'] current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S') cdcDf = cdcDf.withColumn('update_ts_dms',to_timestamp(col('update_ts_dms'))) cdcDf = cdcDf.withColumn('last_applied_date',to_timestamp(lit(current_datetime))) # cdcDf.createOrReplaceTempView(f"{TABLE_NAME}_upsert") existing_tables = spark.sql(f"SHOW TABLES IN {CATALOG}.{DATABASE};") df_existing_tables = existing_tables.select('tableName').rdd.flatMap(lambda x:x).collect() if f"{TABLE_NAME}_iceberg" in df_existing_tables: # DataFrame for the inserted or updated data upsertDf = cdcDf.filter("Op != 'D'").drop(*dropColumnList) upsertDf.createOrReplaceTempView(f"{TABLE_NAME}_upsert") # DataFrame for the deleted data deleteDf = cdcDf.filter("Op = 'D'").drop(*dropColumnList) deleteDf.createOrReplaceTempView(f"{TABLE_NAME}_delete") if upsertDf.count() > 0: print(f"Table {TABLE_NAME}_iceberg is upserting...") spark.sql(f"""MERGE INTO {CATALOG}.{DATABASE}.{TABLE_NAME}_iceberg t USING {TABLE_NAME}_upsert s ON s.{PK} = t.{PK} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """) else: print("No data to insert or update.") if deleteDf.count() > 0: print(f"Table {TABLE_NAME}_iceberg is deleting...") spark.sql(f"""MERGE INTO {CATALOG}.{DATABASE}.{TABLE_NAME}_iceberg t USING {TABLE_NAME}_delete s ON s.{PK} = t.{PK} WHEN MATCHED THEN DELETE """) else: print("No data to delete.") else: print(f"Table {TABLE_NAME}_iceberg doesn't exist in {CATALOG}.{DATABASE}.") #6. Read data from Apache Iceberg Table spark.sql(f"SELECT * FROM {CATALOG}.{DATABASE}.{TABLE_NAME}_iceberg limit 5").show() print(f"Total count of {TABLE_NAME}_Iceberg Table Results:\n") countDf = spark.sql(f"SELECT count(*) FROM {CATALOG}.{DATABASE}.{TABLE_NAME}_iceberg") print(f"{countDf.show()}") print(f"Iceberg data load is completed successfully.") else: print(f"No Data changed.") print(f"Glue Job is completed successfully.") job.commit()
Glue Job 설정이 완료되었으면, “Save”를 선택하여 Job을 저장합니다.
- “Run”을 선택하여 Glue Job을 실행합니다.
- Glue Job이 완료된 후, Amazon Athena Query Editor에서 Iceberg Table을 조회합니다.
emp_no=(2, 5)는 UPDATE된 레코드이고, emp_no=(8, 9)는 INSERT된 레코드 입니다. emp_no=3은 DELETE되었습니다. “last_applied_date” 컬럼에는 Iceberg Table에 적용된 시간이 기록되어 있습니다.select * from "human_resources"."employee_details_iceberg" limit 10;
아래 그림에서 파란색 박스는 Update된 레코드를 보여주고, 빨간색 박스는 Insert된 레코드를 보여줍니다.
- Glue Job(Name : employee-details-cdc-etl)을 다시 실행하여 Glue Job Bookmark가 제대로 작동하는지 확인해 봅니다. Glue Job의 CloudWatch log(Output logs링크)를 확인하면, 두 번째 Glu Job 실행에서는 변경된 데이터가 없기 때문에 아무런 데이터도 처리되지 않을 것을 확인할 수 있습니다.
Glue Job Bookmark에 대한 문제해결 방법은 여기 링크를 참조할 수 있습니다.
단계 8 : Amazon Athena로 Apache Iceberg 데이터 수정, 삭제, 복원하기
- Athena Query Editor에서 Iceberg Table의 History를 확인합니다.
select * from "employee_details_iceberg$iceberg_history"
지금까지 Iceberg History에는 3개의 Snapshot이 기록되어 있습니다. Glue Job에서 3개의 구문이 실행되었기 때문입니다. (첫 번째는 초기 데이터 생성할 때, 두 번째는 UPSERT 구문, 세 번째는 DELETE 구문이 실행될 때 Snapshot이 기록되었습니다.)
-
insert into human_resources.employee_details_iceberg values (current_timestamp, 10, 'Incheol', 'SA', 'ICN', 320000, current_timestamp); update human_resources.employee_details_iceberg set city='Las Vegas', last_applied_date= current_timestamp where emp_no = 4; delete from human_resources.employee_details_iceberg where emp_no = 6;
DML 수행 후, Iceberg History에는 아래와 같이 Snapshot이 3개 더 추가된 것을 확인할 수 있습니다. (3번째 라인에 있는 made_current_at와 snapshot_id 값을 기록해 둡니다. 이 값은 이후 Iceberg Time-Travel 기능을 확인할 때 사용됩니다.)
Athena Query Editor에서 Iceberg Table의 Snapshot 상세 정보를 확인합니다.
select * from "employee_details_iceberg$iceberg_snapshots"
Iceberg Snapshot 정보에는 operation, manifest_list, summary 정보가 있습니다. summary 컬럼에는 변경된 파일 수, 변경된 레코드 수, 파티션 수 등 Iceberg Table에 대한 유용한 정보들이 포함됩니다.
- Athena Query Editor에서 Iceberg Time-Travel기능으로 데이터 삭제 전 데이터를 조회해 봅니다.
지금 emp_no=6은 이전에 삭제되었기 때문에 조회되지 않습니다.아래와 같이 Iceberg Time-Travel 구문을 이용하여 삭제 이전 시점의 데이터를 조회해 봅니다. Iceberg History(employee_details_iceberg$iceberg_history)에서 MADE_CURRENT_AT 또는 SNAPSHOT_ID를 확인한 후 Time-Travel Query를 수행합니다. (TIMESTAMP와 VERSION 값은 여러분이 조회한 값을 입력합니다.)
select * from human_resources.employee_details_iceberg FOR SYSTEM_TIME AS OF TIMESTAMP '2022-03-12 13:24:51.579 UTC'; select * from human_resources.employee_details_iceberg FOR SYSTEM_VERSION AS OF 1566622113474469893;
이전에 삭제되었던 emp_no=6 데이터가 확인되는 것을 볼 수 있습니다. Iceberg Table의 Time-Travel 기능을 이용하면 사용자의 실수로 인한 데이터 삭제를 쉽게 복구할 수 있습니다.
단계 9 : Amazon EMR로 Apache Iceberg 데이터 조회하기
CloudFormation Stack으로 Amazon EMR Cluster가 생성되었습니다. Iceberg가 활성화된 EMR Cluster를 생성하는 방법은 여기를 참조합니다. EMR Cluster의 “Configurations”에 iceberg.enable=true로 설정되어 있는 것을 확인할 수 있습니다.
-
- Amazon EMR Notebook을 생성합니다.
- Notebook name은 “iceberg-notebook”을 입력합니다.
- Cluster : “Choose”버튼을 눌러 CloudFormation Stack에서 생성한 EMR Cluster를 선택합니다.
- “Choose cluster”버튼을 누릅니다.
- Notebook location : s3://{ScriptsAndTempS3Bucket}/notebooks/
- Jupyter Notebook에서 iceberg-notebook.ipynb 파일을 더블클릭하여 오픈합니다.
Notebook Kernel은 PySpark을 선택합니다. - Iceberg Spark Configuration을 아래와 같이 설정합니다.
아래 코드에서 {CuratedS3Bucket}는 CloudFormation의 Output을 참조합니다.%%configure -f { "conf":{ "spark.sql.catalog.glue_catalog": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.glue_catalog.warehouse": "s3://{CuratedS3Bucket}", "spark.sql.catalog.glue_catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "spark.sql.catalog.glue_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "spark.sql.catalog.glue_catalog.lock-impl": "org.apache.iceberg.aws.glue.DynamoLockManager", "spark.sql.catalog.glue_catalog.lock.table": "employee_details_lock", "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" } }
Control+Enter Key를 눌러 Notebook Cell을 실행합니다. - 다른 셀에서 아래와 같이 Spark SQL을 실행하여 Iceberg Table을 조회합니다.
spark.sql("select * from glue_catalog.human_resources.employee_details_iceberg").show()
- Amazon EMR Notebook을 생성합니다.
지금까지 Apache Iceberg Connector for AWS Glue를 이용하여 별도의 Iceberg JAR 파일을 다운로드할 필요없이 AWS Glue Job에서 Iceberg Table을 생성하고, 데이터레이크에 대한 UPSERT와 DELETE를 하는 방법을 살펴보았습니다.
리소스 정리하기
이 블로그에서 사용했던 리소스는 향후 불필요한 과금을 방지하기 위해 삭제하여야 합니다.
AWS CloudFormation Console에서 “Delete stack”를 선택하여 이 포스팅을 위해 생성된 모든 리소스를 삭제합니다.
CloudFormation으로 생성되지 않은 리소스나 의존성이 있는 리소스들은 아래와 같습니다. 이런 리소스는 수동으로 삭제하여야 합니다.
- AWS Glue Studio : Iceberg Connector, employee-details-full-etl, employee-details-cdc-etl Job
- Amazon S3 : S3://{ScriptsAndTempS3Bucket}
- Amazon EMR Notebook : iceberg-notebook
- Amazon DynamoDB : employee_details_lock
- Security Group : ElasticMapReduce*
결론
이번 포스팅에서는 Apache Iceberg Connector for AWS Glue를 이용하여, 데이터레이크에서 DML을 수행하는 방법을 알아보았습니다. 데이터레이크의 DML을 AWS Glue Job과 Amazon Athena Query를 통해 실행해 보고, 변경 이전 시점으로 복구하는 Iceberg Time-Travel 기능을 확인하였습니다. 데이터레이크에서 ACID 트랜잭션 관리는 Apache Iceberg 뿐 아니라 Apache Hudi와 Apache Delta Lake Open Source와 LakeFormation Governed Table을 통해서도 가능합니다. 데이터레이크에서 범용 SQL을 통해 데이터를 삭제하는 것은 개인정보보호와 같은 컴플라이언스를 준수하는데 중요한 요소일 뿐 아니라 데이터레이크를 효율적으로 관리할 수 있도록 해 줍니다. 데이터레이크 관리를 위한 LakeFormation Governed Table, Apache Hudi와 Apache Delta Lake 블로그를 참조하세요.