Amazon Web Services ブログ

AWS Glue パーティションインデックスを使用したクエリパフォーマンスの向上

本記事はAmazon Web Services, Senior Big Data Architect である 関山 宜孝、Senior Software Development Engineerである Sachet Saurabh、Software Development Manager である Vikas Malik によって投稿されたものです。

 

クラウド上にデータレイクを作成する場合、データカタログは、メタデータを一元化し、ユーザーがデータを表示、検索、クエリ実行できるようにするために不可欠です。昨今の急激なデータ量増加に伴い、データレイクの価値を維持するためには、データレイアウトを最適化し、クラウドストレージ上のメタデータを維持することがより一層重要になっています。
パーティショニングは、さまざまな分析エンジンでデータを効率的にクエリ実行できるように、データ・レイアウトを最適化するための重要な手法として登場しました。データは、1 つ以上の列の個別の値に基づいて、階層ディレクトリ構造に編成されます。時間の経過とともに、数十万のパーティションがテーブルに追加され、その結果クエリが遅くなります。AWS Glue Data Catalog でカタログ化され、非常に多くのパーティションで構成されたテーブルのクエリ処理を高速化するために、 AWS Glue パーティションインデックスを利用できます。
パーティションインデックスは、 Amazon EMRAmazon Redshift Spectrum、および AWS Glue の抽出、変換、ロード (ETL) ジョブ (Spark DataFrame) のクエリで使用できます。パーティションを多用した AWS Glue Data Catalog テーブルでパーティションインデックスが有効になっている場合、これらすべてのクエリエンジンが高速化されます。パーティションインデックスを新しいテーブルと既存のテーブルの両方に追加できます。この記事では、パーティションインデックスの使用方法について実演し、非常に多くのパーティションで構成されたデータを操作するときに、パーティションインデックスで得られる利点について説明します。

パーティションインデックス

AWS Glue パーティションインデックスは、全体的なデータ転送と処理を削減し、クエリ処理時間を短縮するための重要な設定です。AWS Glue データカタログでは、 GetPartitions API を使用してテーブル内のパーティションを取得します。API は、リクエストで指定された式に一致するパーティションを返します。テーブルにパーティションインデックスが存在しない場合は、テーブルのすべてのパーティションがロードされ、getPartitions リクエストでユーザーが指定したクエリ式を使用してフィルタリングされます。インデックスのないテーブルでは、パーティション数が増えるにつれて、クエリの実行に時間がかかります。インデックスを使用すると、 getPartitions リクエストは、テーブル内のすべてのパーティションをロードするのではなく、パーティションのサブセットを取得しようとします。

パーティションインデックスの主な利点は次のとおりです。

  • クエリパフォーマンスの向上
  • GetPartitions API 呼び出しが少なくなった結果、同時実行性が高まる
  • コスト節約:
    • 分析エンジンコスト (クエリのパフォーマンスは Amazon EMR および AWS Glue ETL の料金に関連しています)
    • AWS Glue データカタログ API リクエストコスト

AWS CloudFormation でリソース設定

この記事は、クイックセットアップのための AWS CloudFormation テンプレートを提供します。中身を確認し、ニーズに合わせてカスタマイズできます。このスタックがデプロイするリソースの一部は、使用時にコストが発生します。
CloudFormation テンプレートは次のリソースを生成します。

AWS Lake Formation のアクセス権限を使用している場合は、AWS CloudFormation を実行している IAM ユーザーまたはロールに(データカタログでデータベースを作成するため)必要なアクセス権限があることを確認する必要があります。
テーブルは、 Amazon Simple Storage Service (Amazon S3) パブリックバケットにあるサンプルデータを使用します。最初は、これらの AWS Glue データカタログテーブルにはパーティションインデックスが設定されていません。
リソースを作成するには、次の手順を実行します。

  1. CloudFormation コンソールにサインインします。
  2. Launch Stack [スタックの起動]を選択します。
  3. Next [次へ] を選択します。
  4. DatabaseName は、デフォルトのままにしておきます。
  5. Next [次へ] を選択します。
  6. 次のページで、Next [次へ] を選択します。
  7. 最後のページの詳細を確認し、 I acknowledge that AWS CloudFormation might create IAM resources [AWS CloudFormation によって IAM リソースがカスタム名で作成される場合があることを承認します。]を選択します。
  8. Create [スタックの作成] を選択します。

