亚马逊AWS官方博客

使用 Alluxio 加速数据湖 ODS 写后读及 DWD 宽表性能

目前业界大数据分析场景,数据湖架构已经深入人心,一方面它底层基于 Amazon Simple Storage Service (S3) 这样的对象存储,存储成本低;另外它与一系列数据处理工具打通,能够快速进行数据探索和数据挖掘,可以实现高效的”沙中淘金”。之后,以 Hudi、Iceberg 等为代表的数据湖架构,更是推动数据湖走向实时化。

原理&概述

现有数据湖虽然通过 Hudi/iceberg 及 Flink 等技术组件实现了实时更新入湖,但在湖数据查询性能上还存在诸多挑战,如多库多表数据入湖时,hudi 小文件过多导致 ODS 层元数据膨胀,查询响应时间高,ETL 后的 hudi 宽表聚合查询资源开销居高不下,响应时延很难达到秒级等。

为了进一步提升数据湖上 ODS 及 DWD 宽表查询性能,提升 Spark/Presto 等计算引擎的 IO 效率,本文引入 Alluxio 集群缓存,对 S3,hdfs 等数据进行 warm up 或直写缓存透传,加速 ODS 数据入湖的写后读,以及 DWD 宽表的查询性能。

Alluxio 是一个开源的分布式文件系统,主要功能是在计算框架(如 Spark、Presto 等)和底层存储系统(如 HDFS、S3 等)之间提供高速的内存级缓存,Alluxio 是大数据生态系统一个非常重要的组件,作为大数据计算框架的缓存层,通过内存加速读写速度,使得大数据应用可以跨存储框架获取数据,提供统一而高效的数据访问。

Alluxio 在大数据/数据湖技术架构上所处位置如下图所示:

Alluxio 社区与 AWS EMR 服务有深入的交互和集成,官方提供了 on EMR 的集成方案,详见 Alluxio 社区文档,AWS 也提供了快速安装部署的 bootstrap 脚本及配置,详见 AWS 官方 blog

通过搭建独立的 Alluxio 集群,客户可以充分利用其多主特性,实现高可用性和容错性,确保数据的持续可访问性。同时,Alluxio 的多级目录挂载功能使得数据可以按层次结构进行组织,对接后端多种存储系统,比如 root 根/目录对接 S3 云存储,/user 目录对接后端本地 HDFS,从而有效提高数据访问的效率与可维护性,如下图所示。

以下我们详细介绍使用 Alluxio 集群加速 ODS 写后读和 DWD 宽表查询的实施落地方法及步骤。

ODS 写后读集成 Alluxio

我们按照如下架构,在 Amazon EMR 上集成 Alluxio 集群,针对 Flink 实时写入 Hudi 的 ODS 数据入湖的场景,配置 Flink 直写 Alluxio 集群缓存,并在写入后,通过 Presto 查询 ODS 缓存层数据,进行性能对比。

  • 在将 Flink 与 Alluxio 集成后,将数据写入到 ODS 层时,可以利用 Alluxio 的缓存机制,实现直接更新 Alluxio 集群的缓存。这使得数据写入到 ods 层后,能够迅速反映在 Alluxio 的缓存中,提高后续查询的性能和效率。
  • 使用 Presto 查询 ODS 数据湖时,由于 Alluxio 已经配置了与后端 S3 存储的对接,因此最新的缓存数据也会透传到 S3 hudi,保障 ODS 入湖数据一致性。同时,Presto 通过 Hadoop-hive2 plugin 与 Alluxio 集群交互,识别到 alluxio:// url 的表路径后,直接查询 Alluxio 集群获取缓存层数据。

Flink 配置

Flink 与 Alluxio 的集成配置,如下方法所示:

  • 修改 hdfs 的 core-site.xml 配置文件,将如下 alluxio filesystem 的实现类属性加到配置中
    <property>
    <name>fs.alluxio.impl</name>
    <value>alluxio.hadoop.FileSystem</value>
    </property>
    
  • 将 Alluxio 的相关配置加入 Flink 配置中,做为外部属性
    alluxio 客户端相关属性在 alluxio 安装目录下 {alluxio home}/conf/alluxio-site.properties 中,我们可以在 Flink 目录{FLINK_HOME}/conf/flink-conf.yaml 配置文件中将这些属性转化为 env.java.opts,从而方便 Flink 识别和使用 Alluxio 的配置参数

