亚马逊AWS官方博客

使用 Amazon Redshift 进行高性能 ETL 处理的八大最佳实践

ETL (Extract, Transform, Load) 的处理流程能够使您将数据从源系统加载到数据仓库中。通常 ETL 采用批处理或者近实时的数据摄入流程,以保证数据仓库能够为终端用户提供最新的分析数据。Amazon Redshift 是一个快速的、PB 规模的数据仓库,可以轻松地让您做出以数据作为驱动的决策。通过 Amazon Redshift ,您可以使用标准的 SQL ,以最具有成本效益的方式深入了解您的数据。您可以设置任何类型的数据模型,从星型和雪花模式,到简单的非规范化数据表来运行任何分析查询。要运行一个稳固的 ETL 平台,并及时向 Amazon Redshift 交付数据,需要设计能够考虑到 Amazon Redshift 架构的 ETL 流程。当从旧有数据仓库迁移到 Amazon Redshift 时,我们很容易地会采用直接迁移( lift-and-shift )的方式,但从长期的角度可能会导致性能和扩展问题。本文将指导您通过以下最佳实践来确保 ETL 流程最佳的、一致的运行时间:

  • 从多个、均匀大小的文件复制数据
  • 使用工作负载管理( WLM )来改善 ETL 的运行时间
  • 定期执行表维护工作
  • 在单个事务操作中执行多个步骤
  • 批量地加载数据
  • 使用 UNLOAD 命令导出大规模的结果数据集
  • 使用 Amazon Redshift Spectrum 执行 Ad-hoc 的 ETL 处理
  • 使用诊断查询监控每日 ETL 流程的健康状态

1. 从多个、均匀大小的文件复制数据

Amazon Redshift 是一个 MPP(大规模并行处理)架构的数据库,所有的计算节点将摄入数据的工作进行分解和并行化。每个节点进一步细分为片( slice ),每片都有一个或多个专用的内核,并均等地划分处理能力。每个节点的片数取决于集群的节点类型。例如,每个 DS2.XLARGE XLARGE 计算节点有两个切片,而每个 DS2.8XLARGE 计算节点有16个切片。

当您将数据加载到 Amazon Redshift 时,您应该希望每个切片都能有相同的工作量。当你从单个大文件或多个被切分成不同大小的文件中加载数据时,有些切片的工作量会比其他的要多。因此,进程的总体运行速度只会和最慢的切片一样快。在下面的示例中,一个大文件被加载到一个双节点集群中,但结果是只有一个节点“ Compute-0 ”执行了所有的数据摄取工作:

在分割您的数据文件时,请确保它们的大小基本相等——压缩后应在1 MB 到1 GB 之间。文件的数量应该是集群中片数的倍数。另外,强烈建议您使用 gzip 、 lzop 或 bzip2 对加载文件进行单独压缩,以有效地加载大型数据集。

当将多个文件加载到单个表中时,使用单个 COPY 命令来执行表,而不是使用多个 COPY 命令。 Amazon Redshift 将自动地并行化摄取数据。使用单个COPY命令将数据批量加载到表中可以确保集群资源的最佳使用,以及最快的吞吐量。

2. 使用工作负载管理( WLM )来改善 ETL 的运行时间

使用 Amazon Redshift 的工作负载管理( WLM )定义用于不同工作负载的多个队列(例如, ETL 和报告),并管理查询的运行时间。当您将更多的工作负载迁移到 Amazon Redshift 时,如果不适当地设置 WLM ,您的 ETL 处理时间就会变得不稳定。

我建议将 WLM 在所有队列中的总并发数限制在15或更少。这个 WLM 指南帮助您组织和监视 Amazon Redshift 集群的不同队列。

当管理 Amazon Redshift 集群上的不同工作负载时,请考虑下面的队列设置:

  • 创建一个专用于 ETL 进程的队列。为这个队列配置少量的槽位(5个或以下),槽位( slot )是用于处理查询的内存和 CPU 的单位。 Amazon Redshift 是为分析查询而设计的,而不是事务处理。 COMMIT 的成本相对较高,过度使用会导致查询将要等待对提交队列的访问。因为 ETL 是一个密集的 COMMIT 过程,拥有一个单独的队列和少量的插槽有助于缓解这个问题。
  • 为队列中请求额外可用内存。在执行 ETL 查询时,您可以利用 wlm_query_slot_count 来声明特定队列中可用的额外内存。例如,典型的 ETL 过程可能涉及到将原始数据复制到 staging 表中,以便下游 ETL 作业能够运行按每日、每周和每月聚合的转换任务。为了加快复制进程(以便下游任务可以更快地平行开始),可以配置系统表 wlm_query_slot_count 的内存分配。
  • 为报告查询创建单独的队列。在此队列上配置查询监视规则,以进一步管理长时间运行和开销较大的查询。
  •  利用动态内存参数配置。在 ETL 作业完成之后,它们可以将 ETL 中的内存分配给报表队列。

