Category: Amazon Kinesis*


Amazon Kinesis を用いた Databaseの継続的な変更

Emmanuel Espina は、アマゾン ウェブ サービスのソフトウェア開発エンジニアです。

このブログ記事では、Amazon Kinesis を使用して変更をストリーミングすることによって、中央リレーショナルデータベース を他のシステムと統合する方法について説明します。

次の図は、分散システムにおける一般的なアーキテクチャ設計を示しています。これには、「」と呼ばれる中央ストレージと、この中央ストレージを消費するいくつかの派生「衛星」システムが含まれます。

この設計アーキテクチャを使用して、リレーショナルデータベースを中央データストアとして使用し、このシステムのトランザクション機能を利用してデータの整合性を維持することができます。このコンテキストにおける派生システムは、この変化の事実の単一ソースを観察し、それらの変更を変換し、フィルタリングし、最終的にはその内部インデックスを更新する全文検索システムとすることができます。もう 1 つの例は、OLAP クエリに適した列形式ストレージです。一般に、中央リレーショナルシステムの個々の行を変更する際にアクションを取る必要のあるシステムは、派生データストアに適した候補となります。

これらの種類のアーキテクチャの単純な実装では、変更された行を検索するために派生システムが定期的にクエリを発行し、本質的に SELECT ベースのクエリで中央データベースをポーリングします。

このアーキテクチャのより優れた実装となるのが、非同期の更新ストリームを使用するアーキテクチャです。データベースには通常、行のすべての変更が格納されるトランザクションログがあるため、この変更のストリームが外部オブザーバシステムに公開されている場合、これらのシステムにこれらのストリームを添付して行の変更を処理およびフィルタリングできます。ここでは、中央データベースとして MySQL、メッセージバスとして Amazon Kinesis を使用して、このスキーマの基本的な実装をご紹介します。

通常、MYSQL バイナリログは、マスター上のすべての変更を読み取ってローカルに適用する読取りレプリカに公開されます。この記事では、変更をローカルデータベースに適用するのではなく、Amazon Kinesis ストリームに変更を公開する、一般化されたリードレプリカを作成します。

このメソッドの重要な点の 1 つは、コンシューマーが SQL クエリを受け取らないことです。SQL クエリは公開される可能性もありますが、一般的なオブザーバーは、SQL 互換のデータレプリカを維持しない限り、SQL にはあまり関心がありません。代わりに、変更されたエンティティ (行) を 1 つずつ受け取ります。このアプローチの利点は、コンシューマーが SQL を理解する必要はなく、事実の単一ソースは誰が変更を消費するのかを知る必要はないということにあります。これは、さまざまなチームが、必要なデータ形式で調整することなく作業できることを意味します。さらに都合がいいことに、Amazon Kinesis クライアントはが特定の時点から読む機能を備えているため、各コンシューマーは独自のペースでメッセージを処理します。これが、メッセージバスがシステムを統合するための結合されていない方法の 1 つとなる理由です。

この記事で使用されている例では、行フェッチャーは中央データベースに接続する通常の Python プロセスであり、リードレプリカをシミュレートします。

データベースは、Amazon RDS または MySQL の任意のインストールのいずれかになります。RDS の場合、フェッチャープロセスは RDS インスタンスホストにカスタムソフトウェアをインストールすることができないため、別のホスト (たとえば EC2) にインストールする必要があります。外部インストールの場合、フェッチャープロセスはデータベースと同じホストにインストールできます。

マスター MySQL インスタンスの準備

MySQL マスター (真実の単一のソース) は、それが通常のレプリケーションのマスターであるかのように設定する必要があります。個々の変更された行を受け取るには、Binlog を有効にして ROW 形式で作業する必要があります。(そうしないと、SQL クエリだけになってしまいます。)詳細については、MySQL サイトの「バイナリログ」を参照してください。

バイナリログを有効にするには、my.cnf 設定ファイルに次の 2 行を追加します。

log_bin=<path to binlog>
binlog_format=ROW

すべての接続 (たとえば、init_connect または JDBC などのデータベース API を使用) のグローバルまたはセッションレベルで、トランザクション分離レベルをコミット済み読み取りに設定することによって、行ベースのロギングを取得できます。

RDS (MySql 5.6+) を使用している場合は、簡単です。定期的なバックアップを有効にして (バックアップが有効になっていなければバイナリログは無効になります)、パラメータグループ変数 binlog_format を ROW に更新することによって、必要な設定を作成できます。(これは、パラメータグループの RDS ダッシュボードから行うことができます。)

アクセス権限の追加

RDS で作成されたデフォルトのユーザーを使用している場合は、既にこれらのアクセス許可がある可能性があります。そうでない場合は、REPLICATION SLAVE 権限を持つユーザーを作成する必要があります。詳細については、「レプリケーション用のユーザーの作成」を参照してください。

mysql> CREATE USER 'repl'@'%.mydomain.com' IDENTIFIED BY 'slavepass';
mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%.mydomain.com';

Amazon Kinesis ストリームの作成

Amazon Kinesis ストリームと boto3 クライアントの認証情報が必要です。クライアントの認証情報の詳細については、「Boto 3 ドキュメント」を参照してください。

Amazon Kinesis コンソールを開き、[ストリームの作成] を選択します。

使用するストリームの名前とシャード数を入力します。この例では、1 つのシャードがあります。

数分後、ストリームで行の変更を受け入れる準備が整います。

CLI ユーザーに権限を割り当てる

AWS Identity and Access Management (IAM) を使用して、このストリームにアクセスする CLI ユーザに権限を与えることができます。

この例では、そのユーザーは KinesisRDSIntegration です。ユーザーを作成したり、既存のユーザーを使用することはできますが、Amazon Kinesis ストリームへの書き込み権限を追加する必要があります。

ストリームに固有のポリシーを作成できます。この例では、Amazon Kinesis に完全にアクセスできるスタンダードポリシーを使用しています。

マスターに接続して変更を公開する

Python パブリッシャーが必要とするライブラリをインストールするには、次のコマンドを実行します。

pip install mysql-replication boto3

詳細な手順については、次を参照してください。

https://github.com/noplay/python-mysql-replication

https://boto3.readthedocs.io/en/latest/guide/quickstart.html

ここに、マジックを実行する Python スクリプトがあります。<HOST>、<PORT>、<USER>、<PASSWORD>、および <STREAM_NAME> の各変数を設定値に置き換えることを忘れないでください。

Python

 

import json

import boto3



from pymysqlreplication import BinLogStreamReader

from pymysqlreplication.row_event import (

  DeleteRowsEvent,

  UpdateRowsEvent,

  WriteRowsEvent,

)



def main():

  kinesis = boto3.client("kinesis")



  stream = BinLogStreamReader(

    connection_settings= {

      "host": "<HOST>",

      "port": <PORT>,

      "user": "<USER>",

      "passwd": "<PASSWORD>"},

    server_id=100,

    blocking=True,

    resume_stream=True,

    only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])



  for binlogevent in stream:

    for row in binlogevent.rows:

      event = {"schema": binlogevent.schema,

      "table": binlogevent.table,

      "type": type(binlogevent).__name__,

      "row": row

      }



      kinesis.put_record(StreamName="<STREAM_NAME>", Data=json.dumps(event), PartitionKey="default")

      print json.dumps(event)



if __name__ == "__main__":

   main()

このスクリプトは、変更された各行を JSON 形式でシリアル化された Amazon Kinesis レコードとして公開します。

メッセージを使用する

これで、変更されたレコードを使用する準備が整いました。あらゆるコンシューマーコードが動作します。この記事のコードを使用すると、次の形式でメッセージが表示されます。

