AWS 기술 블로그

AWS 분석 서비스에서 Apache Iceberg 활용하기

What is Iceberg?

Apache Iceberg는 페타바이트 기반의 데이터를 위한 오픈소스 데이터 테이블 형식으로, Netflix에서 개발하여 2020년부터 아파치 재단의 오픈소스로서 활용되었습니다. Apache Iceberg의 가장 큰 특징은 데이터 레이크에 저장된 대규모 데이터 세트를 테이블로 관리하며 Upsert, 스키마 진화, Time Travel query 등의 데이터 처리를 지원한다는 것입니다.

이러한 기능은 Apache Iceberg가 ACID를 보장하기 때문에 가능합니다. 기존의 Apache Hive 기반의 빅데이터 처리 방식은 ACID 트랜잭션을 지원하지 않기 때문에 데이터의 일관성과 무결성을 보장하지 않았습니다. 또 데이터의 스키마가 변경될 때에 전체 데이터세트를 re-write 해야하는 불편함과, 데이터를 주기적으로 배치 처리하여 처리시간이 오래 걸리는 특성이 있었습니다.

Apache Iceberg는 ACID를 준수하여 모든 트랜잭션에 데이터 일관성과 무결성을 보장할 뿐만 아니라, 데이터에 변경이 생길 시 레코드 단위로 수정 또는 삭제가 가능하여 대량의 데이터를 전부 re-write할 필요가 없습니다. 또한 실시간으로 데이터의 변경을 처리하기 때문에 스트리밍 데이터와 같은 데이터 수집과 분석이 동시에 이루어져야 하는 작업에 적합합니다.

아래 Apache Iceberg의 장점을 정리하였습니다.

1. SQL

Apache Hive가 HiveQL을 지원하여 편리하게 SQL 형식으로 데이터를 관리할 수 있는 것처럼, Iceberg도 SQL 문으로 여러가지 작업을 수행할 수 있습니다.

2. 데이터 일관성

Apache Iceberg는 데이터의 ACID를 준수하기 때문에 모든 사용자가 동일한 데이터를 읽고 쓸 수 있는 것을 보장합니다.

3. 데이터 구조

데이터의 구조를 변경하는 것을 스키마 진화라고 합니다. Apache Iceberg는 데이터 테이블에 변경이 발생했을 때 전체 데이터를 다시 쓰기할 필요 없이, 필요한 열만 추가하거나 변경, 또는 삭제하는 식으로 더욱 손쉬운 스키마 진화를 지원합니다.

4. 데이터 버전 관리

Apache Iceberg의 Time Travel 기능은 데이터의 시간에 따른 변경 내역을 추적할 수 있도록 합니다. 사용자는 이전 버전의 데이터에 액세스하여 쿼리하거나, 현재 버전과 비교하여 변경되거나 삭제된 부분을 확인할 수 있습니다. 또 문제가 생겼을 때 이전 버전으로 롤백도 가능합니다.

5. 크로스 플랫폼 지원

Apache Iceberg는 다양한 스토리지 시스템과 쿼리엔진을 지원합니다. 하나의 테이블 데이터를 Apache Spark, Hive, Presto, Flink 등 여러 엔진을 이용하여 쿼리할 수 있습니다. AWS Athena, EMR, Glue, Redshift 등에서 Iceberg를 이용하여 데이터를 쿼리할 수 있습니다.

6. 증분처리

Apache Iceberg는 기존의 데이터 테이블에서 변경된 일부 데이터만 다시 업데이트할 수 있는 CDC(Change Data Capture) 기능을 제공합니다. 빅데이터 분석을 위한 대규모 데이터는 보통 Amazon S3와 같은 데이터 레이크에 저장됩니다. 오브젝트 스토리지 아키텍처의 특성 상 저장된 데이터의 일부를 수정하고 싶다면 전체 데이터셋을 다시 쓰기해야 합니다. 하지만 Apache Iceberg를 사용하면 S3 데이터의 일부 레코드만 upsert하거나 merge하는 등 CDC 기능을 적용할 수 있습니다.

Apache Iceberg를 좀 더 자세히 이해하기 위하여 주요 구성요소에 대해 알아보겠습니다.

Apache Iceberg는 Catalog Layer, Metadata Layer, Data Layer로 구성되어 있습니다.