スタックの作成には最大 5 分かかります。スタックが完了すると、table_with_indextable_without_indexという 2 つのデータカタログテーブルが作成されます。どちらのテーブルも同じ S3 バケットを指しており、データは 42 年(1980 ~ 2021)以上のyearmonthday、およびhourの列に基づいて非常に多くのパーティションで構成されています。合計で 367,920 個のパーティションがあり、各パーティションには 1 つの JSON ファイル data.json があります。次のセクションでは、パーティションインデックスがこれらのサンプルテーブルでどのように機能するかを説明します。

AWS Glue コンソールでパーティションインデックスの設定

パーティションインデックスはいつでも作成できます。パーティションインデックスを使用して新しいテーブルを作成する場合は、 PartitionIndex オブジェクトのリストを使用して CreateTable API を呼び出すことができます。既存のテーブルにパーティションインデックスを追加する場合は、 CreatePartitionIndex API 呼び出しを行います。AWS Glue コンソールでこれらのアクションを実行することもできます。テーブルには最大 3 つのパーティションインデックスを作成できます。
CloudFormation テンプレートで作成したテーブル table_with_index に新しいパーティションインデックスを設定しましょう。

  1. AWS Glue コンソールで、Tables [テーブル] を選択します。
  2. テーブル table_with_index を選択します。
  3. Partitions and indices を選択します。
  4. Add new index を選択します。
  5. Index name に、year-month-day-hour を入力します。
  6. Selected keys from schema で、yearmonthday、およびhourを選択します。
  7. Add index を選択します。

新しく作成されたパーティションインデックスの Status 列には、ステータスが Creating として表示されます。パーティションインデックスがActiveになるまで待ちます。パーティションの数が増えるとインデックス作成にかかる時間も長くなり、そしてこのテーブルには 367,920 個のパーティションがあるため、プロセスには約 1 時間かかります。

これで、テーブル table_with_index でパーティションインデックスの準備ができました。テーブルに対してクエリを実行するときに、さまざまな分析エンジンからこのインデックスを使用できます。テーブル table_without_indexにはパーティションインデックスが設定されていないため、標準的な動作はこのテーブルで確認することができます。

関心に応じて、以下のセクションをフォロー(またはスキップ)できます。

式を使用して GetPartitions API 呼び出し

さまざまなクエリエンジンからパーティションインデックスを使用する前に、 AWS コマンドラインインターフェイス (AWS CLI) を使用して GetPartitions API を呼び出して、違いを確認してみましょう。AWS CLIget-partitions コマンドは、必要に応じて複数の GetPartitions API 呼び出しを実行します。このセクションでは、 time コマンドを使用して各テーブルの継続時間を比較し、デバッグログを使用して各テーブルの API 呼び出し数を比較します。

  1. テーブル table_without_index に対して、year='2021' and month='04' and day='01'という式を使用してget-partitions コマンドを実行します。
    $ time aws glue get-partitions --database-name partition_index --table-name table_without_index --expression "year='2021' and month='04' and day='01'"
    ...
    real    3m57.438s
    user    0m2.872s
    sys    0m0.248s
    

コマンドには約4分かかりました。4 つのうち 3 つのパーティション列しか使用していないことに注意してください。

  1. GetPartitionsAPI 呼び出しの数を取得するために、デバッグログを付けて同じコマンドを実行してします。
    $ aws glue get-partitions --database-name partition_index --table-name table_without_index --expression "year='2021' and month='04' and day='01'" --debug 2>get-partitions-without-index.log
    $ cat get-partitions-without-index.log | grep x-amz-target:AWSGlue.GetPartitions | wc -l
         737