{"table": "Users", "row": {"values": {"Name": "Foo User", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"Name": "Bar user", "idUsers": 124}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"Name": "Foo User", "idUsers": 123}, "after_values": {"Name": "Bar User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}

まとめ

このブログ記事では、擬似リードレプリカと Amazon Kinesis を使用して、変更ストリームをデータベースのレコードに公開する方法を説明しました。多くのデータ指向の企業が、これに類似したアーキテクチャを使用しています。この記事で提供されている例は、実際の本番環境には対応していませんが、この統合スタイルを試して、エンタープライズアーキテクチャの拡張機能を改善するために使用できます。最も複雑な部分は、おそらく Amazon Kinesis の背後で既に解決されているものでしょう。接着剤の役割を果たすものを提供する必要があります。

その他のリソース

すべてのソフトウェアエンジニアがリアルタイムデータの統合抽象化について知っておくべきこと

Databus に搭載されているすべてのもの: LinkedIn のスケーラブルで一貫性のある変更データキャプチャプラットフォーム

 

Amazon Kinesis Video Streams – コンピュータビジョン・アプリケーションのためのサーバーレスな動画データの収集と保存

携帯電話、防犯カメラ、ベビーモニター、ドローン、WEBカメラ、車載カメラ、さらには人工衛星まで、これらすべては高輝度で高品質の動画ストリームを生成できます。 住宅、オフィス、工場、都市、街路、高速道路は現在、膨大な数のカメラを備えています。これらのカメラは、洪水などの自然災害時に被害状況の調査を可能にし、公共の安全性を高め、子供が安全かつ健康であることを知らせ、無限に繰り返す「失敗」動画のための一瞬を補足し(個人的な趣味の話です)、身元の判定に役立つデータを集め、交通の問題を解決するなど、様々な場面で活用されます。

この動画データの洪水を扱うことは、言い表せないほど難しいことです。 入力ストリームには、個別に、または何百万という単位でデータが到着します。 ストリームには価値あるリアルタイムなデータが含まれており、遅延したり、一時停止したり、より適切なタイミングで処理するためにデータを脇に置いておいたりすることはできません。生のデータを取得すると、他の課題が発生します。動画データの保存、暗号化、索引作成などが頭に浮かびますね。価値を引き出すこと、つまりコンテンツに深く潜って、そこにあることを理解し、行動を起こすことは、次の大きなステップです。

新しい Amazon Kinesis Video Streams

2017年11月29日、リアルタイムストリーミングサービスであるAmazon Kinesisファミリーの新しいメンバーとして、Amazon Kinesis Video Streamsをご紹介します。 これによって、独自のインフラストラクチャを構築して動かすことなく、何百万ものカメラデバイスからストリーミング動画(または時系列にエンコードされたデータ)を取り込むことができます。 Amazon Kinesis Video Streamsは、入力ストリームを受け入れ、永続的かつ暗号化された形式で保存し、時間に基づいたインデックスを作成し、コンピュータビジョン・アプリケーションの作成を可能にします。 あなたはAmazon Recognition VideoやMXNetTensorFlow、OpenCV、または独自のカスタムコード、つまりクールな新しいロボットや、分析、あなたが考え出すコンシューマー・アプリケーションを支えるあらゆるコードを使用して、入力ストリームを処理することができます。
(more…)

Amazon DynamoDB からのデータストリームを AWS Lambda と Amazon Kinesis Firehose を活用して Amazon Aurora に格納する

Aravind Kodandaramaiah は AWS パートナープログラムのパートナーソリューションアーキテクトです。

はじめに

AWS ワークロードを実行するお客様は Amazon DynamoDBAmazon Aurora の両方を使用していることがよくあります。Amazon DynamoDB は、どのような規模でも、一貫した、数ミリ秒台にレイテンシーを抑える必要のあるアプリケーションに適した、高速で柔軟性の高い NoSQL データベースサービスです。データモデルの柔軟性が高く、パフォーマンスが信頼できるため、モバイル、ウェブ、ゲーム、広告、IoT、他の多くのアプリケーションに最適です。

Amazon Aurora は、MySQL と互換性のあるリレーショナルデータベースエンジンで、オープンソースデータベースのコスト効率性と簡素性を備えた、高性能の商用データベースの可用性とスピードをあわせもったエンジンです。Amazon Aurora は、MySQL よりも最大 5 倍のパフォーマンスを発揮するだけでなく、商用データベースのセキュリティ、可用性、および信頼性を 10 分の 1 のコストで実現します。

DynamoDB と Aurora を連携させるために、カスタムウェブ解析エンジンを構築して、毎秒数百万のウェブクリックが DynamoDB に登録されるようにしたとします。Amazon DynamoDB はこの規模で動作し、データを高速に取り込むことができます。また、このクリックストリームデータを Amazon Aurora などのリレーショナルデータベース管理システム (RDBMS) にレプリケートする必要があるとします。さらに、ストアドプロシージャまたは関数内で SQL の機能を使用して、このデータに対してスライスアンドダイスや、さまざまな方法でのプロジェクションを行ったり、他のトランザクション目的で使用したりするとします。

DynamoDB から Aurora に効率的にデータをレプリケートするには、信頼性の高いスケーラブルなデータレプリケーション (ETL) プロセスを構築する必要があります。この記事では、AWS Lambda Amazon Kinesis Firehose によるサーバーレスアーキテクチャを使用して、このようなプロセスを構築する方法について説明します。

ソリューションの概要

以下の図に示しているのは、このソリューションのアーキテクチャです。このアーキテクチャの背後には、次のような動機があります。

  1. サーバーレス – インフラストラクチャ管理を AWS にオフロードすることで、メンテナンスゼロのインフラストラクチャを実現します。ソリューションのセキュリティ管理を簡素化します。これは、キーやパスワードを使用する必要がないためです。また、コストを最適化します。さらに、DynamoDB Streams のシャードイテレーターに基づいた Lambda 関数の同時実行により、スケーリングを自動化します。
  2. エラー発生時に再試行可能 – データ移動プロセスには高い信頼性が必要であるため、プロセスは各ステップでエラーを処理し、エラーが発生したステップを再試行できる必要があります。このアーキテクチャではそれが可能です。
  3. 同時データベース接続の最適化 – 間隔またはバッファサイズに基づいてレコードをバッファすることで、Amazon Aurora への同時接続の数を減らすことができます。この方法は、接続タイムアウトを回避するのに役立ちます。
  4. 懸念部分の分離 – AWS Lambda を使用すると、データレプリケーションプロセスの各懸念部分を分離できます。たとえば、抽出フェーズを DynamoDB ストリームの処理として、変換フェーズを Firehose-Lambda 変換として、ロードフェーズを Aurora への一括挿入として分離できます。

以下に示しているのは、このソリューションのしくみです。

  1. DynamoDB Streams がデータソースです。DynamoDB Streams を使用すると、DynamoDB テーブル内の項目が変更されたときに、その変更を取得できます。AWS Lambda は、新しいストリームレコードを検出すると、Lambda 関数を同期的に呼び出します。
  2. Lambda 関数は、DynamoDB テーブルに新たに追加された項目をバッファし、これらの項目のバッチを Amazon Kinesis Firehose に送ります。
  3. Firehose は、受け取ったデータを Lambda 関数により変換して、Amazon S3 に配信します。Firehose に対してデータ変換を有効にしていると、Firehose は受け取ったデータをバッファし、バッファしたデータのバッチごとに、指定された Lambda 関数を非同期的に呼び出します。変換されたデータは Lambda から Firehose に返されてバッファされます。
  4. Firehose は変換されたすべてのレコードを S3 バケットに配信します。
  5. Firehose は変換されないすべてのレコードも S3 バケットに配信します。ステップ 4 と 5 は同時に実行されます。Amazon SNS トピックをこの S3 バケットに登録して、以降の通知、修復、再処理に使用できます (通知に関する詳細はこのブログ記事では取り上げません)。
  6. Firehose が変換されたデータを S3 に正常に配信するたびに、S3 はイベントを発行することで Lambda 関数を呼び出します。この Lambda 関数は VPC 内で実行されます。
  7. Lambda 関数は Aurora データベースに接続し、SQL 式を実行して、S3 から直接テキストファイルにデータをインポートします。
  8. Aurora (VPC プライベートサブネット内で実行) は、S3 VPC エンドポイントを使用して S3 からデータをインポートします。

ソリューションの実装とデプロイ
次に、このソリューションを機能させるために必要な手順について説明します。以下の手順では、AWS CloudFormation スタックを起動して一連の AWS CLI コマンドを実行することで、VPC 環境を作成する必要があります。

AWS サービスを使用してこれらの手順を実行している間、AWS サービスの料金が適用されることがあります。

ステップ 1: ソリューションのソースコードをダウンロードする
このブログ記事で概説したソリューションでは、多くの Lambda 関数を使用し、また、多くの AWS Identity and Access Management (IAM) ポリシーおよびロールを作成します。このソリューションのソースコードは以下の場所からダウンロードします。

git clone https://github.com/awslabs/dynamoDB-data-replication-to-aurora.git

このリポジトリには、以下のフォルダ構造があります。このブログ記事の後続の手順を実行するために、lambda_iam フォルダに移動します。

ステップ 2: Firehose 配信用の S3 バケットを作成する
Amazon Kinesis Firehose を使用すると、Amazon S3 にリアルタイムのストリーミングデータを配信できます。そのためには、まず S3 バケットを作成します。次に、レコードの処理に失敗した場合に備えて、変換された最終のレコードとデータバックアップを保存するフォルダを作成します。

aws s3api create-bucket --bucket bucket_name

aws s3api put-object \
--bucket bucket_name \
--key processed/

aws s3api put-object \
--bucket bucket_name
--key tranformation_failed_data_backup/

 

ステップ 3: IAM ポリシー、S3 イベント通知、Firehose-S3 配信設定ファイルを変更する
次に、以下のファイルで、プレースホルダー AWS_REGION、

AWS_ACCOUNT_NUMBER、BUCKET_NAME をそれぞれ、お客様の AWS リージョン ID、AWS アカウント番号、ステップ 2 で作成した S3 バケットの名前に置き換えます。

 

·         aurora-s3-access-Policy.json

·         DynamoDb-Stream-lambda-AccessPolicy.json

·         firehose_delivery_AccessPolicy.json

·         lambda-s3-notification-config.json

·         s3-destination-configuration.json

·         firehose_delivery_trust_policy.json

ステップ 4: CloudFormation を使用して Aurora クラスターを設定する
次に、[Launch Stack] ボタンをクリックして AWS CloudFormation スタックを起動します。CloudFormation テンプレートは、VPC を作成し、その VPC のパブリックおよびプライベートサブネットを設定します。また、このテンプレートは、プライベートサブネット内で Amazon Aurora データベースクラスターを起動し、パブリックサブネット内でパブリック IP を割り当てた踏み台ホストも起動します。


ステップ 5: Aurora DB クラスターを設定する
CloudFormation スタックが完成したら、S3 バケット内のテキストファイルから DB クラスターにデータをロードするように、Aurora クラスターを変更する必要があります。以下に示しているのは、そのための手順です。

Amazon Aurora が Amazon S3 にアクセスできるようにします。そのためには、IAM ロールを作成し、先ほど作成した信頼およびアクセスポリシーをそのロールにアタッチします。

auroraS3Arn=$(aws iam create-role \
    --role-name aurora_s3_access_role \
    --assume-role-policy-document file://aurora-s3-Trust-Policy.json \
    --query 'Role.Arn' \
    --output text)

aws iam put-role-policy \
    --role-name aurora_s3_access_role \
    --policy-name aurora-s3-access-Policy \
    --policy-document file://aurora-s3-access-Policy.json

 

その IAM ロールを Aurora DB クラスターに関連付けます。そのためには、新しい DB クラスターパラメータグループを作成し、その DB クラスターに関連付けます。

aws rds add-role-to-db-cluster \
    --db-cluster-identifier Output AuroraClusterId from CloudFormation Stack \
    --role-arn $auroraS3Arn

aws rds create-db-cluster-parameter-group \
    --db-cluster-parameter-group-name webAnayticsclusterParamGroup \
    --db-parameter-group-family aurora5.6 \
    --description 'Aurora cluster parameter group - Allow access to Amazon S3'

aws rds modify-db-cluster-parameter-group \
    --db-cluster-parameter-group-name webAnayticsclusterParamGroup \
    --parameters "ParameterName=aws_default_s3_role,ParameterValue= $auroraS3Arn,ApplyMethod=pending-reboot"

aws rds modify-db-cluster \
	--db-cluster-identifier Output AuroraClusterId from CloudFormation Stack \
	--db-cluster-parameter-group-name webAnayticsclusterParamGroup

 

プライマリ DB インスタンスを再起動します。

aws rds reboot-db-instance \
--db-instance-identifier Output PrimaryInstanceId from CloudFormationF Stack 

ステップ 6: DynamoDB ストリームと、そのストリームを処理する Lambda 関数を設定する

1.    ストリームを有効にして新しい DynamoDB テーブルを作成します。以降の手順では、AWS Lambda 関数をストリームに関連付けることで、トリガーを作成します。

aws dynamodb create-table \
    --table-name web_analytics \
    --attribute-definitions AttributeName=page_id,AttributeType=S AttributeName=activity_dt,AttributeType=S \
    --key-schema AttributeName=page_id,KeyType=HASH AttributeName=activity_dt,KeyType=RANGE \
    --provisioned-throughput ReadCapacityUnits=50,WriteCapacityUnits=50 \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE

2.    Lambda 実行ロールを作成します。

DdbStreamLambdaRole=$(aws iam create-role \
    --role-name DdbStreamLambdaRole \
    --assume-role-policy-document file://DynamoDB-Stream-lambda-Trust-Policy.json \
    --query 'Role.Arn' \
    --output text)


aws iam put-role-policy \
    --role-name DdbStreamLambdaRole \
    --policy-name DdbStreamProcessingAccessPolicy \
    --policy-document file://DynamoDb-Stream-lambda-AccessPolicy.json

3.  DynamoDB ストリームを処理する Lambda 関数を作成します。

aws lambda create-function \
    --function-name WebAnalyticsDdbStreamFunction \
    --zip-file fileb://ddbStreamProcessor.zip \
    --role $DdbStreamLambdaRole \
    --handler ddbStreamProcessor.handler \
    --timeout 300 \
    --runtime nodejs4.3

 

4.  その Lambda 関数を DynamoDB ストリームに関連付けることで、トリガーを作成します。

tableStreamArn=$(aws dynamodb describe-table --table-name web_analytics --query 'Table.LatestStreamArn' --output text)

aws lambda create-event-source-mapping \
    --function-name WebAnalyticsDdbStreamFunction \
    --event-source-arn $tableStreamArn \
    --batch-size 100 \
    --starting-position LATEST

ステップ 7: Firehose のデータ変換 Lambda 関数を作成して設定する

Lambda 実行ロールを作成します。

transRole=$(aws iam create-role \
            --role-name firehose_delivery_lambda_transformation_role \
            --assume-role-policy-document  file://firehose_lambda_transformation_trust_policy.json \
            --query 'Role.Arn' --output text)

aws iam put-role-policy \
        --role-name firehose_delivery_lambda_transformation_role \
        --policy-name firehose_lambda_transformation_AccessPolicy \
        --policy-document file://firehose_lambda_transformation_AccessPolicy.json

2.  データ変換 Lambda 関数を作成します。

aws lambda create-function \
    --function-name firehoseDeliveryTransformationFunction \
    --zip-file fileb://firehose_delivery_transformation.zip \
    --role $transRole \
    --handler firehose_delivery_transformation.handler \
    --timeout 300 \
    --runtime nodejs4.3 \
    --memory-size 1024

この関数は、受け取ったストリームのレコードを JSON スキーマに対して検証します。スキーマに一致したら、受け取った JSON レコードを解析し、カンマ区切り値 (CSV) 形式に変換します。

'use strict';

var jsonSchema = require('jsonschema');

var schema = { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object", "properties": { "Hits": { "type": "integer" }, "device": { "type": "object", "properties": { "make": { "type": "string" }, "platform": { "type": "object", "properties": { "name": { "type": "string" }, "version": { "type": "string" } }, "required": ["name", "version"] }, "location": { "type": "object", "properties": { "latitude": { "type": "string" }, "longitude": { "type": "string" }, "country": { "type": "string" } }, "required": ["latitude", "longitude", "country"] } }, "required": ["make", "platform", "location"] }, "session": { "type": "object", "properties": { "session_id": { "type": "string" }, "start_timestamp": { "type": "string" }, "stop_timestamp": { "type": "string" } }, "required": ["session_id", "start_timestamp", "stop_timestamp"] } }, "required": ["Hits", "device", "session"] };

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found

    const output = event.records.map((record) => {

        const entry = (new Buffer(record.data, 'base64')).toString('utf8');

        var rec = JSON.parse(entry);
        console.log('Decoded payload:', entry);
        var milliseconds = new Date().getTime();

        var payl = JSON.parse(rec.payload.S);

        var jsonValidator = new jsonSchema.Validator();
        var validationResult = jsonValidator.validate(payl, schema);
        console.log('Json Schema Validation result = ' + validationResult.errors);

        if (validationResult.errors.length === 0) {

            const result = `${milliseconds},${rec.page_id.S},${payl.Hits},
				${payl.session.start_timestamp},
				${payl.session.stop_timestamp},${payl.device.location.country}` + "\n";

            const payload = (new Buffer(result, 'utf8')).toString('base64');
            console.log(payload);
            success++;

            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
        }
        else {
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            }
        }

    });

    console.log(`Processing completed.  Successful records ${success}, Failed records ${failure}.`);
    callback(null, { records: output });

};

ステップ 8: Firehose 配信ストリームを作成し、S3 にデータを配信するように設定する

Amazon S3 ターゲットを使用するとき、Firehose は S3 バケットにデータを配信します。配信ストリームを作成するには、IAM ロールが必要です。Firehose はその IAM ロールを引き受けることで、指定したバケットとキーにアクセスする権限を取得します。Firehose はまた、その IAM ロールを使用することで、Amazon CloudWatch ロググループにアクセスし、データ変換 Lambda 関数を呼び出す権限を取得します。

1.    S3 バケット、キー、CloudWatch ロググループ、データ変換 Lambda 関数にアクセスする権限を付与する IAM ロールを作成します。

aws iam create-role \
    --role-name firehose_delivery_role \
    --assume-role-policy-document  file://firehose_delivery_trust_policy.json

aws iam put-role-policy \
    --role-name firehose_delivery_role \
    --policy-name firehose_delivery_AccessPolicy \
    --policy-document file://firehose_delivery_AccessPolicy.json

 

2.  S3 ターゲット設定を指定して、Firehose 配信ストリームを作成します。

aws firehose create-delivery-stream \
    --delivery-stream-name webAnalytics \
    --extended-s3-destination-configuration='CONTENTS OF s3-destination-configuration.json file' 

3.  AWS マネジメントコンソールにサインインし、Firehose コンソールに移動します。webAnalytics という名前の配信ストリームを選択します。[Details] タブで、[Edit] を選択します。[Data transformation][Enabled] を、[IAM role] [firehose_delivery_role] を選択します。[Lambda function] で、[firehoseDeliveryTransformationFunction] を選択します。次に、[Save] を選択してこの設定を保存します。

ステップ 8: Lambda 関数を作成して、VPC リソースにアクセスするように設定する
S3 バケットから Amazon Aurora にデータをインポートするには、VPC 内のリソースにアクセスするように Lambda 関数を設定します。

1.    Lambda 関数の IAM 実行ロールを作成します。

auroraLambdaRole=$(aws iam create-role \
                --role-name lambda_aurora_role \
                --assume-role-policy-document file://lambda-aurora-Trust-Policy.json \
                --query 'Role.Arn' --output text)

aws iam put-role-policy \
    --role-name lambda_aurora_role \
    --policy-name lambda-aurora-AccessPolicy \
    --policy-document file://lambda-aurora-AccessPolicy.json

2.  プライベートサブネットやセキュリティグループなどの VPC 設定を指定する Lambda 関数を作成します。CLI の実行中に渡される環境変数 AuroraEndpoint、dbUser (データベースユーザー)、dbPassword (データベースパスワード) に正しい値を設定していることを確認します。これらの値については、CloudFormation スタック出力を参照してください。

aws lambda create-function \
    --function-name AuroraDataManagementFunction \
    --zip-file fileb://AuroraDataMgr.zip \
    --role $auroraLambdaRole \
    --handler dbMgr.handler \
    --timeout 300 \
    --runtime python2.7 \
    --vpc-config SubnetIds='Output PrivateSubnets from CloudFormation stack',SecurityGroupIds='Output DefaultSecurityGroupId from CloudFormation stack' \
    --memory-size 1024 \    
    --environment='    
                    {
                        "Variables": {
                            "AuroraEndpoint": "Output AuroraEndpoint from CloudFormation stack",
                            "dbUser": "Database User Name",
                            "dbPassword": "Database Password"
                        }
                    }'

 

Lambda 関数は Aurora データベースに接続します。この関数は、LOAD DATA FROM S3 SQL コマンドを実行して、S3 バケット内のテキストファイルから Aurora DB クラスターにデータをロードします。

import logging import pymysql import boto3 import sys import os
# rds settings
db_name = "Demo"
dbUser = os.environ['dbUser']
dbPassword = os.environ['dbPassword']
AuroraEndpoint = os.environ['AuroraEndpoint']

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client('s3')
try:
    conn = pymysql.connect(AuroraEndpoint, user=dbUser, passwd=dbPassword, db=db_name, connect_timeout=5)
except:
    logger.error("ERROR: Unexpected error: Could not connect to Aurora instance.")
    sys.exit()

logger.info("SUCCESS: Connection to RDS Aurora instance succeeded")

def handler(event, context):

    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        s3location = 's3://' + bucket + '/' + key
        logger.info(s3location)

        sql = "LOAD DATA FROM S3 '" + s3location + "' INTO TABLE Demo.WebAnalytics FIELDS TERMINATED BY ',' " \
              "LINES TERMINATED BY '\\n' (timestarted, page_id, hits, start_time, end_time, country_code);"

        logger.info(sql)

        with conn.cursor() as cur:
            cur.execute(sql)
            conn.commit()
            logger.info('Data loaded from S3 into Aurora')

ステップ 9: S3 イベント通知を設定する
最後に、S3 がイベントを発行することで、前のステップで作成した Lambda 関数を呼び出すように、その関数を設定します。このプロセスの最初の手順は、Lambda 関数を呼び出すアクセス権限を S3 に付与することです。

1.    Lambda 関数を呼び出すアクセス権限を S3 に付与します。

aws lambda add-permission \
    --function-name AuroraDataManagementFunction \
    --action “lambda:InvokeFunction” \
    --statement-id Demo1234 \
    --principal s3.amazonaws.com \
    --source-arn ‘ARN of S3 Bucket created in Step 2’ \
    --source-account ‘AWS Account Id

2.    S3 バケット通知を設定します。

aws s3api put-bucket-notification-configuration \
    --bucket 'Name of S3 Bucket created in step 2' \
    --notification-configuration=' CONTENTS OF lambda-s3-notification-config.json ' 

 

ステップ 10: ソリューションをテストする
最後のステップは、ソリューションをテストすることです。

  1. このブログ記事のソースコードの TestHarness フォルダには、テストハーネスがあります。このテストハーネスは DynamoDB テーブルにデータを読み込みます。まず、TestHarness フォルダに移動し、コマンドノード loadDataToDDb.js を実行します。
  2. Secure Shell (SSH) を使用して、踏み台ホストに接続します。SSH を使用した接続の詳細については、EC2 のドキュメントを参照してください。
  3. 踏み台ホストのブートストラッププロセス中に MySQL クライアントがインストールされたため、以下のコマンドを使用して Aurora データベースに接続できます。パラメータ値を適切に変更していることを確認します。
/usr/bin/mysql -h DatabaseEndpoint -u DatabaseUsername --password=DatabasePassword 

4.    コネクションが成功したら、以下のコマンドを実行します。

 mysql> select count(*) from Demo.WebAnalytics;

このコマンドを実行した後、テーブルにレコードが読み込まれています。

テーブルにレコードが読み込まれていない場合、Firehose はそれらのレコードを S3 に送信する前にバッファしている可能性があります。これを回避するには、1 分くらい後に同じ SQL コードを再試行してください。この間隔は、現在設定されている S3 バッファ間隔の値に基づきます。このコードを再試行した後、Amazon Aurora にレコードが挿入されています。

 

まとめ

DynamoDB Streams と、Amazon Kinesis Firehose のデータ変換関数を使用すると、DynamoDB から Amazon Aurora などのデータソースにデータをレプリケートする強力でスケーラブルな方法を得られます。このブログ記事では、DynamoDB から Aurora へのデータのレプリケートを取り上げましたが、同じ一般的なアーキテクチャパターンを使用すれば、他のストリーミングデータを変換して、Amazon Aurora に取り込むことができます。

さらに、以下の関連するブログ記事を参照してください。

 

Amazon Kinesis Firehose, Amazon Athena, Amazon QuickSightを用いたVPCフローログの分析

多くの業務や運用において、頻繁に更新される大規模なデータを分析することが求められるようになっています。例えばログ分析においては、振る舞いのパターンを認識したり、アプリケーションのフロー分析をしたり、障害調査をしたりするために大量のログの可視化が必要とされます。

VPCフローログAmazon VPCサービス内のVPCに属するネットワークインターフェースを行き来するIPトラフィック情報をキャプチャします。このログはVPC内部に潜む脅威やリスクを認識したり、ネットワークのトラフィック・パターンを調査するのに役立ちます。フローログはAmazon CloudWatchログに格納されます。いったんフローログを作成すれば、Amazon CloudWatchログを用いて見たり取り出したりすることができるようになります。

フローログは様々な業務を助けてくれます。例えば、セキュリティグループのルールを過度に厳しくしすぎたことによって特定のトラフィックがインスタンスに届かない事象の原因調査などです。また、フローログを、インスタンスへのトラフィックをモニタリングするためのセキュリティツールとして使うこともできます。

この記事はAmazon Kinesis FirehoseAWS LambdaAmazon S3Amazon Athena、そしてAmazon QuickSightを用いてフローログを収集し、格納し、クエリを実行して可視化するサーバーレス・アーキテクチャを構成する手順を示します。構成する中で、Athenaにおいてクエリにかかるコストや応答時間を低減させるための圧縮やパーティショニング手法に関するベストプラクティスを学ぶこともできることでしょう。

ソリューションのサマリ

本記事は、3つのパートに分かれています。

  • Athenaによる分析のためにVPCフローログをS3へ格納。このセクションではまずフローログをLambdaとFirehoseを用いてS3に格納する方法と、格納されたデータにクエリを発行するためAthena上のテーブルを作成する方法を説明します。
  • QuickSightを用いてログを可視化。ここではQuickSightとQuickSightのAthenaコネクタを用いて分析し、その結果をダッシュボードを通じて共有する方法を説明します。
  • クエリのパフォーマンス向上とコスト削減を目的とした、Athenaにおけるデータのパーティション化。このセクションではLambda関数を用いてS3に格納されたAthena用のデータを自動的にパーティション化する方法を示します。この関数はFirehoseストリームに限らず、他の手段でS3上に年/月/日/時間のプリフィックスで格納されている場合でも使用できます。

パーティショニングはAthenaにおいてクエリのパフォーマンス向上とコスト削減を実現するための3つの戦略のうちの1つです。他の2つの戦略としては、1つはデータの圧縮、そしてもう1つはApache Parquetなどの列指向フォーマットへの変換があります。本記事では自動的にデータを圧縮する方法には触れますが、列指向フォーマットへの変換については触れません。本ケースのように列指向フォーマットへの変換を行わない場合でも、圧縮やパーティショニングは常に価値のある方法です。さらに大きなスケールでのソリューションのためには、Parquetへの変換も検討して下さい。

VPCフローログを分析するためのサーバレスアーキテクチャ

以下の図はそれぞれのサービスがどのように連携するかを示しています。

VPC_Flowlogs_Ian_Ben

VPCにフローログを作成すると、ログデータはCloudWatchログのロググループとして発行されます。CloudWatchログのサブスクリプションを利用することにより、S3に書き込むためにFirehoseを用いたLambda関数に対して、リアルタイムにログデータイベントを送り込むことが可能になります。

 

いったんS3にログデータが格納され始めれば、Athenaを利用してSQLクエリをアドホックに投入することができます。ダッシュボードを構築したり、画面からインタラクティブにデータを分析したりすることを好む場合には、Athenaに加えQuickSightによるリッチな可視化を簡単に構成できます。

Athenaの分析を目的としたS3へのVPCフローログの送信

この章では、Athenaによるクエリを可能とするためにフローログデータをS3に送信する方法を説明します。この例ではus-east-1リージョンを使用していますが、AthenaとFirehoseが利用できるのであればどのリージョンでも可能です。

Firehoseデリバリーストリームの作成

既存もしくは新しいS3バケットを格納先とするFirehoseデリバリーストリームを作成するためには、この手順を参考にして下さい。ほとんどの設定はデフォルトで問題ありませんが、格納先のS3バケットへの書き込み権限を持つIAMロールを選択し、GZIP圧縮を指定して下さい。デリバリーストリームの名前は‘VPCFlowLogsDefaultToS3’とします。

VPCフローログの作成

まず、この手順に従ってデフォルトVPCのVPCフローログを有効にしましょう。(訳注:デフォルトVPC以外の任意のVPCで構いません。)

Firehoseに書き込むLambda用のIAMロールの作成

Firehoseに書き込むLambda関数を作成する前に、Firehoseにバッチ書き込みを許可するLambda用のIAMロールを作成する必要があります。次のように定義されるインラインアクセスポリシーを組み込んだ‘lambda_kinesis_exec_role’という名前のLambda用ロールを作成して下さい。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:*:*:deliverystream/VPCFlowLogsDefaultToS3"
            ]
        }
    ]
}

 

