Amazon Web Services ブログ

AWS Glue のパーティション分割されたデータを使用した作業

AWS Glue は、Hive 形式で整理されたデータセットの高度な取扱いをサポートします。AWS Glue クローラーは、自動的に Amazon S3 データのパーティションを識別します。AWS Glue ETL (Extract/Transform/Load) ライブラリは、DynamicFrame を取り扱う際、ネイティブでパーティションをサポートします。DynamicFrame はデータの分散コレクションを表します。スキーマを指定する必要はありません。DynamicFrame の作成時に述語をプッシュダウンしてパーティションをフィルタリングし、高コストの S3 への呼び出しを回避できます。また、Apache Spark DataFrame に変換することなく直接パーティショニングされたディレクトリに書き込む機能のサポートを追加しました。

パーティショニングは、データセットを様々なビッグデータシステムで効率的にクエリできるように、それらを整理するための重要な手法となっています。データは、ひとつまたは複数の列の重複しない値に基づいて、階層ディレクトリ構造にまとめられます。たとえば、Amazon S3 のアプリケーションログを、年別、月別、日別の内訳で日付ごとにパーティショニングするとします。すると、1 日分に相当するデータがs3://my_bucket/logs/year=2018/month=01/day=23/のようなプレフィックスの下に配置されます。

Amazon AthenaAmazon Redshift Spectrum、AWS Glue などのシステムでは、これらのパーティションを使用して値ごとにデータをフィルタリングできるため、Amazon S3 への不要な呼び出しを行う必要がありません。このため、読み取る必要のあるパーティションが少ないアプリケーションのパフォーマンスが大幅に改善します。

本投稿では、AWS Glue を使用してパーティショニングされたデータセットを効率的に処理する方法を紹介します。まず、クローラーを設定して、パーティショニングされたデータセットを自動的にスキャンし、AWS Glue データカタログにテーブルとパーティションを作成する方法を説明します。次に、パーティショニングされたデータを取り扱うための AWS Glue ETL ライブラリの機能をいくつか紹介します。SQL 式やユーザー定義の関数を使用してパーティションをフィルタリングし、Amazon S3 からの不要なデータのリスト作成や読み取りを回避することができます。また、ETL ライブラリでは、Spark SQL DataFrame に依存することなく、AWS Glue DynamicFrame をパーティションに直接記述する機能のサポートを追加しました。

それでは、始めましょう。

パーティショニングされたデータのクローリング

この例では、以前の AWS Glue の Scala サポートに関する投稿で紹介したものと同じ GitHub アーカイブのデータセットを使用します。このデータは GitHub アーカイブ で一般公開されており、GitHub サービスに対するすべての API リクエストのための JSON レコードを含んでいます。2017 年 1 月からの 1 か月分のアクティビティを含むサンプルデータセットは以下の場所で入手可能です:

s3://aws-glue-datasets-<region>/examples/githubarchive/month/data/

<region> の箇所は、 us-east-1 など、利用中の AWS リージョンに置き換えます。このデータセットは年別、月別、日別にパーティショニングされているため、実際ファイルのパスは以下のようになります:

s3://aws-glue-datasets-us-east-1/examples/githubarchive/month/data/2017/01/01/part1.json

このデータをクロールするには、AWS Glue 開発者ガイド の説明に従うか、提供された AWS CloudFormation テンプレートを使用します。このテンプレートでは、以下のような内容を含むスタックを作成します:

  • AWS Glue リソースにアクセスする権限を持つ IAMロール
  • githubarchive_month という名前の AWS Glue データカタログのデータベース
  • GitHub データセットをクロールするクローラーの設定
  • AWS Glue 開発エンドポイント (次のセクションでデータ変換に使用します)

このテンプレートを実行するには、次のセクションで出力データを書き込む S3 バケットとプレフィックスを提供する必要があります。このテンプレートで作成するロールは、このバケットのみに書き込む権限を持ちます。また、開発エンドポイントに接続するため SSH 公開鍵を提供する必要があります。SSH の鍵作成について詳しくは、開発エンドポイントチュートリアルをご覧ください。AWS CloudFormation スタックの作成後、AWS Glue コンソールからクローラーを実行できます。

クローラーはタイプやスキーマを推論することに加え、自動的にデータセットのパーティション構造を識別し、AWS Glue データカタログに入力します。これでデータが正確にグループ化されて論理テーブルとなり、パーティション列が AWS Glue ETL ジョブや Amazon Athena などのクエリエンジンのクエリで利用可能な形になります。

テーブルをクロールした後は、AWS Glue コンソールでテーブルに移動し、 [View partitions (パーティションを表示)] を選択してパーティションを表示できます。パーティションは以下のように表示されます:

Hive 形式では、 key=val フォームのパーティショニングされたパスについてはクローラーが自動的に列の名前を入力します。この場合は、GitHub データが 2017/01/01 の形式のディレクトリに保存されているため、クローラーは partition_0partition_1 などのデフォルトの名前を使用します。これらの名前は AWS Glue コンソールで簡単に変更できます。テーブルに移動し、 [Edit schema (スキーマを編集)] を選択して、partition_0 を year、partition_1 を month、partition_2 を day に変更します:

データセットをクロールし、パーティションに適切な名前を付けました。ここで AWS Glue ETL ジョブのパーティショニングされたデータの取り扱い方を見てみましょう。

データの変換とフィルタリング

まず AWS Glue ETL ライブラリについては、AWS Glue 開発エンドポイントと Apache Zeppelin ノートブックを使用することができます。AWS Glue 開発エンドポイントでは、Apache Spark および AWS Glue ETL ライブラリを使用してスクリプトを構築、実行する対話型環境を提供します。これはデバッグと探索的解析に役立ちます。また繰り返しのジョブに移行する前にスクリプトを開発、テストするために使用できます。

前のセクションで AWS CloudFormation テンプレートを実行した場合は、アカウントに partition-endpoint という名前の開発エンドポイントがすでにあるはずです。または、こちらの開発エンドポイントチュートリアルの説明に従って作業することもできます。いずれの場合も、ローカルまたは EC2 インスタンスで Apache Zeppelin ノートブックを設定する必要があります。開発エンドポイントおよびノートブックについて詳しくは、AWS Glue 開発者ガイドをご覧ください。

以下の例はすべてプログラミング言語 Scala で記述されていますが、わずかな変更ですべて Python に実装可能です。

パーティショニングされたデータセットを読み取る

最初にデータセットを読み取り、パーティションがどのようにスキーマに反映されるのか確認しましょう。 まず、この例で必要になるクラスをいくつかインポートし、GlueContext を設定します。これはデータの読み取り、書き込みに使用するメインのクラスです。

Zeppelin の パラグラフ で以下の内容を実行します。パラグラフとは実行可能なコードの単位です:

%spark 
import com.amazonaws.services.glue.DynamicFrame       import com.amazonaws.services.glue.DynamicRecord
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.JsonOptions   import org.apache.spark.SparkContext
import java.util.Calendar
import java.util.GregorianCalendar
import scala.collection.JavaConversions._

@transient val spark: SparkContext = SparkContext.getOrCreate()
val glueContext: GlueContext = new GlueContext(spark)

これは単純ですが、2 つ注意点があります: まず、各パラグラフは、Scala であることを示すため、 %spark という行で始める必要があります。2 点目は、シリアライゼーションの問題を回避するため、 spark 変数には @transient を付ける必要があることです。これは Zeppelin ノートブックで実行する場合のみ必要となります。

次に、GitHub データを DynamicFrame に読み込みますこれは AWS Glue スクリプトで使用される主要なデータ構造で、データの分散コレクションを表しています。DynamicFrame は Spark DataFrame に似ていますが、ETL 変換のための追加機能がある点が異なります。 DynamicFrames については、AWS Glue が Scala スクリプトをサポートおよび AWS Glue API ドキュメントの投稿で詳しく説明されています。

以下のスニペットは、先程クロールしてスキーマをプリントしたデータカタログのテーブルを参照して DynamicFrame を作成します:

%spark

val githubEvents: DynamicFrame = glueContext.getCatalogSource(
database = "githubarchive_month",
tableName = "data"
).getDynamicFrame()

githubEvents.schema.asFieldList.foreach { field =>
println(s"${field.getName}: ${field.getType.getType.getName}")
}

また、 githubEvents.printSchema() を使用してスキーマ全体をプリントすることもできます。ただし今回はスキーマ全体がかなり大きいため、トップレベルの列のみプリントしました。このパラグラフは、標準的なサイズの AWS Glue 開発エンドポイントで実行に 5 分ほどかかります。実行後、以下のような出力を確認できます:

id: string
type: string
actor: struct
repo: struct
payload: struct
public: boolean
created_at: string
year: string
month: string
day: string
org: struct

各レコードに、 yearmonthday のパーティション列が自動的に追加されています。

パーティション列によるフィルタリング

データをパーティショニングする理由のひとつは、パーティションのサブセットでの作業を簡単にすることです。パーティション列によるデータのフィルタリング方法を見てみましょう。ここでは GitHub の週末のアクティビティを参照して、ユーザーが自由時間に何を作っているのか見てみることにします。これを行うには、対象のイベントを選択するため事前に作成した githubEvents DynamicFrame でフィルター変換を使用するという方法があります:

%spark

