亚马逊AWS官方博客

十亿级超大表数据库注入 Amazon S3 数据湖的实践

1. 背景

随着企业业务的快速发展,业务系统数据也随之快速增长。数据量过大导致数据库性能下降的同时,数据存储成本也会增加。解决该问题最常见的办法是对业务系统数据进行归档。通过将业务系统中的数据迁移到其他归档数据库中,来维持业务系统的响应速度,同时也要保持对归档数据库的查询能力。另外,归档数据也可以满足合规性和公司相关审计的要求,通过对热门数据和历史数据分层管理,优化访问模式,来提升系统整体效率。

在实际的生产环境中,随着归档数据库和归档表越来越大,我们经常会碰到一些复杂的场景,比如: 归档数据库由于数据量大、归档保留数据时间长、因归档插入数据效率考虑删除部分索引、表索引原始设计不合理等原因,会出现一些没有索引或者索引对查询没有效果的超大表,比如有一些数据表将会超过几十亿行,在业务需要对这些超大表进行查询的时候,会碰到很多的性能问题,甚至会出现因为数据量过大无法在短时间返回查询结果的情况。

为了改善查询性能,通常的优化策略是修改表结构增加索引或者修改索引,而对这些超大表修改、增加索引会导致数据库存储空间增长、表插入数据变慢甚至可能引起的表被锁定等一系列问题。

2. 解决方案

将数据库注入到 Amazon Simple Storage Service(S3)数据湖,利用 Amazon Athena 来查询是一种有效优化超大表查询性能的办法。通过这种方法,我们不仅可以优化数据库性能,降低存储成本,还可以确保数据的长期保存和易于访问。

按照 Amazon S3 数据湖的最佳实践,在处理大规模数据集时,通常会对数据集进行分区,这样不仅可以将一些查询限定在特定的分区,减少每次查询需要扫描的数据量,来提高了查询的效率,更能够降低查询费用。比如通常,我们可以按照时间戳将数据分区,从而快速筛选出满足时间条件的数据,减少扫描的数据量。

将数据库注入到 Amazon S3 的数据湖有很多不同的方式,比如最常见的有使用 AWS Glue 这样的 ETL 服务、自定义脚本抽取数据到 S3 或者使用如 Talend、Informatica 等开源、商业的数据摄取工具。但无论是 ETL 工具、自定义脚本还是 Talend、Informatica 等数据摄取工具,都需要编写和维护代码,尤其是把超大表注入到数据湖中,并给这些数据湖中的表构建合适的分区,我们还需要开发定制化的代码、进行大量测试来保障性能、提升数据摄取效率。

本文将会演示如何使用 AWS 托管的 AWS Database Migration Service(AWS DMS)来构建任务,把包含超大表的数据库表注入到 S3 数据湖,并完成表数据分区,最后利用 Amazon Athena 来优化查询性能。

使用本文介绍的方案,我们将对实际生产环境中 Oracle 数据库多张超过 10 亿行的超大表进行处理,把表数据注入 S3 数据湖中,并对数据进行分区。

我们对 Oracle 数据库中过去三年的历史物流记录进行查询,由于表中有超过 11 亿条数据,该表由于归档效率需要删除了索引,查询单条物流记录响应时间超过 1 个小时,而把数据注入到基于 S3 数据湖并分区后,利用 Amazon Athena 分区查询返回时间只需要 2.3 秒,大大改善了业务体验。

3. 整体架构

本解决方案使用 Amazon RDS 来模拟源数据库,实际生产环境中,我们也可以使用本地数据中心的数据库作为归档的源数据库。您可以使用任何支持的源数据库引擎来构建复制任务,参考数据复制源进行相应的更改即可。

如下面的架构图所示,Amazon RDS 或者本地数据中心的数据库作为源终端节点,Amazon S3 的桶作为目标端点,在复制任务启动后,源端数据库中的表按照配置的任务规则注入到目标 S3 桶中,生成新的 S3 数据文件,这里我们会给复制任务增加后处理流程,在新的 S3 数据文件生成的时候,给文件增加数据分区需要使用的标签。我们配置了 S3 的事件触发,新的数据文件生成会触发预定义的 Lambda 程序,该 Lambda 程序会读取 S3 文件的标签,然后把文件移动到标签值对应的 S3 分区中,从而实现 S3 数据文件的分区功能。