CloudWatchログからFirehoseに配信するLambda関数の作成

ログイベントをCloudWatchから先に作成したFirehoseデリバリーストリームである‘VPCFlowLogsDefaultToS3’に配信するためのLamdba関数を作成するには、次の手順を実施します。

  1. Lambdaのコンソールから一から作成を選択して新しいLambda関数を作成します。
  2. 基本的な情報ページでは関数に‘VPCFlowLogsToFirehose’と名付けます。さらにロールには先の手順で作成した‘lambda_kinesis_exec_role’を指定します。
  3. 関数コードにおいて、Python 2.7ランタイムを選択し、このGitHub上のコードをコードパネルにコピーします。環境変数にはDELIVERY_STREAM_NAMEという名前の変数を追加します。この変数の値には本手順の最初に作成したデリバリーストリームの名前(‘VPCFlowLogsDefaultToS3’)を設定します。また、タイムアウトは1分にします。

o_vpc-flow_2

Lambda関数に対するCloudWatchサブスクリプションの作成

CloudWatchログにて、次の手順を実施します。

  1. VPCフローログのロググループを選択します(フローログを作成したばかりであればロググループが表示されるまで数分待つ必要がある場合があります)。
  2. アクションからLambdaサービスへのストリーミングの開始を選択します。

  1. 先の手順で作成したLambda関数 ‘VPCFlowLogsToFirehose’ を選択します。
  2. ログの形式は Amazpn VPC フローログ を選択します。

