亚马逊AWS官方博客

基于 Datahub +Redshift 自动生成字段级血缘

一、背景介绍

随着企业数据规模的快速增长,Amazon Redshift 作为一款全托管的 PB 级云数据仓库服务,凭借其 AI 驱动的 MPP 架构和零 ETL 能力,已成为众多企业进行数据分析和业务决策的重要支撑。然而,在数据仓库开发过程中,数据模型日益复杂,数据流转愈发频繁,如何准确追踪和管理数据血缘关系成为一个亟待解决的问题。数据血缘关系描述了数据在系统中的流动和转化路径,尤其是字段级血缘关系的追踪,对企业具有重要价值:

  1. 影响分析:在进行字段修改或表结构变更时,能够快速识别所有受影响的下游依赖,有效降低变更风险。
  2. 问题排查:当数据质量出现异常时,可以沿着血缘关系往上追溯,快速定位问题源头。
  3. 合规审计:通过完整的数据流转路径,确保敏感数据的处理符合数据安全和隐私法规要求。
  4. 优化性能:通过分析数据血缘图谱,识别数据加工环节的冗余和瓶颈,优化 ETL 流程。

DataHub 作为开源元数据平台,为 Redshift 提供了轻量级的血缘管理方案。但在实际应用中,我们发现其存在解析不稳定、 Redshift Serverless 模式不支持以及对 Hive/Spark/Flink 支持有限等问题。为此,本文将介绍一个基于 DataHub 的改进方案,以解决这些实际痛点。

二、解决方案介绍

为了基于 Redshift+DataHub 建立一套轻量级的元数据+血缘关系平台,作者引入了 SQLLineage 这个 SQL 解析的开源工具,并基于 Lambda 实现无服务器模式的 SQL 解析以及字段级血缘关系生成,最终在 DataHub 上进行元数据以及血缘关系展示,整体架构图如下所示。

这个方案中,存在几个关键挑战:

1. 如何自动获取 Redshift 的数仓 SQL

为了基于 SQLLineage 生成血缘关系,则必须先获取到数仓相关的 INSERT SQL。而为了简化获取 SQL 的过程,本方案实现基于 Redshift 的 SYS_QUERY_HISTORY 系统表,增量读取当日的新增 INSERT SQL。同时,考虑到数仓 SQL 版本或者开发调试的场景,可能存在针对相同的目标表,一天中可能有多次写入的情况,本方案会对 SYS_QUERY_HISTORY 按时间排倒序,取最新一条,SQL 命令如下所示。

SELECT query_text 
FROM SYS_QUERY_HISTORY 
WHERE query_type = 'INSERT' 
AND start_time >= TRUNC(SYSDATE)
ORDER BY end_time DESC;
SQL

2. 如何精确地生成字段级血缘

在生成列级别的血缘关系中,如果没有表 Schema 信息,也容易出错。如下示例:

INSERT INTO foo
SELECT a.col1,
       b.col1     AS col2,
       c.col3_sum AS col3,
       col4,
       d.*
FROM bar a
         JOIN baz b
              ON a.id = b.bar_id
         LEFT JOIN (SELECT bar_id, sum(col3) AS col3_sum
                    FROM qux
                    GROUP BY bar_id) c
                   ON a.id = sq.bar_id
         CROSS JOIN quux d;

INSERT INTO corge
SELECT a.col1,
       a.col2 + b.col2 AS col2
FROM foo a
         LEFT JOIN grault b
              ON a.col1 = b.col1;
SQL

在进行 SQL 解析后,结果如下所示。

$ sqllineage -f test.sql -l column
<default>.corge.col1 <- <default>.foo.col1 <- <default>.bar.col1
<default>.corge.col2 <- <default>.foo.col2 <- <default>.baz.col1
<default>.corge.col2 <- <default>.grault.col2
<default>.foo.* <- <default>.quux.*
<default>.foo.col3 <- c.col3_sum <- <default>.qux.col3
<default>.foo.col4 <- col4
SQL

您可能会注意到:

<default>.foo.col4 <- col4           #这里并没有解析出col4来自具体哪一张表
<default>.foo.* <- <default>.quux.*  #这里的*并没有展开到具体的列
SQL

在给定的上下文中,col4 可能来自 bar、 baz 或 quux。在没有元数据的情况下,这是 SQL 解析所能做的最好的结果。

所以接下来,本方案中需要获取 SQL 中涉及到的所有表的 Schema 信息。

with psycopg2.connect(**REDSHIFT_CONFIG) as new_conn:
            with new_conn.cursor() as cur:
                db_name = REDSHIFT_CONFIG['database']
                if '.' in table_name and table_name.split('.')[0] == db_name:
                    query = f'SHOW TABLE {table_name};'
                else:
                    query = f'SHOW TABLE {db_name}.{table_name};'

                logger.info(f"执行查询: {query}")

                cur.execute(query)
                ddl= cur.fetchone()[0]
                
                return ddl
SQL

3. 如何将血缘关系写入 DataHub

DataHub 提供了一套 PythonAPI,用于直接构造元数据,并使用编程方式将该元数据发送到 DataHub。以下是一个写入血缘关系的元数据的简单示例。

# Inlined from /metadata-ingestion/examples/library/lineage_emitter_dataset_finegrained_sample.py
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
    DatasetLineageType,
    FineGrainedLineage,
    FineGrainedLineageDownstreamType,
    FineGrainedLineageUpstreamType,
    Upstream,
    UpstreamLineage,
)


def datasetUrn(tbl):
    return builder.make_dataset_urn("redshift", tbl)


def fldUrn(tbl, fld):
    return builder.make_schema_field_urn(datasetUrn(tbl), fld)


fineGrainedLineages = [
    FineGrainedLineage(
        upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
        upstreams=[
            fldUrn("datahub.public.dim_customer", "customer_name")
        ],
        downstreamType=FineGrainedLineageDownstreamType.FIELD,
        downstreams=[fldUrn("datahub.public.dws_sales_summary", "customer_name")],
    ),
]


# this is just to check if any conflicts with existing Upstream, particularly the DownstreamOf relationship
upstream = Upstream(
    dataset=datasetUrn("datahub.public.dim_customer"), type=DatasetLineageType.TRANSFORMED
)

fieldLineages = UpstreamLineage(
    upstreams=[upstream], fineGrainedLineages=fineGrainedLineages
)

lineageMcp = MetadataChangeProposalWrapper(
    entityUrn=datasetUrn("datahub.public.dws_sales_summary"),
    aspect=fieldLineages,
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mcp(lineageMcp)
SQL

4. 如何具备更好的扩展性

在打通了以上所有流程以后,我们也需要思考一下方案本身的扩展性问题。因为真实场景中,可能不仅仅使用 Redshift 来做数仓,也有可能基于 Hive/Spark/Flink 来做数仓开发。因此,本方案中,通过 S3 目录做为一个解耦与体现方案扩展性的地方,如下图所示,将第 2 与第 3 步实现为通用方案,后续无论是对接 EMR 这样的计算引擎,还是 DolphinScheduler、 Airflow 这样的调度引擎,都只需要局部改造,最终获取到 SQL,并写入 S3 即可。

三、解决方案部署

本文专注于基于 Redshift 实现端到端的血缘关系生成与展示。

1. 部署 DataHub

如果是出于 Demo 目的,可以使用 DataHub 的快速部署指南,基于 Docker Compose 搭建一个简单的 DataHub Demo 。但如果是出于生产目的,则建议基于 AWS EKS 及配套的 AWS 托管服务,构建一个相对正式、稳定、高可用的环境,部署架构图所下图所示,也可参考详细部署指南进行部署。

2. 创建 Lambda Layer

Lambda Layer 的主要好处是代码复用和依赖管理。你可以把常用的代码和依赖库打包成一个 Layer,然后让多个 Lambda 函数共享使用,这样可以减少重复代码并简化维护。在 linux 创建 layer 包,安装方案中两个 Lambda 所需要的依赖,如下所示:

#!/bin/bash
#sudo dnf install -y postgresql-devel python3-devel gcc
# 创建工作目录
mkdir -p layer_build
cd layer_build

cat > requirements.txt << EOL
boto3
sqllineage
acryl-datahub
psycopg2-binary
EOL

python3.9 -m venv create_layer
source create_layer/bin/activate
pip install -r requirements.txt
mkdir python
cp -r create_layer/lib python/
zip -r layer_content.zip python
Python

然后进入 Lambda Console 页面,创建 Layer,并上传 layer_content.zip:

3. 创建血缘解析 Lambda 函数

血缘解析 Lambda 函数,用于接收 S3 存储桶指定目录的 SQL 文件上传事件,解析 SQL,并调用 DataHub API 生成血缘关系。进入 Lambda 创建页面。这里需要提前准备好一个具备 CloudWatch Logs 读写权限与 S3 只读权限的 Lambda Service Role。

为了内网访问 DataHub,需要将该 Lambda 设置了 VPC 模式。同时,需要保证 DataHub 所在服务器的安全组,能接受当前 Lambda 的访问。

将如下代码贴入 Lambda 的 Code 页面,然后点击部署按钮。

import json
import boto3
import os
from sqllineage.runner import LineageRunner

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
    DatasetLineageType,
    FineGrainedLineage,
    FineGrainedLineageDownstreamType,
    FineGrainedLineageUpstreamType,
    Upstream,
    UpstreamLineage,
)


