Amazon Web Services ブログ

Amazon S3 オブジェクトの Amazon Kinesis Data Firehose カスタムプレフィックス

2019 年 2 月、アマゾン ウェブ サービス (AWS) は、Amazon Kinesis Data Firehose に Custom Prefixes for Amazon S3 Objects と呼ばれる新機能を発表しました。これにより、顧客はデータレコードが配信される Amazon S3 プレフィックスのカスタム表現を指定できます。 以前は、Kinesis Data Firehose ではリテラルプレフィックスのみを指定できました。次に、このプレフィックスを静的な日付形式のプレフィックスと組み合わせて、固定形式の出力フォルダを作成しました。 お客様は柔軟性を求めていたので、AWS は耳を傾け、実現したのです。

Kinesis Data Firehose は、アプリケーションや IoT デバイスなどのストリーミングソースからイベントデータを消費するために最も一般的に使用されています。  データは通常データレイクに格納されるため、処理して最終的にクエリを実行できます。  Amazon S3 にデータを保存するときは、関連データを分割またはグループ化し、同じフォルダにまとめて保存することをお勧めします。  これにより、パーティション化されたデータをフィルタリングし、各クエリでスキャンされるデータ量を制御することができるため、パフォーマンスが向上しコストが削減されます。

データをグループ化する一般的な方法は日付順です。  Kinesis Data Firehose はデータを自動的にグループ化し、日付に基づいて Amazon S3 の適切なフォルダに保存します。  ただし、Amazon S3 内のフォルダーの命名は、Apache Hive の命名規則と互換性がありません。これにより、AWS Glue クローラーを使用してデータをカタログ化し、ビッグデータツールを使用して分析することが難しくなります。

この記事では、Kinesis Data Firehose が Amazon S3 の出力フォルダーに名前を付ける方法をカスタマイズできる新しい機能について説明します。カスタムプレフィックスの仕組み、ユースケース、そして自分のアカウントで機能を試すためのステップバイステップの説明が含まれています。

Amazon S3 オブジェクトのカスタムプレフィックスの必要性

以前、Kinesis Data Firehose は静的な世界標準時 (UTC) ベースのフォルダー構造を YYYY/MM/DD/HH の形式で作成していました。その後、オブジェクトを Amazon S3 に書き込む前に、提供されたプレフィックスに追加しました。 たとえば、プレフィックス「mydatalake/」を指定した場合、生成されるフォルダ階層は「mydatalake/2019/02/09/13」になるでしょう。  ただし、Hive 命名規則と互換性を持たせるために、フォルダ構造は「/partitionkey=partitionvalue」の形式に従うことが期待されています。 この命名規則を使用すると、AWS Glue クローラーを使用してデータを簡単にカタログ化できるため、適切なパーティション名が得られます。

Amazon AthenaMSCK REPAIR TABLE を実行する、または Amazon EMR で Apache Hive を実行するなど、パーティションを管理する他の方法も可能になります。それにより、単一のステートメントですべてのパーティションを追加できます。 さらに、日付をフォルダに展開する代わりに、「/dt=2019-02-09-13/」のような他の日付ベースのパーティションパターンを使用することもできます。  これは、テーブルが時間の経過とともに大きくなるにつれて維持する必要があるパーティションの総数を減らすのに役立ちます。範囲クエリも単純化されます。 カスタムプレフィックスを指定する機能を提供することで、データを正しいフォルダ構造に入れるための追加の ETL 手順が不要になり、洞察の時間が短縮されます。

Amazon S3 オブジェクトのカスタムプレフィックスの仕組み