Athenaでのテーブル作成

Amazon Athenaは、特に他のインフラストラクチャをプロビジョニングしたり管理することなく、標準的なSQLによりS3に格納されたデータを問い合わせることを可能にします。AthenaはCSV、JSON、ParquetやORCなど様々な標準的なデータ形式を扱え、問合せ前にそれらのデータを変換する必要はありません。スキーマを定義し、そしてAWSマネジメントコンソールのクエリエディタやAthena JDBCドライバを使用したプログラムからクエリを実行するだけです。

AthenaはHiveメタストアと互換性のあるデータカタログにデータベースとテーブル定義を格納します。本記事では、フローログを1つのテーブル定義として作成します。テーブルに対するDDLについてはこのセクションの後半で述べます。DDL実行前に、次の点に注意しておいて下さい。

    • DDL上の<bucket_and_prefix>はFirehoseからフローログの書き込み先として実際に作成したS3バケットの名前に書き換えます(プリフィックスも含めるのを忘れずに)。
    • CREATE TABLE定義にEXTERNALというキーワードが含まれています。もしこのキーワードを除くと、Athenaはエラーを返します。EXTERNALキーワードはS3に格納されている元データに影響を与えずに、データカタログへテーブルメタデータを格納されるようにします。もし外部テーブルを削除したとしても、カタログからはメタデータが削除されますが、S3上のデータは維持されます。
    • vpc_glow_logsテーブルのカラムはフローログレコード上のフィールドにマッピングされます。フローログレコードはスペース区切りの文字列を含みます。各行のフィールドを解析するために、Athenaはどのようにデータを扱うべきかを示すカスタムライブラリとしてシリアライズ-デシリアライズクラス、またはSerDeを用います。
  • ここでは、スペース区切りのフローログレコードを解析するためにSerDe用の正規表現を指定します。この正規表現は”input.regex”というSerDeのプロパティによって提供されます。

