Amazon Web Services 한국 블로그

Amazon EMR, Apache Hudi 추가를 통해 개별 레코드 수준 데이터 관리 기능 출시

Amazon S3에 데이터를 저장하면 규모 조정, 신뢰성 및 비용 효율 면에서 많은 이점이 있습니다. 그 뿐 아니라, Amazon EMR을 활용할 수 있으므로 Apache Spark, HivePresto와 같은 오픈 소스 도구를 사용하여 데이터를 처리 및 분석할 수 있습니다. 이러한 도구는 강력한 성능을 제공하지만 점진적 데이터 프로세싱과 레코드 수준의 삽입, 업데이트 및 삭제가 필요한 사용 사례를 처리하기에는 여전히 어려울 수 있습니다.

저희 고객과의 대화를 통해, AWS에서는 다음과 같이 개별 레코드에 대한 점진적 변경을 처리해야 하는 사용 사례가 있다는 것을 확인했습니다.

  • 사용자가 잊혀질 권리를 행사하거나 데이터 사용에 대한 동의 의사를 변경할 수 있는 데이터 프라이버시 규정 준수
  • 특정 데이터 삽입 또는 업데이트 이벤트를 처리해야 하는 데이터 스트리밍 작업
  • 기업 데이터 웨어하우스 또는 운영 데이터 스토어에서 오는 데이터베이스 변경 로그의 추적 및 수집을 위해 CDC(변경 데이터 캡처) 아키텍처 사용
  • 지연 수신된 데이터의 재게시 또는 특정 시점의 데이터 분석

Amazon EMR 5.28.0에서는 Apache Hudi(인큐베이션 단계)를 포함하여, 더 이상 레코드 수준의 삽입, 업데이트 및 삭제 작업을 실행하기 위해 사용자 지정 솔루션을 구축할 필요가 없습니다. Hudi 개발은 수집 및 ETL 파이프라인의 비효율성 문제를 해결하기 위해 2016년 Uber에서 시작되었습니다. 최근 수 개월 동안 EMR 팁은 Apache Hudi 커뮤니티와 긴밀히 협력하면서 Hudi의 Spark 2.4.4 업데이트(HUDI-12), Spark Avro 지원(HUDI-91), AWS Glue Data Catalog 지원(HUDI-306) 및 다양한 버그 수정을 포함하는 패치에 기여했습니다.

Hudi를 사용하면 개방형 공급업체 중립적 형식으로 S3에서 레코드 수준의 삽입, 업데이트 및 삭제를 수행할 수 있으므로, 데이터 프라이버시 법을 준수하고, 실시간 스트림 및 변경 로그 캡처를 소비하고, 지연 수신된 데이터를 재게시하고, 기록 및 롤백을 추적할 수 있습니다. 사용자가 데이터 세트와 테이블을 생성하면 Hudi에서 관련 데이터 형식을 관리합니다. Hudi는 Apache ParquetApache Avro(데이터 스토리지용)를 사용하며 Spark, Hive 및 Presto 통합이 내장되어 있으므로, 현재 사용 중인 것과 같은 도구를 사용하여 거의 실시간으로 신규 데이터에 액세스하면서 Hudi 데이터 세트를 쿼리할 수 있습니다.

EMR 클러스터를 시작하면, 언제든지 Hive, Spark 또는 Presto와 같은 구성 요소 중 하나 이상을 선택할 때 자동으로 Hudi를 위한 라이브러리와 도구가 설치 및 구성됩니다. Spark를 사용하여 새로운 Hudi 데이터 세트를 생성하고 데이터를 삽입, 업데이트 및 삭제할 수 있습니다. 각 Hudi 데이터 세트는 클러스터에 구성되어 있는 메타 스토어(AWS Glue Data Catalog 포함)에 등록되어 있으며, Spark, Hive 및 Presto를 사용하여 쿼리할 수 있는 테이블 형태로 표시됩니다.

Hudi는 S3에서 데이터를 쓰고, 인덱싱하고, 읽는 방법을 정의하는 두 가지 스토리지 유형을 지원합니다.

  • 쓰기 시 복사 – 데이터가 열 형식(Parquet)으로 저장되고 업데이트는 쓰기 도중 파일의 새 버전을 생성합니다. 이 스토리지 유형은 효율적인 열 형식의 파일로 된 최신 버전의 데이터 세트를 항상 사용할 수 있으므로 읽기 집약적인 워크로드에 이상적입니다.
  • 읽기 시 병합 – 데이터가 열(Parquet) 및 행 기반(Avro) 형식의 조합으로 저장됩니다. 업데이트는 열 기반의 “델타 파일”에 로깅되고 나중에 압축되어 새 버전의 열 형식 파일을 생성합니다. 이 스토리지 유형은 새 커밋을 델타 파일 형태로 빠르게 작성하지만 데이터 세트를 읽을 때에는 압축된 열 형식 파일을 델터 파일과 병합해야 하므로 쓰기 집약적인 워크로드에 이상적입니다.

