Amazon Web Services ブログ

Apache Spark を実行しているAmazon Kinesis Data Firehose と Amazon EMR によるダウンストリームデータ処理の最適化

増え続けるデータを処理し、新しいデータソースを取り込むことは、多くの組織にとって大きな課題となっています。  多くの場合、AWS のお客様は接続中のさまざまなデバイスやセンサーからメッセージを受け取っていますが、それらを詳しく分析する前に、効率的に取り込み、処理する必要があります。  結果として、あらゆる種類のデータが行き着くソリューションが Amazon S3 となるのは当然と言えるでしょう。  ただし、データが Amazon S3 に格納される方法によって、ダウンストリームデータ処理の効率とコストに大きな違いが生じる可能性があります。  具体的に言うと、Apache Spark では少数の大きなファイルを処理する場合に比べて、小さいファイルを数多く処理すると、ファイル操作に負担がかかります。  これらのファイルにはそれぞれ、メタデータ情報のオープン、読み込み、クローズの処理に数ミリ秒のオーバーヘッドがあります。これらのファイルを数多くファイル操作すると、このオーバーヘッドのために処理が遅くなります。このブログ投稿では、Amazon Kinesis Data Firehose を使用して、Amazon S3 に配信する多数の小さいメッセージを大きいメッセージにマージする方法を説明しています。  この結果、Spark を実行している Amazon EMR の処理が高速化します。

Amazon Kinesis Data Streams と同様、Kinesis Data Firehose は最大で 1 MB のメッセージサイズを受信できます。  単一のメッセージが 1 MB を超える場合は、ストリームに配置する前に圧縮できます。  ただし量が多い場合、メッセージのファイルサイズが 1 MB 以下だと通常小さすぎます。  正しいファイルサイズというものはありませんが、多くのデータセットでは 1 MB を指定するとファイルの数とファイル操作が多すぎることになるでしょう。

この投稿では、Amazon S3 にある Apache Spark を使用して、圧縮ファイルを読み込む方法についても説明します。この圧縮ファイルには適切なファイル名拡張子がなく、parquet ファイル形式で Amazon S3 に保存されます。

ソリューションの概要

このブログ記事で従う手順は、次のとおりです。

  1. 仮想プライベートクラウド (VPC) と Amazon S3 バケットを作成する。
  2. Kinesis データストリームおよび Kinesis データストリームからのメッセージを処理する AWS Lambda 関数をプロビジョニングする。
  3. ステップ 2で Lambda 関数から送信されたメッセージを Amazon S3 に配信するよう、Kinesis Data Firehose をプロビジョニングする。このステップでは Amazon EMR クラスターのプロビジョニングも行い、Amazon S3 のデータを処理します 。
  4. Amazon EC2 で実行しているカスタムコードを使って、テストデータを生成する。
  5. Amazon EMR クラスターのマスターインスタンスからサンプルの Spark プログラムを実行し、Amazon S3 からファイルを読み込み、それらを parquet ファイル形式に変換して Amazon S3 に書き戻す。

次の図は、サービスがどのように連携しているかを示しています。

図にある AWS Lambda 関数がメッセージを読み込み、それらにデータを追加して、Amazon Kinesis Data Firehose に送信する前に gzip で圧縮しています。こうした作業を行う理由は、ほとんどの場合、データが Amazon S3 に届く前にデータをいくらかエンリッチ化する必要があるからです。

Amazon Kinesis Data Firehose では受信メッセージを Amazon S3 バケットに配信する前に、より大きなレコードにバッファリングできます。これはバッファサイズ (最大 128 MB) とバッファリング間隔 (最大 900 秒) の 2 つの条件に従って行われます。どちらかの条件が満たされると、レコード配信を開始します。

Apache Spark ジョブは Amazon S3 からメッセージを読み込み、それらを parquet ファイル形式で保存します。parquet ファイル形式では、より効率的なスキャンが提供できるだけでなく、Amazon Athena のようなサービスでアドホッククエリや処理の継続も可能にするカラムナ形式でデータが保存されます。