구성요소 설명
Catalog Layer metadata pointer로서 가장 최신 시점의 테이블 상태, 즉 최신 metadata file의 위치 정보를 가지고 있습니다.
Metadata Layer Iceberg는 테이블에 변경이 일어나는 시점마다 metadata file과 snapshot을 만듭니다. metadata file에는 테이블에 대한 변경사항이 기록되는데 예를 들어 테이블 스키마, 파티셔닝, 스냅샷 세부정보 등이 포함되어 있습니다. 스냅샷은 특정 시점의 테이블 상태를 나타냅니다.
스냅샷에 포함된 data file들의 metadata와 변경사항 추적은 manifest file에 기록됩니다. 이 manifest file들을 관리하는 것이 manifest file list입니다. manifest file list는 스냅샷 당 1개씩 존재합니다.
metadata file은 snapshot의 manifest file list를 참고하여 어떤 manifest file로 원하는 data file을 쿼리할 수 있는지 확인합니다.
Data Layer 실제로 데이터가 저장되어 있는 레이어로, 파일 형식으로 저장되어 있습니다.  v2 부터는 데이터에 변경사항이 있을 때 기존의 data file에 변경사항을 반영한 후 새로 write하는 방식(Copy-On-Write)뿐만 아니라, 데이터 변경분만 새로 write하여 기존 data file과 병합하여 읽는 방식(Merge-On-Read) 모두 지원합니다.

이렇게 metadata와 실제 data를 모두 파일 단위로 저장하는 Apache Iceberg의 아키텍처는 metadata를 RDBMS에 파티션 단위로 저장하는 Hive Metastore와 달리 metadata 스토리지의 확장성을 증가시키고 처리 성능에도 기여합니다.

Iceberg on AWS

AWS에서 제공하는 다양한 분석서비스를 통해 지금까지 소개드린 Apache Iceberg를 사용 할 수 있습니다. Iceberg 테이블을 Glue Data Catalog를 통해 중앙 관리하며 AWS의 분석서비스는 Glue Data Catalog에 저장된 Iceberg 테이블을 참조하여 데이터레이크에 대한 Transaction을 처리할 수 있습니다. 사용자분들이 선호하는 빅데이터 프레임워크(Spark, Flink, Presto, Trino, Hive 등)에 맞춰 AWS의 분석서비스를 선택적으로 사용 할 수 있습니다. 뿐만아니라 빅데이터 프레임워크에 익숙하지 않은 고객들의 경우 Amazon Redshift를 통해서 Iceberg 테이블에 대해 SQL기반의 읽기 쿼리를 수행 할 수 있습니다.

실습하기

Iceberg on AWS에 대한 이해를 돕기 위해 Sample Data를 바탕으로 실습을 진행합니다. 실습을 통해 AWS Glue, Amazon Athena, Amazon Redshift 서비스를 활용하여 Apache Iceberg의 기능들을 살펴봅니다.

실습 준비하기

실습을 진행하기 위해 아래의 Cloudformation 버튼을 클릭하여 실습을 위한 리소스를 생성합니다. Cloudformation을 통해 실습을 위한 아래의 AWS 리소스들을 생성하게 됩니다. 실습 진행 후 과금을 방지하기 위해 실습에 필요한 자원들은 반드시 삭제하며, 스택 삭제를 통해 Cloudformation에서 생성한 자원을 한 번에 삭제 할 수 있습니다.

Cloudformation을 통한 리소스 생성이 완료되면, Cloudshell 서비스로 이동하여 아래의 명령어를 입력합니다. 아래 명령어를 통해 실습에 필요한 데이터를 S3 버킷으로 이동합니다.

account_id=$(aws sts get-caller-identity --query 'Account' --output text)
echo "AWS Account ID: $account_id"

aws s3 cp s3://aws-korea-tech-blog-public/20240222-iceberg-on-aws/retail_data.parquet s3://$account_id-iceberg/retail/

실습 시나리오

  1. Amazon Athena : Amazon Athena를 통해 Iceberg Table을 생성합니다. 생성한 Iceberg Table을 통해 Iceberg의 기본적인 CRUD 기능을 살펴봅니다.
  2. AWS Glue : Glue Studio를 통해서 Spark의 Python Library인 pyspark을 활용합니다. pyspark을 통해서 Iceberg의 고급 기능인 Iceberg Metadata 조회 및 Time Travel 기능을 살펴봅니다.
  3. Amazon Redshift Serverless : Redshift의 Spectrum 기능을 활용하여 S3에 저장된 Iceberg Table을 Redshift에 연결하는 방법을 살펴보고, 기본적인 읽기 쿼리를 진행합니다.

