Amazon Web Services ブログ

Amazon EMRでS3DistCpを使用してHDFSとAmazon S3間で効率的にデータを移動するための7つのヒント

Amazon S3とHadoop Distributed File System(HDFS)の間で大量のデータを移動する必要があったものの、データセットが単純なコピー操作には大きすぎるということはありませんでしたか? EMRはこれを救うことができます。ペタバイト級のデータの処理と分析に加えて、EMRは大量のデータの移動もできます。

Hadoopエコシステムでは、DistCpがデータを移動するためによく使用されます。 DistCpは、MapReduceフレームワークの上に構築された分散コピー機能を提供します。 S3DistCpは、S3で動作するように最適化されたDistCpの拡張機能であり、いくつかの便利な機能が追加されています。 S3DistCpは、HDFSとS3の間でデータを移動するだけでなく、ファイル操作のスイスアーミーナイフです。この記事では、S3DistCpを使用するための基本的なユースケースから始めて、さらに高度なシナリオまでのヒントについて説明します。

  1. 変換なしにファイルをコピーまたは移動する
  2. ファイル圧縮を変更しつつコピーする
  3. ファイルを段階的にコピーする
  4. 1つのジョブで複数のフォルダをコピーする
  5. パターンに基づいてファイルを集約する
  6. サイズが1TBを超えるファイルをアップロードする
  7. S3DistCpステップをEMRクラスターにサブミットする

1. 変換なしにファイルをコピーまたは移動する

S3またはHDFSのいずれの場合でも、S3DistCpを使用してあるストレージの場所から別の場所にデータをコピーすることがよくあります。この操作の構文は簡単で明瞭です。

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table

ソースの場所には、必ずしもコピーしたくない余分なファイルが含まれている可能性があります。ここでは、正規表現に基づくフィルタを使用して、.log拡張子のファイルのみをコピーするなどの作業を行うことができます。

各サブフォルダには、次のファイルがあります。

$ hadoop fs -ls /data/incoming/hourly_table/2017-02-01/03
Found 8 items
-rw-r--r--   1 hadoop hadoop     197850 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.25845.log
-rw-r--r--   1 hadoop hadoop     484006 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.32953.log
-rw-r--r--   1 hadoop hadoop     868522 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.62649.log
-rw-r--r--   1 hadoop hadoop     408072 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.64637.log
-rw-r--r--   1 hadoop hadoop    1031949 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.70767.log
-rw-r--r--   1 hadoop hadoop     368240 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.89910.log
-rw-r--r--   1 hadoop hadoop     437348 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.96053.log
-rw-r--r--   1 hadoop hadoop        800 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/processing.meta

必要なファイルのみをコピーするには、--srcPatternオプションを使用しましょう:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table_filtered --srcPattern .*\.log

アップロードが正常に終了したら、コピー先の場所のフォルダの内容を確認して、.logで終わるファイルのみがコピーされたことを確認します。

$ hadoop fs -ls s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03
-rw-rw-rw-   1     197850 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.25845.log
-rw-rw-rw-   1     484006 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.32953.log
-rw-rw-rw-   1     868522 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.62649.log
-rw-rw-rw-   1     408072 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.64637.log
-rw-rw-rw-   1    1031949 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.70767.log
-rw-rw-rw-   1     368240 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.89910.log
-rw-rw-rw-   1     437348 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.96053.log

データをコピーする代わりに移動する必要があることがあります。この場合、--deleteOnSuccessオプションを使用できます。AWS CLIで使用したことがある方もいるかもしれませんが、このオプションはaws s3 mvと似ています。ファイルは最初にコピーされ、ソースから削除されます。

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table_archive --deleteOnSuccess

前の操作の後、ソースの場所には空のフォルダのみがあり、ターゲットの場所にはすべてのファイルが含まれます。

$ hadoop fs -ls -R s3://my-tables/incoming/hourly_table/2017-02-01/
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/00
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/01
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/21
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/22


$ hadoop fs -ls s3://my-tables/incoming/hourly_table_archive/2017-02-01/01
-rw-rw-rw-   1     676756 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.27047.log
-rw-rw-rw-   1     780197 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.59789.log
-rw-rw-rw-   1    1041789 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.82293.log
-rw-rw-rw-   1        400 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/processing.meta

