亚马逊AWS官方博客

在 Amazon EMR 上使用 S3DistCp 在 HDFS 和 Amazon S3 之间高效迁移数据的七个技巧

Original URL:https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/

对于 Amazon EMR 客户来说,尽管在 Amazon S3 中直接处理数据早已稀松平常,但有时您可能需要将数据从 S3 复制到 Amazon EMR 集群上的 Hadoop 分布式文件系统 (HDFS) 中。此外,您的某个使用案例还可能需要在存储桶或区域之间迁移大量数据。在这类使用案例中,简单的复制操作对大型数据集来说显然不适用。Amazon EMR 可以提供这方面的帮助。它提供了一个实用程序 S3distCp,用以帮助将数据从 S3 迁移到其他 S3 位置或集群上的 HDFS。在 Hadoop 生态系统中, DistCp 通常用于迁移数据。DistCp 提供了基于 MapReduce 框架的分布式复制功能。 S3DistCp 是 DistCp 的扩展,它进行了优化使得其可以和S3结合使用,并新增了一些实用功能。除了在 HDFS 和 S3 之间迁移数据之外,S3DistCp 在文件处理方面同样得心应手。在本文中,我们将介绍以下使用 S3DistCp 的技巧。我们会从基本使用案例开始,然后逐渐过渡到更高级的应用场景:1. 无需转换即可复制或移动文件
2. 即时复制和更改文件压缩
3. 增量复制文件
4. 在一次作业中复制多个文件夹
5. 根据模式聚合文件
6. 上传大于 1TB 的文件
7. 将 S3DistCp step提交到 EMR 集群

1.无需转换即可复制或移动文件

据我们观察,客户经常使用 S3DistCp 将数据从一个存储位置复制到另一个存储位置(在 S3 和 HDFS 之间来回复制)。这一操作的语法简单明了:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table

源位置可能包含我们无需复制的其他文件。因此,我们可以使用基于正则表达式的过滤器来执行某些操作,例如仅复制扩展名为 .log 的文件。

 

每个子文件夹都有以下文件:

$ hadoop fs -ls /data/incoming/hourly_table/2017-02-01/03
Found 8 items
-rw-r--r--   1 hadoop hadoop     197850 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.25845.log
-rw-r--r--   1 hadoop hadoop     484006 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.32953.log
-rw-r--r--   1 hadoop hadoop     868522 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.62649.log
-rw-r--r--   1 hadoop hadoop     408072 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.64637.log
-rw-r--r--   1 hadoop hadoop    1031949 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.70767.log
-rw-r--r--   1 hadoop hadoop     368240 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.89910.log
-rw-r--r--   1 hadoop hadoop     437348 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.96053.log
-rw-r--r--   1 hadoop hadoop        800 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/processing.meta

要仅复制所需文件,请使用 --srcPattern 选项:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table_filtered --srcPattern .*\.log

成功上传文件后,我们检查一下目标位置中的文件夹内容,以确认是否仅复制了以 .log 结尾的文件:

$ hadoop fs -ls s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03
-rw-rw-rw-   1     197850 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.25845.log
-rw-rw-rw-   1     484006 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.32953.log
-rw-rw-rw-   1     868522 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.62649.log
-rw-rw-rw-   1     408072 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.64637.log
-rw-rw-rw-   1    1031949 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.70767.log
-rw-rw-rw-   1     368240 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.89910.log
-rw-rw-rw-   1     437348 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.96053.log

有时,不是要复制数据,而是要移动数据。在这种情况下,我们可以使用 –deleteOnSuccess 选项。此选项类似于 aws s3 mv,您以前可能在 AWS CLI 中使用过。首先复制文件,然后从源位置删除文件:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table_archive --deleteOnSuccess

完成上述操作后,源位置仅包含空文件夹,目标位置则包含所有文件。

$ hadoop fs -ls -R s3://my-tables/incoming/hourly_table/2017-02-01/
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/00
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/01
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/21
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/22