パーティションインデックスが使用されない場合、737 の GetPartitions API 呼び出しがありました。

  1. 次に、同じ式で table_with_index に対して get-partitions コマンドを実行します。
    $ time aws glue get-partitions --database-name partition_index --table-name table_with_index --expression "year='2020' and month='07' and day='01' and hour='09'"
    ...
    real    0m2.697s
    user    0m0.442s
    sys    0m0.163s

コマンドはわずか2.7秒しかかかりませんでした。必要なパーティションがどれだけ素早く返却されたかを確認できます。

  1. GetPartitionsAPI 呼び出しの数を取得するために、デバッグログを付けて同じコマンドを実行します。
    $ aws glue get-partitions --database-name partition_index --table-name table_with_index --expression "year='2021' and month='04' and day='01'" --debug 2>get-partitions-with-index.log
    $ cat get-partitions-with-index.log | grep x-amz-target:AWSGlue.GetPartitions | wc -l
           4
    

パーティションインデックスが使用されるときには、 GetPartitions API 呼び出しは 4 回しかありませんでした。

Amazon EMR で Apache Spark を使用したテーブルのクエリ

このセクションでは、Amazon EMR で Apache Spark を使用したテーブルのクエリについて説明します。

  1. Apache Spark で新しい EMR クラスターを起動します。

手順については、「Amazon EMR の設定」を参照してください。AWS Glue データカタログをメタストアとして指定する必要があります。この例では、デフォルトの EMR クラスタ(リリース:emr-6.2.0、3 つの m5.xlarge ノード)を使用します。

  1. SSH を使用して EMR ノードに接続します
  1. EMR ノードで spark-sql コマンドを実行して、Spark SQL の対話型シェルを起動します。
    $ spark-sql
  1. partition_index.table_without_index に対して次の SQL を実行します。
    spark-sql> SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01';
    24    13840.894731640636
    Time taken: 35.518 seconds, Fetched 1 row(s)

クエリには 35 秒かかりました。特定のパーティションでのみレコードを集約しましたが、パーティションが多数あり、 GetPartitions API 呼び出しに時間がかかるため、クエリに時間がかかりました。

それでは、 table_with_index に対して同じクエリを実行して、パーティションインデックスがもたらすメリットを確認しましょう。

  1. partition_index.table_with_index に対して次の SQL を実行します。
    spark-sql> SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01';
    24    13840.894731640636
    Time taken: 2.247 seconds, Fetched 1 row(s)

クエリの時間はわずか 2 秒でした。クエリ実行時間の違いの理由は、パーティションインデックスがあり、GetPartitions呼び出しの数が少ないということです。

次のグラフは、パーティションインデックスの有無によるクエリ計画時間の詳細なメトリックスを示しています。インデックスを使用したクエリ計画時間は、インデックスがない場合に比べるとはるかに短くなります。

Apache Spark でのメトリックスの比較の詳細については、この記事の最後にある「付録 2」を参照してください。

Redshift Spectrum を使用したテーブルのクエリ

Redshift Spectrum でクエリを実行するには、次の手順を実行します。

  1. 新しい Redshift クラスターを起動します

Redshift Spectrum と Amazon Redshift クエリエディタを使用するには、クラスターの IAM ロールを設定する必要があります。この例では、dc2.large、1ノードを選択します。クラスターをバケットの場所と同じリージョンに配置する必要があるため、 us-east-1 リージョンでクラスターを起動する必要があります。

  1. Redshift クエリエディタで接続します。手順については、「クエリエディタを使用したデータベースのクエリ」を参照してください。
  2. Redshift Spectrumで使用する partition_index データベースの外部スキーマを作成します (<your IAM role ARN>は設定したIAM ロール ARN に置き換えます)。
    create external schema spectrum from data catalog 
    database 'partition_index' 
    iam_role '<your IAM role ARN>'
    create external database if not exists;
  1. spectrum_schema.table_without_indexに対して次の SQL を実行します。
    SELECT count(*), sum(value) FROM spectrum.table_without_index WHERE year='2021' AND month='04' AND day='01'