另外,如下图黄色箭头所示,我们配置的整个任务完成后,会触发 Amazon SNS 的一个订阅事件,该事件会触发预定义的 Lambda 程序,由 Lambda 来启动 Crawler 对 S3 中的数据集进行抓取,并把数据集中的元数据目录和分区信息存入 AWS Glue 的 Data Catalog 中,我们就可以利用 Amazon Athena 对 S3 中的数据进行基于分区的查询了。

接下来,本文将会描述整个方案的详细构建步骤。

4. 先决条件

在此方案中,您应满足以下先决条件:

  • 一个 AWS 账户
  • 一个 RDS Oracle 数据源,一张包含归档时间列的表
  • 一个充当目标端的 S3 桶

5. 构建步骤

5.1 创建复制实例

我们需要使用复制实例来创建复制任务,该实例需要足够的存储空间和处理能力才能执行我们分配的任务,关于如何选择适当的实例,请参阅使用 AWS DMS 复制实例

5.2 创建源终端节点和目标终端节点.

终端节点提供有关数据存储的连接、数据存储类型和位置信息。AWS DMS 使用此信息连接到数据存储并将数据从源端点迁移到目标端点。

本例中,我们使用 RDS Oracle 作为数据源,我们需要为该数据源创建源终端节点。有关源终端节点的创建可以参考如下的链接:https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.html

需要注意的是,为保证复制任务可以正常运行,需要确保上面创建的 DMS 复制实例可以访问数据库服务器的 IP 和端口。我们需要针对性的修改相关的网络配置,比如安全组和网络访问控制列表(network ACL)。

另外,我们使用 S3 作为目标终端节点,在数据文件存入 S3 时使用 Parquet 作为存储格式。Parquet 是一种开源文件格式,它以列式格式高效存储数据,提供不同的编码类型并支持谓词过滤。凭借良好的压缩比和高效的编码,存储在 Parquet 中的数据对比 csv 等文本格式,可以降低 Amazon S3 存储成本。

在创建目标端点的过程中,可以直接在 “端点设置”中,选择“编辑器”,输入下面的 JSON 来完成 S3 目标终端端点对 Parquet 数据格式的支持。

{
    "CompressionType": "GZIP",
    "DataFormat": "parquet",
    "ParquetVersion": "parquet-2-0",
    "EnableStatistics": true,
    "DatePartitionEnabled": false
}

5.3 创建数据复制任务

接下来,我们将会构建复制任务,该复制任务负责把指定的数据表中的数据按照预定义的时间过滤器进行筛选,与此同时该任务还会给注入到 S3 数据湖中数据文件打上标签,标签的名称是注入数据的分区名称,而标签的值标注了此次注入数据的分区数值。

我们在后续的步骤中,将会利用 S3 的事件通知机制,把 S3 数据文件移动到名为标签值的前缀下,为后续的 AWS Glue Crawler 来生成/更新原数据分区目录和分区做准备。

在本例中,我们使用一张在 WMS 架构下的物流信息记录表 LPN,其中 LAST_DTTM 字段为最后更新时间戳字段,本次将会注入“2021-01-01 00:00:00.000000”到“2022-01-01 00:00:00.000000”之间更新的数据到 S3 中,并为这些生成的 S3 文件添加名称为“dttm”、值为 “2021”的标签,业务上的含义是把 2021 年的物流信息记录数据注入到 S3 数据湖。

在 AWS 控制台,进入 DMS 服务,点击创建数据库迁移任务,输入“任务标识符”的名称,选择第一步中创建的“复制实例”,分别选择上面创建的源数据库端点和目标端点,迁移类型选“迁移现有数据”。“任务设置”中“目标表准备模式”选择“不执行任何操作”,“任务日志”选项可以勾选“打开 CloudWatch ”日志选择,并为对应的项目选择日志的级别。

在“表映射”选项中,选择“JSON 编辑器”,输入如下的 JSON 文件,您也可以根据自己的需求,修改为对应的 JSON,可以参考下面的链接:https://docs.aws.amazon.com/zh_cn/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.html