$ hadoop fs -ls s3://my-tables/incoming/hourly_table_archive/2017-02-01/01
-rw-rw-rw-   1     676756 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.27047.log
-rw-rw-rw-   1     780197 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.59789.log
-rw-rw-rw-   1    1041789 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.82293.log
-rw-rw-rw-   1        400 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/processing.meta

需要特别注意的是:S3DistCp 在带有–deleteOnSuccess 标志时会删除文件;即使父文件夹为空,它也不会将其删除。

2.即时复制和更改文件压缩

原始文件通常以未压缩的文本格式进入 S3 或 HDFS。无论是存储成本还是对该数据进行分析,此格式都不甚理想。S3DistCp 可以使用 –outputCodec 选项帮助您在线高效地存储数据和压缩文件:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table_filtered --dest s3://my-tables/incoming/hourly_table_gz --outputCodec=gz

S3DistCp 的当前版本支持编解码器 gzip、gz、lzo、lzop 和 snappy 以及关键字 nonekeep(默认值)。这些关键字含义如下:

  • none”– 保存为未压缩的文件。如果文件已压缩,则 S3DistCp 会将其解压缩。
  • keep”–不更改文件压缩形态,按原样复制。

我们检查一下目标文件夹中的文件,这些文件现在已经用 gz 编解码器压缩了:

$ hadoop fs -ls s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/
Found 3 items
-rw-rw-rw-   1     78756 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.27047.log.gz
-rw-rw-rw-   1     80197 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.59789.log.gz
-rw-rw-rw-   1    121178 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.82293.log.gz

3.增量复制文件

在实际操作中,上游进程会以某种节奏产生新的文件。例如,新文件可能每小时或每分钟创建一次。可以配置下游进程,按不同的日程安排接收文件。

假设数据传输到 S3 上,我们希望每天在 HDFS 上对其进行处理。每次复制所有文件并不能很好地扩展。幸运的是,S3DistCp 内置了应对此问题的解决方案。

对于此解决方案,我们使用清单文件。该文件允许 S3DistCp 跟踪复制的文件。以下是命令示例:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest s3://my-tables/processing/hourly_table --srcPattern .*\.log --outputManifest=manifest-2017-02-25.gz --previousManifest=s3://my-tables/processing/hourly_table/manifest-2017-02-24.gz

该命令将两个清单文件视为参数:outputManifestpreviousManifest。第一个包含所有已复制文件(旧文件和新文件)的列表,第二个包含之前复制的文件的列表。如此一来,我们可以重新创建完整的操作历史记录,并查看每次运行期间复制了哪些文件:

$ hadoop fs -text s3://my-tables/processing/hourly_table/manifest-2017-02-24.gz > previous.lst
$ hadoop fs -text s3://my-tables/processing/hourly_table/manifest-2017-02-25.gz > current.lst
$ diff previous.lst current.lst
2548a2549,2550
> {"path":"s3://my-tables/processing/hourly_table/2017-02-25/00/2017-02-15.00.50958.log","baseName":"2017-02-25/00/2017-02-15.00.50958.log","srcDir":"s3://my-tables/processing/hourly_table","size":610308}
> {"path":"s3://my-tables/processing/hourly_table/2017-02-25/00/2017-02-25.00.93423.log","baseName":"2017-02-25/00/2017-02-25.00.93423.log","srcDir":"s3://my-tables/processing/hourly_table","size":178928}

S3DistCp 使用路径 /tmp/mymanifest.gz 在本地文件系统中创建文件。复制操作完成后,它会将清单文件移至 <DESTINATION LOCATION>。

4.在一次作业中复制多个文件夹

假设我们需要复制多个文件夹。通常,我们运行的复制作业与需要复制的文件夹一样多。使用 S3DistCp,可以一次性完成复制。我们只需准备一个带有前缀列表的文件,并将其用作工具参数:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table_filtered --dest s3://my-tables/processing/sample_table --srcPrefixesFile file://${PWD}/folders.lst

在这种情况下,folders.lst 文件包含以下前缀:

$ cat folders.lst
s3://my-tables/incoming/hourly_table_filtered/2017-02-10/11
s3://my-tables/incoming/hourly_table_filtered/2017-02-19/02
s3://my-tables/incoming/hourly_table_filtered/2017-02-23

最终,目标位置仅包含请求的子文件夹:

$ hadoop fs -ls -R s3://my-tables/processing/sample_table
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-10
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-10/11
-rw-rw-rw-   1     139200 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-10/11/2017-02-10.11.12980.log
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-19
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-19/02
-rw-rw-rw-   1     702058 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-19/02/2017-02-19.02.19497.log
-rw-rw-rw-   1     265404 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-19/02/2017-02-19.02.26671.log
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-23
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-23/00
-rw-rw-rw-   1     310425 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-23/00/2017-02-23.00.10061.log
-rw-rw-rw-   1    1030397 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-23/00/2017-02-23.00.22664.log
...

5.根据模式聚合文件

经过优化后,Hadoop 可以从 S3 或 HDFS 中读取较少数量的大文件,而不再读取大量小文件。您可以使用 S3DistCp 将小文件聚合为较少的指定大小的大文件,这样可以优化分析性能和成本。

在下面的示例中,我们将小文件合并为较大的文件。为此,我们使用带有 –groupBy 选项的正则表达式。

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table --targetSize=10 --groupBy=’.*/hourly_table/.*/(\d\d)/.*\.log’

我们看一下目标文件夹,并将它们与相应的源文件夹进行比较:

$ hadoop fs -ls /data/incoming/hourly_table/2017-02-22/05/
Found 8 items
-rw-r--r--   1 hadoop hadoop     289949 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.11125.log
-rw-r--r--   1 hadoop hadoop     407290 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.19596.log
-rw-r--r--   1 hadoop hadoop     253434 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.30135.log
-rw-r--r--   1 hadoop hadoop     590655 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.36531.log
-rw-r--r--   1 hadoop hadoop     762076 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.47822.log
-rw-r--r--   1 hadoop hadoop     489783 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.80518.log
-rw-r--r--   1 hadoop hadoop     205976 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.99127.log
-rw-r--r--   1 hadoop hadoop        800 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/processing.meta

 

$ hadoop fs -ls s3://my-tables/processing/daily_table/2017-02-22/05/
Found 2 items
-rw-rw-rw-   1   10541944 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/054
-rw-rw-rw-   1   10511516 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/055

如您所见,我们将七个数据文件合并为两个接近 10MB(要求的大小)的文件。*.meta 文件已被筛选掉,因为 --groupBy 模式的工作方式与 -srcPattern 类似。我们建议文件大小超过默认的数据块大小,EMR 上的默认数据块大小为 128MB。

最终文件的名称由 --groupBy 中使用的正则表达式中的组加某些数字组成,以确保名称的唯一性。该模式必须至少定义一个组。

我们再来看一个例子。这次,我们希望文件名由三部分组成:年、月和文件扩展名(在本例中为 .log)。以下是更新后的命令:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table --targetSize=10 --groupBy=’.*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)’

现在,我们以不同的方式命名了最终文件:

$ hadoop fs -ls s3://my-tables/processing/daily_table/2017-02-22/05/
Found 2 items
-rw-rw-rw-   1   10541944 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/2017-05log4
-rw-rw-rw-   1   10511516 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/2017-05log5

如您所见,最终文件的名称由正则表达式 (2017-)(\d\d)(log) 中的 3 组组合而成。

您偶尔可能会收到以下错误:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table_2017 --targetSize=10 --groupBy=’.*/hourly_table/.*(2018-).*/(\d\d)/.*\.(log)’
...
17/04/27 15:37:45 INFO S3DistCp.S3DistCp: Created 0 files to copy 0 files
...
Exception in thread “main” java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.S3DistCp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.S3DistCp.S3DistCp.run(S3DistCp.java:705)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.S3DistCp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
…

