Amazon Web Services ブログ

Database Migration Service を使用して、リレーショナルデータベースから Amazon Kinesis に CDC データをロードする

多くの大企業は、タイムリーな情報を得るために、データ処理をバッチ処理からリアルタイム処理に移行しています。そうすることの課題は、リアルタイムのデータ処理アーキテクチャが着信データストリームに追いつけなければならないということです。これには、強固な耐障害性と伸縮性が必要です。

このブログ記事では、リアルタイムのデータ処理アーキテクチャと、Amazon RDS for SQL Server データベースへの変更をキャプチャして Amazon Kinesis Data Streams に送信する方法について解説します。Amazon S3 および AWS Lambda をデータの初期ロードに使用する方法と、AWS Database Migration Service を進行中のレプリケーションに使用する方法を示します。
AWS では、Amazon Kinesis Data StreamsAWS Database Migration Services (DMS)AWS Lambda などのデータ処理を適正に行うためのサービスを数多く提供しています。DMS は、既存のデータを移行し、進行中の変更データをソースシステムからターゲットシステムにレプリケートするのに役立ちます。

前提条件および仮定

  • このソリューションを自分で使用するには、AWS のサービスへのアクセスを提供する AWS アカウントが必要です。
  • サービスは us-east-1 リージョンで作成する必要があり、ネットワーキングの考慮事項を簡素化するために us-east-1 リージョンの同じ VPC で設定する必要があります。
  • Lambda 関数のコンパイル済み Java コードは、この Amazon Repository にある ZIP ファイルに含まれています。S3 バケットを作成し、ZIP ファイルをアップロードします。CloudFormation スタックを作成する際、この S3 バケットの名前を指定する必要があります。
  • RDS データベースのデフォルトのマスターユーザー名は、AWSUser です。CloudFormation スタックを作成する際、マスターパスワードを選択します。

AWS CloudFormation を使用する場合は、関連リソースをスタックと呼ばれる単一のユニットとして管理します。スタックの作成、更新、および削除によって、リソースのコレクションを作成、更新、および削除します。スタック内のすべてのリソースは、スタックの AWS CloudFormation テンプレートによって定義されます。

ソリューションの実行手順

このソリューションを実行するためのコードと CloudFormation テンプレートは、この Amazon GitHub Repository にあります。

提供するのは以下です。

  1. 公開されているスナップショットから取得する Amazon RDS for SQL Server データベース。
  2. Lambda 関数を呼び出す権限を持つ DMS の S3 バケット。
  3. DMS の IAM ロール。
  4. AWS Database Migration Services (AWS DMS)。
  5. Amazon Kinesis Data Stream。
  6. Lambda 関数の IAM ロール。
  7. ソース S3 バケットからターゲット Kinesis Data Stream にデータをロードするための Lambda 関数
  8. Kinesis Data Stream からデータを読み込むための Lambda 関数。

次のアーキテクチャ図は、Database Migration Service を使用して、RDS for SQL Server から Kinesis Data Stream へのデータの流れを示しています。

ステップ 1: CloudFormation Stack を作成する

この Amazon Repository から CloudFormation Script をダウンロードします。

以下に示すように AWS コンソールの CloudFormation ページに移動し、[スタックの作成] をクリックします。

以下に示すように CDCCloudFormation.yml ファイルをアップロードしてます。
次の画面では、以下を提供してください。

  • スタック名
  • CodeS3Bucket の名前 (これは Lambda コード ZIP ファイルのバケットです)
  • データベース用のマスターユーザーパスワード
  • TargetS3BucketName (これは DMS 用のターゲットバケットです)。S3 バケットごとに異なるバケット名を選択します。世界的にユニークなものです。

以下に示すように [次へ] をクリックします。

[Review Details (詳細の確認)] ページが表示されるまで、[次へ] をクリックします。IAM リソースの作成を確認し、以下に示すように [作成] をクリックします。

CloudFormation スタックが完了したら、リソースが作成されたことを確認します。CloudFormation スタックが完了しない場合は、AWS マネジメントコンソール で CloudFormation スタックのイベントを確認します。

ステップ 2: 公開されているスナップショットから Amazon RDS for SQL Server データベースを確認する

CloudFormation スクリプトが完了したら、RDS に SQL Server データベースが作成されます。AWS マネジメントコンソールを確認して、以下に示すように RDS インスタンスが作成されたことを確認できます。