def filterWeekend(rec: DynamicRecord): Boolean = {
  def getAsInt(field: String): Int = {
    rec.getField(field) match {
      case Some(strVal: String) => strVal.toInt
      // フィルター変換が例外をキャッチして、レコードをエラーとして記録します。
      case _ => throw new IllegalArgumentException(s"Unable to extract field $field")
    }
  }

  val (year, month, day) = (getAsInt("year"), getAsInt("month"), getAsInt("day"))
  val cal = new GregorianCalendar(year, month - 1, day) // カレンダーの月が 0 から開始します。
  val dayOfWeek = cal.get(Calendar.DAY_OF_WEEK)

  dayOfWeek == Calendar.SATURDAY || dayOfWeek == Calendar.SUNDAY
}

val filteredEvents = githubEvents.filter(filterWeekend)
filteredEvents.count

このスニペットでは、Java の Calendar クラスを使用する filterWeekend 関数を、パーティション列 (year、month、day) が週末になるレコードを識別するよう定義しています。 このコードを実行すると、全 29,160,561 イベント中、2017 年 1 月の週末には 6,303,480 の GitHub イベントが発生していたことがわかります。これは順当な結果のようです。イベントの約 22 パーセントが週末に発生し、その月の日の約 29 パーセント (31 日中 9 日) が週末に当たるためです。つまり、GitHub ユーザーは週末の利用がやや少ないものの、たくさんのアクティビティがあるということです。

パーティション列で述語をプッシュダウン

フィルター変換をこのように使用する場合の主な問題点は、ごく一部のみが必要であっても、Amazon S3 のデータセットのファイルをすべてリストし、読み取る必要があることです。1 か月分のデータなら対処できるでしょう。しかし、さらに多量のデータを処理する場合、読み取ってはすぐに廃棄するレコードに時間を費やすばかりになります。

この問題に対処するため、最近のリリースでは AWS Glue データカタログで指定されたパーティション列での述語のプッシュダウンがサポートされました。クラスター内のエグゼキューターでデータを読み取り DynamicFrame をフィルタリングする代わりに、カタログで利用可能なパーティションのメタデータに直接フィルターを適用します。こうすることで、処理が必要な S3 パーティションのみをリストし、読み取ることができます。

これを行うには、Spark SQL 述語を追加パラメータとして getCatalogSource メソッドに指定します。この述語は、フィルタリングするパーティション列のみを使っていれば、SQL 式 でも ユーザー定義の関数でも構いません。これはカタログに保存されたメタデータに適用することに注意してください。スキーマのその他のフィールドにはアクセス権がありません。

以下のスニペットでは、この機能を使用して週末に発生するパーティションのみを読み取る方法を示しています:

%spark

val partitionPredicate =
    "date_format(to_date(concat(year, '-', month, '-', day)), 'E') in ('Sat', 'Sun')"

val pushdownEvents = glueContext.getCatalogSource(
   database = "githubarchive_month",
   tableName = "data",
   pushDownPredicate = partitionPredicate).getDynamicFrame()

ここでは、SparkSQL 文字列の  concat 関数を使用して、日付の文字列を構築しています。その文字列を to_date 関数を使用して日付オブジェクトに変換し、 date_format 関数を使用して「E」パターンで日付を 3 文字の曜日 (Mon、Tue など) に変換します。これらの関数や Spark SQL 式、ユーザー定義関数全体について詳しくは、 Spark SQL ドキュメントおよび関数一覧をご覧ください。

なお、 pushdownPredicate パラメータは Python でも利用可能です。Python の対応する呼び出しは以下の通りです:

glue_context.create_dynamic_frame.from_catalog(
    database = "githubarchive_month",
    table_name = "data",
    push_down_predicate = partitionPredicate)

述語のプッシュダウンによるパフォーマンスの影響は、各 Zeppelin パラグラフ用にレポートされる実行時間で確認できます。Scala フィルター関数を使用した最初のアプローチでは 2 分半かかりました。

このバージョンではプッシュダウンリストを使用しており、データ読み取り量がはるかに少ないため、完了まで わずか 24 秒でした。5 倍の高速化です。

もちろん、これが役立つかどうかはフィルターの設定次第です。除外するパーティションが多いほど、さらに改善が見られるでしょう。

Amazon S3 パスの Hive 形式パーティショニングに加え、Parquet と ORC ファイル形式で各ファイルをパーティショニングし、列の値を示すデータブロックにできます。各ブロックは、列の最大値/最小値など、中に含むレコードの統計データも保存します。AWS Glue は、Hive 形式パーティションとこれらの形式のブロックパーティション両方の述語プッシュダウンをサポートしています。データ読み取り中、不要な S3 パーティションを切り詰め、Parquet および ORC 形式の列統計データで読み取り不要とされたブロックをスキップします。

追加の変換

これでデータセットの読み取りとフィルタリングが完了したため、追加の変換でデータを整理できます。 たとえば、以前の AWS Glue 投稿で説明したように、感情分析を導入することもできます。