engine_type="hive"

# 库名设置
def datasetUrn(tableName):
    return builder.make_dataset_urn(engine_type, tableName)  # platform = redshift

# 表、列级信息设置
def fieldUrn(tableName, fieldName):
    return builder.make_schema_field_urn(datasetUrn(tableName), fieldName)

def process_sql_from_s3(bucket, key):
    """
    从S3读取SQL文件并处理
    """
    s3_client = boto3.client('s3')
    try:
        response = s3_client.get_object(Bucket=bucket, Key=key)
        sql = response['Body'].read().decode('utf-8')

        # 获取sql血缘
        result = LineageRunner(sql,dialect=engine_type)

        # 获取sql中的下游表名
        targetTableName = result.target_tables[0].__str__()
        print(result)
        print('===============')
        # 打印列级血缘结果
        result.print_column_lineage()
        print('===============')

        return result
    except Exception as e:
        print(f"读取S3文件错误: {str(e)}")
        raise e

def generateDatahubLineage(lineage):
    # 创建一个map数据结构来存储血缘关系的结果
    lineage_map = {}

    # 遍历列级血缘
    for columnTuples in lineage():
        # 逐个字段遍历
        for i in range(len(columnTuples) - 1):
            # 上游list
            upStreamStrList = []

            # 下游list 类似:datahub.public.dws_sales_summary.customer_name <- datahub.public.dim_customer.customer_name <- datahub.public.raw_sales_data.customer_id
            downStreamStrList = []
            if i + 1 < len(columnTuples):
                upStreamColumn = columnTuples[i]
                downStreamColumn = columnTuples[i + 1]

                upStreamFieldName = upStreamColumn.raw_name.__str__()
                upStreamTableName = upStreamColumn.__str__().replace('.' + upStreamFieldName, '').__str__()

                downStreamFieldName = downStreamColumn.raw_name.__str__()
                downStreamTableName = downStreamColumn.__str__().replace('.' + downStreamFieldName, '').__str__()

                print(f"{upStreamTableName}.{upStreamFieldName}-->{downStreamTableName}.{downStreamFieldName}")

                upStreamStrList.append(fieldUrn(upStreamTableName, upStreamFieldName))
                downStreamStrList.append(fieldUrn(downStreamTableName, downStreamFieldName))
                print(f"{upStreamTableName}.{upStreamFieldName}-->{downStreamTableName}.{downStreamFieldName}")
                fineGrainedLineage = FineGrainedLineage(upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
                                                        upstreams=upStreamStrList,
                                                        downstreamType=FineGrainedLineageDownstreamType.FIELD,
                                                        downstreams=downStreamStrList)

                # 将血缘关系添加到map中
                if downStreamTableName not in lineage_map and downStreamTableName != '':
                    lineage_map[downStreamTableName] = {}
                if upStreamTableName not in lineage_map[downStreamTableName]:
                    lineage_map[downStreamTableName][upStreamTableName] = []

                lineage_map[downStreamTableName][upStreamTableName].append(fineGrainedLineage)

    # 打印血缘关系map
    print(lineage_map)

    return lineage_map