考慮事項

Kinesis Data Firehose に送信できるレコードのサイズは最大 1,000 KB です。メッセージサイズがこの値よりも大きい場合は、メッセージを Kinesis Data Firehose に送信する前に圧縮するのが最もよい方法です。Kinesis Data Firehose ではメッセージが Kinesis Data Firehose データストリームに書き込まれた後でも、そのメッセージを圧縮することができます。しかし、これはメッセージサイズの制限に対処するものではありません。この圧縮はメッセージが書き込まれた後に行われるためです。Kinesis Data Firehose が圧縮済みメッセージを Amazon S3 に配信すると、ファイル拡張子のないオブジェクトとして書き込まれます。たとえば、メッセージが Kinesis Data Firehose に書き込まれる前に gzip で圧縮されている場合、そのメッセージは .gz 拡張子なしで Amazon S3 に配信されます。ダウンストリーム処理に Apache Spark を使用している場合、「.gz」拡張子が必要なため、これは問題となります。

しかしこのブログで後述する Amazon S3 API オペレーションを使用しファイルを読み込むことで、この問題を克服できます。

前提条件および仮定

このブログ記事で説明している手順に従うには、以下のものが必要です。

  • AWS のサービスへのアクセスを提供する AWS アカウント。
  • AWS Identity and Access Management (IAM) ユーザー (AWS CLI を設定するためのアクセスキーとシークレットアクセスキーがある)。
  • テンプレートとコードは米国東部 (バージニア北部) リージョンでのみ機能するように設定されています。

さらに、次のことに注意してください。

  • 同じ VPC 内のすべてのサービスを設定して、ネットワーキングの考慮事項を簡素化します。
  • 重要: 弊社が提供する AWS CloudFormation テンプレートとサンプルコードは、ハードコードされたユーザー名とパスワード、オープンなセキュリティグループを使用します。これらはテスト目的だけに使用できます。そのままの形で本番用に使用することを目的としたものではありません。^

ソリューションの実装

こちらのダウンロード可能なテンプレートを、シングルクリックデプロイに使用できます。このテンプレートはデフォルトだと、米国東部 (バージニア北部) リージョンで起動するようになっています。別のリージョンに変更しないでください。このテンプレートは米国東部 (バージニア北部) リージョンでのみ機能するように設計されています。コンソールから直接起動するには、[Launch Stack] をクリックします。

このテンプレートには次のパラメータがあります。一部のパラメータにはデフォルト値があり、これらを編集することはできません。これらの定義済みの名前はコード内でハードコードされています。一部のパラメータでは、値を指定する必要があります。次の表に詳細があります。

このパラメータには 以下を提供
StackName スタック名を提供します。

ClientIP

 

SSH を使用したクラスターへの接続を許可されているクライアントの IP アドレス範囲。
FirehoseDeliveryStreamName Amazon Firehose 配信ストリームの名前。デフォルト値は「AWSBlogs-LambdaToFireHose」に設定されています。
InstanceType EC2 インスタンスタイプ。
KeyName ログインへのアクセスを可能にする既存の EC2 キーペアの名前。
KinesisStreamName Amazon Kinesis Stream の名前。デフォルト値は「AWS-Blog-BaseKinesisStream」に設定されています
リージョン AWS リージョン – デフォルトでは us-east-1、つまり米国東部 (バージニア北部) になっています。スクリプトはこのリージョンでのみ機能するように開発されているため、これを変更しないでください。

EMRClusterName

 

EMR クラスターの名前。
S3BucketName アカウントに作成したバケット名。このバケットに固有の名前を付けます。このバケットはメッセージと Spark コードからの出力を保存するのに使用します。

テンプレートの詳細を指定したら、[次へ] を選択します。[オプション] ページで、[次へ] をもう一度選択します。Review ページで、[I acknowledge that AWS CloudFormation might create IAM resources with custom names] と [I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND] のチェックボックスをオンにします。次に [Create] をクリックします。