単純化するため、 ApplyMapping 変換を使用してデータセットからいくつかの列を抽出することもできます:

%spark

val projectedEvents = pushdownEvents.applyMapping(Seq(
  ("id", "string", "id", "long"),
  ("type", "string", "type", "string"),
  ("actor.login", "string", "actor", "string"),
  ("repo.name", "string", "repo", "string"),
  ("payload.action", "string", "action", "string"),
  ("org.login", "string", "org", "string"),
  ("year", "string", "year", "int"),
  ("month", "string", "month", "int"),
  ("day", "string", "day", "int")
))

ApplyMapping は投影や型変換を実行する柔軟な変換方法です。この例では、 actor.login などいくつかのフィールドのネストを解除し、トップレベルの actor フィールドにマッピングします。また、 id 列を長い整数のパーティション列に変換します。

パーティショニングされたデータの書き出し

最後のステップは、変換したデータセットを Amazon S3 に書き出し、Amazon Athena などの他のシステムで処理可能にすることです。デフォルトでは、DynamicFrame 書き出し時にはパーティショニングされていません。出力ファイルはすべて、指定の出力パス下のトップレベルに書き出されます。最近まで、DynamicFrame をパーティションに書き出すには、書き出し前に Spark SQL DataFrame に変換するしかありませんでした。DynamicFrames は、キーの順番によるパーティショニングをネイティブでサポートするようになりました。

これは、シンク作成時に追加の  partitionKeys オプションのパスを渡すことで実行できます。たとえば以下のコードでは、先程の Parquet 形式で S3 の  type フィールドでパーティショニングされたディレクトリに作成したデータセットを書き出します。

%spark

glueContext.getSinkWithFormat(
    connectionType = "s3",
    options = JsonOptions(Map("path" -> "$outpath", "partitionKeys" -> Seq("type"))),
    format = "parquet")
  .writeDynamicFrame(projectedEvents)

ここでは、 $outpath が S3 内の出力用基本プレースホルダーになります。また、 partitionKeys パラメータも Python の connection_options ディクショナリで指定できます:

glue_context.write_dynamic_frame.from_options(
    frame = projectedEvents,
    connection_options = {"path": "$outpath", "partitionKeys": ["type"]},
    format = "parquet")

この書き込みを実行すると、個々のレコードから type フィールドが削除され、ディレクトリ構造にエンコードされます。これを実際に確認する場合、AWS CLI から aws s3 ls コマンドを使用して出力パスをリストできます:

PRE type=CommitCommentEvent/
PRE type=CreateEvent/
PRE type=DeleteEvent/
PRE type=ForkEvent/
PRE type=GollumEvent/
PRE type=IssueCommentEvent/
PRE type=IssuesEvent/
PRE type=MemberEvent/
PRE type=PublicEvent/
PRE type=PullRequestEvent/
PRE type=PullRequestReviewCommentEvent/
PRE type=PushEvent/
PRE type=ReleaseEvent/
PRE type=WatchEvent/

予想通り、個別のイベントタイプごとにパーティションが作成されました。 この例ではひとつの値でパーティションを作成しましたが、もちろん必須ではありません。たとえば、元の年、月、日のパーティションを残したい場合は、 partitionKeys オプションを  Seq(“year”, “month”, “day”) と設定するだけで可能です。

結論

この投稿では、AWS Glue のパーティショニングされたデータの取り扱い方を説明しました。パーティショニングは、大型のデータセットを最大限に利用するためには必須のテクニックです。AWS ビッグデータのエコシステムには、Amazon Athena や Amazon Redshift Spectrum など、パーティションを活用してクエリ処理を高速化する多数のツールがあります。AWS Glue はクロール、フィルタリング、パーティショニングされたデータの書き出しのメカニズムを提供します。これで Amazon S3 のデータをお好みの形に構成し、ビッグデータアプリケーションを最大限活用することができます。

ぜひお試しください。


その他の参考資料

この投稿がお役に立ちましたら、AWS Glue が Scala スクリプトをサポートおよび AWS Glue の Relationalize 変換でネストされた JSON のクエリを簡略化の投稿もご覧ください。


Ben Sowell は AWS Glue のシニアソフトウェア開発エンジニアです。彼は ETL システムに 5 年以上取り組み、ユーザーのデータから潜在的な力を引き出すお手伝いをしています。余暇には読書と、ベイエリアの散策を楽しんでいます。

 

 

 

Mohit Saxena は AWS Glue のシニアソフトウェア開発エンジニアです。彼はクラウドのデータの効率的な管理を目指して、スケーラブルな分散システム構築に熱心に取り組んでいます。映画鑑賞や最新テクノロジーについての本を読むことが好きです。