修改 flink-conf.yaml 的 env.java.opts 配置示例如下:

env.java.opts: -Djava.io.tmpdir=/mnt/tmp -Dalluxio.master.web.port=29999 -Dalluxio.user.file.writetype.default=CACHE_THROUGH
 -Dalluxio.master.hostname=ip-172-31-26-168.ap-southeast-1.compute.internal
 -Dalluxio.master.mount.table.root.ufs=s3://salunchbucket/data/alluxio/

如上所示我们把 Alluxio 的 master 节点(alluxio.master.hostname),以及对接的后端 S3 路径(alluxio.master.mount.table.root.ufs)加入 flink yaml 配置中,这样 flink 写入 ODS 时,即可识别到 alluxio 缓存 uri 从而直接写入 alluxio 缓存集群中。

Flink 写入 Alluxio hudi 表

我们在 Flink 中 create Alluxio 路径的 hudi 表,通过 path 参数设置 Alluxio 集群的相应 uri 路径,如下所示:

CREATE TABLE logevent_sink_test_alluxio(`timestamp` STRING,
`system` string,
actor string,
action string
) PARTITIONED BY (actor) 
WITH (
'connector' = 'hudi',
'path' = 'alluxio:///1000/flink-hudi/logevent_sink_test_alluxio/',
'table.type' = 'MERGE_ON_READ', 
'compaction.tasks' = '1', 
'compaction.async.enabled' = 'true', 
'compaction.trigger.strategy' = 'num_or_time',
'compaction.delta_commits' ='12', 
'compaction.delta_seconds' = '60', 
'write.precombine.field' = 'timestamp',
'write.operation' = 'upsert', 
'hoodie.datasource.write.recordkey.field' = 'timestamp', 
'hive_sync.enable' = 'true',
'hive_sync.table' = 'logevent_sink_test_alluxio', 
'hive_sync.mode' = 'hms',
'hive_sync.use_jdbc' = 'false',
'hive_sync.username' = 'hadoop', 
'hive_sync.partition_fields' = 'actor',
'hive_sync.database'='tpcds_text_1000',
'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
);

如上即可轻松创建 ODS 的 Alluxio 缓存表,该表对接上文章节中提及的后端 S3 路径,从而实现在 Flink 入湖的时候直接写入 Alluxio 缓存及透传到 S3 数据湖。

对比测试

我们写入一部分测试数据后,即可对比查询 S3 上的原始表和 Alluxio 缓存表的性能。

  • Alluxio 路径表查询:952s
    presto:tpcds_text_1000> select count(1) from logevent_sink_test_alluxio_rt group by actor; _col0
    813
    812
    813
    3123
    812
    (5 rows)
    2023-07-26T23:39:24.792Z INFO dispatcher-query-18  com.facebook.presto.event.QueryMonitor TIMELINE: Query 20230726_233923_00009_58nbw :: Transaction:[c39bed45-3ac8-4e09-b2e1-ba71d6429606] :: elapsed 952ms :: planning 436ms :: scheduling 319ms :: running 173ms :: finishing 24ms :: begin 2023-07-26T23:39:23.821Z :: end 2023-07-26T23:39:24.773Z
    
  • S3 路径表查询:3.7s
    presto:tpcds_text_1000> select count(1) from logevent_sink_test_s3_rt group by actor; _col0
    813
    812
    813
    3123
    812
    (5 rows)
    2023-07-26T23:39:17.186Z INFO dispatcher-query-20  com.facebook.presto.event.QueryMonitor TIMELINE: Query 20230726_  233913_00008_58nbw :: Transaction:[a4fd59fd-b4ac-4e3a-a6d2-634412e7a2df] :: elapsed 3792ms :: planning 593ms :: scheduling 1147ms :: running 2030ms :: finishing 22ms :: begin 2023-07-26T23:39:13.375Z :: end 2023-07-26T23:39:17.167Z
    

可以看到 Alluxio 的 ODS 表写后读查询延迟相对于 S3 的原始 ODS 表有显著降低。

DWD 宽表集成 Alluxio