def lambda_handler(event, context):
    try:
        print(event)
        # 从事件中获取S3桶和文件信息
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = event['Records'][0]['s3']['object']['key']

        # 从环境变量获取DataHub服务器地址
        datahub_server = os.environ['DATAHUB_SERVER_URL']
        global engine_type
        engine_type = os.environ['ENGINE_TYPE']

        # 处理SQL文件
        result = process_sql_from_s3(bucket, key)

        # 获取列级血缘
        lineage = result.get_column_lineage
        lineage_map = generateDatahubLineage(lineage)

        for downStreamTableName, upStreamDict in lineage_map.items():
            upStreamsList = []
            total_fineGrainedLineageList = []
            for upStreamTableName, fineGrainedLineageList in upStreamDict.items():
                fineGrainedLineageList = lineage_map[downStreamTableName][upStreamTableName]
                total_fineGrainedLineageList.extend(fineGrainedLineageList)

                print(f"下游表名: {downStreamTableName}, 上游表名: {upStreamTableName}, 细粒度血缘: {fineGrainedLineageList}")
                upstream = Upstream(
                    dataset=datasetUrn(upStreamTableName), type=DatasetLineageType.TRANSFORMED
                )
                upStreamsList.append(upstream)

            fieldLineages = UpstreamLineage(
                upstreams=upStreamsList, fineGrainedLineages=total_fineGrainedLineageList
            )

            lineageMcp = MetadataChangeProposalWrapper(
                entityUrn=datasetUrn(downStreamTableName),  # 下游表名
                aspect=fieldLineages
            )

            blank_fieldLineages = UpstreamLineage(
                upstreams=[], fineGrainedLineages=total_fineGrainedLineageList
            )
            blank_lineageMcp = MetadataChangeProposalWrapper(
                entityUrn=datasetUrn(downStreamTableName),  # 下游表名
                aspect=blank_fieldLineages
            )

            # 调用datahub REST API
            emitter = DatahubRestEmitter(datahub_server)

            # clear the existing linage for the downstreamTable
            emitter.emit_mcp(blank_lineageMcp)

            # set the latest lineage
            emitter.emit_mcp(lineageMcp)

        return {
            'statusCode': 200,
            'body': json.dumps('SQL血缘分析完成')
        }

    except Exception as e:
        print(f"处理过程中发生错误: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps(f'处理失败: {str(e)}')
        }
JSON

为了让 Lambda 中的代码在执行时,能找到依赖包,所以在当前 Lambda 下添加 此前创建的 Layer,如下图示。

因为 SQL 解析,以及调用 DataHub API 执行血缘关系生成的过程,比较消耗资源,也比较耗时,所以尽可能将 Lambda 的内存与超时时间调大一些。

接下来,需要设置 Lambda 的环境变量。

  • DATAHUB_SERVER_URL:表示 DataHub Server 的地址
  • ENGINE_TYPE:表示 SQL 对应的计算引擎类型。便于以后除了支持 Redshift,还可以进一步扩展支持 Hive/Spark/Flink

最后,再设置一下 S3 的 Trigger,当有 SQL 文件上传至 S3 存储桶指定目录时,触该 Lambda。

4. 创建 SQL 获取 Lambda 函数

