Amazon Web Services ブログ

AWS Database Migration Serviceを使用し、Amazon Kinesis Data Streamingを実施する

この投稿では、AWS Database Migration Service (AWS DMS) を使用し、Amazon Kinesis Data Streamsに変更データをストリーミングする方法について議論します。

以前の投稿 Load CDC Data では、データ処理アーキテクチャについてリアルタイムで議論しました。その一環として、AWS DMS を使用して Amazon RDS for Microsoft SQL Server のデータベースの変更を取得し、AWS Lambdaを使用して Kinesis Data Streams に送信する方法について扱いました。AWS DMS のターゲットとしての Kinesis Data Streams の開始により、変更データの取得データ(CDC)のストリーミング、分析、および格納がより簡単になりました。DMSは、成功事例を使用してデータからの変更を自動的に収集し、それを Data Streams にストリーミングします。

Kinesis Data Streamsをターゲットとして追加したため、お客様がデータレイクを構築し、データストアからデータを変更してリアルタイムで処理することができます。 データ統合パイプラインで AWS DMS を使用して、ほぼリアルタイムで Kinesis Data Stream にデータを複製できます。このアプローチを使用すれば、アプリケーションを高価なデータベース上に構築しなくても、デカップリングされ、最終的に一貫したデータベースビューを構築が可能になります。

リアルタイムの変更データのアーキテクチャ

数多くの組織がKinesis Data ストリームを使用して変更データを分析し、Webサイト、不正検出、広告、モバイルアプリケーション、IOTなどを監視しています。 この投稿では、AWS DMSを使用して関連データベースの変更データを Kinesis Data Streams にロードする方法について説明します。また、次の図に示すように、データプラットフォームアーキテクチャを Kappa アーキテクチャに展開する方法についても説明します。

上図では、AWS DMS は Kinesis Data Streams のいくつかのソースをターゲットとしてサポートしています。RDBMS または NoSQL からのリアルタイムの変更を AWS DMS dream で Amazon Kinesis Data Streams にロードし、Amazon EMR の Apache Sparkを使用してさらに変換し処理できます。このデータは、Amazon QuickSightAmazon AthenaAmazon CloudSearch、AWS Lambda、KCL アプリケーション または Amazon Kinesis Data Analytics等のさまざまなアプリケーションでの使用が可能になります。当該データは、Amazon Redshiftやバッチプロセスレポートを実行する Amazon DynamoDB 等のデータベースに格納できます。

また、データストリームを Amazon Kinesis Data Firehoseでキャプチャして詳細分析のため、Amazon S3バケツへのロードもできます。たとえば、顧客お問い合わせセンターへのメールや電話をすべて処理して、お客様の苦情により効果的に対応することができます。顧客ケアの従業員は、クレジットカード、ローン、アカウントなどのさまざまな金融部門の顧客記録にアクセスし、より高い可用性を備えた、より迅速でより良い顧客体験を提供することができます。

お客様の Amazon Kinesis サービスの使用方法の他の例をいくつかご紹介します。

  1. Netflix はAmazon Kinesisを使用して、全アプリケーション間の通信を監視し、問題を迅速に検出して修正し、顧客のサービス稼働率と可用性を確保しています。
  2. Zillow は、Amazon Kinesisを使用して、公的記録データとMLSリストを収集しています。その後、Zillowはほぼリアルタイムで住宅価額の見積りを更新し、住宅の買い手と売り手は最新の住宅価額の見積りを得ることができます。
  3. Sonos はAmazon Kinesisを使用して、ワイヤレスハイファイオーディオデバイスから週10億件のイベントを監視し、顧客により良いリスニング体験を提供しています。
  4. 多国籍銀行は、利用統計量を照会して数分以内に詐欺を検出することもできました。銀行ローンとクレジットカードの専門家は、ローンとクレジットカードの処理時間を短縮できます。また、そのシステムからより迅速にクレジットスコアを引き上げたので、より早いターンアラウンドタイムを提供することができます。

AWS DMSのターゲットとしてのKinesis Data Streamsを設定する方法