3. 定期执行表维护工作

AAmazon Redshift 是一个列式数据库,它支持对聚合数据进行快速转换。执行常规的表维护可以确保 ETL 工作是可预测的和良好性能。要从 Amazon Redshift 数据库获得最佳性能,您必须确保定期对数据库表进行 VACUUM 和 ANALYZE 操作。VACUUM 和 ANALYZE 实用程序能帮助您自动化地定期进行表维护工作。

  • 使用 VACUUM 对表进行排序并删除标记为已删除的数据

在一个典型的 ETL 刷新过程中,数据表使用 COPY 命令接收新的数据,使用DELETE 删除不需要的数据(冷数据)。 Redshift 将新的行添加到表中的未排序区域,而删除的行只是标记为删除。

DELETE 不会自动回收被删除行占用的空间。添加和删除大量的行可能导致未排序的区域和标记删除的块的数量增长,这会降低对这些表执行查询的性能。

在 ETL 过程完成后,执行 VACUUM 操作可以确保用户查询以一致的方式执行。可以使用 Amazon Redshift Util 的 table_info 脚本找到需要 VACUUM 的数据表的完整列表。

使用以下方法确保及时完成 VACCUM 操作:

  • 使用 wlm_query_slot_count 在 VACUUM 过程中声明 ETL WLM 队列中分配的所有内存
  • DROP 或 TRUNCATE 中间表或 staging 表,从而不用再对它们进行 VACUUM 处理。
  • 如果您的表有只有一个排序列作为复合排序键,请尝试按排序键顺序加载数据。这有助于减少或消除 VACUUM 命令的使用。
  • 考虑使用时间序列,这有助于减少需要 VACUUM 的数据量
  • 使用 ANALYZE 来更新数据库统计信息。

Amazon Redshift 使用基于成本的查询计划器和优化器,通过分析数据表的统计信息来对SQL语句的查询计划做出最佳判断。ETL 完成后的常规统计信息收集确保了用户查询的快速运行,并且每天的 ETL 进程都是高效的。Amazon Redshift 实用程序 table_info 脚本提供了对统计数据更新日期的分析。将统计信息(pct_stats_off)设置少于20%,可以确保SQL查询的有效查询计划。

4. 在单个事务操作中执行多个步骤

ETL 转换逻辑通常跨越多个步骤。由于在 Amazon Redshift 中 COMMITS 是代价较高的,如果每个 ETL 步骤都执行一个 COMMIT,那么多个并发的 ETL 进程可能需要很长时间才能执行完毕。

为了最小化进程中提交的 COMMIT 数量,ETL 脚本中的步骤应该被一个 BEGIN…END 语句包围,以便只有在执行完所有转换逻辑之后才执行一次 COMMIT。例如,这里有一个多步骤 ETL 脚本示例,它仅在末尾执行了一个提交:

Begin
CREATE temporary staging_table;
INSERT INTO staging_table SELECT .. FROM source (transformation logic);
DELETE FROM daily_table WHERE dataset_date =?;
INSERT INTO daily_table SELECT .. FROM staging_table (daily aggregate);
DELETE FROM weekly_table WHERE weekending_date=?;
INSERT INTO weekly_table SELECT .. FROM staging_table(weekly aggregate);
Commit

5.批量地加载数据

Amazon Redshift设计用于存储和查询PB级的数据集。使用Amazon S3,您可以在执行批量COPY操作之前,对来自多个源系统的数据进行分段和累积。以下方法可以有效快速地将这些数据集转移到Amazon Redshift:

  • 使用 manifest 文件摄取跨多个文件的大型数据集。Manifest 文件为 JSON 格式,列出了要加载到 Amazon Redshift中的所有文件。使用 Manifest 文件可以确保 Amazon Redshift 具有从 S3 加载的数据的一致视图,同时还可以确保重复的文件不会导致同一数据被加载超过一次。
  • 使用临时暂存表保存数据进行转换。ETL会话完成后,这些表将自动删除。可以使用 CREATE TEMPORARY TABLE 语法创建临时表,或者在 #TEMP_TABLE 查询中使用 SELECT…INTO #TEMP_TABLE。显式指定 CREATE TEMPORARY TABLE 语句允许您控制分布键、排序键和压缩设置,从而进一步提高性能。
  • 使用 ALTER table APPEND 功能,将数据从 staging 表中交换到目标表中。源表中的数据被移动到目标表中匹配的列中,列的顺序无关紧要。成功地将数据附加到目标表之后,源表变为空。ALTER TABLE APPEND 要比类似的 CREATE TABLE 或 INSERT INTO operation 快得多,因为它不涉及复制或移动数据。