ここで覚えておくべき重要なことは、S3DistCpは--deleteOnSuccessフラグを持つファイルだけを削除し、親フォルダが空であっても削除しないことです。

2. ファイル圧縮を変更しつつコピーする

未加工のファイルは、圧縮されていないテキスト形式でS3またはHDFSに格納されることがよくあります。このフォーマットは、ストレージのコストとそのデータに対する分析の実行の両方に最適とは限りません。 S3DistCpは--outputCodecオプションを使用して効率的にデータを保存し、ファイルを即座に圧縮するのに役立ちます:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table_filtered --dest s3://my-tables/incoming/hourly_table_gz --outputCodec=gz

現在のバージョンのS3DistCpでは、コーデックgzip、gz、lzo、lzop、snappy、およびキーワードnoneおよびkeep(デフォルト)がサポートされています。これらのキーワードの意味は次のとおりです。

none” – 圧縮されていないファイルを保存します。ファイルが圧縮されている場合、S3DistCpはそれらを解凍します。
keep” – ファイルの圧縮は変更せず、そのままコピーします。

ターゲットフォルダ内のgzコーデックで圧縮されたファイルを確認しましょう。

$ hadoop fs -ls s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/
Found 3 items
-rw-rw-rw-   1     78756 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.27047.log.gz
-rw-rw-rw-   1     80197 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.59789.log.gz
-rw-rw-rw-   1    121178 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.82293.log.gz

3. ファイルを段階的にコピーする

実運用では、アップストリームプロセスはファイルをいくつかのリズムで産み落とします。たとえば、新しいファイルが1時間ごと、または毎分作成される可能性があります。ダウンストリームプロセスは、別のスケジュールでそれを取得するように構成できます。

データがS3上にあり、毎日HDFSで処理したいとしましょう。毎回すべてのファイルをコピーしても、うまくスケールできません。幸運なことに、S3DistCpにはそのためのソリューションが組み込まれています。

このソリューションでは、マニフェストファイルを使用します。そのファイルは、S3DistCpがコピーされたファイルを追跡できるようにします。コマンドの例を次に示します。

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest s3://my-tables/processing/hourly_table --srcPattern .*\.log --outputManifest=manifest-2017-02-25.gz --previousManifest=s3://my-tables/processing/hourly_table/manifest-2017-02-24.gz

このコマンドは、2つのマニフェストファイルをパラメータ、--outputManifestおよび--previousManifestとして使用します。最初のファイルには、コピーされたすべてのファイル(古いものと新しいもの)のリストが含まれ、2番目には以前にコピーされたファイルのリストが含まれています。このようにして、操作の全履歴を再作成し、実行ごとにどのファイルがコピーされたかを確認することができます。

$ hadoop fs -text s3://my-tables/processing/hourly_table/manifest-2017-02-24.gz > previous.lst
$ hadoop fs -text s3://my-tables/processing/hourly_table/manifest-2017-02-25.gz > current.lst
$ diff previous.lst current.lst
2548a2549,2550
> {"path":"s3://my-tables/processing/hourly_table/2017-02-25/00/2017-02-15.00.50958.log","baseName":"2017-02-25/00/2017-02-15.00.50958.log","srcDir":"s3://my-tables/processing/hourly_table","size":610308}
> {"path":"s3://my-tables/processing/hourly_table/2017-02-25/00/2017-02-25.00.93423.log","baseName":"2017-02-25/00/2017-02-25.00.93423.log","srcDir":"s3://my-tables/processing/hourly_table","size":178928}

S3DistCpは、指定されたパス/tmp/mymanifest.gzを使用してローカルファイルシステムにファイルを作成します。コピー操作が終了すると、そのマニフェストを<DESTINATION LOCATION>に移動します。

4. 1つのジョブで複数のフォルダをコピーする

複数のフォルダをコピーする必要があるとします。通常、コピーする必要のあるフォルダと同じ数のコピージョブを実行します。 S3DistCpを使用すると、コピーを一度に実行できます。私たちが必要とするのは、接頭辞のリストを含むファイルを準備し、これをツールのパラメータとして使用することだけです。

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table_filtered --dest s3://my-tables/processing/sample_table --srcPrefixesFile file://${PWD}/folders.lst

