Amazon Web Services ブログ

realtor.com® が Amazon S3 から Amazon DynamoDB へのデータのアップロードを最大化する方法



このお客様記事は、realtor.com® のデータテクノロジー担当 VP である Arup Ray 氏と AWS ソリューションアーキテクト Daniel Whitehead によるものです。Arup Ray 氏は Amazon のソフトウェア開発エンジニアである Anil Pillai に対し、以前 realtor.com® のシニアプリンシパルエンジニアを務めていたときの本プロジェクトに対する彼の先駆的な貢献について感謝の意を示したいと話しておられます。

Move, Inc. によって運営される realtor.com® は、それ自体の言葉を借りると、「住宅の買い手、売り手、そしてそれを夢見る人のための信頼のおけるリソースであり、全米の売り出し中物件の堅牢なデータベース、および人々が住宅を購入するまでのすべてのステップを、自信を持って進めていくために役立つ情報、ツール、そして専門知識を提供しています。」

Realtor.com® では、データと分析が住宅購入プロセスをより簡単に、かつ実り多いものとするための重要な部分となっています。お客様が物件を検索するとき、私たちはお客様にとって最も関心がある事柄の属性を特定し、地域内のよく似た家についてお客様の関心により適した推奨を生成するためにそのデータを使います。

パーソナライズされた家の推奨は、お客様が夢見る家を見つける上で極めて重要です。realtor.com® が、その推奨エンジンの基盤であるカスタマー分析データセットを保存するための柔軟なスキーマを可能にする NoSQL データベース、Amazon DynamoDB を活用するするのはこのためです。これらのデータセットは、複数のアップストリームサービスからのデータを集約することによって作成および更新され、realtor.com の分析エンジンに取り込まれます。

毎晩何千万もの更新が行われるので、もし realtor.com®PutItem API を使って DynamoDB に各アイテムを順次的にアップロードしたとすれば、処理に数時間かかるでしょう。その代わりに、realtor.com® はデータセットをセグメント化して BatchWrite API (25 の同時データストリームで 10-MB ファイルを同時にアップロードすることができるもの) を活用しており、これによって realtor.com のデータ取り込みは数時間から数分に短縮されます。

この記事では、realtor.com®Amazon AthenaAWS Glue を使って何百 GB ものデータセットを Amazon S3 から DynamoDB に並列的にアップロードした方法について説明します。 このシステムは、realtor.com 推奨の市場投入までの時間を向上させ、パーソナライゼーションサービスの API を数時間から数分に短縮しました。

ソリューションの概要

大まかに説明すると、このソリューションには次のステップが含まれています。

  1. アップストリームソースからデータが収集され、ファイルが生成される。このファイルには何百万ものレコードが含まれており、毎晩バッチジョブの出力を経て S3 に保存される。
  2. ファイルが S3 バケットに投入されると、バケットに投入されるオブジェクトによって、大きなファイルを 25 の小さなチャンクに分割する Athena クエリがトリガーされます。これらのチャンクの各行には、Athena の Create Table As 関数を利用する 16 MB のデータがあります。
  3. Athena クエリの実行が終わったら、DynamoDB にデータをパラレルにアップロードする複数の Spark ワーカーによる AWS Glue ジョブが開始されます。
  4. このプロセスが完了したら、ステージングバケットにあるファイルが削除されます。

以下の図は、このワークフローを示したものです。

このソリューションは、AWS CLI、S3、Athena、AWS Glue、および DynamoDB を使用します。このパイプラインの構築には関連するコストが発生します。

データストアの準備

最初のステップは、データの送信先となる DynamoDB テーブルの作成です。次の手順を完了してください。

  1. DynamoDB コンソールで [テーブルの作成] を選択します。
  2. [テーブル名] には target_table と入力します。
  3. [プライマリキー] には pk と入力します。
  4. [デフォルト設定の使用] を選択して、[テーブルの作成] を選択します。
    以下のスクリーンショットは、手順 1~4 を示しています。
  5. [テーブルの作成] を選択します。
  6. 以下のスクリーンショットにあるように、作成したテーブルを選択します。
  7. [キャパシティー] を選択します。
  8. 以下のスクリーンショットにあるように、[書き込みキャパシティー] の [プロビジョニングされた最小キャパシティー] に 5000 と入力します。
  9. [保存] を選択します。

