我该如何在 Amazon EMR 中串联 Parquet 文件?

上次更新时间:2020 年 7 月 9 日

我使用 S3DistCp (s3-dist-cp) 并选择 --groupBy 和 --targetSize 选项以 Apache Parquet 格式串联文件。s3-dist-cp 作业完成并且未发生错误,但生成的 Parquet 文件却已损坏。当尝试在应用程序中读取 Parquet 文件时,我看到与以下内容类似的错误消息:

“Expected n values in column chunk at /path/to/concatenated/parquet/file offset m but got x values instead over y pages ending at file offset z”

我该如何在 Amazon EMR 中串联 Parquet 文件?

简短描述

S3DistCp 不支持串联 Parquet 文件。因此应使用 PySpark

解决方法

您不能在 PySpark 中指定目标文件大小,但您可以指定分区的数量。Spark 会将每个分区保存到单独的输出文件中。要估算您需要的分区数量,将数据集的大小除以单个目标文件的大小。

1.    使用安装的 Apache Spark 创建 Amazon EMR 集群

2.    指定您需要的执行程序的数量。该数量由集群的容量和数据集的大小决定。如需更多信息,见 Best practices for successfully managing memory for Apache Spark applications on Amazon EMR

$  pyspark --num-executors number_of_executors

3.    将源 Parquet 文件加载至 Spark DataFrame。它可以是 Amazon Simple Storage Service (Amazon S3) 路径或 HDFS 路径。例如:

df=sqlContext.read.parquet("s3://awsdoc-example-bucket/parquet-data/")

HDFS:

df=sqlContext.read.parquet("hdfs:///tmp/parquet-data/")

4.    对 DataFrame 进行重新分区。在以下示例中,n 是分区的数量。

df_output=df.coalesce(n)

5.    将 DataFrame 保存到目标位置。它可以是 Amazon S3 路径或 HDFS 路径。例如:

df_output.write.parquet("URI:s3://awsdoc-example-bucket1/destination/")

HDFS:

df=sqlContext.read.parquet("hdfs:///tmp/destination/")

6.    验证目标目录中现在有多少文件:

hadoop fs -ls "URI:s3://awsdoc-example-bucket1/destination/ | wc -l"

文件总数应该是第 4 步的 n 值加 1。Parquet 输出提交程序会写入另一个额外的文件,名为 _SUCCESS


这篇文章对您有帮助吗?


您是否需要账单或技术支持?