SQL Server の変更データキャプチャ (CDC) も有効にする必要があります。詳細については、Amazon RDS ユーザーガイドの「変更データキャプチャを使用する」をご覧ください。まず、exec msdb.dbo.rds_cdc_enable_db 'CustomerDB' を実行して、データベースの CDC を有効にします。次に、以下のストアドプロシージャを実行してテーブル用の CDC を有効にします。

exec sys.sp_cdc_enable_table   
   @source_schema = N'dbo'
,  @source_name = N'CustomerInformation'
,  @role_name = N'admin'

SQL Server Management Studio 経由で SQL RDS インスタンスにアクセスできない場合は、セキュリティグループ内で RDS インスタンスが待機しているポートを開きます。このブログで提供されている CloudFormation テンプレートを使用している場合、使用するポート番号は 1433 です。

ステップ 3: DMS の S3 バケットを確認する

CDC データ CSV ファイルに対して、Database Migration Service 用のターゲット S3 バケットが作成されます。AWS マネジメントコンソールを確認して、以下に示すように バケットが作成されたことを確認できます。

ステップ 4: DMS Database Migration Service を設定する

AWS Database Migration Service タスクには次の 3 つのエンティティが必要です。

ソースはデータを移動するデータベース、ターゲットはデータを移動するデータベースです。この場合、ソースデータベースは RDS 上の SQL Server で、ターゲットデータベースは S3 バケットです。レプリケーションインスタンスは移行タスクを処理し、VPC 内のソースエンドポイントとターゲットエンドポイントにアクセスする必要があります。レプリケーションインスタンスは、ソースデータベースとターゲットデータベース間の接続を開始し、データを転送し、初期データロード時にソースデータベースで発生したすべての変更をキャッシュします。

CloudFormation スクリプトが正常に完了したら、レプリケーションインスタンス、タスク、およびエンドポイントが作成されていることを確認します。次に示すように DMS タスクを確認して開始します。

ステップ 5: Lambda 関数を検証する

CloudFormation スクリプトが完了したら、コンソールで 2 つの Lambda 関数の作成を確認します。最初の関数は、Database Migration Service によって生成された S3 上の CSV ファイルを読み取ります。2 番目の関数は、Kinesis Data Stream からデータを消費します。

コンソール上で、以下に示す 2 つの Lambda 関数をご覧ください。

Lambda 関数のコンパイル済み Java コードは、この Amazon GitHub Repository にある ZIP ファイルに含まれています。コードを使用するには、S3 バケットを作成し、 CloudFormation スタックを作成するときにその名前を指定します。その後、S3 バケットに ZIP ファイルをアップロードします。

Database Migration Service が CSV データを S3 にロードすると、Kinesis Data Stream が読み込まれます。consumekinesis ストリームの Lambda 関数は、新しいデータのストリームをポーリングします。以下に示すように consumekinesis の Log Group の CloudWatchで、consumekinesis Lambda 関数で消費されたデータを見ることができます。

最初の Lambda 関数の Java コードは、S3 でオブジェクトが作成された後に Lambda が S3 によってトリガーされたときに実行されます。その後、S3 イベントを使用してオブジェクトのメタデータとコンテンツを取得します。CSV ファイルの内容と Kinesis Data Stream を処理します。Lambda が Kinesis Data Stream に発行された新しいレコードを見つけると、2 番目の Lambda 関数の Java コードが実行されます。この場合、Lambda 関数は、ストリームに発行された新しいレコードの Kinesis Data Stream をポーリングします。

ユースケースに合わせてどちらの Lambda 関数のコードも自由に変更してください。

概要

このブログ記事では、Database Migration Service を使用して、CDC データをリレーショナルデータベースから Amazon Kinesis にロードするエンドツーエンドソリューションを示しました。このソリューションを使用すると、リアルタイムのデータ変更を分析できます。たとえば、詐欺行為を監視したり、反マネーロンダリングに関する法規を充足したりすることができます。

このブログ投稿に関するコメントは、以下のコメントセクションからお送りください。この投稿のソリューションの実行に関して質問がございましたら、「コメント」セクションに書き込むこともできます。


著者について

Zafar Kapadia は、アマゾン ウェブ サービスのクラウ​​ドアプリケーションアーキテクトです。アプリケーション開発と最適化プロジェクトを担当しています。また、熱心なクリケット選手であり、さまざまなローカルリーグでプレーしています。

 

 

 

Udayasimha Theepireddy は アマゾン ウェブ サービスのデータベースクラウドアーキテクトです。AWS のお客様と協力して、データベースの移行や大規模なデータプロジェクトに関する指導と技術支援を提供しています。