本例中,定义的表映射规则可以分为两部分:

  • rule-type 为 selection 部分,负责对数据的筛选,我们利用 filter 来筛选最后更新时间 LAST_DTTM 在“2021-01-01 00:00:00.000000”和“2022-01-01 00:00:00.000000”之间的数据。
  • rule-type 为 post-processing 部分,将会给 S3 数据文件增加名称为“dttm”、值为“2021”的标签。
{
  "rules": [{
      "rule-type": "selection",
      "rule-id": 453584433,
      "rule-name": "rule-name-453584433",
      "object-locator": {
        "schema-name": "wms",
        "table-name": "lpn"
      },
      "rule-action": "include",
      "filters": [{
        "filter-type": "source",
        "column-name": "LAST_DTTM",
        "filter-conditions": [{
          "filter-operator": "between",
          "start-value": "2021-01-01 00:00:00.000000",
          "end-value": "2022-01-01 00:00:00.000000"
        }]
      }]
    },
    {
      "rule-type": "post-processing",
      "rule-id": 553584433,
      "rule-name": "post-rule-name-553584433",
      "rule-action": "add-tag",
      "object-locator": {
        "schema-name": "wms",
        "table-name": "lpn"
      },
      "tag-set": [{
        "key": "dttm",
        "value": "2021"
      }]
    }
  ]
}

5.4 创建数据分区移动函数

在 5.3 中,我们为注入到 S3 数据湖中的数据文件定义了标签,该标签用来作为数据文件实际的分区目录。在这一步中,我们将会创建 Lambda 函数, 把这些定义了标签的数据文件移动到到指定的 S3 分区目录下。

首先,创建一个 IAM 角色,选择 “亚马逊云科技服务”为“可信实体类型”,选择“Lambda”为使用案例。编辑下面的 IAM 策略,替换 Resource 中的“REPLACE_S3_ARN”为目标 S3 桶的 ARN,把该策略和 AWS 托管策略 “AWSLambdaBasicExecutionRole” 附加到该角色中。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:DeleteObjectTagging",
                "s3:GetObjectTagging",
                "s3:PutObjectTagging",
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "REPLACE_S3_ARN/*",
                "REPLACE_S3_ARN"
            ]
        }
    ]
}

创建 Lambda 函数,输入 “函数名称”,“Python 3.10”为运行时,使用上面创建的角色作为默认执行角色。可以参考链接 Lambda 函数代码

该代码部分主要包含两个函数 getTagValue 和 lambda_handler,其中 getTagValue 会找到“dttm”字段对应的标签值,lambda_handler 会获取到变更的文件信息,并把文件移动到“dttm”标签值对应的分区目录下。

import json
import re
import boto3
import urllib.parse

DatePattern = re.compile(r'\d\d\d\d')
s3 = boto3.resource('s3')
s3Client = boto3.client('s3');

def lambda_handler(event, context):
    # TODO implement
    print(event['Records']);
    
    affectedRecords = event['Records']
    for affectedRecord in affectedRecords:
        bucketName = affectedRecord['s3']['bucket']['name'];
        objectKey = affectedRecord['s3']['object']['key'];
        objectPrefix = objectKey[0: objectKey.rfind('/')]
        if DatePattern.search(objectPrefix):
            print('Record has date, Date=%s', objectKey)
        else:
            copy_source = {
                'Bucket': bucketName,
                'Key': objectKey
            }
            tagValue = getTagValue(bucketName, objectKey);
            date = '1970-01'
            if tagValue != 'None':
                date = tagValue[0:4]
            else: 
                print('tagValue is not right, %s,%s', bucketName, objectKey);
                break;
            
            fileName = objectKey.split('/')[-1];
            destinationKey = '/'.join(objectKey.split('/')[:-1]) + '/dttm=' + date + '/' + fileName;
            print('destinationKey:', destinationKey);
            s3.meta.client.copy(copy_source, bucketName, destinationKey);
            s3.Object(bucketName, destinationKey).wait_until_exists()
            s3.Object(bucketName, objectKey).delete();
            print('move data successfully destinationKey:', destinationKey);
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