SQL 获取 Lambda 函数,用于读取 Redshift 的元数据信息,以及 SYS_QUERY_HISTORY 系统表,获取 SQL INSERT 语句以及对应表的 Schema 信息。目前代码中的执行逻辑是,只获取今日执行成功的 INSERT SQL,以便于后续基于定时调度,每日执行。

进入 Lambda 创建页面,如图进行设置。这里需要事前准备一个具备 CloudWatchLogs 读写权限,S3 读写以及 AWSLambdaVPCAccessExecutionRole 权限的 Lambda 角色。

为了内网访问 Redshift,需要将该 Lambda 设置了 VPC 模式。同时,需要保证 Redshift 所在网络的安全组,能接受当前 Lambda 的访问。

将如下代码贴入 Lambda 的 Code 页面,然后点击部署按钮。

import psycopg2
import boto3
import logging
from sqllineage.runner import LineageRunner
from typing import Optional, List, Dict, Any
import os

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 从环境变量获取Redshift连接配置
REDSHIFT_CONFIG = {
    "host": os.environ['REDSHIFT_HOST'],
    "port": int(os.environ['REDSHIFT_PORT']),
    "database": os.environ['REDSHIFT_DATABASE'],
    "user": os.environ['REDSHIFT_USER'],
    "password": os.environ['REDSHIFT_PASSWORD']
}

project_name = os.environ['PROJECT_NAME']
bucket_name = os.environ['S3_BUCKET']

def get_table_ddl(conn, table_name: str) -> Optional[str]:
    """获取Redshift表的DDL"""
    try:
        # 创建新的连接
        with psycopg2.connect(**REDSHIFT_CONFIG) as new_conn:
            with new_conn.cursor() as cur:
                db_name = REDSHIFT_CONFIG['database']
                if '.' in table_name and table_name.split('.')[0] == db_name:
                    query = f'SHOW TABLE {table_name};'
                else:
                    query = f'SHOW TABLE {db_name}.{table_name};'

                logger.info(f"执行查询: {query}")

                cur.execute(query)
                ddl = cur.fetchone()[0]  # 修复了变量赋值的空格问题
                # 替换DISTSTYLE AUTO为分号
                ddl = ddl.replace("DISTSTYLE AUTO;", ";")
                return ddl
    except Exception as e:
        logger.error(f"获取表 {table_name} DDL失败: {str(e)}")
        return None

def get_today_insert_queries(conn) -> List[str]:
    """获取今天执行的所有INSERT语句"""
    try:
        with conn.cursor() as cur:
            query = """
            SELECT query_text 
            FROM SYS_QUERY_HISTORY 
            WHERE query_type = 'INSERT' 
            AND start_time >= TRUNC(SYSDATE)
            AND status = 'success'  -- 只获取执行成功的语句
            ORDER BY end_time DESC;
            """
            cur.execute(query)
            return [row[0] for row in cur.fetchall()]
    except Exception as e:
        logger.error(f"获取INSERT语句失败: {str(e)}")
        return []