원활한 실습 진행을 위해서 1번 부터 실습을 진행 합니다.

Apache Iceberg on Amazon Athena

Amazon Athena는 표준 SQL을 사용하여 Amazon S3(Amazon Simple Storage Service)에 있는 데이터를 직접 간편하게 분석할 수 있는 대화형 쿼리 서비스입니다. Amazon Athena를 활용하여 표준 SQL을 활용해 Iceberg Table을 생성 및 CRUD 및 고급 기능을 활용 할 수 있습니다.

Iceberg 테이블 생성하기

Amazon Athena 서비스의 쿼리 편집기로 이동합니다. Iceberg 테이블을 생성하기 이전에 Iceberg 테이블이 저장 될 Database를 생성합니다. 생성한 Database는 Glue Data Catalog에 등록됩니다.

CREATE DATABASE IF NOT EXISTS retaildb;

Iceberg 테이블에 저장할 데이터를 Hive기반 테이블에 저장합니다. 아래의 코드 중 {Account-ID}는 각 계정의 Account ID로 수정합니다.

CREATE EXTERNAL TABLE `retaildb`.`hive_table`(
  `invoiceno` bigint, 
  `stockcode` string, 
  `description` string, 
  `quantity` int, 
  `invoicedate` string, 
  `unitprice` float, 
  `customerid` string, 
  `country` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://{Account-ID}-iceberg/retail/'

아래의 쿼리를 실행하여 Iceberg 테이블을 앞에서 생성한 retaildb 안에 생성하고 데이터를 HIVE TABLE에서 가져와 입력합니다.

-- ICEBERG TABLE 생성하기
CREATE TABLE retaildb.iceberg_table(
invoiceno bigint,
stockcode string,
description string,
quantity int,
invoicedate string,
unitprice float,
customerid string,
country string)
LOCATION 's3://{Account-ID}-iceberg/iceberg_table/'
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet',
  'write_compression'='snappy',
  'optimize_rewrite_delete_file_threshold'='10'
);

-- HIVE 기반 테이블에서 ICEBERG TABLE로 데이터 저장
insert into retaildb.iceberg_table
select * from retaildb.hive_table; 

아래의 쿼리를 입력하여 Iceberg 테이블에 정상적으로 데이터가 입력되었는지 확인합니다. 데이터가 정상적으로 입력되었다면 Row의 수가 0이 아니어야합니다.

SELECT count(*) AS NUMBEROFROW FROM "retaildb"."iceberg_table";

지금까지 Iceberg 테이블을 정상적으로 생성했다면, ‘s3://{Account-ID}-iceberg/iceberg_table’로 이동했을 때 아래와 같은 데이터를 확인할 수 있습니다. metadata 폴더에는 iceberg table에서 DDM/DML을 수행할 때마다 새로운 metadata file과 snapshot이 생성됩니다.

데이터 조회 및 변경하기

Iceberg 테이블에 대한 데이터 조회 및 변경을 확인하기 위해 아래의 쿼리를 순서대로 진행합니다.

  1. country가 ‘Korea’인 데이터를 조회합니다.
  2. country가 ‘United Kingdom’인 데이터를 조회합니다.
  3. ‘United Kingdom’을 ‘Korea’로 변경해봅니다.
  4. 정상적으로 ‘Korea’로 변경 되었는지 확인합니다.
-- 1) Korea 데이터 조회 -> 데이터가 조회되지 않습니다.
SELECT * FROM retaildb.iceberg_table WHERE country='Korea' limit 10;

-- 2) United Kingdom 데이터 조회 -> 데이터가 조회됩니다.
SELECT * FROM retaildb.iceberg_table WHERE country='United Kingdom' limit 10;

-- 3) United Kingdom을 Korea로 변경
UPDATE retaildb.iceberg_table SET country='Korea' WHERE country='United Kingdom';

-- 4) 정상적으로 Korea로 변경되었는지 확인 -> 데이터가 조회됩니다.
SELECT * FROM retaildb.iceberg_table WHERE country='Korea' limit 10;

Iceberg 테이블과 Hive 테이블을 비교하기위해 Hive Table에 저장된 데이터 변경을 시도해 봅니다.

-- HIVE 테이블에 Korea를 China로 데이터 변경 시도
UPDATE retaildb.hive_table SET country='China' WHERE country='Korea'

HIVE 테이블에는 아래의 경고문구가 발생하며 데이터 변경이 불 가능함을 확인 할 수 있습니다.