次のスクリーンショットは、出力結果を示しています。

クエリには 3 分以上かかりました。

  1. spectrum_schema.table_with_indexに対して次の SQL を実行します。
    SELECT count(*), sum(value) FROM spectrum.table_with_index WHERE year='2021' AND month='04' AND day='01'

次のスクリーンショットは、出力結果を示しています。

インデックスを使用したテーブルのクエリにはわずか 8 秒しかかかっておらず、これはインデックスのないテーブルよりもはるかに高速です。

AWS Glue ETL を使用したテーブルのクエリ

AWS Glue 開発エンドポイントと Amazon SageMaker ノートブックを起動しましょう。

  1. AWS Glue コンソールを開き、Dev endpoints [開発エンドポイント] を選択します。
  2. Add endpoint [エンドポイントの追加] を選択します。
  3. Development endpoint name [開発エンドポイント名] に、 partition-index と入力します。
  4. IAM role [IAM ロール] で、IAM ロールを選択します。

ロールの詳細については、「AWS Glue リソースのアクセス権限の管理」を参照してください。

  1. Security configuration, script libraries, and job parameters (optional) [セキュリティ構成、スクリプトライブラリ、およびジョブパラメータ (任意)] Number of workers [作業者タイプ] で、G.1X を選択します。
  2. Number of workers [作業者数] 4 と入力します。
  3. Dependent jar path [依存 JARS パス] に、 s3://crawler-public/json/serde/json-serde.jar と入力します。
  4. Catalog options (optional) [カタログオプション (任意)]Use Glue data catalog as the Hive metastore を選択します。
  5. Next [次へ] を選択します。
  6. Networking [ネットワーキング] では、デフォルト選択のまま、Next [次へ] を選択します(Skip networking configuration [ネットワーキング情報をスキップ]が選択されている)。
  7. Add an SSH public key (Optional) [SSH 公開キーの追加 (省略可能)] を空白のままにして、Next [次へ] を選択します。
  8. Finish [完了] を選択します。
  9. 開発エンドポイントのpartition-index READY と表示されるのを待ちます。

エンドポイントの準備が完了するまで最大 10 分かかる場合があります。

  1. 開発エンドポイント partition-index を選択し、Actions [アクション] Create SageMaker notebook [SageMaker ノートブックの作成] を選択します。
  2. Notebook name [ノートブック名] partition-index と入力します。
  3. Create an IAM role [IAM ロールの作成] を選択します。
  4. IAM role [IAM ロール] partition-index と入力します。
  5. Create notebook [ノートブックを作成] を選択します。
  6. ノートブックの aws-glue-partition-index のステータスが [準備完了] と表示されるのを待ちます。

ノートブックの準備が完了するまで最大で 3 分かかる場合があります。

  1. ノートブックの aws-glue-partition-index を選択し、Open notebook [ノートブックを開く] を選択します。
  2. NewSparkMagic (PySpark) を選択します。
  3. table_without_index に対して次のコードスニペットを入力し、セルを実行します。
    %%time
    %%sql
    SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01'

次のスクリーンショットは、出力結果を示しています。

クエリには 3 分かかりました。

  1. partition_index.table_with_index に対して次のコードスニペットを入力し、セルを実行します。
    %%time
    %%sql
    SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01'

次のスクリーンショットは、出力結果を示しています。

セルの実行はわずか7秒でした。インデックスを使用したテーブルのクエリは、インデックスがないテーブルよりも高速です。

クリーンアップ

