亚马逊AWS官方博客

AWS Glue 扩展 Apache Spark 作业以及数据分区的最佳实践

Original URL : https://aws.amazon.com/blogs/big-data/best-practices-to-scale-apache-spark-jobs-and-partition-data-with-aws-glue/

AWS Glue 提供了一个无服务器环境,用于准备(提取和转换)和从各种来源加载大量数据集,它通过 Apache Spark ETL 作业进行分析和数据处理。本系列博文讨论了帮助 Apache Spark 应用程序和 Glue ETL 作业的开发人员、大数据架构师、数据工程师和业务分析师自动扩展在 AWS Glue 上运行的数据处理作业的最佳实践。

本系统的第一篇博文讨论管理数据处理作业扩展的两项关键的 AWS Glue 功能。第一项让您可以为大型可分割数据集的Apache Spark 应用程序进行水平扩展。第二项则允许您在新 AWS Glue 工作线程类型的帮助下对内存密集型 Apache Spark 应用程序进行垂直扩展。该博文还将介绍在 AWS Glue中,针对采用 Amazon Kinesis Data Firehose 的流应用程序中提取的大量小文件,如何来扩展 Apache Spark 应用程序。最后,文章将介绍对 AWS Glue 作业如何利用在 Amazon S3 上大型数据集的分区结构,来缩短 Apache Spark 应用程序的执行时间。

理解 AWS Glue 工作线程类型

AWS Glue 具有三种工作线程类型,来帮助客户选择能满足其作业延迟与成本要求的配置。这些工作线程,也被称作 数据处理单元 (DPU),分别为标准型、G.1X 和 G.2X 配置。

标准型工作线程由 16 GB 内存,4 个 vCPU 的计算容量,以及 50 GB 附加 EBS 存储和两个 Spark 执行程序组成。G.1X 工作线程由 16 GB 内存,4 个 vCPU,以及 64 GB 附加 EBS 存储和一个 Spark 执行程序组成。G.2X 工作线程所分配的内存、磁盘空间和 vCPU 是 G.1X 工作线程类型的两倍,并拥有一个 Spark 执行程序。如需关于 AWS Glue 工作线程类型的更多详细信息,见与 AWS Glue 作业有关的文档。

不管是哪种工作线程类型,可被用于水平扩展的计算并行性(不同 DPU 的 Apache Spark)都是相同的。例如,标准型和 G.1X 工作线程都映射到 1 个 DPU,每个都可运行八个并发任务。G.2X 工作线程映射到 2 个 DPU,每个可运行 16 个并发任务。因此,拥有高度数据并行性的计算密集型 AWS Glue 作业可以从水平扩展(更多标准型或 G.1X 工作线程)中获益。需要较大内存或充足磁盘空间来存储中间混洗输出的 AWS Glue 作业可以从垂直扩展(更多 G.1X 或 G.2X 工作线程)中获益。

适用于可分割数据集的水平扩展

AWS Glue 会在从使用 AWS Glue DynamicFrame 的 S3 读取常见的原生格式(如 CSV 和 JSON)和现代文件格式(如 Parquet 和 ORC)时自动支持文件分割。如需关于 DynamicFrame 的更多信息,见在 AWS Glue 中处理分区数据

文件分割是 Spark 任务可在 AWS Glue 工作线程上独立读取与处理的文件的一部分。在默认的情况下,会为以换行符分隔的原生格式启用文件分割,允许 Apache Spark 作业在 AWS Glue 上运行,以便在多个执行程序中对计算进行并行化。使用中型(数百 MB)或大型(数 GB)文件处理大型可分割数据集的 AWS Glue 作业可以从水平扩展中获益,并且通过添加更多 AWS Glue 工作线程加快运行。

文件分割还有利于基于块的压缩格式,如 bzip2。您可以在文件分割边界读取每个压缩块,并独立对其进行处理。不可分割的压缩格式(如 gzip)无法从文件分割中获得好处。要水平扩展读取不可分割文件或压缩格式的作业,使用多个中型大小文件准备输入数据集。

 

每个文件分割(图中的蓝色方块)都从 S3 读取,并反序列化到 AWS Glue DynamicFrame 分区,然后由 Apache Spark 任务(图中的齿轮图标)对其进行处理。反序列化分区尺寸可以明显大于磁盘上的 64 MB 文件分割,尤其是采用不可分割压缩格式(如 gzip)的高度压缩可分割文件格式(如 Parquet 或大文件)。反序列化分区通常不会在内存中缓存,而且仅在 Apache Spark 转换的延迟计算需要时构建,因此不会对 AWS Glue 工作线程产生任何内存压力。如需关于延迟计算的更多信息,见 Apache Spark 网站上的 RDD 编程指南