Apache Iceberg on AWS Glue

AWS Glue는 분석 사용자가 여러 소스의 데이터를 쉽게 검색, 준비, 이동, 통합할 수 있도록 하는 서버리스 데이터 통합 서비스입니다. Glue Notebook은 빅데이터 처리를 위해 Spark, Ray 등의 프레임워크를 지원하는 대화형 방식의 노트북이며 이번 실습에서는 빅데이터 처리 플랫폼인 Spark의 Python 라이브러리인 Pyspark을 활용합니다.

Glue Notebook 생성하기

Glue Notebook 메뉴로 이동하여 Notebook 버튼을 클릭합니다. 아래와 같이 option은 ‘start fresh’로 지정하고 Notebook의 IAM Role은 Cloudformation을 통해 생성한 ‘Glueadmin’을 적용합니다.

Glue Notebook Setup하기

왼쪽 상단의 ‘+’ 버튼을 클릭하면 새로운 코드 입력 칸을 생성할 수 있습니다. 코드 입력란을 생성하고 아래의 코드를 입력하여 Glue Job에 대한 환경을 설정합니다.

# Glue Setup
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg"
} 

Glue 환경 설정을 완료했다면, 아래의 코드를 입력하여 Spark job 생성을 위한 Spark Session을 생성합니다. 그리고 Athena를 통해 생성한 Iceberg 테이블을 정보를 호출합니다. 아래의 코드 중 {Account-ID}는 각 계정의 Account ID로 수정합니다.

# Spark Session 생성
from pyspark.sql import SparkSession

warehouse_path = "s3://{Account-ID}-iceberg/iceberg_table/"

spark = SparkSession.builder \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.glue_catalog.warehouse", f"{warehouse_path}") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()  

# Iceberg 테이블의 스키마 조회 

table_path = 'glue_catalog.retaildb.iceberg_table'
spark.table(f"{table_path}").printSchema()

다음과 같이 Iceberg 테이블에 대한 스키마 정보가 정상적으로 호출된다면 Glue Studio에서 Iceberg 테이블을 정상적으로 호출한것입니다.

Iceberg 테이블에 대한 Metadata 조회

Iceberg 테이블은 데이터가 변경 될 때마다 Metadata에 대한 변경도 함께 발생합니다. Iceberg에서는 아래의 구분을 통해서 Metadata를 SQL 쿼리로 조회 할 수 있습니다. 자세한 사항은 Apache Iceberg 홈페이지에서 확인 가능합니다.

구분 설명
files 테이블의 현재 데이터 파일을 표시합니다.
manifests 테이블의 현재 파일 매니페스트를 표시합니다.
history 테이블의 기록을 표시합니다.
partitions 테이블의 현재 파티션을 표시합니다.
snapshots 테이블의 스냅샷을 보여줍니다.
refs 테이블의 참조를 표시합니다.

아래의 코드를 입력하여 지금까지 Iceberg 테이블에 대한 히스토리를 조회합니다. Spark은 SQL 방식으로도 데이터 조회가 가능합니다.

query = f"""SELECT * FROM {table_path}.history;"""

spark.sql(query).show(truncate=False)

아래와 같이 쿼리 결과가 호출됩니다. 첫번째 Snapshot은 parent_id가 Null 값인 것을 통해 Iceberg의 첫번째 스냅샷인 것을 확인 할 수 있습니다. 앞서 Athena를 통해 데이터 변경 작업을 1회 작업했기 때문에 Snapshot이 총 2개 인것을 확인 할 수 있습니다.

Iceberg Time Travel 기능을 통한 과거 데이터 조회

Iceberg의 Time Travel 기능을 활용하여 과거 데이터를 조회해봅니다. 앞서 1번 Step에서 ‘United Kingdom’을 ‘Korea’로 변경하였습니다. Snapshot 정보를 활용하여 데이터를 변경하기 이전의 원본 테이블로 데이터를 조회합니다.
Time Travel을 Snapshot의 시간 정보와 Snapshot ID를 활용하는 두 가지 방식으로 조회 할 수 있습니다. 해당 정보는 바로 이전의 Metada 조회 결과에서 확인 할 수 있습니다.
아래 쿼리 시 {TIMESTAMP}와 {SNAPSHOT ID}의 정보는 parent_id가 Null인 첫번째 스냅샷의 정보를 사용합니다. 아래는 사진은 예시입니다.