Athenaのクエリエディタに以下のDDLを入力し、Run Queryをクリックして下さい。

CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs (
Version INT,
Account STRING,
InterfaceId STRING,
SourceAddress STRING,
DestinationAddress STRING,
SourcePort INT,
DestinationPort INT,
Protocol INT,
Packets INT,
Bytes INT,
StartTime INT,
EndTime INT,
Action STRING,
LogStatus STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
    "input.regex" = "^([^ ]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([0-9]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)$") 
LOCATION 's3://<bucket_and_prefix>/';

 

Athenaでのデータ問合せ

テーブルを作成した後は、テーブル名の隣にある小さなアイコンからPreview tableを実行し、レコードセットのサンプルを確認しましょう(訳注:VPCフローログをクリーニングせずに投入しているため、サンプリングされる箇所によってはnullのレコードが表示される場合もあります)。

o_vpc-flow_5

フローログを調査するために様々なクエリを実行することができます。ここでは例として、拒否されたトラフィックの中からソースIPアドレスのトップ25を取得してみます。

SELECT sourceaddress, count(*) cnt
FROM vpc_flow_logs
WHERE action = 'REJECT'
GROUP BY sourceaddress
ORDER BY cnt desc
LIMIT 25;

o_vpc-flow_6

QuickSightによるログの可視化

QuickSightはわずか数度のクリックでAthena上のテーブルを可視化することを可能にします。AWSアカウントによってQuickSight上にサインアップすることができ、1GBのSPICEキャパシティ無料利用枠を持つユーザアカウント1つを取得できます。

QuickSightをAthenaに接続させる前に、ここで説明されている手順を参考に、QuickSightに対するAthena及び関連するS3バケットへのアクセス権限の付与を確認します。

QuickSightにログインし、Manage dataNew data setと選択します。データソースとしてAthenaを選びます。

o_vpc-flow_7

データソースに“AthenaDataSource”と名前を付けます。defaultスキーマ→vpc_flow_logsテーブルと選択します。

o_vpc-flow_8

Edit/Preview dataを選択します。starttimeとendtimeのデータは数値形式よりも日付形式が適しています。これらの2つのフィールドはフローログのキャプチャウィンドウとシステムに到達した時刻をUnix時間で表現しています。

o_vpc-flow_9

ここでSave and visualizeを選択します。

異なるstart timeのキャプチャウィンドウとその時の送信バイト数を見てみましょう。StartTimeとBytesをフィールドリストから選択することでそれが可能です。覚えておいて頂きたいのは、QuickSightはこれを自動的に通信量のタイムチャートとして可視化することです。異なる日付/時間単位への変更はとても簡単です。

vpc-flow_10

これは、1日の中の通信で大きなスパイクがあった例です。この例はプロットされている他の日と比べてこの日に大量の通信が発生していることを教えてくれます。

o_vpc-flow_11

データに含まれるポート、IPアドレスその他のファセットを通じて通信許可/拒否に関するリッチな分析を行うことも簡単です。作られた分析結果をダッシュボードとして同じ組織の他のQuickSightユーザに共有することもできます。

o_vpc-flow_12

クエリのパフォーマンス向上とコスト削減のためのデータのパーティション化

ここまで述べてきたソリューションでは、S3に対して頻繁にGZIP圧縮されたフローログを配置します。Firehoseはこれらのファイルをデリバリーストリームの作成時に指定したバケット内に /year/month/day/hour というキーで格納します。Athenaでvpc_flow_logsテーブルを作成した時に使用していた外部テーブル定義には、この時系列キースペース内にある全てのファイルが含まれています。

Athenaはスキャンされたデータの量に応じて、クエリ毎に課金されます。ここまでのソリューションでは、クエリを発行する毎にS3に配置されたすべてのファイルがスキャンされます。VPCフローログの数が増加するに従い、データのスキャン量も増加し、クエリの応答時間やコストの両方に影響を与えることになります。

データの圧縮、パーティショニング、そして列指向フォーマットへの変換によって、クエリのコストを削減し、パフォーマンスを向上させることが可能です。Firehoseにて、既に圧縮された状態でS3へデータを配置するように設定されています。ここではパーティショニングに着目します。(Apache Parquetのような列指向フォーマットへの変換については本記事の対象外とします)

テーブルをパーティショニングすることで、クエリ発行時のデータスキャン量を制限することができます。多くのテーブルに置いて、特に発行するクエリの大半に時間ベースでの範囲制限が含まれる場合には、時間別での分割が有効です。AthenaはHiveのパーティショニング形式を使用します。これにより、パーティショニングの体系が直接反映されたkey-valueペアを含む名前付けをされたフォルダにパーティションが分割されます(詳しくはAthenaのドキュメントを参照)。

Firehoseにより作成されたこのフォルダ構造(例:s3://my-vpc-flow-logs/2017/01/14/09/)は、Hiveパーティショニング形式(例:s3://my-vpc-flow-logs/dt=2017-01-14-09-00/)とは異なります。しかし、ALTER TABLE ADD PARTITION文を用いると、手動でパーティションを追加してそのパーティションをデリバリーストリームによって作成されたキー空間の一部分に割り当てることができます。

ここでは、S3で新しいファイルを受信した際にALTER TABLE ADD PARTITION文を実行するためにLambda関数とAthena JDBCドライバを使い、自動的にFirehoseのデリバリーストリームに対する新しいパーティションを作成する方法を示します。

Athenaにてクエリを実行するLamba用のIAMロールの作成

Lambda関数を作成する前に、Lambdaに対してAthena上でクエリを実行することを許可するためのIAMロールを作成する必要があります。ユーザーガイドも参考に、次のように定義されるインラインアクセスポリシーを組み込んだ ‘lambda_athena_exec_role’ という名前のロールを作成して下さい。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "athena:RunQuery",
                "athena:GetQueryExecution",
                "athena:GetQueryExecutions",
                "athena:GetQueryResults"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::aws-athena-query-results-*"
            ]
        }
    ]
}