このワンステップソリューションを使用する場合は、ステップ 7 にスキップします。テストデータセットを生成し、Kinesis Data Streams に取り込みます。

各コンポーネントを個別に作成するには、次の手順に従います。

1.AWS CloudFormation テンプレートを使用して Amazon VPC を設定し、Amazon S3 バケットを作成する

このステップでは、VPC、パブリックサブネット、インターネットゲートウェイ、ルートテーブル、セキュリティグループを設定します。セキュリティグループには 2 つのインバウンドアクセスルールがあります。1 つ目のインバウンドルールは、提供済みクライアント IP CIDR 範囲から TCP ポート 22 (SSH) へのアクセスを許可します。2 つ目のインバウンドルールは、同じセキュリティグループ内の任意のホストからの任意の TCP ポートへのアクセスを許可します。次のステップで作成する他のサービスすべてにおいて、この VPC とサブネットを使用しますこれらのリソースに加えて、提供済みバケット名を使用して標準の Amazon S3 バケットも作成し、受信データと処理済みデータを格納します。こちらのダウンロード可能な AWS CloudFormation テンプレートを使用して、以前のコンポーネントを設定できます。コンソールから直接起動するには、[Launch Stack] をクリックします。

このテンプレートには次のパラメータがあります。次の表に詳細があります。

このパラメータには 以下を実行
StackName スタック名を提供します。
S3BucketName 固有の Amazon S3 バケットを提供します。このバケットはお客様のアカウントに作成されます。
ClientIP セキュリティグループの受信ルールに追加されている CIDR IP アドレス範囲を指定します。現在の IP アドレスは「checkip.amazon.com」から入手できます。

テンプレートの詳細を指定したら、[次へ] を選択します。[レビュー] ページで、[作成] を選択します。

スタックの起動が完了すると、次のような出力が返されるはずです。

Key Value
StackName Name
VPCID Vpc-xxxxxxx
SubnetID subnet-xxxxxxxx
SecurityGroup sg-xxxxxxxxxx
S3BucketDomain <S3_BUCKET_NAME>.s3.amazonaws.com
S3BucketARN arn:aws:s3:::<S3_BUCKET_NAME>

この情報を次のステップで使用するので、出力のメモをとっておいてください。AWS マネジメントコンソールまたは次の AWS CLI コマンドを使用して、スタック出力を表示できます。

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

2.  AWS CloudFormation テンプレートを使用して、必要な IAM ロールを作成する

このステップでは、2 つの AWS IAM ロールを設定します。IAM ロールのうち 1 つは AWS Lambda 関数が使用して、Amazon S3 サービス、Amazon Kinesis Data Firehose、Amazon CloudWatch Logs、Amazon EC2 インスタンスへのアクセスを許可します。  もう 1 つの IAM ロールは Amazon Kinesis Data Firehose サービスが使用して、Amazon S3 サービスにアクセスします。こちらのダウンロード可能な CloudFormation テンプレートを使用して、以前のコンポーネントを設定することができます。コンソールから直接起動するには、[Launch Stack] をクリックします。

このテンプレートには次のパラメータがあります。次の表に詳細があります。

このパラメータには 以下を実行
StackName スタック名を提供します。

テンプレートの詳細を指定したら、[次へ] を選択します。[オプション] ページで、[次へ] をもう一度選択します。[Review] ページで、[I acknowledge that AWS CloudFormation might create IAM resources with custom names] チェックボックスをオンにします。[Create] をクリックします。

スタックの起動が完了すると、次のような出力が返されるはずです。

Key Value
LambdaRoleArn arn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-lamdarole
FirehoseRoleArn arn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-firehoserole

スタックの起動が完了すると、作成されたリソースに関する情報が出力されます。この情報を次のステップで使用するので、出力のメモをとっておいてください。AWS マネジメントコンソールまたは次の AWS CLI コマンドを使用して、スタック出力を表示できます。

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

3.AWS CloudFormation テンプレートを使用して Amazon Kinesis Data Firehose データストリームを設定する