不过,在内存中对分区进行显式缓存或在 AWS Glue ETL 脚本或 Apache Spark 应用程序中将其溢出到本地磁盘可能导致内存不足 (OOM) 或磁盘空间不足异常。AWS Glue 可以通过使用较大的 AWS Glue 工作线程类型支持此类使用案例,为 AWS Glue ETL 作业垂直扩展 DPU 实例。

使用较大的工作线程类型垂直扩展 Apache Spark 作业

支持 AWS Lake Formation 的各种 AWS Glue ETL 作业、Apache Spark 应用程序和新的机器学习 (ML) Glue 转换有较高的内存和磁盘要求。运行这些工作负载可能会给执行引擎造成很大的内存压力。由于 OOM 或磁盘空间不足异常,这样的内存压力可能导致作业失败。您可能会遇到关于内存和磁盘空间的 Yarn 异常。

超出 Yarn 内存开销

Apache Yarn 要负责分配运行您的 Spark 应用程序所需的集群资源。应用程序包括 Spark 驱动程序和多个执行程序 JVM。除了分配每个执行程序运行作业所需的内存,Yarn 还会分配额外的开销内存,以适应 JVM 开销、interned 字符串和 JVM 所需的其他元数据。配置参数 spark.yarn.executor.memoryOverhead 默认为总执行程序内存的 10%。内存密集型操作(如合并大型表或处理在特定列值分布中有偏斜的数据集)可能超出内存阈值,并导致出现以下错误消息:

18/06/13 16:54:29 ERROR YarnClusterScheduler: Lost executor 1 on ip-xxx:
Container killed by YARN for exceeding memory limits.5.5 GB of 5.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.

磁盘空间

Apache Spark 使用 Glue 工作线程上的本地磁盘从超出由 spark.memory.fraction 配置参数所定义堆空间的内存溢出数据。在对作业各阶段进行排序或混洗期间,Spark 会将中间数据写入到本地磁盘,之后它才能在不同的工作线程之间交换此类数据。在无磁盘空间时,由于以下异常,作业可能会失败:

java.io.IOException: No space left on device
UnsafeExternalSorter: Thread 20 spilling sort data of 141.0 MB to disk (90 times so far)

AWS Glue 作业指标

在最常见的情况下,这是作业所处理的数据集发生明显偏斜时的结果。您也可以通过使用 AWS Glue 作业指标对不同 Apache Spark 执行程序的执行时间线进行监控时识别偏斜。如需更多信息,见调试要求苛刻的阶段和落后任务

以下 AWS Glue 作业指标图表显示了 AWS Glue ETL 作业中不同执行程序的执行时间线和内存配置文件。其中一个执行程序(红线)因为处理较大分区而落后,而且在作业持续的大多数时间主动消耗内存。

借助于 AWS Glue 的垂直扩展功能,内存密集型 Apache Spark 作业可以使用具有较大内存和磁盘空间的 AWS Glue 工作线程,以帮助解决此两种常见的失败原因。使用 AWS Glue 作业指标,您还可以通过为正在运行的作业检查驱动和执行程序的内存使用情况,调试 OOM 并确定理想的工作线程类型。如需更多信息,见调试 OOM 异常和作业异常

一般而言,执行内存密集型操作的作业可以从 G.1X 工作线程类型中获益,而使用 AWS Glue 的 ML 转换或类似 ML 工作负载的作业也可以从 G.2X 中获得好处。

通过Apache Spark UI监控AWS Glue 作业

您还可以使用适用于 Spark UI 的 AWS Glue 支持,通过对 Spark 执行的有向无环图 (DAG) 进行可视化以检查与扩展您的 AWS Glue ETL 作业,同时监控要求苛刻的阶段、大量混洗,并检查 Spark SQL 查询计划。如需更多信息,见使用 Apache Spark Web UI 监控作业

以下关于 Spark UI 的 Spark SQL 查询计划显示了 ETL 作业的 DAG,此类作业从 S3 读取两个表,执行会导致 Spark 混洗的外联结,并且以 Parquet 格式将结果写入到 S3。

从计划中可见,合并转换的 Spark 混洗和后续的排序操作将占用大部分作业执行时间。通过 AWS Glue 垂直扩展,每个 AWS Glue 工作线程会将更多 Spark 任务置于同一个地方,因此减少网络中的数据交换数量。

扩展以处理大量小文件

AWS Glue ETL 作业可能会从 S3 读取数千或数百万个文件。这对于 Kinesis Data Firehose 或将数据写入 S3 的流应用程序来说十分常见。Apache Spark 驱动程序可能在尝试读取大量文件时遇到内存不足的问题。当出现此问题时,您会看到以下错误消息:

# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
# Executing /bin/sh -c "kill -9 12039"...

Apache Spark v2.2 可以管理在标准型 AWS Glue 工作线程类型上的大约 650000 个文件。要处理更多文件,AWS Glue 会提供根据每个 AWS Glue 工作线程的 Spark 任务以较大的组读取输入文件的选项。如需更多信息,见以较大的组读取输入文件

