亚马逊AWS官方博客

使用Spline收集Spark数据血缘实践

很多企业为满足传统大数据分析T+1的批处理或是实时性的流处理业务场景,往往会选择主流大数据框架Apache Spark结合Amazon Glue和这种完全托管的,无服务器的ETL服务或者Amazon EMR托管大数据平台在亚马逊云上搭建数据湖,但由于目前没有原生基于Amazon Glue实现数据血缘的功能,很多客户需要自己开发或者使用Atlas,但目前Atlas对Spark支持很有限,比如最高只支持到2.4版本,也不支持字段级别的血缘,我们迫切需要找到更优的血缘方案解决Atlas目前的问题。

而针对以上场景,我们会在这篇博客中给出一个开源的解决方案Spline。对比其它开源数据血缘方案,Spline支持Glue和EMR代码级别的血缘包括DataFrame API和SparkSQL, 可以做到从整个数据链路实时捕获追踪血缘。另外Spline还包括支持字段血缘,可以可视化向上向下溯源, 支持Spark Structured Streaming结合最受欢迎的开源数据湖框架如Hudi和Delta Lake,提供统一流批的血缘,同时Spline提供Producer & Consumer API,方便第三方集成和以API形式的二次开发。

本文会带您一步一步地在AWS上部署Spline,提供在Glue和EMR中实现Spark代码级别的血缘,并且结合开源数据湖架构Hudi和Delta Lake来提供批流入湖实例,本文会重点介绍如何使用Spline来捕获,关于使用Glue和EMR的资料相对丰富,可以参考官方文档以及相关博客,本文不再重复。

1. 什么是数据血缘

数据血缘(Data Lineage)又叫做数据起源(Data Provenance) 或者数据家谱(Data Pedigree)。其通常被定义为一种生命周期,主要包含数据的来源以及数据随时间移动的位置。数据血缘是数据治理的最重要组成部分之一,虽然数据目录提供了元数据管理特性和搜索功能,但数据血缘通过更详细地捕捉数据源之间的关系、数据来源以及数据转换和聚合的方式,揭示了数据的生命周期,旨在显示从开始到结束的完整的数据流。数据血缘是理解、记录和可视化数据的过程,因为它从数据源流向消费。这包括数据在整个过程中所经历的所有转换:数据是如何转换的,发生了什么变化,以及为什么发生了变化。有助于数据工程师查看和跟踪数据流,并理解特定指标和数据集的质量以及来源,了解数据集之间的依赖关系,当出现问题时,更好的评估对数据链路的影响并快速定位问题以及修复。

1.1 数据血缘的应用场景

在数据的处理过程中,从数据源头到最终的消费数据,每个环节都可能会导致我们出现数据质量的问题。比如我们数据源本身数据质量不高,在后续的处理环节中如果没有进行数据质量的检测和处理,那么这个数据信息最终流转到我们的目标表,它的数据质量也是有问题的。也有可能在某个环节的数据处理中,我们对数据进行了一些不恰当的处理,导致后续环节的数据质量变得糟糕。因此,对于数据的血缘关系,我们要确保每个环节都要注意数据质量的检测和处理,那么我们后续数据才会有优良的基因,即有很高的数据质量。

数据血缘队对整个数据上下游的用户有不同视角,例如数据工程师和ETL开发人员一般需要验证、自信地处理原始数据包括data cleaning, de-duplicate, data masking, upsert,sorting, aggregation等等操作,识别上游依赖项和下游使用,提高数据质量,评估对服务影响,提高 Data Pipeline SLA和优化调度。对于业务用户和数据平台运维管理部门,他们更关心如何为数据湖消费者建立对数据的信任和透明度, 为数据治理和监管部门提供审计跟踪,如何理解捕获数据源之间的关系,数据经过了哪些关键环节,每个环节是有谁来负责,如何运维排错和故障溯源,如何保证数据安全性和合规性。

2. 什么是Spline

Spline 即 Spark Lineage,是一个专注Spark的数据血缘追踪工具,spline的目标是创建一种简单且高效的方法捕获Spark血缘,同时提供API,方便第三方去扩展和开发。

spline在架构上可以分为四部分

  • Spline Server
  • Spline Agent
  • ArangoDB
  • Spline UI

spline server 是 spline的核心。它通过 producer api 接收来自agent的血缘数据,并将其存储在 ArangoDB 中。另一方面,它为读取和查询血缘数据提供了 Consumer API。消费者 API 由spline UI 使用,但也可以由第三方应用程序使用。