このステップでは受信メッセージの宛先として Amazon S3 を使用し、Amazon Kinesis Data Firehose を設定します。圧縮形式には非圧縮、128 MB サイズのバッファリング、および間隔秒数 300 を選択します。こちらのダウンロード可能な AWS CloudFormation テンプレートを使用して、以前のコンポーネントを設定できます。コンソールから直接起動するには、[Launch Stack] をクリックします。

このテンプレートには次のパラメータがあります。次の表に詳細があります。

このパラメータには 以下を実行
StackName スタック名を提供します。
FirehoseDeliveryStreamName Amazon Kinesis Data Firehose 配信ストリームの名前を入力します。デフォルト値は「AWSBlogs-LambdaToFirehose」に設定されています。
Role ステップ 2 で作成した Kinesis Data Firehose IAM ロール ARN を提供します。
S3BucketARN S3BucketARN を選択します。これはステップ 1 の AWS CloudFormation の出力から取得できます。

テンプレートの詳細を指定したら、[次へ] を選択します。[オプション] ページで、[次へ] をもう一度選択します。[レビュー] ページで、[作成] を選択します。

4.AWS CloudFormation テンプレートを使用して、Kinesis データストリームと Lambda 関数を作成する

このステップでは、Kinesis データストリームと AWS Lambda 関数を設定します。AWS Lambda 関数を使用して、Kinesis データストリームの受信メッセージを処理できます。イベントソースマッピングもこのテンプレートの一部として作成されます。これで、Kinesis データストリームソースの AWS Lambda 関数にトリガーが追加されます。イベントソースマッピング作成の詳細については、「イベントソースマッピングを作成する」をご参照ください。この Kinesis データストリームは 10 シャードで、また Lambda 関数は Java 8 ランタイムで作成されます。ここでは 1920 MB のメモリサイズと 300 秒のタイムアウトを割り当てます。こちらのダウンロード可能な AWS CloudFormation テンプレートを使用して、以前のコンポーネントを設定できます。コンソールから直接起動するには、[Launch Stack] をクリックします。

このテンプレートには次のパラメータがあります。次の表に詳細があります。

このパラメータには 以下を実行
StackName スタック名を提供します。
KinesisStreamName Amazon Kinesis Stream の名前を入力します。デフォルト値は「AWS-Blog-BaseKinesisStream」に設定されています。
Role 2 つ目の AWS CloudFormation テンプレートの一部として、Lambda 関数用に作成した IAM ロールを提供します。2 つ目の AWS CloudFormation テンプレートの出力から値を取得します。
S3Bucket 1 つ目の AWS CloudFormation テンプレートを使用して作成した既存の Amazon S3 バケット名を入力します。ドメイン名は使用しないでください。バケット名のみを使用してください。
リージョン AWS リージョンを選択します。デフォルトでは us-east-1、つまり米国東部 (バージニア北部) になっています。

テンプレートの詳細を指定したら、[次へ] を選択します。[オプション] ページで、[次へ] をもう一度選択します。[レビュー] ページで、[作成] を選択します。

5.AWS CloudFormation テンプレートを使用して、Amazon EMR クラスターを設定する

このステップでは、「Spark」、「Ganglia」、「Hive」アプリケーションを使用して、Amazon EMR 5.16.0 クラスターを設定します。1 つのマスターと 2 つのコアノードでこのクラスターを作成し、r4.xlarge インスタンスタイプを使用します。このテンプレートでは Amazon EMR Hive メタストアに AWS Glue メタストアを使用しています。この Amazon EMR クラスターを使用して、Amazon Kinesis Data Firehose データストリームが作成した Amazon S3 バケット内のメッセージを処理します。こちらのダウンロード可能な AWS CloudFormation テンプレートを使用して、以前のコンポーネントを設定できます。コンソールから直接起動するには、[Launch Stack] をクリックします。

このテンプレートには次のパラメータがあります。次の表に詳細があります。