您可以通过使用 AWS Glue 文件分组减少从启动某项 Apache Spark 任务到处理每个文件的多余并行性。此方法降低了 Spark 驱动程序上 OOM 异常的几率。要配置文件分组,您需要设置 groupFilesgroupSize 参数。以下代码示例使用 ETL 脚本中的 AWS Glue DynamicFrame API 和这些参数:

dyf = glueContext.create_dynamic_frame_from_options("s3",
    {'paths': ["s3://input-s3-path/"],
    'recurse':True,
    'groupFiles': 'inPartition',
    'groupSize': '1048576'},
    format="json")

您可以在 Hive 式 S3 分区 (inPartition) 或多个 S3 分区中 (acrossPartition) 设置 groupFiles 对文件进行分组。在大多数情况下,在分区内分组足以减少并发 Spark 任务的数量,以及 Spark 驱动程序的内存占用。在基准测试中,通过 inPartition 分组选项进行配置的 AWS Glue ETL 作业在处理分布于 160 个不同 S3 分区的 320000 个 JSON 小文件时比原生的 Apache Spark v2.2 大约快七倍。Apache Spark 会将大部分时间用于构建内存内索引,同时列出 S3 文件并安排大量短时间运行的任务,以对每个文件进行处理。在启用 AWS Glue 分组后,基准 AWS Glue ETL 作业可以使用标准型 AWS Glue 工作线程类型处理超过 100 万个文件。

groupSize 是一个可选字段,让您可以配置每个 Spark 任务作为一个 AWS Glue DynamicFrame 分区能够读取和处理的数据量。若在运行作业前了解文件大小的分布,用户可以设置 groupSizegroupSize 参数允许您控制 AWS Glue DynamicFrame 分区的数量,进而控制输出文件的数量。然而,使用非常小或非常大的 groupSize 可能分别导致明显的任务并行性,或集群的利用不足。

AWS Glue 会在输入文件数量或任务并行性超过 50000 阈值时默认自动开启分组,而无需任何手动配置。groupFiles 参数的默认值为 inPartition,因此每个 Spark 任务仅读取相同 S3 分区内的文件。AWS Glue 会自动计算 groupSize 参数,并对其进行配置以减少多余并行性,然后通过并行运行的足够多的 Spark 任务充分利用集群计算资源。

数据分区和下推谓词

数据分区已成为数据集整理的一项重要的技术,因此可对其进行高效查询的大数据类型也有很多。分层目录结构基于一个或多个列的唯一值对数据进行整理。例如,您可以按照日期对 S3 内应用程序日志加以分区,并按照年、月、日进行细分。对应单天数据的文件会收到与下列类似的前缀:

s3://my_bucket/logs/year=2018/month=01/day=23/

分区列的谓词下推

AWS Glue 支持下推谓语,为分区列定义筛选标准以填充 AWS Glue 数据目录中的表。您可以在分区列上以 WHERE 子句的格式提供 SQL 谓语,而不是在执行时读取所有数据和筛选结果。例如,假设以年份列对表进行分区并运行 SELECT * FROM table WHERE year = 2019year 代表分区列,而 2019 为筛选标准。

AWS Glue 仅从 S3 分区列出与读取满足谓语而且处理所需的文件。

要做到这一点,使用 Spark SQL 表达式语言作为除 AWS Glue DynamicFrame getCatalogSource 方法以外的其他参数指定谓语。只要此谓语仅使用分区列进行筛选,它可以是任何 SQL 表达式,或由用户定义的计算结果为布尔值的函数。

此示例演示了此功能,以及以年、月、日进行分区的 Github 事件数据集。以下代码示例仅读取与发生于周末的事件相关的此类 S3 分区:

%spark

val partitionPredicate ="date_format(to_date(concat(year, '-', month, '-', day)), 'E') in ('Sat', 'Sun')"

val pushdownEvents = glueContext.getCatalogSource(
   database = "githubarchive_month",
   tableName = "data",
   pushDownPredicate = partitionPredicate).getDynamicFrame()

您可以在这里使用 SparkSQL 字符串 concat 函数来构建日期字符串。to_date 函数会将其转换成日期对象,而采用 “E” 模式的 date_format 函数会将该日期转换成以三个字符表示的星期几(如,Mon 或 Tue)。如需关于这些函数、Spark SQL 表达式和由用户定义函数的更多一般性信息,见 Apache Spark 网站上的 Spark SQL、DataFrame 和数据集指南函数列表

当对 AWS Glue 数据目录分区进行修剪时,AWS Glue ETL 作业会有明显的性能提升。它会缩短 Spark 查询引擎运行时在 S3 中列出文件,并且读取与处理数据所需的时间。您可以使用更高选择性的谓语排除其他分区,从而实现进一步的提升。