データストリームをAWS DMSの変更データターゲットとして設定し、ストリーミングデータを簡単に開始できます。まず、最小限のアクセスで IAM ロールを作成します。

{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "kinesis:PutRecord",
 "kinesis:PutRecords",
 "kinesis:DescribeStream"
 ],
 "リソース": <streamArn>
 }
 ]
}
信頼関係
{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Principal": {
 "Service": "dms.amazonaws.com"
 },
 "Action": "sts:AssumeRole"
 }
 ]
}

IAM ロールの定義後、AWS DMSで ソースポイントターゲットエンドポイントリプリケーションインスタンス を設定します。ソースはデータを移動するデータベースで、ターゲットはデータを移動する対象のデータベースです。この場合、ソースデータベースは Amazon RDS上の Oracle データベースで、ターゲットデータベースは Kinesis データストリームです。レプリケーションインスタンスは移行タスクを処理し、VPC 内のソースエンドポイントとターゲットエンドポイントにアクセスする必要があります。

AWS Management Consoleにログインして Kinesisを選択し、Kinesis ストリームを作成します。

AWS Database Migration Service コンソールに移動して、ソースとターゲットのエンドポイントを作成します。

以下に示すように、ソースエンドポイントを設定します。

以下に示すように、ターゲットエンドポイントを設定します。

データを Amazon S3にロードする Amazon Kinesis Firehose の配信ストリームを設定します。

ソースと Kinesis エンドポイント間を移行するタスクを作成します。

こちらにオブジェクトマッピングのJSON ファイルの例を示します:

{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "DMS_SAMPLE",
                "table-name": "PLAYER"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "2",
            "rule-action": "map-record-to-record",
            "target-table-name": "PLAYER",
            "object-locator": {
                "schema-name": "DMS_SAMPLE",
                "table-name": "PLAYER"
            },
            "mapping-parameters": {
                "partition-key-type": "schema-table"
            }
        }
    ]
}

タスクが実行され、Kinesis data stream へのデータのロード開始後は、そのデータの Amazon S3 バケツを点検します。

こちらに Oracle ソースデータベースのサンプルデータを示します:

ID SPORT_TEAM_ID LAST_NAME      FIRST_NAME                     FULL_NAME

—————————————————————–

6951       81 Koyie                           Hill                            Koyie Hill

 

SQL>

 

Kinesis データストリームからのサンプルデータ:

{
"data": {
"ID": 6951,
"SPORT_TEAM_ID": 111,
"LAST_NAME": "Koyie",
"FIRST_NAME": " Hill",
"FULL_NAME": "Koyie Hill"
},
"metadata": {
"timestamp": "2018-11-06T18:09:06.363956Z",
"record-type": "data",
"operation": "load",
"partition-key-type": "schema-table",
"schema-name": "DMS_SAMPLE",
"table-name": "PLAYER"
}
}

次のコマンドを使用して、Kinesisデータストリームから直接データを取得することもできます。

SHARD_ITERATOR=$(aws kinesis get-shard-iterator —shard-id shardId-000000000000 —shard-iterator-type TRIM_HORIZON —stream-name kramya-test-integ —query 'ShardIterator')

aws kinesis get-records —shard-iterator $SHARD_ITERATOR

結論

Amazon Kinesis データストリームを AWS DMS ターゲットとして使用して、変更データを関連データベースから直接に Amazon Kinesis データストリームにストリームする強力な方法が可能になりました。この方法を使用して、AWS DMSがサポートするソースから変更データをストリーミングし、リアルタイムデータ処理を実行できます。ストリーミングをお楽しみください!

ご不明な点がございましたら、下記へコメントをお寄せください。

 


著者について

Udayasimha Theepireddyは アマゾン ウェブ サービスのデータベースクラウドアーキテクトです。AWS のお客様と協力して、データベースの移行や大規模なデータプロジェクトに関する指導と技術支援を提供しています。

 

 



Ramya Kaushikは、アマゾン ウェブ サービスの Database Migration Service (DMS) & Schema Conversion Tool (SCT) チーム でデータベースエンジニアを務めています。