このパラメータには 以下を実行
EMRClusterName EMR クラスターの名前を入力します。
ClusterSecurityGroup 1 つ目の AWS CloudFormation テンプレートの一部として作成したセキュリティグループ ID を選択します。
ClusterSubnetID 1 つ目の AWS CloudFormation テンプレートの一部として作成したサブネット ID を選択します。
AllowedCIDR クラスターへの接続を許可されているクライアントの IP アドレス範囲を指定します。
KeyName Amazon EMR クラスターにアクセスするための既存の EC2 キーペアの名前を入力します。

テンプレートの詳細を指定したら、[次へ] を選択します。[オプション] ページで、[次へ] をもう一度選択します。[レビュー] ページで、[作成] を選択します。

スタックの起動が完了すると、次のような出力が返されるはずです。

Key Value
EMRClusterMaster ssh hadoop@ec2-XX-XXX-XXX-XXX.us-east-1.compute.amazonaws.com -i <KEY_PAIR_NAME>.pem

この情報を次のステップで使用するので、出力のメモをとっておいてください。AWS マネジメントコンソールまたは次の AWS CLI コマンドを使用して、スタック出力を表示できます。

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

6.AWS CloudFormation テンプレートを使用して、テストデータを生成するための Amazon EC2 インスタンスを作成する

このステップでは Amazon EC2 インスタンスを設定し、open-jdk バージョン 1.8 をインストールします。この EC2 インスタンスを作成する AWS CloudFormation スクリプトは、さらに 2 つのステップを実行します。まず open-jdk バージョン 1.8 をダウンロードし、インストールします。2 番目に Java プログラムの jar ファイルを EC2 インスタンスの ec2-user ホームディレクトリにダウンロードします。この Java プログラムを使用すると、約 900 KB のテストデータメッセージが生成されます。次に、それらを以前のステップで作成した Kinesis データストリームに送信します。Java jar ファイル名は次のようになります。「sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

こちらのダウンロード可能な AWS CloudFormation テンプレートを使用して、以前のコンポーネントを設定できます。コンソールから直接起動するには、[Launch Stack] をクリックします。

このテンプレートには次のパラメータがあります。次の表に詳細があります。

このパラメータには 以下を実行
EC2SecurityGroup 最初の AWS CloudFormation テンプレートから作成したセキュリティグループ ID を選択します。
EC2Subnet 最初の AWS CloudFormation テンプレートから作成したサブネットを選択します。
InstanceType 提供済みインスタンスタイプを選択します。デフォルトでは、r4.4xlarge インスタンスが選択されます。
KeyName EC2 インスタンスへの SSH アクセスを有効にする既存の EC2 キーペアの名前。

テンプレートの詳細を指定したら、[次へ] を選択します。[オプション] ページで、[次へ] をもう一度選択します。[Review] ページで、[I acknowledge that AWS CloudFormation might create IAM resources with custom names] のオプションを選択し、[Create button] をクリックします。

スタックの起動が完了すると、次のような出力が返されるはずです。

Key Value
EC2Instance ssh ec2-user@<Public-IP> -i <KEY_PAIR_NAME>.pem

この情報を次のステップで使用するので、出力のメモをとっておいてください。AWS マネジメントコンソールまたは次の AWS CLI コマンドを使用して、スタック出力を表示できます。

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

7.テストデータセットを生成して Kinesis Data Streams に取り込む

以前の AWS CloudFormation スタックをすべて正常に作成したら、ステップ 6 で作成した EC2 インスタンスにログインします。CloudFormation スタックテンプレートの出力に示されているように、「ssh」コマンドを使用します。このテンプレートは「sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar」ファイルをコピーします。これを使用して、テストデータを生成し、Amazon Kinesis Data Streams に送信します。このサンプルの Kinesis プロデューサーに対応するコードは、こちらの Git リポジトリにあります。

EC2 インスタンスのセキュリティグループがご自身の IP アドレスからの SSH ポート 22 (インバウンド) を許可することを確認します。許可しない場合は、セキュリティグループのインバウンドアクセスを更新します。

  • ssh ec2-user@<Public IP Address of the EC2 Instance> -i <SSH_KEY_PAIR_NAME>.pem