刚才我们实现了在 ODS 入湖的时候,通过 Flink 直写 Alluxio 集群缓存,加速 ODS 写后读性能,但有客户 ODS 没有使用 Flink(比如通过离线文件,ETL 入 ODS),此时如果我们也希望通过 Alluxio 集群缓存,提升后续数仓层得数据分析的性能,比如 Presto 对 DWD/DWS 层的查询延迟,我们接下来将基于一张 DWD 层的宽表,分别使用 Alluxio 路径、S3 路径、HDFS 路径配置创建三张不同路径的表,然后进行性能对比测试。

DWD 宽表构建

我们采用 spark sql 进行 create DWD 宽表 table 的操作,示例如下:

spark-sql --jars /usr/lib/hudi/hudi-spark-bundle.jar --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf  "spark.driver.memory=32G"  --conf  "spark.executor.memory=40G" --conf "spark.dynamicAllocation.enabled=true"

CREATE EXTERNAL TABLE tpcds_text_1000.dwd_charge_transaction_record_v_partition_alluxio (
  chain_id STRING,
  create_time STRING,
  id STRING,
  type INT,
  type_text STRING,
  is_deleted INT,
  retail_type INT,
  charge_sheet_type INT,
  v2_patient_order_id STRING,
  v2_charge_sheet_id STRING,
  v2_transaction_id STRING,
  v2_charge_form_item_id STRING,
  v2_charge_coupon_promotion_info_id BIGINT,
  v2_charge_gift_promotion_info_id BIGINT,
  v2_registration_form_item_id STRING,
  v2_patient_member_bill_id STRING,
  v2_outpatient_sheet_id STRING,
  clinic_id STRING,
  department_id STRING,
  chain_text STRING,
  clinic_text STRING,
  department_text STRING,
  doctor_id STRING,
  copywriter_id STRING,
  seller_id STRING,
  cashier_id STRING,
  patient_id STRING,
  created_by STRING,
  last_modified_by STRING,
  doctor_text STRING,
  copywriter_text STRING,
  seller_text STRING,
  cashier_text STRING,
  patient_text STRING,
  created_by_text STRING,
  last_modified_text STRING,
  patientorder_no BIGINT,
  patientorder_source INT,
  patientorder_source_text STRING,
  product_type INT,
  product_sub_type INT,
  product_custom_type INT,
  product_id STRING,
  product_unit STRING,
  product_unit_count DECIMAL(20,8),
  product_unit_price DECIMAL(20,8),
  product_dose_count DECIMAL(20,8),
  product_compose_type INT,
  product_is_disamounting INT,
  specification STRING,
  product_type_text STRING,
  product_sub_type_text STRING,
  product_custom_type_text STRING,
  product_name STRING,
  product_cadn STRING,
  product_spec STRING,
  pay_type INT,
  pay_sub_type INT,
  pay_third_party_extra_info STRING,
  pay_type_text STRING,
  pay_sub_type_text STRING,
  original_price DECIMAL(20,8),
  cost_price DECIMAL(20,8),
  discount_price DECIMAL(20,8),
  adjustment_price DECIMAL(20,8),
  promotion_price DECIMAL(20,8),
  promotion_info STRING,
  should_receive_price DECIMAL(20,8),
  received_price DECIMAL(20,8),
  is_air_pharmacy INT,
  source_form_item_id STRING,
  associate_form_item_id STRING,
  source_form_type INT,
  v2_charge_form_id STRING,
  usage STRING,
  record_type INT,
  calc_unit STRING,
  calc_count DECIMAL(20,8),
  classify_level_1_id STRING,
  classify_level_2_id INT,
  prescription_type INT ,
  visit_source_remark STRING,
  comment STRING ,
  member_present DECIMAL(20,8),
  member_principal DECIMAL(20,8),
  process_type INT,
  process_sub_type INT,
  discount_info STRING ,
  deduct_promotion_price DECIMAL(15,4),
  discount_promotion_price DECIMAL(15,4) ,
  gift_promotion_price DECIMAL(15,4) ,
  coupon_promotion_price DECIMAL(15,4) ,
  member_info STRING ,
  member_id STRING ,
  visit_source_level_1_id STRING ,
  visit_source_level_2_id STRING ,
  visit_source_from STRING ,
  charge_sheet_total_price DECIMAL(15,4) ,
  charge_sheet_deduct_price DECIMAL(15,4) ,
  charge_sheet_receivable_price DECIMAL(15,4) ,
  charge_sheet_discount_price DECIMAL(15,4),
  charge_sheet_adjustment_price DECIMAL(15,4) ,
  record_discount_price DECIMAL(15,4) ,
  record_adjustment_price DECIMAL(15,4) ,
  charged_by STRING ,
  health_card_cash_receivable_fee DECIMAL(15,4) ,
  health_card_cash_received_fee DECIMAL(15,4) ,
  health_card_med_type STRING ,
  is_chain_guest INT,
  is_clinic_guest INT ,
  scene_type INT ,
  seller_department_id STRING ,
  nurse_id STRING ,
  item_doctor_id STRING ,
  item_doctor_department_id STRING ,
  item_nurse_id STRING )