6. 使用 UNLOAD 来提取大型结果数据集

使用 SELECT 获取大量的行代价比较高,并且需要很长的执行时间。当从 Amazon Redshift 集群获取大量数据时, leader 节点必须暂时保存数据,直到获取操作完成。此外,数据需要按顺序数据,这将导致更长的运行时间。因此, leader 节点会变得很热,这不仅会影响正在执行的查询,还会影响用来创建执行计划和管理整个集群资源的资源。下面是一个大型 SELECT 语句的示例。请注意,leader 节点正在执行大部分工作来将行输出:

使用 UNLOAD 直接提取大型结果数据集到 S3。当数据在 S3 中之后,可以与多个下游系统进行共享。默认情况下, UNLOAD 将根据集群中的分片数将数据并行写入多个文件。所有的计算节点都会参与到将数据快速卸载到 S3 中的操作。

如果您正在使用 Amazon Redshift Spectrum 提取数据,您可以使用 MAXFILESIZE  参数,将文件保存为150MB大小。与上面所提到的技巧类似,拥有许多大小相同的文件可以确保 Redshift Spectrum 可以并行地最大化完成任务

7. 使用 Redshift Spectrum 进行 Ad hoc 的 ETL 处理

诸如数据回填、促销活动和特殊交易日期之类的事件可能导致需要处理额外的数据量,从而影响 Amazon Redshift 集群中的数据刷新时间。为了帮助消除数据量和吞吐量的峰值,我们建议在 S3 中暂存数据。在 S3 中组织数据之后, Redshift Spectrum 允许您使用标准 SQL 直接查询数据。通过这种方式,您可以不用将数据导入到集群中,从而减小了因为集群扩容导致的额外开销。

有关开始使用和优化 Redshift Spectrum 功能的技巧, 请参阅 10 Best Practices for Amazon Redshift Spectrum

8. 使用诊断查询监控每日 ETL 流程的健康状态

定期监控 ETL 过程的运行状况有助于从一开始就识别出性能问题的原因, 避免之后对集群的重大影响。以下脚本可用来提供对 ETL 健康状况的监控:

脚本 何时使用(当。。。) 解决方案
commit_stats.sql – Commit queue statistics from past days, showing largest queue length and queue time first 当在同时执行多个 DML 操作如 INSERT/UPDATE/COPY/DELETE 时出现运行时间数倍延长的时候 为 ETL 过程设计独立的 WLM 队列,并将并发数降到5以下
copy_performance.sql –  Copy command statistics for the past days 每日的 COPY 操作需要更多时间来执行时

参考 best practices for the COPY command

根据进入的数据量来分析数据增长趋势,同时考虑通过扩展集群来达到期望的 SLA

table_info.sql – Table skew and unsorted statistics along with storage and key information 数据转换步骤需要更长执行时间时 设计定期的 VACCUM 任务来消除未排列的行并删掉已声明删除的数据块,从而让数据转换的 SQL 脚本执行得更有效率
考虑重新进行表设计来避免数据倾斜
v_check_transaction_locks.sql – Monitor transaction locks 与在 ETL 之后运行时相比,在特定表上插入/更新/复制/删除(INSERT/UPDATE/COPY/DELETE)操作不会获得及时响应 可能是多个 DML 语句在同一个目标表上同时在不同的事务上运行导致。可以设置 ETL 作业依赖项,以便对相同的目标表执行串行操作。
v_get_schema_priv_by_user.sql – Get the schema that the user has access to 报告的用户可以查考中间表 为报告和 ETL 用户各自创建独立的数据库用户组,使用 GRANT 命令分配相应的对象访问权限。
v_generate_tbl_ddl.sql – Get the table DDL 当你打算做数据的回填,需要创建一张与目标表相同结构的空数据表时 利用这个脚本生成DDL命令来协助数据回填。
v_space_used_per_tbl.sql – monitor space used by individual tables 当 Redshift 数据仓库的使用空间增速超过平常时

分析那些数据量增速高于正常的数据表,考虑通过 UNLOAD 命令将数据归档到 S3,并使用 Spectrum 来查询

使用 unscanned_table_summary.sql 来查找未使用的表并进行归档或者删除

top_queries.sql – Return the top 50 time consuming statements aggregated by its text ETL 转换的时间比平时要长 分析转换时间最长的 SQL 语句,并使用 EXPLAIN 来找到优化查询计划的方法

amazon-redshift-utils 资料库中还有其他一些有用的脚本。AWS Lambda Utility Runner 定期运行这些脚本的一个子集,从而允许您自动地监视 ETL 进程。

示例 ETL 过程