def getTagValue(bucketName, objectKey):
    # 获取对象标签
    response = s3Client.get_object_tagging(Bucket=bucketName, Key=objectKey);
    
    # 获取指定标签的值
    tag_value = None
    for tag in response['TagSet']:
        if tag['Key'] == 'dttm':
            tag_value = tag['Value']
            return tag_value;
            break
    return tag_value;

5.5 创建事件驱动机制,在数据注入后,完成分区处理

前面的两步中,我们分别完成了数据注入和分区函数的构建,本步骤将会创建事件驱动机制,待数据写入 S3 后,通过配置 S3 的事件通知来触发 Lambda 函数完成 S3 文件的移动。

打开 S3 的控制台,选择对应的 S3 桶,选中“属性”按钮,点击“创建事件通知”,输入使用的“前缀”,本例为“archive”,后缀“.parquet”,事件类型勾选 “s3:ObjectCreated:Put”、“s3:ObjectCreated:Post”和“s3:ObjectCreated:CompleteMultipartUpload”,如下图所示。

目标栏选择刚才创建的 Lambda 函数作为目标 Lambda 函数。

5.6 创建 Crawler 对 S3 目标桶的数据目录进行爬取

关于如何利用 AWS Glue 创建 Crawler 已经有很多的介绍,本文将不对此进行详细说明,您可以按照 AWS 提供的官方文档或者如下的指导来创建一个 Glue 的 Crawler 程序,对目标 S3 桶中数据存储目录进行爬取,Crawler 将会在 Glue 数据目录中创建一个或者多张表。参考如下的链接:

https://docs.aws.amazon.com/zh_cn/glue/latest/dg/add-crawler.html
https://aws.amazon.com/cn/blogs/china/use-aws-glue-amazon-s3-build-datalake/

5.7 处理任务订阅事件,自动更新元数据目录与分区信息

通常情况下,在数据注入 S3 数据湖任务完成后,我们需要手动启动 Crawler 来完成元数据目录与分区信息的更新,本例中,我们将利用 AWS DMS 复制任务中的事件订阅来触发 Lambda 函数调用 Crawler 完成元数据目录与分区信息的更新。

接下来,我们创建一个 Lambda,该 Lambda 将会处理订阅了 DMS 复制任务的 SNS 主题,当 DMS 复制任务状态有变更时,该 Lambda 将会触发。这里我们使用该 Lambda 启动了上一步中创建的 Crawler 来对目标 S3 桶的数据集进行爬取,当然我们也可以利用该 Lambda 做一些其他的任务,比如设置 DMS 任务失败的处理等等。可以参考链接 Lambda 函数代码

import json
import pymysql
import boto3

glue_client = boto3.client("glue")

def lambda_handler(event, context):
    messages = event['Records'][0]['Sns']['Message']
    print(messages)
    dmsEvent = json.loads(messages)
    if 'http://docs.aws.amazon.com/dms/latest/userguide/CHAP_Events.html#DMS-EVENT-0079' in dmsEvent['Event ID']:
        print('start crawler.')
        startCrawler()
    else:
        print("unknown message, ignore.")
    return {
        'statusCode': 200,
        'body': json.dumps('Task complete!')
    }

def startCrawler():
    crawler_name = "wmss3"
    try:
        # 启动AWS Glue Crawler
        response = glue_client.start_crawler(Name=crawler_name)
        return f"成功启动 AWS Glue Crawler: {crawler_name}"
    except Exception as e:
        return f"启动 AWS Glue Crawler 失败: {str(e)}"

如上述代码描述,如果数据复制任务成功完成,会发送事件 ID 为 DMS-EVENT-0079 的 SNS 消息,在收到该消息后,我们可以在 Lambda 中启动 Crawler,待该脚本执行完成,元数据目录与表分区信息将会更新。

接下来,我们需要创建了一个 Amazon SNS 主题,把上面创建的 Lambda 作为并为该主题的订阅终端。然后,我们可以利用该主题配置 DMS 复制任务的事件订阅,需要注意的是,我们需要在“选择特定事件类别”中需要选择“state change”,该事件订阅将会确保 DMS 复制任务状态变更时发送通知给该 SNS 主题。

关于复制任务事件订阅的流程可以参考如下文档:https://docs.aws.amazon.com/zh_cn/dms/latest/userguide/CHAP_Events.html

5.8 启动数据入湖任务