この新しい機能では、イベントデータから日付やタイムスタンプの値を使用することはできません。また、イベントで他の任意の値を使用することもできません。Kinesis Data Firehose は、ApproximateArrivalTimestamp という内部タイムスタンプフィールドを使用します。 各データレコードには、ストリームがレコードを正常に受信して格納したときに設定される ApproximateArrivalTimestamp (UTC) が含まれています。これは一般的にサーバーサイドタイムスタンプと呼ばれています。Kinesis Data Firehose は、設定されたバッファリングヒントに従って受信レコードをバッファリングし、それらを Amazon S3 宛先の Amazon S3 オブジェクトに配信します。Amazon S3 で作成したオブジェクトには、それぞれが異なる ApproximateArrivalTimestamp を持つ複数のレコードが含まれる可能性があります。 タイムスタンプを評価するとき、Kinesis Data Firehose は、書き込まれている Amazon S3 オブジェクトに含まれる最も古いレコードの ApproximateArrivalTimestamp を使用します。

Kinesis Data Firehose には、配信、AWS Lambda 変換、またはフォーマット変換の失敗が発生したときにレコードを別のエラー出力場所に配信する機能もあります。以前は、エラー出力場所を設定できず、配信失敗の種類によって決定されていました。このリリースでは、エラー出力場所 (ErrorOutputPrefix) も設定できます。この新機能の 1 つの利点は、失敗したレコードを日付分割されたフォルダに分割して簡単に再処理できることです。

それでは、どのようにカスタム Prefix と ErrorOutputPrefix を指定するのでしょうか? !{namespace:value} 形式の式を使用します。ここでは、ネームスペースは firehose またはタイムスタンプのいずれかです。は、ファイアホースネームスペースに対して「random-string」または「error-output-type」のいずれか、あるいは Java DateTimeFormatter 形式のタイムスタンプネームスペースの日付パターンになるでしょう。単一の式では 2 つのネームスペースの組み合わせを使用できますが、ErrorOutputPrefix では !{firehose: error-output-type} のみ使用できます。詳細および例については、「Amazon S3 オブジェクトのカスタムプレフィックス」を参照してください。

Kinesis Data Firehose を使用してストリーミングデータを Amazon S3 に書き込む

このチュートリアルでは、Hive 互換フォルダ構造を使用して Kinesis Data Firehose でストリーミングデータを Amazon S3 に書き込む方法について説明します。  次に、AWS Glue クローラーがどのようにスキーマを推論し、Kinesis Data Firehose で指定した適切なパーティション名を抽出し、それらを AWS Glue Data Catalog でカタログ化できるかを示します。  最後に、パーティションが実際に認識されていることを示すためにサンプルクエリを実行します。

これを実証するために、Python コードを使用してサンプルデータを生成します。  また、Kinesis Data Firehose の Lambda 変換を使用して、強制的に失敗を発生させます。これにより、データをエラー出力場所に保存する方法を示します。このチュートリアルに必要なコードは GitHub のこちらに含まれています。

このチュートリアルでは、以下がこれから構築するアーキテクチャです。

ステップ 1: Amazon S3 バケットを作成する

イベントレコードを配信するために Kinesis Data Firehose で使用する S3 バケットを作成します。米国東部 (バージニア北部) リージョンに Amazon S3 バケットを作成するには、AWS Command Line Interface (AWS CLI) を使用します。 例のバケット名を自分の名前に置き換えることを忘れないでください。

aws s3 mb s3://kdfs3customprefixesexample --region us-east-1

ステップ 2: Lambda 変換 (オプション)

受信イベントには、イベントペイロードに ApproximateArrivalTimestamp フィールドがあります。  これは、Amazon S3 に適切なフォルダ構造を作成するのに十分です。  ただし、データをクエリするときは、フィルタリングと検証を簡単にするためにこのタイムスタンプ値を最上位列として公開すると効果的です。  これを達成するために、データペイロードの最上位フィールドとして ApproximateArrivalTimestamp を追加する Lambda 関数を作成します。データペイロードは、Kinesis Data Firehose が Amazon S3 のオブジェクトとして書き込むものです。さらに、Lambda コードは、「ErrorOutputPrefix」内の式の使用方法を示すために、配信先に指定された「ErrorOutputPrefix」の場所に配信される処理エラーも人工的に生成します。

Lambda 変換関数の IAM ロールを作成する

まず、LambdaBasicRole という Lambda 関数のロールを作成します。 TrustPolicyForLambda.json ファイルは GitHub リポジトリに含まれています。