# Snapshot 시간정보 활용. {TIMESTAMP} 정보는 실제 데이터(made_current_at)로 변경합니다.
query = f"""SELECT * FROM {table_path} 
TIMESTAMP AS OF '{TIMESTAMP}'
WHERE country='United Kingdom'
limit 10"""

spark.sql(query).show()

# Snapshot ID 활용
query = f"""SELECT * FROM {table_path} 
VERSION AS OF {SNAPSHOT ID}
WHERE country='United Kingdom'
limit 10"""

spark.sql(query).show()

Country의 데이터를 Korea로 변경했음에도 Time Travel 기능을 활용하여 데이터를 변경하기 이전의 Snapshot을 참고해 과거 데이터를 조회할 수 있습니다.

Apache Iceberg on Amazon Redshift

Amazon Redshift는 클라우드에서 완벽하게 관리되는 페타바이트급 데이터 웨어하우스 서비스입니다. Amazon Redshift Serverless를 사용하면 프로비저닝된 데이터 웨어하우스를 구성하지 않아도 데이터를 액세스하고 분석할 수 있습니다.
현재 블로그를 작성하는 시점에서는 Apache Iceberg에 대한 Redshift의 지원은 프리뷰 상태입니다. Redshift 환경에서 Iceberg 사용 시 고려사항과 제한 사항은 해당 링크를 참고하시기 바랍니다. 현재 Redshift에서는 Iceberg에 대해서 읽기 전용 액세스를 제공하여 이번 실습에서는 ‘SELECT’ 구문에 한해 실습을 진행합니다.

Redshift Spectrum이란?

Amazon Redshift Spectrum은 데이터를 Amazon Redshift 테이블에 로드하지 않고도 Amazon S3의 파일에서 정형 및 비정형 데이터를 효율적으로 쿼리할 수 있는 기능입니다. Redshift Spectrum 쿼리는 대량 병렬 처리를 채택해 큰 데이터 집합에 대해 매우 빠르게 실행됩니다.

Query Editor v2를 통한 Redshift 접근

Redshift Serverless에 접속하여 우측 상단의 ‘쿼리 데이터’ 버튼을 클릭합니다. 왼쪽의 Serverless Redshift에 대한 연결을 설정하며, 연결 방식은 ‘Federated user’로 선택합니다.

Redshift Spectrum을 이용해 외부 Schema 연결

아래의 쿼리를 입력하여 Glue Data Catalog에 저장된 Schema를 연결합니다.

create external schema spectrum_schema from data catalog
database 'retaildb'
iam_role default
create external database if not exists;

아래의 쿼리를 입력하여 연결된 Table 정보를 호출합니다.

SHOW TABLES FROM SCHEMA dev.spectrum_schema;

Redshift에서 Iceberg 테이블의 데이터 읽기

아래의 쿼리를 입력하여 Iceberg 테이블을 정상적으로 호출하는지 확인합니다.

SELECT * FROM spectrum_schema.iceberg_table limit 10;

리소스 정리하기

  1. Cloudshell 서비스로 이동합니다.
  2. 아래의 코드를 입력하여 실습에서 사용한 모든 리소스를 삭제합니다.
account_id=$(aws sts get-caller-identity --query 'Account' --output text)
echo "AWS Account ID: $account_id"

aws s3 rm s3://$account_id-iceberg --recursive

aws glue delete-database --name retaildb
aws cloudformation delete-stack --stack-name iceberg

결론

많은 AWS 사용자 분들이 S3를 데이터레이크로 기반한 데이터 분석 아키텍처를 구축하고 있습니다. 이제 Apache Iceberg를 통해 데이터 변환 및 삭제가 가능한 데이터레이크를 구성할 수 있으며 Time Travel의 기능을 통해 과거 데이터 조회를 지원합니다. AWS 분석 서비스를 활용해 다양한 빅데이터 프레임워크를 간편하게 구성할 수 있으며 AWS Glue Catalog를 통해 Iceberg 테이블을 사용할 수 있습니다.

Uh Hannah

Uh Hannah

어한나 Solutions Architect는 고객의 AWS에서의 클라우드 여정을 기술적으로 돕는 역할을 담당하고 있습니다. 현재 AWS AOD (Area of Depth) member of Analytics TFC (Technical Field Community)로 활동하고 있습니다.

Huh Jinsung

Huh Jinsung

허진성 솔루션즈 아키텍트는 금융, 핀테크, 제조 고객들의 성공적인 AWS Cloud 여정을 위한 최적의 아키텍트를 구성하고 지원하는 역할을 하고 있습니다.