using parquet  
PARTITIONED BY (ts STRING)  
LOCATION 'alluxio:///1000/dwd_charge_transaction_record_v_partition_alluxio'


CREATE EXTERNAL TABLE tpcds_text_1000.dwd_charge_transaction_record_v_partition_hdfs (
  chain_id STRING,
  create_time STRING,
  id STRING,
  type INT,
  type_text STRING,
  is_deleted INT,
  retail_type INT,
  charge_sheet_type INT,
  v2_patient_order_id STRING,
  v2_charge_sheet_id STRING,
  v2_transaction_id STRING,
  v2_charge_form_item_id STRING,
  v2_charge_coupon_promotion_info_id BIGINT,
  v2_charge_gift_promotion_info_id BIGINT,
  v2_registration_form_item_id STRING,
  v2_patient_member_bill_id STRING,
  v2_outpatient_sheet_id STRING,
  clinic_id STRING,
  department_id STRING,
  chain_text STRING,
  clinic_text STRING,
  department_text STRING,
  doctor_id STRING,
  copywriter_id STRING,
  seller_id STRING,
  cashier_id STRING,
  patient_id STRING,
  created_by STRING,
  last_modified_by STRING,
  doctor_text STRING,
  copywriter_text STRING,
  seller_text STRING,
  cashier_text STRING,
  patient_text STRING,
  created_by_text STRING,
  last_modified_text STRING,
  patientorder_no BIGINT,
  patientorder_source INT,
  patientorder_source_text STRING,
  product_type INT,
  product_sub_type INT,
  product_custom_type INT,
  product_id STRING,
  product_unit STRING,
  product_unit_count DECIMAL(20,8),
  product_unit_price DECIMAL(20,8),
  product_dose_count DECIMAL(20,8),
  product_compose_type INT,
  product_is_disamounting INT,
  specification STRING,
  product_type_text STRING,
  product_sub_type_text STRING,
  product_custom_type_text STRING,
  product_name STRING,
  product_cadn STRING,
  product_spec STRING,
  pay_type INT,
  pay_sub_type INT,
  pay_third_party_extra_info STRING,
  pay_type_text STRING,
  pay_sub_type_text STRING,
  original_price DECIMAL(20,8),
  cost_price DECIMAL(20,8),
  discount_price DECIMAL(20,8),
  adjustment_price DECIMAL(20,8),
  promotion_price DECIMAL(20,8),
  promotion_info STRING,
  should_receive_price DECIMAL(20,8),
  received_price DECIMAL(20,8),
  is_air_pharmacy INT,
  source_form_item_id STRING,
  associate_form_item_id STRING,
  source_form_type INT,
  v2_charge_form_id STRING,
  usage STRING,
  record_type INT,
  calc_unit STRING,
  calc_count DECIMAL(20,8),
  classify_level_1_id STRING,
  classify_level_2_id INT,
  prescription_type INT ,
  visit_source_remark STRING,
  comment STRING ,
  member_present DECIMAL(20,8),
  member_principal DECIMAL(20,8),
  process_type INT,
  process_sub_type INT,
  discount_info STRING ,
  deduct_promotion_price DECIMAL(15,4) ,
  discount_promotion_price DECIMAL(15,4) ,
  gift_promotion_price DECIMAL(15,4) ,
  coupon_promotion_price DECIMAL(15,4) ,
  member_info STRING ,
  member_id STRING ,
  visit_source_level_1_id STRING ,
  visit_source_level_2_id STRING ,
  visit_source_from STRING ,
  charge_sheet_total_price DECIMAL(15,4) ,
  charge_sheet_deduct_price DECIMAL(15,4) ,
  charge_sheet_receivable_price DECIMAL(15,4) ,
  charge_sheet_discount_price DECIMAL(15,4) ,
  charge_sheet_adjustment_price DECIMAL(15,4) ,
  record_discount_price DECIMAL(15,4) ,
  record_adjustment_price DECIMAL(15,4) ,
  charged_by STRING ,
  health_card_cash_receivable_fee DECIMAL(15,4) ,
  health_card_cash_received_fee DECIMAL(15,4) ,
  health_card_med_type STRING ,
  is_chain_guest INT ,
  is_clinic_guest INT ,
  scene_type INT ,
  seller_department_id STRING ,
  nurse_id STRING ,
  item_doctor_id STRING ,
  item_doctor_department_id STRING ,
  item_nurse_id STRING )