以下のコマンドを実行して、テストデータを生成します。

$ cd;

 

$ ls -ltra sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

-rwxr-xr-x 1 ec2-user ec2-user 27536802 Oct 29 21:19 sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

 

$java -Xms1024m -Xmx25600m -XX:+UseG1GC -cp sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar com.optimize.downstream.entry.Main 10000

 

この Java プログラムは PutRecords API メソッドを使用しており、1 つの HTTP リクエストで多数のレコードを送信できます。詳細については、AWS ブログをご参照ください。上記の Java プログラムを実行すると、メッセージを Kinesis Data Stream に送信中であることを示す以下の出力が表示されます。

“Starting producer and consumer.....
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 0
Producer Thread # 9 is going to sleep mode for 500 ms.
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 1
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 2
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 3
::
::
Record sent to Kinesis Stream.Record size is ::: 5042850 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream.Record size is ::: 5042726 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream.Record size is ::: 5042729 KB”

サンプルの Kinesis プロデューサー jar を実行すると、メッセージ数が 10,000 になることに注意してください。このプログラムはテストデータメッセージを生成するもので、負荷テストツールに代わるものではありません。この記事で紹介するユースケースを説明するために作成したものです。

すべてのメッセージが生成され、Amazon Kinesis Data Streams に送信されると、プログラムが正常に終了します。