spline agent 从数据转换管道中捕获沿血缘和元数据,并通过使用 HTTP API (称为 Producer API) ,以标准格式将其发送到spline server,最终血缘数据被处理并以图的形式存储,并且可以通过另一个 REST API (称为 Consumer API)访问。

ArangoDB 是一个原生多模型数据库,兼有key/value键/值对、graph图和document文档数据模型,提供了涵盖三种数据模型的统一的数据库查询语言,并允许在单个查询中混合使用三种模型。基于其本地集成多模型特性,您可以搭建高性能程序,并且这三种数据模型均支持水平扩展。

Spline UI 是可视化渲染数据血缘的endpoint,可以按application绘制作业的表血缘,字段血缘,以及每一个stage的输入输出schema。

3. Spline 部署

我们采用在EC2上以docker compose的方式容器化部署Spline,需提前安装好Docker和Compose。也可以参考在亚马逊云上部署Spline的详细例子:https://github.com/AbsaOSS/spline-getting-started/tree/main/spline-on-AWS-demo-setup

wget https://raw.githubusercontent.com/AbsaOSS/spline-getting-started/main/docker/docker-compose.yml

wget https://raw.githubusercontent.com/AbsaOSS/spline-getting-started/main/docker/.env

sudo docker-compose up

修改.env确保同VPC内可以访问, 拉起4个组件,分别是Spline UI,图数据库arangoDB,以及Spline server,Agent,其中agent会执行多个测试脚本,结果写入Spline Server

启动后进入spline UI查看,我们看到Spline分为Execution Events,Data Sources,Execution Plans几个部分

点击一个测试生成的Execution event,首先显示的是表血缘

点击链接Table的job查看每一步stage的输入输出

点击任意schema中的字段,显示该字段的上下游字段血缘

Spline捕获血缘的内容还是很充分的,接下来我们在Glue中使用spline尝试血缘捕获

4. 动手实验

4.1 Spline 与 Glue Studio的集成

AWS Glue Studio 是一个新图形界面,可以方便地在 AWS Glue 中创建、运行和监控ETL任务。数据人员可以直观地编写数据转换工作流,并在 AWS Glue 的基于 Apache Spark 的无服务器 ETL 引擎上顺畅运行。您可以在任务的每个步骤中检查架构和数据结果。

4.1.1 首先在Glue studio中通过图形化界面托拉拽一个DAG来建一个ETL Job

4.1.2 然后只需点击Job details,添加Dependent JARS path, 加入我们的Spline Agent Jar文件

s3://aws-virginia-spline-only-jar/za.co.absa.spline.agent.spark_spark-3.1-spline-agent-bundle_2.12-0.6.2.jar

4.1.3 在Advanced properties 添加 Job parameters来制定spline相关参数。

Key Value
1 –conf spark.spline.producer.url=http://{Spline_server_IP}:48080/producer  –conf  spark.spline.mode=REQUIRED —conf spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener
2 –enable-glue-datacatalog

4.1.4 点击Save,然后点击Run。 作业执行面成功后我们可以看到血缘被Spline成功捕获

spline event页面

可以看到完美捕获血缘

4.2 Spline 与 SparkSQL on Glue的集成

另外,我也可以通过Glue来创建一个以SparkSQL为主的Job

4.2.1  进入Amazon Glue,  首先在数据库里新建一个数仓  prd_dw

4.2.2   点击作业,然后添加作业

本例我们采用将配置写在代码中来实现血缘捕获

4.2.3,在从属JAR路径选添加jars path

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
sc.setSystemProperty('spline.mode','REQUIRED')
sc.setSystemProperty('spline.producer.url','http://{Spline_server_IP}:48080/producer')
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.conf.set("spark.sql.debug.maxToStringFields",2000)
sc._jvm.za.co.absa.spline.harvester.SparkLineageInitializer.enableLineageTracking(spark._jsparkSession)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# 以上通用

# src->ods
df=spark.sql("select * from s3.aws_virginia_spline_test_data")
df.write\
.format('parquet')\
.mode('overwrite')\
.option('path','s3://neo-aws-ohio-emr-task-abc/ods_order_a')\
.saveAsTable('prd_dw.ods_order_a')

# ods->dwd
df=spark.sql("select * from prd_dw.ods_order_a where caseid>10009")
df.write\
.format('parquet')\
.mode('overwrite')\
.option('path','s3://neo-aws-ohio-emr-task-abc/dwd_order_a')\
.saveAsTable('prd_dw.dwd_order_a')