EMR 클러스터에서 Hidi 데이터 세트를 설정하고 사용하는 방법에 대한 개요를 간략히 살펴 보겠습니다.

Amazon EMR에서 Apache Hudi 사용
먼저 EMR 콘솔에서 클러스터를 생성합니다. 고급 옵션에서 EMR 릴리스 5.28.0(최초로 Hudi를 포함한 버전)을 선택하고 Spark, Hive 및 Tez 애플리케이션을 선택합니다. Spark와 Hive를 모두 실행하기 충분한 용량을 확보하기 위해 하드웨어 옵션에서 3개의 작업 노드를 추가합니다.

클러스터가 준비되면 보안 옵션에서 선택한 키 페어를 사용하여 마스터 노드에 SSH하고 Spark Shel에 액세스합니다. 다음 명령을 사용하여 Hudi와 함께 사용할 Spark Shel을 시작합니다.

$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
              --conf "spark.sql.hive.convertMetastoreParquet=false"
              --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

여기에서 다음 Scala 코드를 사용하여 쓰기 시 복사 스토리지 유형으로 Hudi 데이터 세트의 일부 샘플 ELB 로그를 가져옵니다.

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//다양한 입력 값을 변수로 설정
val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"
val hudiTableName = "elb_logs_hudi_cow"
val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName

// Hudi 데이터 소스 옵션 설정
val hudiOptions = Map[String,String](
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb", 
    HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
    DataSourceWriteOptions.OPERATION_OPT_KEY ->
        DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp", 
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb", 
    DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false", 
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
        classOf[MultiPartKeysValueExtractor].getName)

// S3에서 데이터를 읽고 파티션 및 레코드 키가 있는 DataFrame 생성
val inputDF = spark.read.format("parquet").load(inputDataPath)

// Hudi 데이터 세트에 데이터 쓰기
inputDF.write
       .format("org.apache.hudi")
       .options(hudiOptions)
       .mode(SaveMode.Overwrite)
       .save(hudiTablePath)

Spark Shell에서 이제 Hudi 데이터 세트에 있는 레코드의 수를 계산할 수 있습니다.

scala> inputDF2.count()
res1: Long = 10491958

테이블이 기본 데이터베이스에 생성되도록 옵션에서 클러스터에 대해 구성된 Hive 메타 스토어와의 통합을 사용합니다. 이 방식에서는 Hive를 사용하여 Hudi 데이터 세트의 데이터를 쿼리할 수 있습니다.

hive> use default;
hive> select count(*) from elb_logs_hudi_cow;
...
OK
10491958
...

이제 데이터 세트의 단일 레코드를 업데이트 또는 삭제할 수 있습니다. Spark Shell에서 업데이트하려는 레코드를 찾기 위해 일부 변수를 준비하고 변경하려는 열의 값을 선택하기 위해 SQL 문을 준비합니다.

val requestIpToUpdate = "243.80.62.181"
val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"

열의 현재 값을 보기 위해 SQL 문을 실행합니다.

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_003|
+------------+

그런 다음 레코드를 선택하고 업데이트합니다.

// 단일 레코드를 가진 DataFrame 생성 및 열 값 업데이트
val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate)
                      .withColumn("elb_name", lit("elb_demo_001"))

이제 Hudi 데이터 세트를 생성할 때 사용했던 것과 유사한 구문을 사용하여 해당 데이터 세트를 업데이트합니다. 그러나, 이번에는 작성되는 DataFrame이 단 하나의 레코드만 포함합니다.