$ aws iam create-role --role-name KDFLambdaBasicRole --assume-role-policy-document file://TrustPolicyForLambda.json

ロールが作成されたら、管理された Lambda 基本実行ポリシーをそれにアタッチします。

$ aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name KDFLambdaBasicRole

Lambda 関数

Lambda 関数を作成するには、Python Kinesis Data Firehose のブループリント「General Firehose Processing」から始めて修正します。レコードの構造と返されるべき内容の詳細については、「Amazon Kinesis Data Firehose Data Transformation」を参照してください。

Python ファイルを圧縮したら、AWS CLI を使用して Lambda 関数を作成します。CreateLambdaFunctionS3CustomPrefixes.json ファイルGitHubリポジトリに含まれています。

aws lambda create-function --zip-file "fileb://lambda_function.zip" --cli-input-json file://CreateLambdaFunctionS3CustomPrefixes.json

ステップ 3:配信ストリーム

次に、Kinesis Data Firehose 配信ストリームを作成します。 createdeliverystream.json ファイルGitHub リポジトリに含まれています。

 aws firehose create-delivery-stream --cli-input-json file://createdeliverystream.json

前の設定では、「ExtendedS3DestinationConfiguration」要素で Prefix と ErrorOutputPrefix を定義しました。「S3BackupConfiguration」要素についても同じことを定義しました。「ProcessingConfiguration」要素が「Disabled」に設定されている場合、「ExtendedS3DestinationConfiguration」要素の ErrorOutputPrefix パラメータは一貫性のためにのみ存在します。それ以外の点では意味がありません。

ハイブスタイルのパーティション分割と互換性のあるフォルダ構造になるプレフィックスを選択しました。以下が使用したプレフィックスです。

“fhbase/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/”

Kinesis Data Firehose は、まず Amazon S3 バケットのすぐ下に「fhbase」という名前のベースフォルダを作成します。次に、Java DateTimeFormatter を使用して、式 !{timestamp:YYYY}、!{timestamp:MM}、!{timestamp:dd}、および !{timestamp:HH} を年、月、日と時間に評価します。たとえば、UTC で 2019-02-09T16:13:01.000000Z である UNIX エポックタイムの 1549754078390 の ApproximateArrivalTimestamp は、「yea=2019」、「month=02」、「day=09」、および「hour=16」に評価します。  したがって、データレコードが配信される Amazon S3 内の場所は「fhbase/year=2019/month=02/day=09/hour=16/」に評価されます。

同様に、ErrorOutputPrefix 「fherroroutputbase/!{firehose:random-string}/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/」という名前のベースフォルダが S3 バケットのすぐ下に作成されます。式 !{firehose:random-string} は、「ztWxkdg3Thg」のように 11 文字のランダムな文字列に評価されます。  同じ式でこれを複数回使用すると、すべてのインスタンスが新しいランダムな文字列に評価されます。式 !{firehose:error-output-type} は次のいずれかに評価されます。

  1. Lambda 変換の配信に失敗した場合は「processing-failed」
  2. Amazon ES 宛先の配信に失敗した場合は「elasticsearch-failed」
  3. Splunk 宛先の配信に失敗した場合は「splunk-failed」
  4. データフォーマットの変換に失敗した場合は「format-conversion-failed」

そのため、Lambda 変換の配信失敗レコードを含む Amazon S3 オブジェクトの場所は、fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/09/ に評価される可能性があります。

作成した配信ストリームを記述するために aws firehose describe-delivery-stream --delivery-stream-name KDFS3customPrefixesExample を実行できます。

次に、配信ストリームの保存データの暗号化を有効にします。

aws firehose start-delivery-stream-encryption --delivery-stream-name KDFS3customPrefixesExample