# dwd->ads
df=spark.sql("select caseid, count(dAge) as num from prd_dw.dwd_order_a  group by caseid")
df.write\
.format('parquet')\
.mode('overwrite')\
.option('path','s3://neo-aws-ohio-emr-task-abc/ads_order_a')\
.saveAsTable('prd_dw.ads_order_a')

job.commit()

查看glue catalog

我们通过Spline UI界面查看到血缘被捕获成功

4.2.4 查看spline UI 血缘

执行DAG

表血缘

字段血缘和每步的schema

4.3 Spline收集Spark Structured Streaming+Delta Lake on Glue的血缘

针对准实时的场景下使用开源Delta Lake结合Amazon Glue的无服务器数据湖架构在企业客户里使用越来越普遍,本例尝试使用spline来统一Delta数据湖中的流和批的血缘。如下图,spline的会通过Agent从数据转换管道ETL工作流中捕获沿血缘,并通过使用 HTTP API (称为 Producer API) ,以标准格式将其发送到spline server,最后Consumer API读取和查询血缘数据。

架构上第一个Job负责从MSK读取原始数据并且把数据以Delta的格式写入原始数据层(raw layer),同时我们会按processing time以年, 月, 天, 小时来分区。数据在进入这一层前没有做数据清洗和处理,并允许重复订单数据,第二个Job从原始数据层读取数据,然后进行数据清洗,实现upsert逻辑然后写入处理层(processed layer)的ODS表供下游消费端查询,这一层类似ODS层, 可以做数据清洗和去重等业务逻辑。同时,我们使用 Spark原生的Run Once trigger特性实现一次性的流式处理,然后停止集群,避免了24*7运行成本。本篇博客主要以第一个红色画框的Job为例子。

4.3.1  新建第一个Job

注意运行程序需要使用到Delta和Spline的两个Jar包。然后向下滚动进入到“安全配置、脚本库和作业参数(可选)”环节,在”Python 库路径”和“从属 JAR 路径”的输入框中分别将前面上传到桶里的两个依赖Jar包的S3路径粘贴进去。如下图所示:(记住,中间要使用逗号分隔)

接下来,在“作业参数”环节,添加三个作业参数。我们需要把 kafka的bootstrap_servers, topic和数据湖S3桶的名称以“作业参数”的形式传给示例程序,以便其可以拼接出Delta数据集的完整路径,这个值会在读写Delta数据集时使用,因为Delta数据集会被写到这个桶里。

以下是第一个Job的脚本示例, 注意目前只可以捕获在Structured Streaming以foreachBatch + DataFrame.write的方式捕获写入的血缘,对原生的writeStream的支持目前正在开发中,具体可以参考:https://github.com/AbsaOSS/spline-spark-agent/issues/41

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# Import the packages
from delta import *
from pyspark.sql.session import SparkSession
from datetime import datetime

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col, from_json, lit

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'bucket_name', 'bootstrap_servers', 'topic'])

##conf 写code里
sc = SparkContext()
sc.setSystemProperty('spline.mode','REQUIRED')
sc.setSystemProperty('spline.producer.url','http://{Spline_server_IP}/producer')
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.debug.maxToStringFields",2000)
sc._jvm.za.co.absa.spline.harvester.SparkLineageInitializer.enableLineageTracking(spark._jsparkSession)
glueContext = GlueContext(sc)


job = Job(glueContext)
job.init(args['JOB_NAME'], args)


data_bucket = args['bucket_name']
bootstrap_servers = args['bootstrap_servers']
topic = args['topic']

schema = StructType([ \
  StructField("order_id", IntegerType(), True), \
  StructField("order_owner", StringType(), True), \
  StructField("order_value", IntegerType(), True), \
  StructField("timestamp", TimestampType(), True), ])

def insertToDelta(microBatch, batchId):  
  date = datetime.today()
  year = date.strftime("%y")
  month = date.strftime("%m")
  day = date.strftime("%d")
  hour = date.strftime("%H")
  if microBatch.count() > 0:
    df = microBatch.withColumn("year", lit(year)).withColumn("month", lit(month)).withColumn("day", lit(day)).withColumn("hour", lit(hour))
    df.write.partitionBy("year", "month", "day", "hour").mode("append").format("delta").save(f"s3://{data_bucket}/raw/")

# Use IAM and SSL for data in transit
options = {
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "AWS_MSK_IAM", 
    "kafka.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
    "kafka.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
    }  