在本例中,重要信息包含在 Created 0 files to copy 0 files 中。S3DistCpd 找不到要复制的文件,因为 --groupBy 选项中的正则表达式与源位置的任何文件都不匹配。

导致这一问题的原因有多种。例如,可能是指定模式存在错误。在前面的示例中,我们没有 2018 年的文件。另一个常见的原因是,当我们以step的形式提交 S3DistCp 命令时,模式的转义不正确,后文会讨论这一问题的解决方法。

6.上传大于 1TB 的文件

执行 S3 分段上传时,默认的上传区块大小为 128MB。当文件超过 1TB 时,分段总数可能会超过 10000。如此庞大的分段数量可能会使作业运行时间过长甚至失败。

在这种情况下,您可以通过增加每个分段的大小来提升作业性能。在 S3DistCp 中,可以使用 --multipartUploadChunkSize 选项来执行此操作。

我们测试一下它在处理大约 200GB 的多个文件时效果如何。使用默认分段大小,将文件从 HDFS 复制到 S3 大约需要 84 分钟。

我们可以将默认分段大小增加到 1000MB:

$ time s3-dist-cp --src /data/gb200 --dest s3://my-tables/data/S3DistCp/gb200_2 --multipartUploadChunkSize=1000
...
real    41m1.616s

最大分段大小为 5GB。请注意:分段越大,上传过程中出现故障的可能性就更高,并且不一定会加快上传过程。我们以最大分段大小来运行同一作业:

time s3-dist-cp --src /data/gb200 --dest s3://my-tables/data/S3DistCp/gb200_2 --multipartUploadChunkSize=5000
...
real    40m17.331s

7.提交S3DistCp step到 EMR 集群

您可以通过多种方式运行 S3DistCp 工具。首先,您可以像前面的示例那样,通过 SSH 连接到主节点并在终端窗口中执行命令。这种方法对于许多使用案例来说可能很方便,但是,有时您可能想要创建一个在HDFS上包含某些数据的集群。在创建集群时,您可以通过在 AWS 管理控制台直接提交一个步骤来实现这一操作。

在控制台“添加步骤”对话框中,我们按以下方法填写字段:

  • 步骤类型:Custom JAR
  • 名称*:S3DistCp Step
  • JAR 位置command-runner.jar
  • 参数s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest /data/input/hourly_table --targetSize 10 --groupBy .*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)
  • 失败操作继续

注意,我们没有在模式两边添加引号。在终端窗口中使用 bash 时,我们需要加引号,但在这里不需要。控制台负责转义并将参数传递给集群上的命令。

另一个常见的使用案例是周期性地或在某个事件上运行 S3DistCp。我们随时可以向现有集群提交新步骤。这里的语法与之前的示例略有不同。我们用逗号来分隔参数。如果是复杂的模式,我们用单引号将整个step选项括起来:

aws emr add-steps --cluster-id j-ABC123456789Z --steps 'Name=LoadData,Jar=command-runner.jar,ActionOnFailure=CONTINUE,Type=CUSTOM_JAR,Args=s3-dist-cp,--src,s3://my-tables/incoming/hourly_table,--dest,/data/input/hourly_table,--targetSize,10,--groupBy,.*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)'

小结

本文向您介绍了 S3DistCp 的基本工作原理,并重点介绍了一些最为实用的功能。其中包括如何使用 S3DistCp 优化不同大小的原始文件,以及如何有选择地在各位置间复制不同的文件。此外,我们还介绍了通过 SSH、AWS 管理控制台和 AWS CLI 使用工具的几个选项。

如果您有任何问题或建议,请在评论区留言。


后续步骤

继续丰富您的知识储备! 单击下面的文章,了解在 Amazon Athena 中提升查询性能的十大技巧。

Amazon Athena 的十大性能优化技巧


关于作者

Illya Yalovyy 是 Amazon Web Services 的高级软件开发工程师。 他致力于开发 EMR 前沿功能,并积极参与 Apache Hive、Apache Zookeeper、Apache Sqoop 等开源项目。他的业余时间全部贡献给了家庭和孩子。