サンプルの JSON 入力メッセージフォーマットは次のとおりです。

   "processedDate":"2018/10/30 19:05:19",
   "currentDate":"2018/10/30 19:05:07",
   "hashDeviceId":"0c2745e4-c2d6-4d43-8339-9c2401e80e92",
   "deviceId":"94581b5f-a117-484a-8e3c-4fcc2dbd53b7",
   "accelerometerSensorList":[  
      {  
         "accelerometer_Y":8,
         "gravitySensor_X":5,
         "accelerometer_X":9,
         "gravitySensor_Z":4,
         "accelerometer_Z":1,
         "gravitySensor_Y":5,
         "linearAccelerationSensor_Z":3,
         "linearAccelerationSensor_Y":9,
         "linearAccelerationSensor_X":9
      },
      {  
         "accelerometer_Y":1,
         "gravitySensor_X":3,
         "accelerometer_X":5,
         "gravitySensor_Z":5,
         "accelerometer_Z":7,
         "gravitySensor_Y":9,
         "linearAccelerationSensor_Z":6,
         "linearAccelerationSensor_Y":5,
         "linearAccelerationSensor_X":3
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "tempSensorList":[  
      {  
         "kelvin":585.4928040286752,
         "celsius":43.329574923775425,
         "fahrenheit":50.13864584530086
      },
      {  
         "kelvin":349.95625855125814,
         "celsius":95.68423052685313,
         "fahrenheit":7.854854574219985
      },
 {
   …
 },
 {
   …
 },
 :
 :
 
   ],
   "illuminancesSensorList":[  
      {  
         "illuminance":44.65135784368194
      },
      {  
         "illuminance":98.15404017082403
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "gpsSensorList":[  
      {  
         "altitude":4.38273213294682,
         "heading":7.416314616289915,
         "latitude":5.759723677991661,
         "longitude":1.4732885894731842
      },
      {  
         "altitude":9.816473807569487,
         "heading":5.118919157684835,
         "latitude":3.581361614110458,
         "longitude":1.3699272610616127
      },
 {
   …
 },
 {
   …
 },
 :
 :
   }

 

Kinesis Data Stream コンソールにログインしてから、ステップ 4 で作成した Kinesis データストリームを選択します。  グラフを表示するには、[Monitor] タブをクリックします。十分なデータを生成するために、データ生成ユーティリティを少なくとも 15 分は実行してください。

8.AWS Lambda を使用した Kinesis Data Streams のメッセージ処理

前述で設定した AWS Lambda 関数 (name:LambdaForProcessingKinesisRecords) も使用し、Kinesis データストリームからのメッセージを処理します。この Lambda 関数は各メッセージの内容を読み込み、「追加データ」を加えます。 これは、Kinesis データストリームからの受信メッセージを読み込み、メッセージサイズが 1 MBを超えるという情報が追加されていることを示しています。 追加情報を加えて受信メッセージをエンリッチ化する、こうしたユースケースがお客様の中でいくつか見受けられます。AWS Lambda 関数は受信メッセージに追加データを加えた後、それらを Amazon Kinesis Data Firehose に送信します。Kinesis Data Firehose は 1 MB 未満のメッセージのみを受け入れるため、送信する前にメッセージを圧縮する必要があります。Lambda 関数では Kinesis Data Firehose に送信する前に、gzip を使用してメッセージを圧縮しています。各メッセージの圧縮に加えて、メッセージを分離するためにそれらを圧縮した後、各メッセージに改行文字 (“/n”) を追加します。

Kinesis Data Firehose の作成中、バッファサイズを 128 MB に設定します。バッファ時間は 900 秒です。これは、受信した圧縮メッセージをより大きなメッセージにマージし、提供済み Amazon S3 バケットに送信するのに役立ちます。

AWS Lambda 関数は Kinesis Data Streams の元のメッセージを読み込んだ後に、そのメッセージに次の内容を追加します。

"testAdditonalDataList": [
  {
    "dimesnion_X": 9,
    "dimesnion_Y": 2,
    "dimesnion_Z": 2
  },
  {
    "dimesnion_X": 3,
    "dimesnion_Y": 10,
    "dimesnion_Z": 5
  }
  {
    …
  },
  {
    …
  },
  :
  :
]

Kinesis Data Firehose への送信前にメッセージを圧縮しないと、このエラーメッセージが Amazon CloudWatch Logs にスローされます。

これは AWS Lambda 関数のメッセージを圧縮するコードスニペットです。完全なコードは、こちらの Git リポジトリにあります。

private String sendToFireHose(String mergedJsonString)
{
    PutRecordResult res = null;
    try {
        //To Firehose -
        System.out.println("MESSAGE SIZE BEFORE COMPRESSION IS : " + mergedJsonString.toString().getBytes(charset).length);
        System.out.println("MESSAGE SIZE AFTER GZIP COMPRESSION IS : " + compressMessage(mergedJsonString.toString().getBytes(charset)).length);
        PutRecordRequest req = new PutRecordRequest()
                .withDeliveryStreamName(firehoseStreamName);

        // Without compression - Send to Firehose
        //Record record = new Record().withData(ByteBuffer.wrap((mergedJsonString.toString() + "\r\n").getBytes()));

        // With compression - send to Firehose
        Record record = new Record().withData(ByteBuffer.wrap(compressMessage((mergedJsonString.toString() + "\r\n").getBytes())));
        req.setRecord(record);
        res = kinesisFirehoseClient.putRecord(req);
    }
    catch (IOException ie) {
        ie.printStackTrace();
    }
    return res.getRecordId();
}

提供されたバケットをチェックして、メッセージがバケットに流れているかどうかを確認できます。Amazon S3 バケットには次のような例が表示されるはずです。

Kinesis Data Firehose が生成したファイルで、拡張子がないものがあります。デフォルトでは、圧縮オプションを選択しない限り、Kinesis Data Firehose は Amazon S3 バケットで生成されるファイルに拡張子を付けません。しかしこのユースケースでは、非圧縮の入力メッセージのサイズが 1 MB を超えているため、Kinesis Data Firehose への送信前に圧縮しています。メッセージはすでに圧縮されているので、Kinesis Data Firehose でメッセージを二重に圧縮することになります。ダウンストリームの Spark アプリケーションではこれを処理できないため、今回は圧縮オプションを選択していません。

9.Amazon EMR で Apache Spark プログラムを使用して、データを読み込み、parquet ファイル形式に変換する

前出のスクリーンショットのところで述べたように、Kinesis Data Firehose はデフォルトでは Amazon S3バ ケットに書き込まれるファイルの拡張子を生成しません。よって、Apache Spark を使ってファイルを読み込む際に問題が発生します。ファイルが圧縮されている場合、デフォルトでは Apache Spark は有効なファイル名拡張子をチェックします。この場合では gzip 圧縮は <filename>.gz を探し出し、正常に読み込みます。

この問題を解決するには、Amazon S3 API の操作、特に AmazonS3Client クラスを使用してすべての Amazon S3 キーを一覧表示し、Spark の paralleliz メソッドを使用してファイルの内容を読み込みます。ファイルの内容を読み終えたら、GZipInputStream クラスを使用して解凍します。以下のコードスニペットを確認してください。完全なコードは Git リポジトリにあります。

val allLinesRDD = spark.sparkContext.parallelize(s3ObjectKeys).flatMap
{ key => Source.fromInputStream
  (
   new GZipInputStream(s3Client.getObject(bucketName, key).getObjectContent:   InputStream)
  ).getLines 
}

var finalDF = spark.read.json(allLinesRDD).toDF()

Amazon EMR クラスターの作成が正常に完了したら、次のコマンドを使用して Amazon EMR マスターマシンにログインします。AWS CloudFormation スタック 5 (ステップ 5) の出力パラメータ「EMRClusterMaster」から「ssh」ログインコマンドを取得します。

  • ssh hadoop@ec2-XX-XX-XX-XX.compute-1.amazonaws.com -i <KEYPAIR_NAME>.pem
  • Amazon EMRマ スターマシンに接続するため、セキュリティポート 22 が開いていることを確認してください。

次の Spark 送信コマンドを使用して、Spark プログラムを実行します。

spark-submit --class com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet --master yarn --deploy-mode client s3://aws-bigdata-blog/artifacts/aws-blog-optimize-downstream-data-processing/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar <S3_BUCKET_NAME> fromfirehose/<YEAR>/ output-emr-parquet/

前出の Spark コマンドで、S3_BUCKET_NAME と YEAR の値を変更します。

引数番号 プロパティ Value
1 –class com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet
2 –master yarn
3 –deploy-mode client
4 s3://aws-bigdata-blog/artifacts/aws-blog-avoid-small-files/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar
5 S3_BUCKET_NAME AWS CloudFormation テンプレートの一部として作成した Amazon S3 バケット名。ソースファイルはこのバケットに作成されます。
6 <INPUT S3 LOCATION> “fromfirehose/<YYYY>/”.入力ファイルは作成したバケット下の、この Amazon S3 キーに作成されます。「YYYY」は今年を表します。たとえば「fromfirehose/2018 /」のようになります。
7 <OUTPUT S3 LOCATION> 上記の Amazon S3 バケット下に作成される出力ディレクトリ名を指定します。例: 「output-emr-parquet/」

 

プログラムの実行が終了したら、Amazon S3 の出力場所を確認して parquet 形式のファイルを確認できます。

移行後のクリーンアップ

このソリューションを完了してテストした後、タスクを停止し、AWS CloudFormation スタックを削除してリソースをクリーンアップします。作成済み Amazon S3 バケットにファイルがある場合には、スタックの削除は失敗します。AWS CloudFormation テンプレートを削除する前に、作成済み Amazon S3 バケットをクリーンアップしたことを確認します。

結論

この記事では、受信メッセージを Amazon Kinesis Data Firehose に送信することで、Amazon S3 で小さなファイルが作成されないようにするプロセスについて説明しました。さらに、Amazon Spark と Amazon EMR クラスターを使用して、データを parquet 形式で読み込み、保存するプロセスも実行しました。

 


著者について

Srikanth Kodali の写真Srikanth Kodali アマゾン ウェブ サービスのシニアIoT データアナリティクスアーキテクトです。 AWS のお客様のご協力のもとに、IoT データと分析ソリューションの構築に関する指導や技術支援を行い、AWS を使用する際にソリューションがより有益となるようお客様をサポートしています。