この場合、folders.lstファイルには次の接頭辞が含まれています。

$ cat folders.lst
s3://my-tables/incoming/hourly_table_filtered/2017-02-10/11
s3://my-tables/incoming/hourly_table_filtered/2017-02-19/02
s3://my-tables/incoming/hourly_table_filtered/2017-02-23

その結果、ターゲットの場所には要求されたサブフォルダのみが存在します。

$ hadoop fs -ls -R s3://my-tables/processing/sample_table
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-10
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-10/11
-rw-rw-rw-   1     139200 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-10/11/2017-02-10.11.12980.log
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-19
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-19/02
-rw-rw-rw-   1     702058 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-19/02/2017-02-19.02.19497.log
-rw-rw-rw-   1     265404 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-19/02/2017-02-19.02.26671.log
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-23
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-23/00
-rw-rw-rw-   1     310425 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-23/00/2017-02-23.00.10061.log
-rw-rw-rw-   1    1030397 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-23/00/2017-02-23.00.22664.log
...

5. パターンに基づいてファイルを集約する

Hadoopは、S3またはHDFSからの多数の小さなファイルではなく、より少ない数の大きなファイルを読み込むことに最適化されています。 S3DistCpを使用すると、小さなファイルを選択したサイズの少数の大きなファイルに集約できます。これにより、パフォーマンスとコストの両方の分析を最適化できます。

次の例では、小さいファイルを大きなファイルに結合します。これを行うには、--groupByオプションを指定した正規表現を使用します。

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table --targetSize=10 --groupBy=’.*/hourly_table/.*/(\d\d)/.*\.log’

ターゲットフォルダを見て、対応するソースフォルダと比較してみましょう。

$ hadoop fs -ls /data/incoming/hourly_table/2017-02-22/05/
Found 8 items
-rw-r--r--   1 hadoop hadoop     289949 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.11125.log
-rw-r--r--   1 hadoop hadoop     407290 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.19596.log
-rw-r--r--   1 hadoop hadoop     253434 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.30135.log
-rw-r--r--   1 hadoop hadoop     590655 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.36531.log
-rw-r--r--   1 hadoop hadoop     762076 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.47822.log
-rw-r--r--   1 hadoop hadoop     489783 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.80518.log
-rw-r--r--   1 hadoop hadoop     205976 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.99127.log
-rw-r--r--   1 hadoop hadoop        800 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/processing.meta
$ hadoop fs -ls s3://my-tables/processing/daily_table/2017-02-22/05/
Found 2 items
-rw-rw-rw-   1   10541944 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/054
-rw-rw-rw-   1   10511516 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/055

ご覧のように、7つのデータファイルは、要求された10MBに近いサイズで2つに結合されました。 --groupByパターンは--srcPatternと同じように動作するため、*.metaファイルは除外されました。ファイルをデフォルトのブロックサイズ(EMRでは128MB)よりも大きくすることをお勧めします。

最終的なファイルの名前は、--groupByで使用される正規表現のグループと、その名前を一意にするための数字で構成されます。パターンには少なくとも1つのグループが定義されている必要があります。

もう1つの例を考えてみましょう。今度は、年、月、ファイル拡張子(この場合は.log)の3つの部分からファイル名を作成します。ここに更新されたコマンドがあります:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table_2017 --targetSize=10 --groupBy=’.*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)’

これで別の方法で名前が付けられた最終的なファイルが作成されました。

$ hadoop fs -ls s3://my-tables/processing/daily_table_2017/2017-02-22/05/
Found 2 items
-rw-rw-rw-   1   10541944 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/2017-05log4
-rw-rw-rw-   1   10511516 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/2017-05log5

ご覧のように、最終的なファイルの名前は、正規表現(2017-)、(\d\d)、(log)の3つのグループを連結したものです。

場合によっては、次のようなエラーが発生することがあります。

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table_2017 --targetSize=10 --groupBy=’.*/hourly_table/.*(2018-).*/(\d\d)/.*\.(log)’
...
17/04/27 15:37:45 INFO S3DistCp.S3DistCp: Created 0 files to copy 0 files
... 
Exception in thread “main” java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.S3DistCp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.S3DistCp.S3DistCp.run(S3DistCp.java:705)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.S3DistCp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
…