# Read Source
df = spark \
  .readStream \
  .format("kafka") \
  .options(**options) \
  .option("kafka.bootstrap.servers", bootstrap_servers) \
  .option("subscribe", topic) \
  .option("startingOffsets", "earliest") \
  .option("maxOffsetsPerTrigger", 1000) \
  .load().select(col("value").cast("STRING"))

df2 = df.select(from_json("value", schema).alias("data")).select("data.*")


# Write data as a DELTA TABLE
df3 = df2.writeStream \
  .foreachBatch(insertToDelta) \
  .option("checkpointLocation", f"s3://{data_bucket}/checkpoint/") \
  .trigger(processingTime="60 seconds") \
  .start()

df3.awaitTermination()

job.commit()

4.3.2 查看spline UI 血缘

我们看到由于spline是by事件的记录血缘,因此可以看到每次触发写入都会产生一条血缘信息。我们看到捕获的Delta格式的记录。

执行Dag

表血缘

字段血缘和每步的schema

 

4.4 Spline收集Spark Structured Streaming+ Hudi on EMR的血缘

针对准实时的场景下,基于Hudi等的数据湖框架在企业中使用越来越普遍,本例尝试使用spline来统一数据湖中的流和批血缘

架构上,采用Kafka的connector作为RDS的CDC工具,同步数据湖中的多张表格,EMR.Spark 以 Structured Streaming 的方式消费 Kafka ,并将数据写入S3的Hudi-ODS表,并注册catalog。。

我们以pyspark的形式运行本例的结构流处理

启动pyspark,并带上spline的jar以及配置

SPARK_VERSION=3.1.2
SCALA_VERSION=2.12         
HUDI_VERSION=0.10.0
alias nowts='date +"%s"'
pyspark \
--master yarn \
--deploy-mode client \
--name "`hostname`-`whoami`-`nowts`" \
--packages io.github.spark-redshift-community:spark-redshift_2.12:5.0.3,org.apache.spark:spark-sql-kafka-0-10_$SCALA_VERSION:$SPARK_VERSION,org.apache.hudi:hudi-spark3-bundle_$SCALA_VERSION:$HUDI_VERSION,org.apache.spark:spark-avro_$SCALA_VERSION:$SPARK_VERSION \
--jars s3://aws-virginia-spline-only-jar/za.co.absa.spline.agent.spark_spark-3.1-spline-agent-bundle_2.12-0.6.2.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.hive.convertMetastoreParquet=false' \
--conf 'spark.spline.producer.url=http://{Spline_server_IP}:48080/producer' \
--conf 'spark.spline.mode=REQUIRED' \
--conf 'spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener'

这里我们流式读取kafka多个topic,并自己推断schema且并行写入多张hudi表

from pyspark.sql.functions import explode;
from pyspark.sql.functions import split;
from pyspark.sql.functions import from_json;
spark.sql("SET spark.sql.streaming.metricsEnabled=true").show(10,False);
spark.sql("SET spark.sql.streaming.forceDeleteTempCheckpointLocation=true").show(10,False);
spark.conf.set("spark.sql.streaming.schemaInference",True)
import pyspark.sql.functions as fn;
from pyspark.sql.functions import col, struct, to_json
from pyspark.sql.types import StructType
from pyspark.sql.types import *
from concurrent.futures import ThreadPoolExecutor
import concurrent
KAFKA_BROKER="{KAFKA_ENDPOINT}"
# 定义流DF,读取多张表
kafkaDF = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", f"{KAFKA_BROKER}") \
  .option("subscribePattern", "Aurora-8-unwarp-fix-v11-json.tesla.*") \
  .option("startingOffsets", "earliest") \
  .load()