def save_queries_to_s3(insert_queries: List[str]) -> None:
    """将INSERT语句保存到本地文件并上传到S3"""
    try:
        processed_tables = set()  # 用于记录已处理的目标表
        ddl_tables = set()  # 用于记录已获取过DDL的表
        # 为DDL查询创建独立连接
        with psycopg2.connect(**REDSHIFT_CONFIG) as ddl_conn:
            # Lambda中使用/tmp目录进行临时文件存储
            local_path = f'/tmp/sql_queries_{project_name}.sql'

            with open(local_path, 'w', encoding='utf-8') as f:
                for query in insert_queries:
                    # 将\n转换为实际换行符
                    query = query.replace('\\n', '\n')
                    query = f"{query};"  # 加上分号结尾

                    # 分析SQL依赖的表
                    result = LineageRunner(query, dialect="redshift")
                    tables = result.source_tables
                    logger.info(f"SQL涉及的表: {tables}")

                    # 获取目标表
                    target_tables = result.target_tables
                    logger.info(f"目标表: {target_tables}")

                    should_skip = False

                    table_str = target_tables[0].__str__()
                    if table_str in processed_tables:
                        logger.info(f"目标表 {table_str} 已处理过,跳过该SQL")
                        should_skip = True
                    processed_tables.add(table_str)

                    if should_skip:
                        continue

                    # 获取每张表的DDL
                    for table in tables:
                        # 将table对象转换为字符串
                        table_name = table.__str__()
                        # 检查表是否已经获取过DDL
                        if table_name in ddl_tables:
                            logger.info(f"表 {table_name} 的DDL已获取过,跳过")
                            continue
                        ddl_tables.add(table_name)
                        ddl = get_table_ddl(ddl_conn, table_name)

                        if ddl:  # 修复了缩进问题
                            f.write(f"\n-- DDL for table {table_name}:\n{ddl}\n")
                        else:
                            f.write(f"\n-- Failed to get DDL for table {table_name}\n")
                        f.write("-" * 80 + "\n")

                    f.write(f"{query}\n{'-' * 80}\n")

            # 上传到S3
            s3_key = f'sql-scripts/sql_queries_{project_name}.sql'
            s3_client = boto3.client('s3')
            s3_client.upload_file(local_path, bucket_name, s3_key)

            logger.info(f"查询已保存到本地文件: {local_path}")
            logger.info(f"文件已上传到S3: s3://{bucket_name}/{s3_key}")

    except Exception as e:
        logger.error(f"保存查询失败: {str(e)}")
        raise

def lambda_handler(event, context):
    """Lambda处理函数"""
    try:
        # 查询INSERT语句使用独立连接
        with psycopg2.connect(**REDSHIFT_CONFIG) as conn:
            insert_queries = get_today_insert_queries(conn)
            if not insert_queries:
                logger.warning("未找到今天的INSERT语句")
                return {
                    'statusCode': 200,
                    'body': '未找到今天的INSERT语句'
                }

        # 保存查询时使用新的连接
        save_queries_to_s3(insert_queries)

        return {
            'statusCode': 200,
            'body': 'SQL查询已成功保存到S3'
        }

    except Exception as e:
        logger.error(f"处理SQL失败: {str(e)}")
        return {
            'statusCode': 500,
            'body': f'处理失败: {str(e)}'
        }
JSON

为了让 Lambda 中的代码在执行时,能找到依赖包,所以在当前 Lambda 下添加 此前创建的 Layer,如下图示。

因为获取 SQL 的过程,也涉及大量的判断与计算,建议将超时时间和内存尽量调大一些。

最后,需要设置该 Lambda 的环境变量。

  • REDSHIFT_HOST:Redshift 集群的 Host 地址
  • REDSHIFT_PORT:Redshift 集群的访问端口
  • REDSHIFT_DATABASE:Redshift 集群中的数据库名
  • REDSHIFT_USER:Redshift 集群的访问用户名
  • REDSHIFT_PASSWORD:Redshift 集群的访问密码
  • PROJECT_NAME:当前生成 SQL 对应的项目名,为了避免多个项目,同时使用本方案时,上传至 S3 的文件名重复,所以需要设置下这里的项目名。如不属于特定项目,可以按实际情况命名即可。
  • S3_BUCKET:设置将生成的 SQL 上传至哪个 S3 存储桶。存储桶后面的目录是固定的:sql-scripts/

5. 测试和应用

完成上述部署后,我们通过一个 Demo 来演示效果。

首先,在 Redshift 中执行测试 SQL 语句。这是一个基于客户、产品、销售信息的简单数仓开发示例。

-- 创建示例表:原始数据表
DROP TABLE IF EXISTS raw_sales_data;
CREATE TABLE raw_sales_data (
                                sales_id INT,
                                product_id INT,
                                customer_id INT,
                                sales_date DATE,
                                sales_amount DECIMAL(10, 2)
);

-- 创建示例表:维度表 - 产品
DROP TABLE IF EXISTS dim_product;
CREATE TABLE dim_product (
                             product_id INT,
                             product_name VARCHAR(255),
                             product_category VARCHAR(255)
);