パーティション化されたvpc_flow_logテーブルの作成

前の手順でAthena上に定義したvpc_flow_log外部テーブルはパーティション化されていません。‘IngestDateTime’という名前のパーティションを付したテーブルを作成するために、元のテーブルを削除して、次に示すDDLにてテーブルを再作成します。

DROP TABLE IF EXISTS vpc_flow_logs;

CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs (
Version INT,
Account STRING,
InterfaceId STRING,
SourceAddress STRING,
DestinationAddress STRING,
SourcePort INT,
DestinationPort INT,
Protocol INT,
Packets INT,
Bytes INT,
StartTime INT,
EndTime INT,
Action STRING,
LogStatus STRING
)
PARTITIONED BY (IngestDateTime STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
    "input.regex" = "^([^ ]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([0-9]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)$") 
LOCATION 's3://<bucket_and_prefix>/';

Athenaのパーティションを生成するためのラムダ関数の作成

次の手順でLamda関数を作成します。

  1. GitHub上にあるLambda Javaのプロジェクトをクローンします。
  2. READMEファイルの手順に従って、ソースをコンパイルし、出力された.jarファイルをS3のバケットにコピーします。
  3. Lambdaコンソールから関数の作成を開始し、一から作成を選択します。
  4. 基本的な情報ページでは、名前に‘CreateAthenaPartitions’、既存のロールとして’lambda_athena_exec_role’を選択し、関数の作成をクリックします。
  5. 関数コードページに遷移しますランタイムからJava 8を選択します。
  6. コードエントリタイプは、Amazon S3からのファイルのアップロードを選択します。S3リンクのURLには、先ほどアップロードした.jarファイルへのURLを入力します。
  7. ハンドラには’com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3Event::handleRequest’と入力します。
  8. このLambda関数のためには、いくつかの環境変数を設定する必要があります。
  • PARTITION_TYPE: [Month, Day, Hour]のうちどれかの値を設定します。この環境変数はオプションです。もし設定しなかった場合、Lambda関数はデフォルトとして日次でパーティションを作成します。例:’Hour’
  • TABLE_NAME: <データベース名>.<テーブル名>という形式で指定します。必須です。例:‘default.vpc_flow_logs’.
  • S3_STAGING_DIR: クエリの出力が書き込まれるAmazon S3のロケーション。(Lambda関数はDDL文だけを実行していますが、Athenaはその結果もS3に対して書き込みます。前の手順で作成したIAMポリシーでは、クエリ出力バケット名は‘aws-athena-query-results-’で開始されると仮定しています)
  • ATHENA_REGION: Athenaが動作するリージョンです。例:’us-east-1′

o_vpc-flow_13

  1. 最後に、タイムアウトを1分に設定して保存します。

S3上の新しいオブジェクトをLambda関数に通知するための設定

VPCフローログデータを含むS3バケットのプロパティページにおいて、Eventsペインを開き、通知の追加を押して新たな通知を作成します。

  • 名前: FlowLogDataReceived
  • イベント: ObjectCreated(All)
  • 送信先: Lambda function
  • Lambdaドロップボックスから‘CreateAthenaPartitions’を選択

これで、FirehoseによりS3バケットにファイルが配置される度に、‘CreateAthenaPartitions’ 関数がトリガーされます。この関数は新たに受信したオブジェクトのキーを解析します。キーの year/month/day/hour という部分に基づいて、関数作成の際に環境変数で指定したPARTITION_TYPEの設定(Month/Day/Hour)に従い、そのファイルがどのパーティションに属するかを判断します。次に、このパーティションが既に存在するかどうかをAthenaに問い合わせます。存在しなければ新たにパーティションを作成し、S3のキー空間の関連部分にマッピングします。

このロジックを詳しく見てみましょう。時間単位のパーティションを作成するために ‘CreateAthenaPartitions’ 関数を作成し、Firehoseによりちょうどフローログデータのファイルが s3://my-vpc-flow-logs/2017/01/14/07/xxxx.gz に配信されたと仮定します。

新しいファイルのS3キーを見て、Lambda関数はそれが’2017-01-14-07’という時間単位のパーティションに属すると推測します。Athenaを確認すると、そのようなパーティションはまだ存在していないため、次のDDL文を実行します。

ALTER TABLE default.vpc_flow_logs ADD PARTITION (IngestDateTime='2017-01-14-07') LOCATION 's3://my-vpc-flow-logs/2017/01/14/07/';

もしLambda関数が日次のパーティションとして作成されていた場合には、新しいパーティションは‘s3://my-vpc-flow-logs/2017/01/14/’ とマッピングされます。月次であれば ‘s3://my-vpc-flow-logs/2017/01/’ となります。
パーティションは、各ログがS3に取り込まれた日時を表しています。これは、各パーティションに含まれる個々のレコードのStartTimeやEndTimeの値よりも後の時間であることに注意して下さい。

Athenaを用いたパーティション化されたデータへの問合せ

これで、問合せはパーティションを利用できるようになりました。

SELECT sourceaddress, count(*) cnt
FROM vpc_flow_logs
WHERE ingestdatetime > '2017-01-15-00'
AND action = 'REJECT'
GROUP BY sourceaddress
ORDER BY cnt desc
LIMIT 25;

過去3時間以内に取り込まれたデータを問い合わせるには、次のような問合せを実行します(ここでは時間単位のパーティショニングを行っていると仮定します)。

SELECT sourceaddress, count(*) cnt
FROM vpc_flow_logs
WHERE date_parse(ingestdatetime, '%Y-%m-%d-%H') >= 
      date_trunc('hour', current_timestamp - interval '2' hour)
AND action = 'REJECT'
GROUP BY sourceaddress
ORDER BY cnt desc
LIMIT 25;

次の2つのスクリーンショットが示しているのは、パーティションを利用することでスキャンするデータ量を減らせるということです。そうすることで、クエリのコストと応答時間が削減できます。1つめのスクリーンショットはパーティションを無視したクエリの結果です。
vpc-flow_1o_6
2つめのスクリーンショットは、WHERE句でパーティションの使用を指示したクエリの結果です。
o_vpc-flow_17
このように、パーティションを用いることで2つめのクエリは半分の時間で済み、データのスキャン量は最初のクエリの1/10以下にできたことがわかります。

結論

以前は、ログ分析というのは特定のクエリのためだけに大規模なデータを準備しなければならなかったり、大規模なストレージやマシンリソースを準備したり運用したりする必要がありました。Amazon AthenaとAmazon QuickSightによって、ログデータの発行から格納、分析、そして視覚化まで、より柔軟性高く実現することができるようになっています。クエリを実行してデータを視覚化するのに必要なインフラストラクチャに焦点を当てるのではなく、ログの調査に集中できるようになるのです。

著者について

ben_snively_90
Ben Snively – a Public Sector Specialist Solutions Architect. 彼は政府や非営利団体、教育組織の顧客におけるビッグデータや分析プロジェクトについて、AWSを用いたソリューションの構築を通じた支援に従事しています。また、彼は自宅にIoTセンサーを設置してその分析も行っています。

 

Ian_pic_1_resized
Ian Robinson – a Specialist Solutions Architect for Data and Analytics. 彼はヨーロッパ・中東・アジア地域において顧客が持つデータを結びつけることによる価値の創出のためにAWSを利用することを支援しています。また彼は現在、1960年代のダーレク(Dalek)の複製を修復する活動をしています。
(翻訳はSA五十嵐が担当しました。原文はこちら)

関連文書

Analyze Security, Compliance, and Operational Activity Using AWS CloudTrail and Amazon Athena

Amazon Kinesis Streams のサーバーサイド暗号化

昨今ではスマートホーム、ビッグデータ、IoT デバイス、携帯電話、ソーシャルネットワーク、チャットボット、ゲームコンソールなどが一般的に普及しており、ストリーミングデータはごく普通のことになりました。Amazon Kinesis Streams は、何千ものストリーミングデータソースから毎時間ごとにテラバイト単位のデータをキャプチャ、処理、分析、保存できるカスタムアプリケーションの構築を可能にしています。Amazon Kinesis Streams では、アプリケーションが同じ Kinesis ストリームから同時にデータを処理することができるので、並列処理システムを構築することができます。たとえば、処理済みのデータを Amazon S3 だけに送信するようにし、Amazon Redshift で複雑な分析を行ったり、AWS Lambda を使用する堅牢なサーバーレスストリーミングソリューションを構築することもできます。

