亚马逊AWS官方博客
基于 Datahub +Redshift 自动生成字段级血缘
![]() |
一、背景介绍
随着企业数据规模的快速增长,Amazon Redshift 作为一款全托管的 PB 级云数据仓库服务,凭借其 AI 驱动的 MPP 架构和零 ETL 能力,已成为众多企业进行数据分析和业务决策的重要支撑。然而,在数据仓库开发过程中,数据模型日益复杂,数据流转愈发频繁,如何准确追踪和管理数据血缘关系成为一个亟待解决的问题。数据血缘关系描述了数据在系统中的流动和转化路径,尤其是字段级血缘关系的追踪,对企业具有重要价值:
- 影响分析:在进行字段修改或表结构变更时,能够快速识别所有受影响的下游依赖,有效降低变更风险。
- 问题排查:当数据质量出现异常时,可以沿着血缘关系往上追溯,快速定位问题源头。
- 合规审计:通过完整的数据流转路径,确保敏感数据的处理符合数据安全和隐私法规要求。
- 优化性能:通过分析数据血缘图谱,识别数据加工环节的冗余和瓶颈,优化 ETL 流程。
DataHub 作为开源元数据平台,为 Redshift 提供了轻量级的血缘管理方案。但在实际应用中,我们发现其存在解析不稳定、 Redshift Serverless 模式不支持以及对 Hive/Spark/Flink 支持有限等问题。为此,本文将介绍一个基于 DataHub 的改进方案,以解决这些实际痛点。
二、解决方案介绍
为了基于 Redshift+DataHub 建立一套轻量级的元数据+血缘关系平台,作者引入了 SQLLineage 这个 SQL 解析的开源工具,并基于 Lambda 实现无服务器模式的 SQL 解析以及字段级血缘关系生成,最终在 DataHub 上进行元数据以及血缘关系展示,整体架构图如下所示。
![]() |
这个方案中,存在几个关键挑战:
1. 如何自动获取 Redshift 的数仓 SQL
为了基于 SQLLineage 生成血缘关系,则必须先获取到数仓相关的 INSERT SQL。而为了简化获取 SQL 的过程,本方案实现基于 Redshift 的 SYS_QUERY_HISTORY 系统表,增量读取当日的新增 INSERT SQL。同时,考虑到数仓 SQL 版本或者开发调试的场景,可能存在针对相同的目标表,一天中可能有多次写入的情况,本方案会对 SYS_QUERY_HISTORY 按时间排倒序,取最新一条,SQL 命令如下所示。
2. 如何精确地生成字段级血缘
在生成列级别的血缘关系中,如果没有表 Schema 信息,也容易出错。如下示例:
在进行 SQL 解析后,结果如下所示。
您可能会注意到:
在给定的上下文中,col4 可能来自 bar、 baz 或 quux。在没有元数据的情况下,这是 SQL 解析所能做的最好的结果。
所以接下来,本方案中需要获取 SQL 中涉及到的所有表的 Schema 信息。
3. 如何将血缘关系写入 DataHub
DataHub 提供了一套 PythonAPI,用于直接构造元数据,并使用编程方式将该元数据发送到 DataHub。以下是一个写入血缘关系的元数据的简单示例。
4. 如何具备更好的扩展性
在打通了以上所有流程以后,我们也需要思考一下方案本身的扩展性问题。因为真实场景中,可能不仅仅使用 Redshift 来做数仓,也有可能基于 Hive/Spark/Flink 来做数仓开发。因此,本方案中,通过 S3 目录做为一个解耦与体现方案扩展性的地方,如下图所示,将第 2 与第 3 步实现为通用方案,后续无论是对接 EMR 这样的计算引擎,还是 DolphinScheduler、 Airflow 这样的调度引擎,都只需要局部改造,最终获取到 SQL,并写入 S3 即可。
![]() |
三、解决方案部署
本文专注于基于 Redshift 实现端到端的血缘关系生成与展示。
1. 部署 DataHub
如果是出于 Demo 目的,可以使用 DataHub 的快速部署指南,基于 Docker Compose 搭建一个简单的 DataHub Demo 。但如果是出于生产目的,则建议基于 AWS EKS 及配套的 AWS 托管服务,构建一个相对正式、稳定、高可用的环境,部署架构图所下图所示,也可参考详细部署指南进行部署。
![]() |
2. 创建 Lambda Layer
Lambda Layer 的主要好处是代码复用和依赖管理。你可以把常用的代码和依赖库打包成一个 Layer,然后让多个 Lambda 函数共享使用,这样可以减少重复代码并简化维护。在 linux 创建 layer 包,安装方案中两个 Lambda 所需要的依赖,如下所示:
然后进入 Lambda Console 页面,创建 Layer,并上传 layer_content.zip:
![]() |
3. 创建血缘解析 Lambda 函数
血缘解析 Lambda 函数,用于接收 S3 存储桶指定目录的 SQL 文件上传事件,解析 SQL,并调用 DataHub API 生成血缘关系。进入 Lambda 创建页面。这里需要提前准备好一个具备 CloudWatch Logs 读写权限与 S3 只读权限的 Lambda Service Role。
![]() |
为了内网访问 DataHub,需要将该 Lambda 设置了 VPC 模式。同时,需要保证 DataHub 所在服务器的安全组,能接受当前 Lambda 的访问。
![]() |
将如下代码贴入 Lambda 的 Code 页面,然后点击部署按钮。
![]() |
为了让 Lambda 中的代码在执行时,能找到依赖包,所以在当前 Lambda 下添加 此前创建的 Layer,如下图示。
![]() |
因为 SQL 解析,以及调用 DataHub API 执行血缘关系生成的过程,比较消耗资源,也比较耗时,所以尽可能将 Lambda 的内存与超时时间调大一些。
![]() |
接下来,需要设置 Lambda 的环境变量。
- DATAHUB_SERVER_URL:表示 DataHub Server 的地址
- ENGINE_TYPE:表示 SQL 对应的计算引擎类型。便于以后除了支持 Redshift,还可以进一步扩展支持 Hive/Spark/Flink
![]() |
最后,再设置一下 S3 的 Trigger,当有 SQL 文件上传至 S3 存储桶指定目录时,触该 Lambda。
![]() |
4. 创建 SQL 获取 Lambda 函数
SQL 获取 Lambda 函数,用于读取 Redshift 的元数据信息,以及 SYS_QUERY_HISTORY 系统表,获取 SQL INSERT 语句以及对应表的 Schema 信息。目前代码中的执行逻辑是,只获取今日执行成功的 INSERT SQL,以便于后续基于定时调度,每日执行。
进入 Lambda 创建页面,如图进行设置。这里需要事前准备一个具备 CloudWatchLogs 读写权限,S3 读写以及 AWSLambdaVPCAccessExecutionRole 权限的 Lambda 角色。
![]() |
为了内网访问 Redshift,需要将该 Lambda 设置了 VPC 模式。同时,需要保证 Redshift 所在网络的安全组,能接受当前 Lambda 的访问。
![]() |
将如下代码贴入 Lambda 的 Code 页面,然后点击部署按钮。
![]() |
为了让 Lambda 中的代码在执行时,能找到依赖包,所以在当前 Lambda 下添加 此前创建的 Layer,如下图示。
![]() |
因为获取 SQL 的过程,也涉及大量的判断与计算,建议将超时时间和内存尽量调大一些。
![]() |
最后,需要设置该 Lambda 的环境变量。
- REDSHIFT_HOST:Redshift 集群的 Host 地址
- REDSHIFT_PORT:Redshift 集群的访问端口
- REDSHIFT_DATABASE:Redshift 集群中的数据库名
- REDSHIFT_USER:Redshift 集群的访问用户名
- REDSHIFT_PASSWORD:Redshift 集群的访问密码
- PROJECT_NAME:当前生成 SQL 对应的项目名,为了避免多个项目,同时使用本方案时,上传至 S3 的文件名重复,所以需要设置下这里的项目名。如不属于特定项目,可以按实际情况命名即可。
- S3_BUCKET:设置将生成的 SQL 上传至哪个 S3 存储桶。存储桶后面的目录是固定的:sql-scripts/
![]() |
5. 测试和应用
完成上述部署后,我们通过一个 Demo 来演示效果。
首先,在 Redshift 中执行测试 SQL 语句。这是一个基于客户、产品、销售信息的简单数仓开发示例。
然后,可以直接通过 Lambda 函数:datalineage-generatesql 的测试功能,进行执行。
![]() |
最后,再打开 DataHub 的血缘关系展示页面,即可查询到生成的列级别血缘关系。
![]() |
在生产环境中,也可以使用 Amazon EventBridge 来进行日常的定时调度,执行 Lambda 函数:datalineage-generatesql。这里就不展开介绍了。
四、总结
这篇文章介绍了一个基于 DataHub 和 SQLLineage 实现 Redshift 字段级血缘的轻量级解决方案。方案通过两个 Lambda 函数实现:一个用于从 Redshift 获取 SQL 语句并上传至 S3,另一个用于解析 SQL 并生成血缘关系。该方案的主要优势在于:
- 采用无服务器架构,运维成本低;
- 通过 S3 作为中间层实现解耦,具有良好的扩展性;
- 支持精确的字段级血缘追踪;
- 可以方便地扩展到其他计算引擎,如 Hive/Spark/Flink 等。
这个方案为数据团队提供了一个实用的数据血缘追踪工具,有助于数据治理、影响分析、问题排查和性能优化等工作。同时其轻量级和可扩展的特点,也使其非常适合中小型数据团队使用。