-- 创建示例表:维度表 - 客户
DROP TABLE IF EXISTS dim_customer;
CREATE TABLE dim_customer (
                              customer_id INT,
                              customer_name VARCHAR(255),
                              customer_region VARCHAR(255)
);

-- 创建示例表:事实表 - 销售
DROP TABLE IF EXISTS fact_sales;
CREATE TABLE fact_sales (
                            sales_id INT,
                            product_id INT,
                            customer_id INT,
                            sales_date DATE,
                            sales_amount DECIMAL(10, 2)
);

-- 创建示例表:DWS 层 - 销售汇总
DROP TABLE IF EXISTS dws_sales_summary;
CREATE TABLE dws_sales_summary (
                                   customer_name VARCHAR(255),
                                   sales_date DATE,
                                   total_sales_amount DECIMAL(10, 2),
                                   total_sales_count INT
);




---------------------


-- 插入示例数据到维度表 - 产品
INSERT INTO datahub.public.dim_product (product_id, product_name, product_category)
SELECT DISTINCT product_id, 'Product ' || product_id, 'Category ' || (product_id % 5)
FROM datahub.public.raw_sales_data;

-- 插入示例数据到维度表 - 客户
INSERT INTO datahub.public.dim_customer (customer_id, customer_name, customer_region)
SELECT DISTINCT customer_id, 'Customer ' || customer_id, 'Region ' || (customer_id % 10)
FROM datahub.public.raw_sales_data;

-- 插入示例数据到事实表 - 销售
INSERT INTO datahub.public.fact_sales (sales_id, product_id, customer_id, sales_date, sales_amount)
SELECT sales_id, product_id, customer_id, sales_date, sales_amount
FROM datahub.public.raw_sales_data;

-- 插入示例数据到 DWS 层 - 销售汇总
INSERT INTO datahub.public.dws_sales_summary (customer_name, sales_date,total_sales_amount, total_sales_count)
SELECT c.customer_name, sales_date,SUM(f.sales_amount), COUNT(*)
FROM datahub.public.fact_sales f
         JOIN datahub.public.dim_customer c ON f.customer_id = c.customer_id
GROUP BY c.customer_name,sales_date;
SQL

然后,可以直接通过 Lambda 函数:datalineage-generatesql 的测试功能,进行执行。

最后,再打开 DataHub 的血缘关系展示页面,即可查询到生成的列级别血缘关系。

在生产环境中,也可以使用 Amazon EventBridge 来进行日常的定时调度,执行 Lambda 函数:datalineage-generatesql。这里就不展开介绍了。

四、总结

这篇文章介绍了一个基于 DataHub 和 SQLLineage 实现 Redshift 字段级血缘的轻量级解决方案。方案通过两个 Lambda 函数实现:一个用于从 Redshift 获取 SQL 语句并上传至 S3,另一个用于解析 SQL 并生成血缘关系。该方案的主要优势在于:

  • 采用无服务器架构,运维成本低;
  • 通过 S3 作为中间层实现解耦,具有良好的扩展性;
  • 支持精确的字段级血缘追踪;
  • 可以方便地扩展到其他计算引擎,如 Hive/Spark/Flink 等。

这个方案为数据团队提供了一个实用的数据血缘追踪工具,有助于数据治理、影响分析、问题排查和性能优化等工作。同时其轻量级和可扩展的特点,也使其非常适合中小型数据团队使用。

本篇作者

张盼富

亚马逊云科技解决方案架构师,从业十三年,先后经过历云计算、供应链金融、电商等多个行业,担任过高级开发、架构师、产品经理、开发总监等多种角色,有丰富的大数据应用与数据治理经验。加入亚马逊云科技后,致力于通过大数据+AI 技术,帮助企业加速数字化转型。

黄霄

亚马逊云科技数据分析解决方案架构师,专注于大数据解决方案架构设计,具有多年大数据领域开发和架构设计经验。

柯俊雄

亚马逊云科技解决方案架构师,专注于数据分析/容器化领域。