// 기존 Hudi 데이터 세트에 대한 업데이트로서 DataFrame 쓰기
updateDF.write
        .format("org.apache.hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(hudiTablePath)

Spark Shell에서 업데이트 결과를 확인합니다.

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_001|
+------------+

이제 동일한 레코드를 삭제하고자 합니다. 삭제하려면 write 옵션에 EmptyHoodieRecordPayload 페이로드를 전달합니다.

// 레코드 삭제를 위해 EmptyHoodieRecordPayload로 DataFrame 쓰기
updateDF.write
        .format("org.apache.hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,
                "org.apache.hudi.EmptyHoodieRecordPayload")
        .mode(SaveMode.Append)
        .save(hudiTablePath)

Spark Shell에서 레코드가 더 이상 사용할 수 없음을 볼 수 있습니다.

scala> spark.sql(sqlStatement).show()
+--------+                                                                      
|elb_name|
+--------+
+--------+

Hidi에서는 이 모든 업데이트와 삭제를 어떻게 관리할까요? Hudi CLI(명령줄 인터페이스)를 사용하여 데이터 세트에 연결해 보고 이러한 변경 사항이 어떻게 커밋으로 해석되는지 알아보겠습니다.

이 데이터 세트는 쓰기 시 복사 데이터 세트입니다. 즉, 레코드에 대한 업데이트가 있을 때마다 해당 레코드를 포함하는 파일이 업데이트된 값을 포함하기 위해 재작성됩니다. 각 커밋에 대해 얼마나 많은 레코드가 쓰여졌는지 볼 수 있습니다. 테이블에 맨 아래 줄은 최초 데이터 생성을 설명하고 그 위 줄에는 단일 레코드 업데이트가 있으며 맨 위에는 단일 레코드 삭제가 있습니다.

Hudi를 사용하면 각 커밋을 롤백할 수 있습니다. 예를 들어, 다음과 같은 명령으로 삭제 작업을 롤백할 수 있습니다.

hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031

Spark Shell에서 레코드는 이제 업데이트 직후의 원래 상태로 되돌려졌습니다.

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_001|
+------------+

쓰기 시 복사는 기본 스토리지 유형입니다. 위 단계를 반복하면 hudiOptions에 다음을 추가하여 읽기 시 병합 데이터 세트 유형을 생성 및 업데이트할 수 있습니다.

DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"

읽기 시 병합 데이터 세트를 업데이트하고 Hudi CLI로 커밋을 확인하면, 읽기 시 병합이 쓰기 시 복사와 비교할 때 얼마나 다른지 볼 수 있습니다. 읽기 시 병합에서는 쓰기 시 복사와 같이 전체 파일이 아닌 업데이트된 행만 작성합니다. 그렇기 때문에 읽기 시 병합이 더 많은 쓰기를 필요로 하거나 읽기 수가 적은 대신 업데이트/삭제가 많은 워크로드와 같은 사용 사례에 유용한 것입니다. 델타 커밋은 디스크에 Avro 레코드(행 기반 스토리지)로 작성되고 압축된 데이터는 Parquet 파일(열 형식 스토리지)로 작성됩니다. 너무 많은 델타 파일이 생성되는 것을 피하기 위해, Hudi는 충분한 읽기 성능을 보장할 수 있도록 데이터 세트를 자동으로 압축합니다.

읽기 시 병합 데이터 세트가 생성될 때에는 두 개의 Hive 테이블이 생성됩니다.

  • 첫 번째 테이블은 데이터 세트의 이름을 일치시킵니다.
  • 두 번째 테이블은 이름에 _rt라는 문자가 첨부됩니다. _rt 접미사는 real-time, 즉 실시간을 의미합니다.

쿼리를 수행하면 첫 번째 테이블에서 압축된 데이터를 반환하며 가장 최근의 델타 커밋은 표시하지 않습니다. 이 테이블을 사용하면 최상의 성능을 제공하지만 가장 최신의 데이터가 누락됩니다. 실시간 테이블을 쿼리하면 읽기 시 압축된 데이터와 델타 커밋이 병합되므로 이 데이터 세트를 “읽기 시 병합”이라고 부릅니다. 이렇게 하면 가장 최신의 데이터를 사용할 수 있지만 성능 부담이 발생하므로 압축된 데이터를 쿼리할 때만큼 뛰어난 성능을 얻을 수 없습니다. 이 방식은 데이터 엔지니어와 애널리스트에게 성능 및 데이터 최신성 사이에서 선택할 수 있는 유연성을 제공합니다.

지금 이용 가능
이 새로운 기능은 현재 EMR 5.28.0을 지원하는 모든 리전에서 사용할 수 있습니다. EMR과 함께 Hudi를 사용할 때에는 추가 비용이 발생하지 않습니다. Hudi에 대한 자세한 내용은 EMR 설명서를 참조하십시오. 이 새로운 도구는 S3의 데이터를 처리, 업데이트 및 삭제하는 방식을 간소화해 줍니다. 이 도구를 어떤 사용 사례에 사용하려는지 알려 주시기 바랍니다.

Danilo