下面的 ETL 过程强化了本文中讨论的一些最佳实践。请考虑下面的四步处理每日 ETL 工作流的方法。其中 RDBMS 源系统中的数据在 S3 中进行缓存,然后加载到 Amazon Redshift 集群中。Amazon Redshift 集群用于计算每日、每周和每月的聚合,再把结果数据集卸载到 S3,在 S3 中可以进一步处理这些聚合,并可使用许多不同的工具(包括 Redshift Spectrum 和 Amazon Athena)向最终用户提供报告服务。下面我们分步骤进一步进行解释:

第一步:将数据从 RDBMS 源系统中导出到 S3 存储桶

在这个 ETL 过程中,数据提取操作每隔1小时从源系统提取一次变化的数据,并将其按小时分为多个文件缓存在 S3 目录中。例如,分级的 S3 文件夹如下所示:

 [ec2-user@ip-172-81-1-52 ~]$ aws s3 ls s3://<<S3 Bucket>>/batch/2017/07/02/
2017-07-02 01:59:58   81900220 20170702T01.export.gz
2017-07-02 02:59:56   84926844 20170702T02.export.gz
2017-07-02 03:59:54   78990356 20170702T03.export.gz
…
2017-07-02 22:00:03   75966745 20170702T21.export.gz
2017-07-02 23:00:02   89199874 20170702T22.export.gz
2017-07-02 00:59:59   71161715 20170702T23.export.gz

将数据放置到多个大小均匀的文件中,在  Amazon Redshift 集群中使用所有资源执行 COPY 命令来摄取此数据。此外,通过压缩文件(gzipped)进一步减少拷贝时间。

第二步:将数据缓存到 Redshift 数据表中进行清洗操作

可以使用基于JSON的清单文件来完成获取数据。使用清单文件确保 S3 最终的一致性问题可以被消除,并提供了在需要时对任何文件进行删除的机会。以下为示例文件 manifest20170702.json 的内容:

{
  "entries": [
    {"url":" s3://<<S3 Bucket>>/batch/2017/07/02/20170702T01.export.gz", "mandatory":true},
    {"url":" s3://<<S3 Bucket>>/batch/2017/07/02/20170702T02.export.gz", "mandatory":true},
    …
    {"url":" s3://<<S3 Bucket>>/batch/2017/07/02/20170702T23.export.gz", "mandatory":true}
  ]
}

可以使用以下命令获取数据:

SET wlm_query_slot_count TO <<max available concurrency in the ETL queue>>;
COPY stage_tbl FROM 's3:// <<S3 Bucket>>/batch/manifest20170702.json' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole' manifest;

由于下游的 ETL 进程依赖于这个 COPY 命令来完成,所以使用 wlm_query_slot_count 来声明队列可用的所有内存。这将帮助 COPY 命令尽快完成。

第三步:将数据进行转换并创建每日、每周和每月的数据集并导入到目标表中

数据被放置在“stage_tbl”中,从这里可以将数据转换为每日、每周和每月的聚合并加载到目标表中。以下任务展示了一个典型的每周流程:

Begin
INSERT into ETL_LOG (..) values (..);
DELETE from weekly_tbl where dataset_week = <<current week>>;
INSERT into weekly_tbl (..)
  SELECT date_trunc('week', dataset_day) AS week_begin_dataset_date, SUM(C1) AS C1, SUM(C2) AS C2
	FROM   stage_tbl
GROUP BY date_trunc('week', dataset_day);
INSERT into AUDIT_LOG values (..);
COMMIT;
End;

如上所示,将多个步骤合并到一个事务中并执行单个提交,可以减少提交队列上的冲突。

第四步:将每日数据集导入到 S3 数据湖的存储桶中

转换后的结果现在被导入到另一个 S3 桶中,在那里可以进一步处理它们,并使用许多不同的工具(包括 Redshift  Spectrum 和 Amazon Athena)向最终用户报告。以下为示例命令:

unload ('SELECT * FROM weekly_tbl WHERE dataset_week = <<current week>>’) TO 's3:// <<S3 Bucket>>/datalake/weekly/20170526/' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole';

总结

Amazon Redshift允许您轻松地在云上操作PB级的数据仓库。本文总结了在Amazon Redshift中原生运行可扩展的ETL的最佳实践,并演示了摄入和转换数据的有效方法,以及密切监视整个ETL过程的技巧。同时本文还演示了一个典型的将数据转换进Amazon Redshift集群的最佳实践。

本篇作者

Thiyagarajan Arumugam

AWS 大数据架构师。 加入 AWS 之前在 Amazon.com 负责数据库解决方案。

译者介绍

屈铭

AWS 中国专业服务团队大数据咨询顾问,曾供职于亚马逊电商和澳大利亚智能交通研究机构,拥有多年电商平台和智慧供应链的数据分析经验。现任职于 AWS 中国专业服务团队,主要为客户提供云上大数据平台设计、数据湖架构设计、数据仓库解决方案、数据建模等咨询服务。