using parquet  
PARTITIONED BY (ts STRING)  
LOCATION 'hdfs:///1000/dwd_charge_transaction_record_v_partition_hdfs'

CREATE EXTERNAL TABLE tpcds_text_1000.dwd_charge_transaction_record_v_partition_s3 (
  chain_id STRING,
  create_time STRING,
  id STRING,
  type INT,
  type_text STRING,
  is_deleted INT,
  retail_type INT,
  charge_sheet_type INT,
  v2_patient_order_id STRING,
  v2_charge_sheet_id STRING,
  v2_transaction_id STRING,
  v2_charge_form_item_id STRING,
  v2_charge_coupon_promotion_info_id BIGINT,
  v2_charge_gift_promotion_info_id BIGINT,
  v2_registration_form_item_id STRING,
  v2_patient_member_bill_id STRING,
  v2_outpatient_sheet_id STRING,
  clinic_id STRING,
  department_id STRING,
  chain_text STRING,
  clinic_text STRING,
  department_text STRING,
  doctor_id STRING,
  copywriter_id STRING,
  seller_id STRING,
  cashier_id STRING,
  patient_id STRING,
  created_by STRING,
  last_modified_by STRING,
  doctor_text STRING,
  copywriter_text STRING,
  seller_text STRING,
  cashier_text STRING,
  patient_text STRING,
  created_by_text STRING,
  last_modified_text STRING,
  patientorder_no BIGINT,
  patientorder_source INT,
  patientorder_source_text STRING,
  product_type INT,
  product_sub_type INT,
  product_custom_type INT,
  product_id STRING,
  product_unit STRING,
  product_unit_count DECIMAL(20,8),
  product_unit_price DECIMAL(20,8),
  product_dose_count DECIMAL(20,8),
  product_compose_type INT,
  product_is_disamounting INT,
  specification STRING,
  product_type_text STRING,
  product_sub_type_text STRING,
  product_custom_type_text STRING,
  product_name STRING,
  product_cadn STRING,
  product_spec STRING,
  pay_type INT,
  pay_sub_type INT,
  pay_third_party_extra_info STRING,
  pay_type_text STRING,
  pay_sub_type_text STRING,
  original_price DECIMAL(20,8),
  cost_price DECIMAL(20,8),
  discount_price DECIMAL(20,8),
  adjustment_price DECIMAL(20,8),
  promotion_price DECIMAL(20,8),
  promotion_info STRING,
  should_receive_price DECIMAL(20,8),
  received_price DECIMAL(20,8),
  is_air_pharmacy INT,
  source_form_item_id STRING,
  associate_form_item_id STRING,
  source_form_type INT,
  v2_charge_form_id STRING,
  usage STRING,
  record_type INT,
  calc_unit STRING,
  calc_count DECIMAL(20,8),
  classify_level_1_id STRING,
  classify_level_2_id INT,
  prescription_type INT ,
  visit_source_remark STRING,
  comment STRING ,
  member_present DECIMAL(20,8),
  member_principal DECIMAL(20,8),
  process_type INT,
  process_sub_type INT,
  discount_info STRING ,
  deduct_promotion_price DECIMAL(15,4) ,
  discount_promotion_price DECIMAL(15,4) ,
  gift_promotion_price DECIMAL(15,4) ,
  coupon_promotion_price DECIMAL(15,4) ,
  member_info STRING ,
  member_id STRING ,
  visit_source_level_1_id STRING ,
  visit_source_level_2_id STRING ,
  visit_source_from STRING ,
  charge_sheet_total_price DECIMAL(15,4) ,
  charge_sheet_deduct_price DECIMAL(15,4) ,
  charge_sheet_receivable_price DECIMAL(15,4) ,
  charge_sheet_discount_price DECIMAL(15,4) ,
  charge_sheet_adjustment_price DECIMAL(15,4) ,
  record_discount_price DECIMAL(15,4) ,
  record_adjustment_price DECIMAL(15,4) ,
  charged_by STRING ,
  health_card_cash_receivable_fee DECIMAL(15,4) ,
  health_card_cash_received_fee DECIMAL(15,4) ,
  health_card_med_type STRING ,
  is_chain_guest INT ,
  is_clinic_guest INT ,
  scene_type INT ,
  seller_department_id STRING ,
  nurse_id STRING ,
  item_doctor_id STRING ,
  item_doctor_department_id STRING ,
  item_nurse_id STRING )