import functools
def executor_run(table, batch_df):
    tableName = table.split('.')[-1]
    # basePath = "file:///tmp/hudi_trips_cow" # file
    # basePath = "s3a://hudi/main/hudi_trips_cow_v1/" # lakeFS
    # basePath = "s3a://aws-tools-kit/hudi_trips_cow_v1/" # minIO
    basePath = f"s3a://app-util-hudi/spark/{tableName}"  # S3
    hudi_options = {
        'hoodie.table.name': tableName,
        'hoodie.datasource.write.table.name': tableName,
        'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
        'hoodie.datasource.write.operation': 'upsert',
        'hoodie.datasource.write.recordkey.field': 'id',
        'hoodie.datasource.write.partitionpath.field': 'age',
        'hoodie.datasource.write.precombine.field': 'eventTS',
        'hoodie.upsert.shuffle.parallelism': 2,
        'hoodie.insert.shuffle.parallelism': 2,
        'hoodie.bulkinsert.shuffle.parallelism': 2,
        'hoodie.delete.shuffle.parallelism': 2,
        'hoodie.datasource.hive_sync.mode': 'hms',
        'hoodie.datasource.hive_sync.auto_create_database': True,
        'hoodie.datasource.hive_sync.database': 'lake-hudi',
        'hoodie.datasource.hive_sync.table': tableName,
        'hoodie.datasource.hive_sync.partition_fields': 'age',
        'hoodie.datasource.hive_sync.partition_extractor_class': "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
        'hoodie.datasource.hive_sync.enable': "true",
    }
    one_batch_df = batch_df.filter(batch_df.topic == table).withColumn("json", col("value").cast(StringType()) \
                                                                       ).select("json")
    json_schema = spark.read.json(one_batch_df.select("json").rdd.map(lambda row: row.json)).schema
    #print(json_schema)
    df = one_batch_df.withColumn("jsonData", from_json("json", json_schema))
    one_batch_df.show(3)
    df.select("jsonData.*").write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(basePath)
def df_2_hudi_multiple_table(batch_df, batch_id):
    """
    同结构df写入hudi
    """
    tableList = batch_df.select('topic').distinct().rdd.flatMap(lambda x: x).collect()
    print(tableList)
    fs = dict()
    executor_run_partial = functools.partial(executor_run, batch_df=batch_df)
    with ThreadPoolExecutor() as executor:
        #    for table in tableList:
        #        task = executor.submit(executor_run, table)
        #        fs[task] = table
        futures = {executor.submit(executor_run_partial, table) for table in tableList}
        for fut in concurrent.futures.as_completed(futures):
            print(f"The outcome is {fut.result()}")   
kafkaDF.writeStream\
  .option("checkpointLocation", "/home/hadoop/checkpoint-all-multiple-2022-04-01-001/")\
  .trigger(processingTime='60 seconds')\
  .foreachBatch(df_2_hudi_multiple_table)\
  .start().awaitTermination()

由于spline是by事件的记录血缘,因此可以看到每次触发写入都会产生一条血缘信息

可以看到从源Kafka到最终write to s3的每一步,Spline以node来表示每一次对数据集的转换操作(Transformation)

如果选择一个Node, 可以看到从上游到下游转换操作每一步的对字段血缘影响的可视化

点击一个Node, 可以看到其具体的转化操作

5. Spline血缘总结

1. Spline可以完全捕获以Glue, EMR来运行的Job, 并且支持Spark 3.0。

2. Spline可以在不侵入代码的情况下,增加spark作业血缘,极大的降低了老脚本添加血缘的成本。

3. Spline可以支持DataFrame API和SparkSQL的数据血缘,注意:非DataFrame操作的血缘不可捕获。

4. Spline支持Spark Structured Streaming,统一流批血缘。注意:目前只可以捕获在Structured Streaming以foreachBatch + DataFrame.write的方式写入的血缘,对原生的writeStream 的支持目前正在开发中。

5. SpineSpline支持所有JDBC方式的数据血缘捕获,也支持字段级别血缘与字段衍生的可视化,且对数据湖框架支持良好。

6. Spline提供了Producer & Consumer API,方便第三方集成和二次开发。

参考资料

https://absaoss.github.io/spline/

https://github.com/AbsaOSS/spline-getting-started/tree/main/spline-on-AWS-demo-setup

https://github.com/AbsaOSS/spline-spark-agent/tree/develop/examples

https://docs.delta.io/latest/quick-start.html

https://hudi.apache.org/docs/0.10.0/writing_data/

https://en.wikipedia.org/wiki/Data_lineage

https://aws.amazon.com/blogs/big-data/making-etl-easier-with-aws-glue-studio/

本篇作者

孙健

AWS大数据解决方案架构师,负责基于AWS的大数据解决方案的咨询与架构设计,同时致力于大数据方面的研究和推广。在大数据运维调优、容器解决方案,湖仓一体以及大数据企业应用等方面有着丰富的经验。

胡晓度

AWS解决方案架构师,负责跨国企业级客户基于AWS的技术架构设计、咨询和设计优化工作。在加入AWS之前曾就职于电商Farfetch,海外政府IT部门和咨询相关企业,积累了丰富的大数据开发和数据库管理的实践经验。目前主要专注于大数据技术领域研究和AWS云服务在国内和全球的应用和推广