在写入 S3 之前及过程中对数据进行分区

在从 AWS Glue DynamicFrame 将结果写出时,数据默认不会被分区,而会在指定输出路径下的顶层写入全部输出文件。AWS Glue 会通过在创建接收器时传递 partitionKeys 选项,启用对 DynamicFrame 的分区。例如,以下代码示例会按照类型列并以 Parquet 格式将数据集写出到 S3 分区:

%spark

glueContext.getSinkWithFormat(
    connectionType = "s3",
    options = JsonOptions(Map("path" -> "$outpath", "partitionKeys" -> Seq("type"))),
    format = "parquet").writeDynamicFrame(projectedEvents)

在此示例中,$outpath 是 S3 中基本输出路径的占位符。partitionKeys 参数对应在 S3 中用于对输出进行分区的列的名称。当您执行写入操作时,它会从个别记录中移除类型列,然后采用目录结构对其进行编码。要进行演示,您可以使用以下 AWS CLI 的 aws s3 ls 命令列出输出路径:

PRE type=CommitCommentEvent/
PRE type=CreateEvent/
PRE type=DeleteEvent/
PRE type=ForkEvent/
PRE type=GollumEvent/
PRE type=IssueCommentEvent/
PRE type=IssuesEvent/
PRE type=MemberEvent/
PRE type=PublicEvent/
PRE type=PullRequestEvent/
PRE type=PullRequestReviewCommentEvent/
PRE type=PushEvent/
PRE type=ReleaseEvent/
PRE type=WatchEvent/

如需更多信息,见 AWS CLI 命令参考中的 aws . s3 . ls

一般来说,您应该为较低基数,而且最常用于筛选或对查询结果进行分组的 partitionKeys 选择列。例如,在分析 AWS CloudTrail 日志时,我们经常查找发生在某个日期范围内的事件。因此,按照年、月、日对 CloudTrail 数据进行分区可以提升查询性能,并且减少扫描所需的数据数量以便返回结果。

对输出进行分区可以起到一石二鸟的作用。首先,它能缩短最终用户查询的执行时间。其次,在将多项作业整合到一个数据管道时拥有适当的分区方案有助于避免下游 AWS Glue ETL 作业中成本高昂的 Spark 混洗操作。如需更多信息,见在 AWS Glue 中处理分区数据

S3 或 Hive 式分区与 Spark RDD 或 DynamicFrame 分区不同。Spark 分区关乎 Spark 或 AWS Glue 如何将大型的数据集细分为较小且更易管理的块,以方便并行读取与应用转换。AWS Glue 工作线程在内存中管理此类型分区。您可以在作业执行期间的任何时间点及将数据写入 S3 之前在 DynamicFrame 上使用 repartitioncoalesce 函数,从而进一步控制 Spark 分区。您可以通过 repartition 函数明确指定分区的总数或选择对数据进行分区的列,以设置分区的数量。

使用 reparation 或 coalesce 函数对数据集进行重新分区经常会导致 AWS Glue 工作线程交换(混洗)数据,进而可能影响到作业的运行时间并增加内存压力。对比之下,通过 Hive 式分区将数据写入 S3 不需要任何数据混洗,而只需在每个工作线程节点上对其进行本地排序。S3 中非 Hive 式分区的输出文件的数量粗略与 Spark 分区的数量相对应。相反,S3 中 Hive 式分区的输出文件的数量可能因为分区键在每个 AWS Glue 工作线程上的分布而有所差异。

结论

本文介绍了如何在 AWS Glue 上为计算和内存密集型作业扩展您的 ETL 作业与 Apache Spark 应用程序。AWS 通过数据集和不同类型 AWS Glue 工作线程的并行性缩短了作业执行时间并高效地管理内存。它还可以自动调整工作负载和集群的并行性,从而帮助您克服处理众多小文件的挑战。AWS Glue ETL 作业使用 AWS Glue 数据目录,并借助于谓语下推启用无缝的分区修剪。此外,它还通过下游 Apache Spark 应用程序和其他分析引擎(如 Amazon AthenaAmazon Redshift)让您可以对 S3 中的数据集进行高效分区,从而加快查询。我们希望您可以在 AWS Glue 上为您的 Apache Spark 应用程序尝试这些最佳实践。

本系列中的第二篇博文将向您介绍如何利用 AWS Glue 功能来批量处理大型的历史数据集,并且在 S3 数据湖中进行增量处理。它还将演示如何使用自定义的 AWS Glue Parquet 写入器实现更快的作业执行。

 


关于作者

Mohit Saxena 是 AWS Glue 的技术主管。他热衷于构建可扩展的分布式系统,以有效地管理云中的数据。他喜欢看电影和阅读有关最新技术的内容。