进入 AWS 控制台,选择“数据库迁移任务”, 选择对应的任务,点击“操作”,选择“重启/恢复”按钮来运行任务,等任务运行完成后,我们可以在目标的 S3 桶中的分区目录下看到对应的数据文件。

待复制任务运行结束后,我们可以看到 Crawler 被触发,在爬网程序运行完成后,我们可以在 AWS Glue 的元数据目录中找到该表,也能够看到该表有一个“dttm”字段值为“2021”分区。

5.9 数据入湖

按照前面的步骤,我们成功把 WMS 库 LPN 表中 2021 年的数据注入到 S3 数据湖中并对表数据进行了分区。如果把其他年份的数据也注入到数据湖,直接修改步骤 5.3 中的开始时间、结束时间和分区值,重新启动入湖任务即可完成。

经过测试,我们把最近 3 年超过 11 亿记录导入到数据湖中,整个数据入数据湖的时间大约在 1.5 小时左右,原表占用的数据库表空间大小为 215GiB,但是在 S3 的存储空间为 19.4GiB,这得益于 Parquet 的列式存储带来的高压缩和编码效率的提升。

5.10 查询性能对比

利用本例中的方法,我们把实际生成环境中配置为 m5.4xlarge 的 Oracle 归档数据库中多张超过 10 亿行数据的表注入到基于 S3 的数据湖中,按照时间对这些表进行分区。

平时的生产环境中,业务部门需要对其中一张包含 279 字段、1,115,415,346 行数据的表进行查询,该表中存储了从 2021 年到 2023 年 8 月的所有物流信息记录。

查询的语句对 2021 年 1 月的一条物流单号情况进行查询,Oracle 数据库返回查询结果需要超过 1 小时。

select * from wms.lpn 
where wave_nbr= '20210101090700110000122112190909’
and LAST_DTTM between '2021-01-01 00:00:00' and '2021-02-01 00:00:00'

在使用了本例的方法后,我们使用带分区条件的 SQL 语句在 Athena 中运行查询,返回时间仅为 2.3 秒,极大的改善了业务体验。

select * from AwsDataCatalog.wms.lpn 
where wave_nbr= '20210101090700110000122112190909’
and LAST_DTTM between '2021-01-01 00:00:00' and '2021-02-01 00:00:00' and dttm = ‘2021’

根据我们上面的优化步骤,我们在数据入湖的过程中,进行了如下的优化:

  • 采用 Parquet 列式压缩存储数据到 S3 数据湖,本例中,数据压缩比超过 1:11。
  • 利用 Lambda 和 S3 事件通知对有标签的数据进行分区,让查询能减少数据的扫描,从而提升查询效率、节省成本。
  • Athena 使用的 Presto 分布式执行引擎,能够自动并发的执行查询,并动态地扩展资源以使查询快速运行,为 S3 数据查询提供更快的速度。

6. 结论

利用这篇博客提供的方法,我们将十亿级超大表注入到 Amazon S3 构建的数据湖中,并对数据进行了分区处理,使用 Athena 对数据进行查询时,提高了数据查询的效率,能够更好的支持业务部门对数据进行分析和查询的需求。

另外,本案例中仅使用了仅托管的 DMS服务、SNS、无服务器的计算引擎 Lambda、无服务器的查询引擎 Athena 来构建完整的流程,减少了代码开发维护的工作量和操作复杂性。通过对 Parquet 格式进行列式压缩存储并对数据进行分区,减少 Athena 查询过程中扫描的数据量,也能够做到很好的成本节省。

7. 参考链接

本篇作者

刘殿明

SF-DSC IT 产品高级架构师, 主要负责 SF-DSC 公司的数据库产品运维、数据分析、EDI 产品运维。

程红波

亚马逊云科技解决方案架构师,负责企业级客户的架构咨询及设计优化,同时致力于容器和无服务器技术在国内和全球企业客户的应用和推广。

郭松

亚马逊云科技解决方案架构师,负责企业级客户的架构咨询及设计优化,同时致力于 AWS IoT 和存储服务在国内和全球企业客户的应用和推广。加入亚马逊云科技之前在 EMC 研发中心担任系统工程师,对企业级存储应用的高可用架构,方案及性能调优有深入研究。