最後のステップで、リソースをクリーンアップします。

  1. CloudFormation スタックを削除します。
  2. EMR クラスタを削除します。
  3. Amazon Redshift クラスターを削除します。
  4. AWS Glue 開発エンドポイントと SageMaker ノートブックを削除します。

まとめ

この記事では、パーティションインデックスの使用方法と、さまざまなクエリエンジンでのクエリの高速化方法について説明しました。数百万ものパーティションがある場合、パフォーマンス上の利点は大幅に高くなります。パーティションインデックスの詳細については、「パーティションインデックスの使用 」を参照してください。


付録 1: AWS CLI を使用したパーティションインデックスのセットアップ

AWS CLI を使用する場合は、次の create-partition-index コマンドを実行してパーティションインデックスを設定します。

$ aws glue create-partition-index --database-name partition_index --table-name table_with_index --partition-index Keys=year,month,day,hour,IndexName=year-month-day-hour

パーティションインデックスのステータスを取得するには、次の get-partition-indexes コマンドを実行します。

$ aws glue get-partition-indexes --database-name partition_index --table-name table_with_index
{
    "PartitionIndexDescriptorList": [
        {
            "IndexName": "year-month-day-hour",
            "Keys": [
                {
                    "Name": "year",
                    "Type": "string"
                },
                {
                    "Name": "month",
                    "Type": "string"
                },
                {
                    "Name": "day",
                    "Type": "string"
                },
                {
                    "Name": "hour",
                    "Type": "string"
                }
            ],
            "IndexStatus": "CREATING"
        }
    ]
}

付録 2: Apache Spark でのブレークダウンメトリクスの比較

クエリ計画時間のブレークダウンメトリクスの比較に興味がある場合は、次の Scala コードスニペットを使用して SQL リスナーを登録できます。

spark.listenerManager.register(new org.apache.spark.sql.util.QueryExecutionListener {
  override def onSuccess(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = {
    val metricMap = qe.tracker.phases.mapValues { ps => ps.endTimeMs - ps.startTimeMs }
    println(metricMap.toSeq)
  }
  override def onFailure(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit = {}
})

spark-shell を使用する場合は、次のようにリスナーを登録できます。

$ spark-shell
...
scala> spark.listenerManager.register(new org.apache.spark.sql.util.QueryExecutionListener {
     |   override def onSuccess(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = {
     |     val metricMap = qe.tracker.phases.mapValues { ps => ps.endTimeMs - ps.startTimeMs }
     |     println(metricMap.toSeq)
     |   }
     |   override def onFailure(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit = {}
     | })

次に、インデックスを使用せずに同じクエリを実行して、ブレークダウンメトリクスを取得します。

scala> spark.sql("SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01'").show()
Vector((planning,208), (optimization,29002), (analysis,4))
+--------+------------------+
|count(1)|        sum(value)|
+--------+------------------+
|      24|13840.894731640632|
+--------+------------------+

この例では、EMR クラスタに同じセットアップを使用します(リリース:emr-6.2.0、3 つの m5.xlarge ノード)。コンソールには、次の行が追加されています。

Vector((planning,208), (optimization,29002), (analysis,4)) 

Apache Spark のクエリ計画メカニズムには、analysis、 optimization、physical planning( plannning として表示)の3つのフェーズがあります。この行は、クエリ計画にanalysisに 4 ミリ秒、optimizationに 29,002 ミリ秒、physical planningに 208 ミリ秒かかったことを意味しています。

インデックスを使用して同じクエリを実行してみましょう。

scala> spark.sql("SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01'").show()
Vector((planning,7), (optimization,608), (analysis,2))                          
+--------+------------------+
|count(1)|        sum(value)|
+--------+------------------+
|      24|13840.894731640634|
+--------+------------------+

クエリ計画では、analysisに 2 ミリ秒、optimizationに 608 ミリ秒、physical planningに 7 ミリ秒かかりました。

原文はこちらです。
本ブログは Solutions Architect の池田が翻訳しました。