または AWS コンソールを使用して配信ストリームを作成します

  1. ソースを選択します。この例では、Direct PUT を使用します。
  2. Lambda 変換を使用して着信レコードを変換するかどうかを選択します。[Enabled] を選択し、先ほど作成した Lambda 関数の名前を選択しました。

  1. 宛先を選択します。Amazon S3 宛先を選択しました。

  1. Amazon S3 バケットを選択します。この演習の前半で作成した Amazon S3 バケットを選択しました。

  1. Amazon S3 プレフィックスと Amazon S3 エラープレフィックスを指定します。これは、AWS CLI の入力 JSON のコンテキストで前述した「Prefix」と「ErrorOutputPrefix」に対応しています。

  1. 生の (変換前の) レコードを別の Amazon S3 の場所にバックアップするかどうかを選択します。[Enabled] を選択し、同じバケットを指定しました (別のバケットも選択できます)。また、変換後のレコードとは異なるプレフィックスを指定しました。ベースフォルダは異なりますが、その下のフォルダ構造は同じです。これにより、AWS Glue クローラーを使用してこの場所をクロールするか、この場所を指す Athena または Redshift Spectrum で外部テーブルを作成することがより効率的になります。

  1. Amazon S3 宛先にバッファリングヒントを指定します。私は 1 MB と 240 秒を選びました。
  2. [S3 Compression and encryption settings] を選択します。変換されたレコードの場所には圧縮を選択しませんでした。サービス管理されている AWS KMS カスタマーマスターキー (CMK) を使用して、休止中の Amazon S3 ロケーションを暗号化することにしました。
  3. Cloudwatch でのエラーログ記録を有効にするかどうかを選択します。[Enabled] を選択しました。
  4. 代わりに Kinesis Data Firehose がリソースにアクセスするために引き受ける IAM ロールを指定します。新しい画面を表示するには、[Create new] または [Choose] を選択します。[Create a new IAM role] を選択し、ロールに名前を付けて、[Allow] を選択します。
  5. [Create Delivery Stream] を選択します。

配信ストリームが作成されて有効になりました。これでイベントを送ることができます。

 サンプルデータでテストする

サンプルデータを生成するために Python コードを使用しました。生成されたデータの構造は以下のとおりです。

