现有数据湖虽然通过 Hudi/iceberg 及 Flink 等技术组件实现了实时更新入湖,但在湖数据查询性能上还存在诸多挑战,如多库多表数据入湖时,hudi 小文件过多导致 ODS 层元数据膨胀,查询响应时间高,ETL 后的 hudi 宽表聚合查询资源开销居高不下,响应时延很难达到秒级等。
为了进一步提升数据湖上 ODS 及 DWD 宽表查询性能,提升 Spark/Presto 等计算引擎的 IO 效率,本文引入 Alluxio 集群缓存,对 S3,hdfs 等数据进行 warm up 或直写缓存透传,加速 ODS 数据入湖的写后读,以及 DWD 宽表的查询性能。
Alluxio 是一个开源的分布式文件系统,主要功能是在计算框架(如 Spark、Presto 等)和底层存储系统(如 HDFS、S3 等)之间提供高速的内存级缓存,Alluxio 是大数据生态系统一个非常重要的组件,作为大数据计算框架的缓存层,通过内存加速读写速度,使得大数据应用可以跨存储框架获取数据,提供统一而高效的数据访问。
通过搭建独立的 Alluxio 集群,客户可以充分利用其多主特性,实现高可用性和容错性,确保数据的持续可访问性。同时,Alluxio 的多级目录挂载功能使得数据可以按层次结构进行组织,对接后端多种存储系统,比如 root 根/目录对接 S3 云存储,/user 目录对接后端本地 HDFS,从而有效提高数据访问的效率与可维护性,如下图所示。
我们按照如下架构,在 Amazon EMR 上集成 Alluxio 集群,针对 Flink 实时写入 Hudi 的 ODS 数据入湖的场景,配置 Flink 直写 Alluxio 集群缓存,并在写入后,通过 Presto 查询 ODS 缓存层数据,进行性能对比。
如上所示我们把 Alluxio 的 master 节点(alluxio.master.hostname),以及对接的后端 S3 路径(alluxio.master.mount.table.root.ufs)加入 flink yaml 配置中,这样 flink 写入 ODS 时,即可识别到 alluxio 缓存 uri 从而直接写入 alluxio 缓存集群中。
如上即可轻松创建 ODS 的 Alluxio 缓存表,该表对接上文章节中提及的后端 S3 路径,从而实现在 Flink 入湖的时候直接写入 Alluxio 缓存及透传到 S3 数据湖。
刚才我们实现了在 ODS 入湖的时候,通过 Flink 直写 Alluxio 集群缓存,加速 ODS 写后读性能,但有客户 ODS 没有使用 Flink(比如通过离线文件,ETL 入 ODS),此时如果我们也希望通过 Alluxio 集群缓存,提升后续数仓层得数据分析的性能,比如 Presto 对 DWD/DWS 层的查询延迟,我们接下来将基于一张 DWD 层的宽表,分别使用 Alluxio 路径、S3 路径、HDFS 路径配置创建三张不同路径的表,然后进行性能对比测试。
spark-sql --jars /usr/lib/hudi/hudi-spark-bundle.jar --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf "spark.driver.memory=32G" --conf "spark.executor.memory=40G" --conf "spark.dynamicAllocation.enabled=true"
CREATE EXTERNAL TABLE tpcds_text_1000.dwd_charge_transaction_record_v_partition_alluxio (
chain_id STRING,
create_time STRING,
id STRING,
type INT,
type_text STRING,
is_deleted INT,
retail_type INT,
charge_sheet_type INT,
v2_patient_order_id STRING,
v2_charge_sheet_id STRING,
v2_transaction_id STRING,
v2_charge_form_item_id STRING,
v2_charge_coupon_promotion_info_id BIGINT,
v2_charge_gift_promotion_info_id BIGINT,
v2_registration_form_item_id STRING,
v2_patient_member_bill_id STRING,
v2_outpatient_sheet_id STRING,
clinic_id STRING,
department_id STRING,
chain_text STRING,
clinic_text STRING,
department_text STRING,
doctor_id STRING,
copywriter_id STRING,
seller_id STRING,
cashier_id STRING,
patient_id STRING,
created_by STRING,
last_modified_by STRING,
doctor_text STRING,
copywriter_text STRING,
seller_text STRING,
cashier_text STRING,
patient_text STRING,
created_by_text STRING,
last_modified_text STRING,
patientorder_no BIGINT,
patientorder_source INT,
patientorder_source_text STRING,
product_type INT,
product_sub_type INT,
product_custom_type INT,
product_id STRING,
product_unit STRING,
product_unit_count DECIMAL(20,8),
product_unit_price DECIMAL(20,8),
product_dose_count DECIMAL(20,8),
product_compose_type INT,
product_is_disamounting INT,
specification STRING,
product_type_text STRING,
product_sub_type_text STRING,
product_custom_type_text STRING,
product_name STRING,
product_cadn STRING,
product_spec STRING,
pay_type INT,
pay_sub_type INT,
pay_third_party_extra_info STRING,
pay_type_text STRING,
pay_sub_type_text STRING,
original_price DECIMAL(20,8),
cost_price DECIMAL(20,8),
discount_price DECIMAL(20,8),
adjustment_price DECIMAL(20,8),
promotion_price DECIMAL(20,8),
promotion_info STRING,
should_receive_price DECIMAL(20,8),
received_price DECIMAL(20,8),
is_air_pharmacy INT,
source_form_item_id STRING,
associate_form_item_id STRING,
source_form_type INT,
v2_charge_form_id STRING,
usage STRING,
record_type INT,
calc_unit STRING,
calc_count DECIMAL(20,8),
classify_level_1_id STRING,
classify_level_2_id INT,
prescription_type INT ,
visit_source_remark STRING,
comment STRING ,
member_present DECIMAL(20,8),
member_principal DECIMAL(20,8),
process_type INT,
process_sub_type INT,
discount_info STRING ,
deduct_promotion_price DECIMAL(15,4),
discount_promotion_price DECIMAL(15,4) ,
gift_promotion_price DECIMAL(15,4) ,
coupon_promotion_price DECIMAL(15,4) ,
member_info STRING ,
member_id STRING ,
visit_source_level_1_id STRING ,
visit_source_level_2_id STRING ,
visit_source_from STRING ,
charge_sheet_total_price DECIMAL(15,4) ,
charge_sheet_deduct_price DECIMAL(15,4) ,
charge_sheet_receivable_price DECIMAL(15,4) ,
charge_sheet_discount_price DECIMAL(15,4),
charge_sheet_adjustment_price DECIMAL(15,4) ,
record_discount_price DECIMAL(15,4) ,
record_adjustment_price DECIMAL(15,4) ,
charged_by STRING ,
health_card_cash_receivable_fee DECIMAL(15,4) ,
health_card_cash_received_fee DECIMAL(15,4) ,
health_card_med_type STRING ,
is_chain_guest INT,
is_clinic_guest INT ,
scene_type INT ,
seller_department_id STRING ,
nurse_id STRING ,
item_doctor_id STRING ,
item_doctor_department_id STRING ,
item_nurse_id STRING )
using parquet
PARTITIONED BY (ts STRING)
LOCATION 'alluxio:///1000/dwd_charge_transaction_record_v_partition_alluxio'
CREATE EXTERNAL TABLE tpcds_text_1000.dwd_charge_transaction_record_v_partition_hdfs (
chain_id STRING,
create_time STRING,
id STRING,
type INT,
type_text STRING,
is_deleted INT,
retail_type INT,
charge_sheet_type INT,
v2_patient_order_id STRING,
v2_charge_sheet_id STRING,
v2_transaction_id STRING,
v2_charge_form_item_id STRING,
v2_charge_coupon_promotion_info_id BIGINT,
v2_charge_gift_promotion_info_id BIGINT,
v2_registration_form_item_id STRING,
v2_patient_member_bill_id STRING,
v2_outpatient_sheet_id STRING,
clinic_id STRING,
department_id STRING,
chain_text STRING,
clinic_text STRING,
department_text STRING,
doctor_id STRING,
copywriter_id STRING,
seller_id STRING,
cashier_id STRING,
patient_id STRING,
created_by STRING,
last_modified_by STRING,
doctor_text STRING,
copywriter_text STRING,
seller_text STRING,
cashier_text STRING,
patient_text STRING,
created_by_text STRING,
last_modified_text STRING,
patientorder_no BIGINT,
patientorder_source INT,
patientorder_source_text STRING,
product_type INT,
product_sub_type INT,
product_custom_type INT,
product_id STRING,
product_unit STRING,
product_unit_count DECIMAL(20,8),
product_unit_price DECIMAL(20,8),
product_dose_count DECIMAL(20,8),
product_compose_type INT,
product_is_disamounting INT,
specification STRING,
product_type_text STRING,
product_sub_type_text STRING,
product_custom_type_text STRING,
product_name STRING,
product_cadn STRING,
product_spec STRING,
pay_type INT,
pay_sub_type INT,
pay_third_party_extra_info STRING,
pay_type_text STRING,
pay_sub_type_text STRING,
original_price DECIMAL(20,8),
cost_price DECIMAL(20,8),
discount_price DECIMAL(20,8),
adjustment_price DECIMAL(20,8),
promotion_price DECIMAL(20,8),
promotion_info STRING,
should_receive_price DECIMAL(20,8),
received_price DECIMAL(20,8),
is_air_pharmacy INT,
source_form_item_id STRING,
associate_form_item_id STRING,
source_form_type INT,
v2_charge_form_id STRING,
usage STRING,
record_type INT,
calc_unit STRING,
calc_count DECIMAL(20,8),
classify_level_1_id STRING,
classify_level_2_id INT,
prescription_type INT ,
visit_source_remark STRING,
comment STRING ,
member_present DECIMAL(20,8),
member_principal DECIMAL(20,8),
process_type INT,
process_sub_type INT,
discount_info STRING ,
deduct_promotion_price DECIMAL(15,4) ,
discount_promotion_price DECIMAL(15,4) ,
gift_promotion_price DECIMAL(15,4) ,
coupon_promotion_price DECIMAL(15,4) ,
member_info STRING ,
member_id STRING ,
visit_source_level_1_id STRING ,
visit_source_level_2_id STRING ,
visit_source_from STRING ,
charge_sheet_total_price DECIMAL(15,4) ,
charge_sheet_deduct_price DECIMAL(15,4) ,
charge_sheet_receivable_price DECIMAL(15,4) ,
charge_sheet_discount_price DECIMAL(15,4) ,
charge_sheet_adjustment_price DECIMAL(15,4) ,
record_discount_price DECIMAL(15,4) ,
record_adjustment_price DECIMAL(15,4) ,
charged_by STRING ,
health_card_cash_receivable_fee DECIMAL(15,4) ,
health_card_cash_received_fee DECIMAL(15,4) ,
health_card_med_type STRING ,
is_chain_guest INT ,
is_clinic_guest INT ,
scene_type INT ,
seller_department_id STRING ,
nurse_id STRING ,
item_doctor_id STRING ,
item_doctor_department_id STRING ,
item_nurse_id STRING )
using parquet
PARTITIONED BY (ts STRING)
LOCATION 'hdfs:///1000/dwd_charge_transaction_record_v_partition_hdfs'
CREATE EXTERNAL TABLE tpcds_text_1000.dwd_charge_transaction_record_v_partition_s3 (
chain_id STRING,
create_time STRING,
id STRING,
type INT,
type_text STRING,
is_deleted INT,
retail_type INT,
charge_sheet_type INT,
v2_patient_order_id STRING,
v2_charge_sheet_id STRING,
v2_transaction_id STRING,
v2_charge_form_item_id STRING,
v2_charge_coupon_promotion_info_id BIGINT,
v2_charge_gift_promotion_info_id BIGINT,
v2_registration_form_item_id STRING,
v2_patient_member_bill_id STRING,
v2_outpatient_sheet_id STRING,
clinic_id STRING,
department_id STRING,
chain_text STRING,
clinic_text STRING,
department_text STRING,
doctor_id STRING,
copywriter_id STRING,
seller_id STRING,
cashier_id STRING,
patient_id STRING,
created_by STRING,
last_modified_by STRING,
doctor_text STRING,
copywriter_text STRING,
seller_text STRING,
cashier_text STRING,
patient_text STRING,
created_by_text STRING,
last_modified_text STRING,
patientorder_no BIGINT,
patientorder_source INT,
patientorder_source_text STRING,
product_type INT,
product_sub_type INT,
product_custom_type INT,
product_id STRING,
product_unit STRING,
product_unit_count DECIMAL(20,8),
product_unit_price DECIMAL(20,8),
product_dose_count DECIMAL(20,8),
product_compose_type INT,
product_is_disamounting INT,
specification STRING,
product_type_text STRING,
product_sub_type_text STRING,
product_custom_type_text STRING,
product_name STRING,
product_cadn STRING,
product_spec STRING,
pay_type INT,
pay_sub_type INT,
pay_third_party_extra_info STRING,
pay_type_text STRING,
pay_sub_type_text STRING,
original_price DECIMAL(20,8),
cost_price DECIMAL(20,8),
discount_price DECIMAL(20,8),
adjustment_price DECIMAL(20,8),
promotion_price DECIMAL(20,8),
promotion_info STRING,
should_receive_price DECIMAL(20,8),
received_price DECIMAL(20,8),
is_air_pharmacy INT,
source_form_item_id STRING,
associate_form_item_id STRING,
source_form_type INT,
v2_charge_form_id STRING,
usage STRING,
record_type INT,
calc_unit STRING,
calc_count DECIMAL(20,8),
classify_level_1_id STRING,
classify_level_2_id INT,
prescription_type INT ,
visit_source_remark STRING,
comment STRING ,
member_present DECIMAL(20,8),
member_principal DECIMAL(20,8),
process_type INT,
process_sub_type INT,
discount_info STRING ,
deduct_promotion_price DECIMAL(15,4) ,
discount_promotion_price DECIMAL(15,4) ,
gift_promotion_price DECIMAL(15,4) ,
coupon_promotion_price DECIMAL(15,4) ,
member_info STRING ,
member_id STRING ,
visit_source_level_1_id STRING ,
visit_source_level_2_id STRING ,
visit_source_from STRING ,
charge_sheet_total_price DECIMAL(15,4) ,
charge_sheet_deduct_price DECIMAL(15,4) ,
charge_sheet_receivable_price DECIMAL(15,4) ,
charge_sheet_discount_price DECIMAL(15,4) ,
charge_sheet_adjustment_price DECIMAL(15,4) ,
record_discount_price DECIMAL(15,4) ,
record_adjustment_price DECIMAL(15,4) ,
charged_by STRING ,
health_card_cash_receivable_fee DECIMAL(15,4) ,
health_card_cash_received_fee DECIMAL(15,4) ,
health_card_med_type STRING ,
is_chain_guest INT ,
is_clinic_guest INT ,
scene_type INT ,
seller_department_id STRING ,
nurse_id STRING ,
item_doctor_id STRING ,
item_doctor_department_id STRING ,
item_nurse_id STRING )
using parquet
PARTITIONED BY (ts STRING)
LOCATION 's3://salunchbucket/data/alluxio/1000/dwd_charge_transaction_record_v_partition_s3'
如上我们通过 random 随机生成 id 及时间错字断,确保测试生成的测试数据不出现热点和倾斜,然后我们再使用 spark sql loop 循环,通过 select insert 方式,快速构造千万级别的 dummy 数据。
其中 catalog 为元数据编目源,这里我们都是用的 Amazon EMR 上 hive glue catalog schema 为具体的数据库,也就是我们建 S3,HDFS 和 Alluxio 路径的三张表所在的 hive database 库。
通过测试和实验对比, 在使用 Hudi 表作为数据 ODS 入湖的场景下,使用 Alluxio 集群缓存路径,在通过 flink 写入后,通过 Presto 可以查询出最新的数据,并且查询效率提升了约 3.5 倍。在非 ODS 入湖写后读的 DWD 等分析查询场景下,通过比对测试 Presto 查询 S3 路径、HDFS 路径与 Alluxio 集群路径的查询性能,发现 Alluxio 集群路径相比 S3 路径,也能提升约 3.5 倍的效率,即使相比采用通用 SSD 盘构建的 HDFS,也能提升 25% 的性能。
综上所述,在实时数据湖场景中,通过引入 Alluxio 集群缓存,缩短了数据入湖时间,并大幅提升了数仓查询效率,本文中的示例脚本和代码,可以供感兴趣的小伙伴根据自己业务场景进行 Alluxio 的集成实施和优化。