Kinesis Streams では消費者が複数のストリーミングユースケースを利用できるようにしていますが、今後は Kinesis Streams でサーバー側の暗号化 (SSE) をサポートすることにより、移動中のデータをより効率的に保護できるようになりました。この新しい Kinesis Streams の機能により、データのセキュリティを強化したり、組織のデータストリーミングで必要となる様々な規制とコンプライアンス要件を満たすことができます。
Kinesis Streams は Payment Card Industry Data Security Standard (PCI DSS) のコンプライアンスプログラムで AWS 対象範囲内サービスの 1 つになっているほどです。PCI DSS は、主要な金融機関が設立した PCI Security Standards Council が管轄する専有情報のセキュリティ基準です。PCI DSS コンプライアンスは、カード所有者のデータやサービスプロバイダを含む機密性の高い認証データを保存、処理、転送する機関すべてに適用されます。AWS Artifact を使用して、PCI DSS Attestation of Compliance や Responsibility Summary をリクエストすることができます。Kinesis Streams とのコンプライアンスにおけるメリットはこの他にもあります。Kinesis Streams は AWS GovCloud の FedRAMP にも準拠しています。FedRAMP は Federal Risk and Authorization Management Program の略で、米国政府全体のプログラムであり、クラウド製品およびサービス向けのセキュリティ評価、認証、継続的なモニタリングに関する標準化されたアプローチを提供するものです。AWS サービスの FedRAMP コンプライアンスに関する詳細についてはこちらをご覧ください。

 

さあ、いかがでしょう? 興味が湧いてきましたか?今すぐ始めてみましょう!でもとりあえず、後もう少し説明してみます。Kinesis Streams の SSE に触れたので、Kinesis におけるサーバー側の暗号化の流れについて解説しておきます。PutRecord または PutRecords API を使用して Kinesis Stream に含んだデータレコードやパーティションキーは AWS Key Management Service (KMS) マスターキーを使用して暗号化されています。AWS Key Management Service (KMS) のマスターキーを使用し、Kinesis Streams は 256 ビットの Advanced Encryption Standard (AES-256 GCM アルゴリズム) を使って受信データに暗号化を追加します。

新規または既存のストリームに Kinesis Streams でサーバー側の暗号化を有効にするには、Kinesis マネジメントコンソールを使用、または利用可能な AWS SDK の 1 つを使います。また、ストリームの暗号化の履歴を監査したり、Kinesis Streams コンソールで特定のストリームの暗号化ステータスの検証や、PutRecord または GetRecord トランザクションが AWS CloudTrail サービスを使用して暗号化されているか確認することができます。

チュートリアル: Kinesis Streams でのサーバー側の暗号化

Kinesis Streams でサーバー側の暗号化の演習をしてみましょう。まず [Amazon Kinesis console] にアクセスし [Streams console] オプションを選択します。

Kinesis Streams コンソールにアクセスしたら、既存の Kinesis ストリームの 1 つにサーバー側の暗号を追加または新しい Kinesis ストリームを作成することができます。このチュートリアルでは、Kinesis ストリームを作成したいので [Create Kinesis stream] ボタンを選択します。

ストリーム名を「KinesisSSE-stream」にし、ストリームにシャードを 1 つ割り当てます。ストリームのデータ容量は、そのストリームで特定されたシャード数をもとに計算されることを忘れないでください。コンソールで [Estimate the number of shards you’ll need ] ドロップダウンを使用するか、こちらでストリームのシャード数を予測する計算に関する詳細をご覧ください。[Create Kinesis stream] ボタンをクリックして、ストリームの作成を完了します。

KinesisSSE-stream を作成したら、ダッシュボードを選択し [Actions] ドロップダウンで [Details] オプションを選択します。


KinesisSSE-stream の [Details] ページに [Server-side encryption] セクションが表示されるようになります。このセクションで [Edit] ボタンを選択します。

これで、[Enabled] ラジオボタンを選択し AWS KMS のマスターキーでストリームにサーバー側の暗号化を有効にすることができます。このボタンを選択すると、どの AWS KMS マスターキーを KinesisSSE-stream のデータの暗号化に使用するか選択できます。Kinesis サービスが生成した KMS のマスターキーまたは (Default) aws/kinesis、もしくはすでに作成してある KMS マスターキーの 1 つを選択することができます。この例では、デフォルトのマスターキーを選びます。後は [Save] ボタンをクリックするだけです。


これで完了です。 次のスクリーンショットで表示されているように、約 20 秒後にはサーバー側の暗号化が Kinesis ストリームに追加され、ストリームに送信された受信データはすべて暗号化されるようになります。サーバー側の暗号化では、暗号化が有効になって初めて受信データが暗号化される点にご注意ください。サーバー側の暗号化が有効になる前の Kinesis ストリームにある既存のデータは暗号化されません。

まとめ

AWS KMS キーを使用してサーバー側の暗号化を有効にしている Kinesis Streams では、ストリームに送られてくるストリーミングデータの暗号化の自動化を行いやすくします。AWS マネジメントコンソールまたは AWS SDK を使用して、Kinesis ストリームのサーバー側の暗号化の開始、停止、更新ができます。Kinesis のサーバー側の暗号化に関する詳細や AWS Key Management Service または Kinesis Streams のレビューに関する詳細については「Amazon Kinesis の入門ガイド (Amazon Kinesis getting started guide)」、「AWS Key Management Service の開発者ガイド (AWS Key Management Service developer guide)」または「Amazon Kinesis の製品ページ (Amazon Kinesis product page)」をご覧ください。

ストリーミングをぜひご活用ください。

Tara

Amazon Kinesis Data Generatorを使用してストリーミングデータソリューションをテストする

ストリーミングデータソリューションを構築する場合、ほとんどのお客様は、本番データと同様のデータを使用してストリーミングデータソリューションをテストしたいと考えています。この、データを作成してソリューションにストリーミングすることは、ソリューションをテストする際の最も退屈な作業かもしれません。

Amazon Kinesis StreamsAmazon Kinesis Firehoseを使用すると、数十万のソースから1時間にテラバイト級のデータを連続的に捉えて保存できます。 Amazon Kinesis Analyticsでは、標準SQLを使用してリアルタイムでこのデータを分析および集計することができます。 AWS Management Console(またはAWS CLIまたはAmazon Kinesis APIを使用したいくつかのコマンド)で数回クリックするだけで、Amazon KinesisストリームまたはFirehose配信ストリームを簡単に作成できます。ただし、テストデータの連続したストリームを生成するには、AWS SDKまたはCLIを使用してAmazon Kinesisにテストレコードを送信することで、連続して実行されるカスタムプロセスまたはスクリプトを作成する必要があります。この作業はソリューションを適切にテストするために必要ですが、複雑さと開発時間とテスト時間が長くなることを意味します。

テストデータを生成してAmazon Kinesisに送信するユーザーフレンドリーなツールがあれば素晴らしいとは思いませんか?そこで、Amazon Kinesis Data Generator(KDG)の出番です。

(more…)

Kinesis Firehoseを使用してApache WebログをAmazon Elasticsearch Serviceに送信する

ElasticsearchLogstash、および、Kibana(ELK)スタックを所有して運用する多くのお客様が、他の種類のログの中でもApache Webログを読み込んで可視化しています。 Amazon Elasticsearch Serviceは、AWSクラウドにElasticsearchとKibanaを提供しており、セットアップと運用が簡単です。 Amazon Kinesis Firehoseは、Amazon Elasticsearch ServiceにApache Webログ(またはその他のログデータ)をサーバーレスで確実に配信します。

Firehoseを使用すると、Firehose内のレコードを変換するAWS Lambda関数への自動呼び出しを追加できます。これらの2つのテクノロジーを使用すると、既存のELKスタックを効果的かつ簡単に管理することができます。

この記事では、最初にAmazon Elasticsearch Serviceドメインを設定する方法を説明します。次に、事前ビルドされたLambda関数を使用してApache Webログを解析するFirehoseストリームを作成して接続する方法を示します。最後に、Amazon Kinesis Agentでデータをロードし、Kibanaで可視化する方法を示します。

(more…)

AWS上でApache Flinkを使用してリアルタイムストリーム処理パイプラインを構築する

今日のビジネス環境では、多様なデータソースが着実に増加していく中で、データが継続的に生成されています。したがって、このデータを継続的にキャプチャ、格納、および処理して、大量の生データストリームを実用的な洞察に素早く繋げることは、組織にとって大きな競争上のメリットになっています。

Apache Flinkは、このようなストリーム処理パイプラインの基礎を形成するのに適したオープンソースプロジェクトです。ストリーミングデータの継続的な分析に合わせたユニークな機能を提供しています。しかし、Flinkを基にしたパイプラインの構築と維持には、物理​​的なリソースと運用上の努力に加え、かなりの専門知識が必要になることがよくあります。

この記事では、Amazon EMRAmazon KinesisAmazon Elasticsearch Serviceを使用してApache Flinkを基にした、一貫性のあるスケーラブルで信頼性の高いストリーム処理パイプラインの参照アーキテクチャの概要を説明します。 AWSLabs GitHubリポジトリは、実際に参照アーキテクチャを深く理解するために必要なアーティファクトを提供します。リソースには、サンプルデータをAmazon Kinesisストリームに取り込むプロデューサアプリケーションと、リアルタイムでデータを分析し、その結果をAmazon ESに可視化するためのFlinkプログラムが含まれています。

(more…)

AWS KMSを使用してAmazon Kinesisレコードを暗号化および復号する

コンプライアンスやデータセキュリティの要件が厳しいお客様は、AWSクラウド内での保存中や転送中など、常にデータを暗号化する必要があります。この記事では、保存中や転送中もレコードを暗号化しながら、Kinesisを使用してリアルタイムのストリーミングアプリケーションを構築する方法を示します。