この場合、キーとなる情報は「Created 0 files to copy 0 files」にあります。 --groupByオプションの正規表現がソースの場所にあるファイルと一致しないため、S3DistCpはコピーするファイルが見つかりませんでした。

この問題の理由はさまざまです。たとえば、指定されたパターンの間違いである可能性があります。前述の例では、2018年のファイルがありません。もう1つの一般的な理由は、S3DistCpコマンドをステップとして送信したときのパターンのエスケープが正しくないことです。

6. サイズが1TBを超えるファイルをアップロードする

S3マルチパートアップロードを行うときのデフォルトのアップロードチャンクサイズは128MBです。ファイルが1TBを超えると、パートの総数は10,000を超えることがあります。そのような多数のパートは、ジョブを非常に長い時間稼働させたり、失敗さえすることがあります。

この場合、各パーツのサイズを増やすことでジョブのパフォーマンスを向上させることができます。 S3DistCpでは、 --multipartUploadChunkSizeオプションを使用してこれを行うことができます。

サイズが約200GBのいくつかのファイルでどのように動作するかをテストしてみましょう。デフォルトのパーツサイズでは、HDFSからS3にコピーするのに約84分かかります。

デフォルトのパーツサイズを1,000MBに増やすことができます:

$ time s3-dist-cp --src /data/gb200 --dest s3://my-tables/data/S3DistCp/gb200_2 --multipartUploadChunkSize=1000
...
real    41m1.616s

最大パートサイズは5GBです。大きなパートはアップロード中に失敗する可能性が高く、必ずしもプロセスを高速化するとは限りません。最大のパートサイズで同じジョブをしてみましょう:

time s3-dist-cp --src /data/gb200 --dest s3://my-tables/data/S3DistCp/gb200_2 --multipartUploadChunkSize=5000
...
real    40m17.331s

7. S3DistCpステップをEMRクラスターにサブミットする

S3DistCpツールは、いくつかの方法で実行できます。まず、前述の例のように、マスターノードにSSHで入り、ターミナルウィンドウでコマンドを実行します。このアプローチは多くのユースケースで便利ですが、HDFS上にデータがあるクラスタを作成することもできます。これを行うには、クラスタを作成するときにAWS Management Consoleで直接ステップを実行します。

コンソールの追加手順ダイアログボックスでは、次の方法でフィールドを入力することができます。

  • ステップタイプカスタムJAR
  • 名前:S3DistCp Step
  • JARの場所:command-runner.jar
  • 引数s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest /data/input/hourly_table --targetSize 10 --groupBy .*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)
  • 失敗時の操作次へ

パターンの周りに引用符を追加していないことに注目してください。ターミナルウィンドウでbashを使用していたときは引用符が必要でしたが、ここでは使用しません。コンソールは、エスケープし、引数をクラスタのコマンドに転送します。

もう1つの一般的な使用例は、S3DistCpを反復的に実行することです。既存のクラスタには常に新しいステップをサブミットできます。ここでの構文は、前の例と少し異なります。引数はカンマで区切ります。複雑なパターンの場合、ステップオプション全体をシングルクォーテーションで囲みます。

aws emr add-steps --cluster-id j-ABC123456789Z --steps 'Name=LoadData,Jar=command-runner.jar,ActionOnFailure=CONTINUE,Type=CUSTOM_JAR,Args=s3-dist-cp,--src,s3://my-tables/incoming/hourly_table,--dest,/data/input/hourly_table,--targetSize,10,--groupBy,.*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)'

まとめ

この記事では、S3DistCpがどのように機能し、その最も有用な機能をいくつかハイライトしました。 S3DistCpを使用してさまざまなサイズの未加工のファイルを最適化する方法と、異なるファイルを場所間で選択的にコピーする方法について説明しました。 SSH、AWS Management Console、およびAWS CLIからツールを使用するためのいくつかのオプションについても触れました。

ご質問やご提案がありましたら、ぜひご意見をお寄せください。

次のステップ

あなたの新しい知識をさらに次のレベルに!下のポストをクリックして、Amazon Athenaのクエリパフォーマンスを向上させるためのヒントを学びましょう。
Amazon AthenaのパフォーマンスチューニングTipsトップ10

 

原文:Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3(翻訳:半場光晴)