using parquet  
PARTITIONED BY (ts STRING)  
LOCATION 's3://salunchbucket/data/alluxio/1000/dwd_charge_transaction_record_v_partition_s3'

如上我们分别创建了 S3 路径,Alluxio 缓存路径和 HDFS 本地路径三种类型的 DWD 宽表。

初始化宽表数据

我们同样通过 spark-sql 随机初始化表数据:

spark-sql —jars /usr/lib/hudi/hudi-spark-bundle.jar \
          --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
          --conf "spark.driver.memory=32G" \
          --conf "spark.executor.memory=40G" \
          --conf "spark.sql.shuffle.partitions=100" \
          --conf "spark.dynamicAllocation.enabled=true" \
          -f initial.sql
          
spark-sql —jars /usr/lib/hudi/hudi-spark-bundle.jar \
          --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
          --conf "spark.driver.memory=32G" \
          --conf "spark.executor.memory=40G" \
          --conf "spark.sql.shuffle.partitions=100" \
          --conf "spark.dynamicAllocation.enabled=true" \
          -f initial_hdfs.sql
          
spark-sql --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 
          --conf  "spark.driver.memory=32G"  
          --conf  "spark.executor.memory=40G"
          --conf  "spark.sql.shuffle.partitions=100"
          --conf  "spark.dynamicAllocation.enabled=true" 
-f initial_alluxio.sql       

insert select 构造 dummy 数据脚本示例如下:

insert into
    tpcds_text_1000.dwd_charge_transaction_record_v_partition (
        chain_id,
        create_time,
        id,
        type,
        type_text,
        is_deleted,
        retail_type,
        charge_sheet_type,
        .....省略
        ) (
        select
            cast(ceil(rand() * 10000) as string),
            create_time,
            cast(ceil(rand() * 100000000000) as string) as id,
            ....,
            ate_format(
                date_add(now(), cast(rand() * 365 as int)),
                'yyyy-MM-dd'
            ) as ts
        from
            tpcds_text_1000.dwd_charge_transaction_record_v_partition

如上我们通过 random 随机生成 id 及时间错字断,确保测试生成的测试数据不出现热点和倾斜,然后我们再使用 spark sql loop 循环,通过 select insert 方式,快速构造千万级别的 dummy 数据。

示例脚本如下所示:

for i in {1..20}; do
    spark-sql --jars /usr/lib/hudi/hudi-spark3-bundle_2.12-0.12.1-amzn-0.jar \
              --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
              --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
              --conf  "spark.driver.memory=32G"  --conf  "spark.executor.memory=40G" \
              --conf "spark.dynamicAllocation.enabled=true" -f insert.sql
done

nohup ./insert_s3.sh &
nohup ./insert_alluxio.sh &
nohup ./insert_hdfs.sh &

DWD 宽表查询对比

构造完成 dummy data 后,我们使用 presto cli 工具,分别查询 HDFS,S3 和 Alluxio 路径的三个表进行对比测试。

presto-cli 命令行格式如下:

presto-cli --catalog=hive --schema=tpcds_text_1000

其中 catalog 为元数据编目源,这里我们都是用的 Amazon EMR 上 hive glue catalog schema 为具体的数据库,也就是我们建 S3,HDFS 和 Alluxio 路径的三张表所在的 hive database 库。

S3 路径表查询延迟:21s

presto:tpcds_text_1000> 
select count(1) from 
dwd_charge_transaction_record_v_partition_s3 
 group by (type,ts);
[hadoop@ip-172-31-31-170 ~]$ presto-cli —catalog=hive —schema=tpcds_text_1000
presto:tpcds_text_1000> select ts,type,count(1) from dwd_charge_transaction_record_v_partition_s3 group by (ts, type);
ts | type | _col2
---------+------+--------
2023-09 | 1 | 266258
2024-04 | 1 | 266481
2023-08 | 1 | 275578
2024-06 | 1 | 267505
2024-05 | 1 | 275540
2023-12 | 1 | 275197
2023-07 | 1 | 35928
2023-10 | 1 | 275624
2024-02 | 1 | 257159
2024-07 | 1 | 231374
2024-01 | 1 | 274622
2024-03 | 1 | 276074
2023-11 | 1 | 266692
(13 rows)
Query 20230728_095032_00062_58nbw, FINISHED, 6 nodes
Splits: 48,859 total, 48,859 done (100.00%)
0:21 [3.24M rows, 1.15GB] [153K rows/s, 55.7MB/s]

Alluxio 集群 cache 查询:6s

presto:tpcds_text_1000> 
select ts,type,count(1) 
from dwd_charge_transaction_record_v_partition_alluxio 
group by (ts, type);
ts | type | _col2
---------+------+--------
2023-09 | 1 | 266258
2024-04 | 1 | 266481
2023-08 | 1 | 275578
2024-06 | 1 | 267505
2024-05 | 1 | 275540
2023-12 | 1 | 275197
2023-07 | 1 | 35928
2023-10 | 1 | 275624
2024-02 | 1 | 257159
2024-07 | 1 | 231374
2024-01 | 1 | 274622
2024-03 | 1 | 276074
2023-11 | 1 | 266692
(13 rows)
Query 20230728_095110_00063_58nbw, FINISHED, 6 nodes
Splits: 42,917 total, 42,917 done (100.00%)
0:06 [3.24M rows, 1.01GB] [575K rows/s, 184MB/s]

HDFS 表路径查询:16s

presto:tpcds_text_1000> 
select count(1) from 
dwd_charge_transaction_record_v_partition_hdfs 
group by (type,ts);
 _col0  
--------
 275559 
 275286 
 266952 
 266542 
 275150 
 266421 
 275864 
 276080 
 230994 
  35632 
 275790 
 265800 
 257962 
(13 rows)
Query 20230803_072151_00017_3q58d, FINISHED, 4 nodes
Splits: 42,853 total, 42,853 done (100.00%)
0:16 [3.24M rows, 1.01GB] [431K rows/s, 138MB/s]

可以看出,Alluxio 路径的 DWD 宽表性能明显优于 HDFS 和 S3,缓存是生效的,且能大幅提升 DWD/DAS 等数仓层的查询性能。

小结

通过测试和实验对比, 在使用 Hudi 表作为数据 ODS 入湖的场景下,使用 Alluxio 集群缓存路径,在通过 flink 写入后,通过 Presto 可以查询出最新的数据,并且查询效率提升了约 3.5 倍。在非 ODS 入湖写后读的 DWD 等分析查询场景下,通过比对测试 Presto 查询 S3 路径、HDFS 路径与 Alluxio 集群路径的查询性能,发现 Alluxio 集群路径相比 S3 路径,也能提升约 3.5 倍的效率,即使相比采用通用 SSD 盘构建的 HDFS,也能提升 25% 的性能。

综上所述,在实时数据湖场景中,通过引入 Alluxio 集群缓存,缩短了数据入湖时间,并大幅提升了数仓查询效率,本文中的示例脚本和代码,可以供感兴趣的小伙伴根据自己业务场景进行 Alluxio 的集成实施和优化。

参考资料

如果您希望了解更多关于AWS EMR与Alluxio的资料,可以参考如下链接:

本篇作者

唐清原

亚马逊云科技高级解决方案架构师,负责 Data Analytic & AIML 产品服务架构设计以及解决方案。10+数据领域研发及架构设计经验,历任 IBM 咨询顾问,Oracle 高级咨询顾问,澳新银行数据部领域架构师职务。在大数据 BI,数据湖,推荐系统,MLOps 等平台项目有丰富实战经验。