Amazon Kinesisの概要

Amazon Kinesisプラットフォームを使用すると、要求に特化したストリーミングデータを分析または処理するカスタムアプリケーションを構築できます。 Amazon Kinesisは、ウェブサイトクリックストリーム、金融取引、ソーシャルメディアフィード、ITログ、トランザクショントラッキングイベントなど、何十万ものソースから1時間につき数テラバイトのデータを連続的にキャプチャして保存できます。
Amazon Kinesis Streamsは、HTTPSを使用してクライアント間でデータを暗号化し、転送されているレコードの盗聴を防止します。ただし、HTTPSで暗号化されたレコードは、データがサービスに入ると解読されます。このデータは24時間保管され(最大168時間まで延長可能)、アプリケーションの処理、再処理、処理遅延の際の巻き取りに対して十分なゆとりが確保されています。

ウォークスルー

Amazon Kinesis Producer Library(KPL)、Kinesis Consumer Library(KCL)、AWS KMSaws-encryption-sdkを使用してサンプルKinesisプロデューサおよびコンシューマアプリケーションへの暗号化と復号を行います。この記事で使用されているKinesisレコードの暗号化と復号に使用される方法とテクニックは、あなたのアーキテクチャに簡単に再現できます。いくつか制約があります:

  • AWSは、暗号化と復号のためのKMS APIリクエストの使用料金を請求します。詳しくは、「AWS KMSの料金」を参照してください。
  • Amazon Kinesis Analyticsを使用して、このサンプルアプリケーションのクライアントによって暗号化されたレコードのAmazon Kinesis Streamにクエリすることはできません。
  • アプリケーションでレイテンシの低い処理が必要な場合は、レイテンシに多少の上乗せがあることに注意してください。

次の図は、ソリューションのアーキテクチャを示しています。

(more…)

Amazon Kinesis Analytics - SQL を使用してリアルタイムにストリーミングデータを処理

ご存知の通り、Amazon Kinesis では、AWS Cloud でリアルタイムにストリーミングデータを操作するプロセスが大幅に簡素化されています。独自のプロセスおよび短期のストレージインフラストラクチャを設定および実行する代わりに、ただ Kinesis ストリームまたは Kinesis Firehose を作成し、そこにデータを流し込むように設定してから、それを処理または分析するアプリケーションを構築するのみです。Kinesis ストリームと Kinesis Firehose を使用してストリーミングデータソリューションを構築するのは比較的簡単なのですが、さらに簡素化したいと考えました。作業開発者、データサイエンティスト、または SQL 開発者であるかどうかにかかわらず、お客様がウェブアプリケーション、テレメトリ、およびセンサーレポートの大量のクリックストリームを、接続されたデバイス、サーバーログなどからすべてリアルタイムに標準クエリ言語を使用して処理できるようにしたいと考えました。

Amazon Kinesis Analytics
本日より、Amazon Kinesis Analytics がご利用いただけるようになりました。ストリーミングデータに対して SQL クエリを継続的に実行し、データが届くと同時にフィルタリング、変換、および集約できるようになりました。インフラストラクチャで時間を無駄にするのではなく、データの処理とそこからのビジネス価値の抽出に注力できます。強力なエンドツーエンドのストリームプロセスパイプラインを 5 分で構築できます。SQL クエリより複雑なものを作成する必要はありません。
データベーステーブルに対する一連の SQL クエリの実行を考慮する場合、クエリは通常、非常に素早く追加および削除されるのに対して、データはほぼ静的なままです。常に行が追加、変更および削除されますが、これは特定の時点で実行する単一のクエリを考慮する際、一般的には関係ありません。Kinesis Analytics のクエリをストリーミングデータに対して実行すると、このモデルと正反対のことが生じます。クエリは長時間実行され、新しいレコード、監視、またはログエントリが届くと同時に、データは毎秒何度も変化します。一度この仕組みを把握すると、クエリプロセスモデルが非常に理解しやすいことが分かります。レコードが届くと同時に処理する、持続的なクエリを構築するのです。所定のクエリで処理されるレコードのセットを制御するには、プロセス「ウィンドウ」を使用します。Kinesis Analytics では、3 つの異なるタイプのウィンドウがサポートされています。

タンブリングウィンドウは定期的なレポートに使用されます。タンブリングウィンドウを使用して、時間の経過と共にデータをまとめることができます。おそらく毎秒数千~数百万ものリクエストを受信するので、1 分ごとの受信数を知りたいと思うでしょう。現在のタンブリングウィンドウが閉じると、その後に次のウィンドウが開始します。ウィンドウがいっぱいになるたびに、新しい結果が生成されます。

スライディングウィンドウは、モニタリングとその他のタイプのトレンド検出に使用されます。例えば、スライディングウィンドウを使用してリアルタイムで動くエラー率の平均をコンピューティングできます。レコードがウィンドウに入り、レコードがその中にあるかぎり結果に反映され、ウィンドウは進行します。新しいレコードがウィンドウに入るたびに、新しい結果が生成されます。ウィンドウのサイズを調整して、結果の精度を制御できます。

カスタムウィンドウは、適切なグループが厳密に時間に基づいていない場合に使用されます。クリックストリームデータまたはサーバーログを処理している場合、カスタムウィンドウを使用してセッション化として知られるアクションを実行できます。つまり、各ユーザーが実行する最初と最後のアクション (受信データ内のセッション ID により識別される) によって、各クエリをバインドできます。各ユーザーが閲覧したページ数またはサイトで費やした時間をコンピューティングするクエリを作成できます。

これらすべてはいくらか複雑に聞こえるかもしれませんが、実装するのは非常に簡単です。Kinesis Analytics では、受信レコードのサンプルを分析し、最適なスキーマを提案します。それをそのまま使用することも、微調整して実際のデータモデルをさらに反映することもできます。スキーマが定義されると、組み込み SQL エディターを使用できます (ライブデータに対する構文チェックと簡単なテストを含む)。クエリの結果を Amazon S3Amazon RedshiftAmazon Elasticsearch Service、または Amazon Kinesis Stream を含む最大 4 つの送信先にルーティングするように、Kinesis Analytics を設定できます。初めての Amazon Kinesis Analytics アプリケーションを構築する場合、SQL ステートメントのペアを作成する必要があります (より複雑なアプリケーションには多くを使用する場合がありますが、必要なのは起動と実行の 2 つのみです)。

これは、中間 SQL 結果を保存するアプリケーション内のストリームを作成するためのステートメントです (ストリームは、選択および挿入でき、継続的にアップデートされる SQL テーブルのようなものです)。

1 つのアプリケーション内のストリームから SQL クエリを選択し、別のアプリケーション内のストリームに挿入します。

また、S3 を送信元とするデータを参照するよう、SQL ステートメントもレコードに追加できます。これはレコードを強化または変更し、追加のさらに詳細な情報を含める時に役立ちます。

動作中の Amazon Kinesis Analytics
動作中の Amazon Kinesis Analytics を数分見てみましょう。
Amazon Kinesis Analytics コンソールにログインし、Create new application をクリックします。名前とアプリケーションの説明を記入します。

データソース、クエリ、および送信先を管理できるようになりました。

既存の入力ストリームの 1 つを選択できます。

または、新しいストリームを設定できます (こちらを実行します)。

Create demo stream をクリックし、サンプルの株式相場表示機のデータが入力されるストリームを作成します。これには 30~40 秒かかります。Kinesis Analytics はストリームを参照し、スキーマを提案します。それをそのまま承認することも、微調整することもできます。

それから SQL エディターに移動します。アプリケーションの起動を提案しています。良いアイデアだと思うので、同意して Yes をクリックし、アプリケーションを起動します。

これが実際の SQL エディターです。

クエリをゼロから作成することも、テンプレートを使用することもできます。

継続的なフィルターを選びました。これが SQL です。

確認し、アグリーメントを承認し、Save and run SQL をクリックしました。数秒以内に結果が流れ始め、コンソールに表示されました。

SQL エディターを使用して、部門料金列を削除するようクエリを変更し、クエリを再度実行しました。この作業を行ってみて、CREATE STREAM ステートメントから列を削除する必要があることを学びました (振り返ってみると明白なのですが、これは長い 1 日の最後の作業でした)。こちらが修正した結果セットです。

ほとんどの場合、次のステップでは結果を新規または既存のストリームにルーティングすることになるでしょう。これはコンソールから実行できます。

数回のクリックとほんの少しの入力のみで、本番環境規模の株式相場表示機のストリームを処理できる Amazon Kinesis Analytics アプリケーションを作成できました。本番環境で使用される前に、この「デモ」には何の変更も必要ありません。素晴らしいです。

詳細 & お試し
いつものように、このすばらしい新サービスについては、やっと分かり始めたばかりです。詳細については、新しい投稿をご覧ください。Writing SQL on Streaming Data with Amazon Kinesis Analytics上記のステップは 5 分またはそれ以下で再現できるはずです。ぜひお試しされるようにお勧めします。アプリケーションを作成し、SQL クエリをカスタマイズし、大規模なストリーミングデータの処理方法を学んでください。

今すぐご利用可能
今すぐ Amazon Kinesis Analytics をご利用いただけます。今日からストリーミングデータに対してクエリの実行を開始できます。

Jeff