この記事では、テーブルへのエントリのスケールアップに DynamoDB の Auto Scaling 機能を使用します。BatchWrite 操作のために十分なスループットを提供し、テーブルで実行される書き込みを日常業務の一部として再現するために、最初はこれを最小 5,000 に設定する必要があります。これによって、DynamoDB テーブルにあるすべてのアイテムが書き込まれるまで書き込み量を増加させる最大スループットにテーブルをスケールすることが可能になります。

データセットの作成

パイプラインをシミュレートするため、この記事では New York City Taxi and Limousine Commission (TLC) Trip Record Data のサブセットを使用します。 DynamoDB テーブルと同じリージョンにある S3 バケットが必要です。詳細については、「バケットの作成」を参照してください。

最初のステップは、データセットのファイルのバケットへのコピーです。バケットにファイルをアップロードできるように、アクセス許可とアクセスコントロールをセットアップする必要があります。CLI にシークレットキーとアクセスキーを設定したら、以下の手順を完了してください。

  1. AWS Command Line Interface で設定されたターミナルに以下のコマンドを入力します。
    aws s3 cp s3://nyc-tlc/trip\ data/yellow_tripdata_2009-01.csv s3://<YOUR BUCKET>       
  1. データが S3 に投入されたら、Athena コンソールを開きます。
  2. [テーブルの作成] を選択します。
  3. [From S3 bucket data] を選択します。
  4. [データベース] には input_db と入力します。
  5. [テーブル名] には input_raw と入力します。
  6. [入力データセットの場所] には、データをコピーした S3 バケットの場所を入力します。
    以下のスクリーンショットは、手順 5~7 を示しています。
  7. [次へ] を選択します。
  8. データの形式画面では、以下のスクリーンショットにあるように、[カスタム区切り記号のテキストファイル] を選択します。
  9. [次へ] を選択します。
  10. 列画面の [列名] には、以下にあるように s3_data と入力します。
  11. [次へ] を選択します。
  12. パーティションにはデフォルトをそのまま使用して、[テーブルの作成] を選択します。
    クエリエディタが以下のコードに似たクエリで更新されます。

    CREATE EXTERNAL TABLE IF NOT EXISTS input_db.input_raw (
    s3_data string
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
    WITH SERDEPROPERTIES (
    'serialization.format' = ' ',
    'field.delim' = ' ',
    'collection.delim' = '',
    'mapkey.delim' = ''
    ) LOCATION 's3://<YOUR BUCKET>/'
    TBLPROPERTIES ('has_encrypted_data'='false');
  1. [クエリの実行] を選択します。
    結果として生成されるテーブルは、1,400 万行を超えるデータセットから Vendor NamePickup Date の 2 つの値をプルします。これで、このパイプラインの Raw データが作成されました。次のステップは、DynamoDB へのアップロードのためのデータの準備です。
    このデータセットには一意の識別子がないため、データから一意のパーティションキーを作成する必要があります。プライマリキーを作成するには、Athena からの Raw データを使って md5 ハッシュを作成し、それを 16 進数に変換して、行に一意の識別子を付与します。別個の演算子を使用することによって、データ内に重複が発生しないことを確実にできます。一意のレコードを持つクリーンなデータがある場合は、このプロセスをデータセットに適用する必要はありません (これは、realtor.com® のパイプラインにも含まれていません)。
  1. Athena コンソールのクエリエディタに移動し、以下のコードを入力します。
    CREATE table input_for_ddb AS
    SELECT DISTINCT to_hex(md5(to_utf8(s3_data))) AS primary_key,
             replace(substring(trim(s3_data),
             1,
             20),
             '', '') AS attribute
    FROM input_raw
    WHERE length(trim(s3_data)) > 5

    これによって、準備したデータを使った新しいテーブルが作成されます。

    次のステップは、ntile を使ったデータのシャーディングです。これは、順序付けられたパーティションの行を同等のグループに分散するための Windows 関数です。これによって、データセットが小さなチャンクに分割され、DynamoDB にアップロードする機能が最大活用されます。

  1. 以下のコードを入力します。
    CREATE table dynamodb_shards AS
    SELECT primary_key,
             ntile(1000)
        OVER (order by primary_key) ntile_value
    FROM input_for_ddb

    データを準備するための最後のステップは、先ほど作成した 2 つのテーブルからのデータを結合し、DynamoDB にプッシュされる最終的なデータセットを作成するクエリの実行です。

    CREATE table push_to_ddb_data AS
    SELECT dynamo_shards_table.ntile_value,
             ARRAY['primary_key','attribute' ] AS meta_array, array_agg(ARRAY[ coalesce(case
        WHEN length(trim(cast(input_for_ddb_table.primary_key AS varchar)))=0 THEN
        NULL
        ELSE cast(input_for_ddb_table.primary_key AS varchar)
        END ,'NULL'), coalesce(case
        WHEN length(trim(cast(attribute AS varchar)))=0 THEN
        NULL
        ELSE cast(attribute AS varchar)
        END ,'NULL') ]) AS data_array
    FROM input_for_ddb AS input_for_ddb_table
    JOIN dynamodb_shards dynamo_shards_table on(input_for_ddb_table.primary_key = dynamo_shards_table.primary_key)
    GROUP BY  1

プロセスの実行

このクエリが終了したら、以下のステップを完了します。

  1. AWS Glue コンソールの [ETL] で [ジョブ] を選択します。
  2. [ジョブの追加] を選択します。
  3. AWS Glue のジョブプロパティの設定画面で、ジョブに S3toDynamoDB という名前を付けます。
  4. [IAM ロールを作成します] を選択します。
  5. [ロール] を選択します。
  6. [ロールの作成] を選択します。
  7. ロールの作成画面で、[Glue] を選択します。
  8. [次のステップ: アクセス権限] を選択します。
  9. [ポリシーの作成] を選択します。新しいウィンドウが開きます。
  10. [JSON] を選択します。
  11. 以下のポリシーを入力します (お使いの AWS アカウント番号を入力するようにしてください)。
    {
        "Version": "2012-10-17",
        "Statement": [
            { 
                "Sid": "GlueScriptPermissions",
                "Effect": "Allow",
                "Action": [
                    "athena:BatchGetQueryExecution",
                    "athena:GetQueryExecution",
                    "athena:GetQueryResults",
                    "athena:GetQueryResultsStream",
                    "athena:GetWorkGroup",
                    "dynamodb:BatchWriteItem",
                    "glue:GetTable",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:PutObject"
                ],
                "Resource": [
                    "arn:aws:athena:*:<YOURACCOUNTNUMBER>:workgroup/*",
                    "arn:aws:dynamodb:*:<YOURACCOUNTNUMBER>:table/target_table",
                    "arn:aws:glue:*:<YOURACCOUNTNUMBER>:catalog",
                    "arn:aws:glue:*:<YOURACCOUNTNUMBER>:database/input_db",
                    "arn:aws:glue:*:<YOURACCOUNTNUMBER>:table/input_db/push_to_ddb_data",
                    "arn:aws:s3:::<YOURBUCKET>/*"
                ]
            },
            {
                "Sid": "Logs",
                "Effect": "Allow",
                "Action": [
                    "athena:GetCatalogs",
                    "logs:Create*",
                    "logs:Put*"
                ],
                "Resource": "*"
            },
            {
                "Sid": "Passrole",
                "Effect": "Allow",
                "Action": "iam:PassRole",
                "Resource": "arn:aws:iam::<YOURACCOUNTNUMBER>:role/AWSGlueServiceRole"
            }
        ]
    }
  12. [次のステップ: タグ] を選択します。
  13. [次のステップ:確認] を選択します。
  14. ポリシーに AWSGlueServicePolicy という名前を付けます。
  15. [作成] を選択します。
  16. ロールウィンドウで、新しく作成したポリシーを検索します。
  17. [次のステップ: タグ] を選択します。
  18. [次のステップ:確認] を選択します。
  19. ロールに AWSGlueServiceRole という名前を付けます。
  20. AWS Glue ジョブ用に、ドロップダウンでこの新しいロールを見つけます。
  21. [Type] には [Spark] を選択します。
  22. [Glue version] には [Spark 2.2 Python 2] を選択します。
  23. [このジョブ実行] には [ユーザーが作成する新しいスクリプト] を選択します。
  24. [セキュリティ設定、スクリプトライブラリおよびジョブパラメータ] の [最大容量] に 30 を入力します。
  25. 他はすべてデフォルトのままにしておきます。
  26. [次へ] を選択します。
  27. 次の画面で、[ジョブを保存してスクリプトを編集する] を選択します。
  28. 次の画面で、以下のコードを入力します (<AWS Region> を運用しているリージョンに変更するようにしてください)。
    #---- Glue PySpark script
    
    
    from __future__ import print_function
    import boto3
    import time
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    
    #------------ Input section -----
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    
    DYNAMO_TABLE_NAME = 'target_table'
    athena_db_name = 'input_db'
    athena_table_name = 'push_to_ddb_data'
    
    
    def generate_data(meta_list, data_list):
        for data in data_list:
            yield dict(zip(meta_list, data))
    
    
    def push2ddb_batch_put(meta_list, data_list):
        try:
            dynamodb = boto3.resource('dynamodb', region_name=<AWS REGION>)
            table = dynamodb.Table(DYNAMO_TABLE_NAME)
            with table.batch_writer() as batch:
                for data in generate_data(meta_list, data_list):
                    ndata = {k: v for k, v in data.items() if v != "NULL"}
                    batch.put_item(
                        Item=ndata
                    )
    
            return len(data_list)
        except Exception as err:
            print("Error: while inserting data to DynamoDB...{err}".format(err=err))
            raise err
    
    
    def insert_into_ddb(line):
        return push2ddb_batch_put(line['meta_array'], line['data_array'])
    
    
    if __name__ == "__main__":
        try:
            # ---- Create the Glue Context and using Glue Context create a data frame
            glueContext = GlueContext(SparkContext.getOrCreate())
            src_data = glueContext.create_dynamic_frame.from_catalog(database=athena_db_name, table_name=athena_table_name)
            print("Count: ", src_data.count())
            src_data.printSchema()
    
            df1 = src_data.toDF()
            df1.cache()
            df1.show(10)
            start_time = time.time()
            print(df1.rdd.map(insert_into_ddb).collect())
            print(time.time() - start_time)
    
        except Exception as err:
            print("There is an error while inserting data to DDB...{err}".format(err=err))
            raise err
  29. [保存] を選択します。
    このスクリプトでは、先ほど Athena で作成した準備済みのデータを読み込みます。作成されたチャンクが、DynamoDB BatchWriteItem API に加えて、AWS Glue Spark ワーカーの並列処理機能を使って DynamoDB にロードされます。
  30. [ジョブの実行] を選択します。

AWS Glue ジョブのプロビジョニングにはしばらく時間がかかりますが、実行中になると、AWS Glue クラスターからのログが表示されるようになります。

AWS Glue ジョブが終了するまで、スループットが時間の経過とともに最大化され、動的にスケールアップされます。DynamoDB の [メトリックス] タブにあるメトリクスは、以下のスクリーンショットに似たものになります。

DynamoDB の Auto Schaling 機能は、AWS Glue ジョブが DynamoDB テーブルにアップロードする書き込みの数に基づいてスケーリングを開始しました。特定のしきい値において、テーブルでの 70% の使用率という目標に対応するために、テーブルが書き込みキャパシティーの量を増加させました。AWS Glue には、AWS SDK 内で BatchWrite コールの再試行ロジックを使用した複数の同時ライターがあり、スロットルリクエストがあったとしても、ジョブを正常に完了させることで最終的には DynamoDB テーブルに書き込まれるようになっていました。前出のスクリーンショットは、1,400 万アイテムを 30 分未満で DynamoDB にアップロードしたことを示しています。

realtor.com のケースでは、これが日中のある時点で実行されるバッチジョブです。AWS Glue と DynamoDB を使用することによって、realtor.com® は DynamoDB に書き込まれる必要があるデータの量に応じて動的にスケールアップし、ジョブが完了した後はインフラストラクチャを管理しなくてもスケールダウンするシステムを持つことができました。

まとめ

この記事では、以下の実行方法をご紹介しました。

  • データを投入する DynamoDB テーブルのセットアップ。
  • 取り込まれた Raw データを AWS Glue が並列化できる形式に整えるための Athena を使った複数クエリの実行。
  • DynamoDB への並列的なアップロードのために、イベントによるトリガー、またはスケジュールに従ってオンデマンドに呼び出すことができる AWS Glue ジョブのセットアップ。

realtor.com® は、Athena と AWS Glue を DynamoDB パイプラインに拡張して、システムを動的にスケールすることを可能にしながら全体的な管理を低減させました。これによって realtor.com の分析プロファイルの更新にかかる時間が短縮され、ユーザーが夢見る家を見つけるための支援をより多く提供できるようになります。

 


著者について

 

Arup Ray 氏は realtor.com のエンジニアリング担当 VP で、データテクノロジーチームを率いています。realtor.com のデータチームは、データを住宅購入者と不動産プロフェッショナルにとって実用的なものとするために AWS のテクノロジーを使用しています。

 

 

 

Daniel Whitehead は、アマゾン ウェブ サービスのソリューションアーキテクトです