{'sector': 'HEALTHCARE', 'price': 194.07, 'ticker_symbol': 'UFG', u'EventTime': '2019-02-12T07:10:52.649000Z', 'change': 20.56}
{'sector': 'HEALTHCARE', 'price': 124.01, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:53.745000Z', 'change': 3.32}
{'sector': 'MANUFACTURING', 'price': 26.95, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:54.864000Z', 'change': 24.53}

データを生成してKinesis Data Firehose にプッシュするためのサンプルコードは、GitHub リポジトリに含まれています。

Kinesis Data Firehose 配信ストリームへのイベントの送信を開始した後、オブジェクトは Amazon S3 の指定されたプレフィックスの下に表示され始めるはずです。

Lambda の呼び出しエラーと、Lambda 変換エラーの ErrorOutputPrefix の場所にあるファイルの外観を説明するため、Lambda 関数を呼び出すための「firehose_delivery_role」に対して許可を与えませんでした。次のファイルは、ErrorOutputPrefix で指定された場所に現れました。

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/FxvO2Tf9MQP/processing-failed/2019/02/12/

2019-02-12 16:57:24     260166 KDFS3customPrefixesExample-1-2019-02-12-16-53-20-5262db81-0f3a-48bf-8fc6-2249124923ff

これは、前述したエラーファイルの内容の一部です。

{"attemptsMade":4,"arrivalTimestamp":1549990400391,"errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied.Ensure that the access policy allows access to the Lambda function.","attemptEndingTimestamp":1549990478018,"rawData":"eyJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE4Ny45NCwgInRpY2tlcl9zeW1ib2wiOiAiVUZHIiwgIkV2ZW50VGltZSI6ICIyMDE5LTAyLTEyVDE2OjUzOjE5Ljk5MzAwMFoiLCAiY2hhbmdlIjogOS4yNn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

「firehose_delivery_role」に適切な権限を与えた後、データオブジェクトは Amazon S3 宛先に指定された「Prefix」の場所に現れました。

aws s3 ls s3://kdfs3customprefixesexample/fhbase/year=2019/month=02/day=12/hour=17/

2019-02-12 17:17:26    1392167 KDFS3customPrefixesExample-1-2019-02-12-17-14-51-fc63e8f6-7421-491d-8417-c5002fca1722

2019-02-12 17:18:39    1391946 KDFS3customPrefixesExample-1-2019-02-12-17-16-43-e080a18a-3e1e-45ad-8f1a-98c7887f5430

また、Lambda 変換の Lambda コードは 10 パーセントのレコードに対して失敗のステータスを設定したため、Lambda 変換エラーの ErrorOutputPrefix の場所に表示されました。

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/12/

2019-02-12 17:25:54     180092 KDFS3customPrefixesExample-1-2019-02-12-17-21-53-3bbfe7c0-f505-47d0-b880-797ce9035f73

これは、エラーファイルの内容の一部です。

{"attemptsMade":1,"arrivalTimestamp":1549992113419,"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record","attemptEndingTimestamp":1549992138424,"rawData":"eyJ0aWNrZXJfc3ltYm9sIjogIlFYWiIsICJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE3LjUyLCAiY2hhbmdlIjogMTcuNTUsICJFdmVudFRpbWUiOiAiMjAxOS0wMi0xMlQxNzoyMTo1My4zOTY2NDdaIn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

これで AWS Glue クローラーを作成する準備が整いました。AWS Glue Data Catalog の使用の詳細については、「AWS Glue Data Catalog へのデータ入力」を参照してください。

  1. AWS Glue コンソールで [Crawlers] に移動し、[Add Crawler] を選択します。

  1. クローラーに関する情報を追加して、[Next] を選択します。
  2. [Include Path] で、Amazon S3 宛先に入力した Amazon S3 バケット名を指定します。Kinesis Data Firehose 配信ストリームを作成したときに使用した静的プレフィックスも含めます。カスタムプレフィックス式は含めないでください。
  3. [Next] を選択します。

  1. [Next, No, Next] を選択します。
  2. AWS Glue が使用する IAM ロールを指定します。新しい IAM ロールを作成することにしました。[Next] を選択します。
  3. クローラーを実行するスケジュールを指定します。[Run it on Demand] を選択しました。[Next] を選択します。
  4. クローラーがクロールし検出したテーブルを追加する場所を指定します。デフォルトのデータベースを選択しました。[Next] を選択します。

  1. [終了] を選択します。
  1. クローラーが作成され、実行する準備が整いました。[Run crawler] を選択します。

  1. AWS Glue コンソールで、[Tables] に移動します。ベースフォルダの名前でテーブルが作成されたことがわかります。[fhbase] を選択します。

クローラーはテーブルとそのプロパティを検出してデータを入力しました。

発見したスキーマが表示されます。クローラーは、プレフィックス式で指定されたフォルダ構造に基づいてパーティションを識別して作成しました。

Amazon Athena コンソールを開き、ドロップダウンメニューからデフォルトデータベースを選択します。次のクエリを [New query1] ウィンドウに入力し、[Run query] を選択します。

SELECT * FROM "default"."fhbase"

where year = '2019' and day = '12' and hour = '17'

order by approxarrtimestamputcfh desc

Amazon Athena が fhbase テーブルをパーティションテーブルとして認識したことがわかります。クエリは、クエリ内のパーティションを利用して結果をフィルタリングできます。

結論

この記事が示すように、Amazon S3 オブジェクトのカスタムプレフィックスは、Kinesis Data Firehose が Amazon S3 のデータレコードと失敗レコードを配信するフォルダ構造をカスタマイズするための大幅な柔軟性を実現しています。Amazon S3 でフォルダ構造と名前を制御することで、データの検出、カタログ化、およびアクセスが簡単になります。その結果、より便宜に洞察を得ることができ、クエリのコストをより適切に管理できるようになります。

 


著者について

Rajeev Chakrabarti は Kinesis のスペシャリストソリューションアーキテクトです