Amazon Web Services ブログ

Amazon Kinesis を用いた Databaseの継続的な変更

Emmanuel Espina は、アマゾン ウェブ サービスのソフトウェア開発エンジニアです。

このブログ記事では、Amazon Kinesis を使用して変更をストリーミングすることによって、中央リレーショナルデータベース を他のシステムと統合する方法について説明します。

次の図は、分散システムにおける一般的なアーキテクチャ設計を示しています。これには、「」と呼ばれる中央ストレージと、この中央ストレージを消費するいくつかの派生「衛星」システムが含まれます。

この設計アーキテクチャを使用して、リレーショナルデータベースを中央データストアとして使用し、このシステムのトランザクション機能を利用してデータの整合性を維持することができます。このコンテキストにおける派生システムは、この変化の事実の単一ソースを観察し、それらの変更を変換し、フィルタリングし、最終的にはその内部インデックスを更新する全文検索システムとすることができます。もう 1 つの例は、OLAP クエリに適した列形式ストレージです。一般に、中央リレーショナルシステムの個々の行を変更する際にアクションを取る必要のあるシステムは、派生データストアに適した候補となります。

これらの種類のアーキテクチャの単純な実装では、変更された行を検索するために派生システムが定期的にクエリを発行し、本質的に SELECT ベースのクエリで中央データベースをポーリングします。

このアーキテクチャのより優れた実装となるのが、非同期の更新ストリームを使用するアーキテクチャです。データベースには通常、行のすべての変更が格納されるトランザクションログがあるため、この変更のストリームが外部オブザーバシステムに公開されている場合、これらのシステムにこれらのストリームを添付して行の変更を処理およびフィルタリングできます。ここでは、中央データベースとして MySQL、メッセージバスとして Amazon Kinesis を使用して、このスキーマの基本的な実装をご紹介します。

通常、MYSQL バイナリログは、マスター上のすべての変更を読み取ってローカルに適用する読取りレプリカに公開されます。この記事では、変更をローカルデータベースに適用するのではなく、Amazon Kinesis ストリームに変更を公開する、一般化されたリードレプリカを作成します。

このメソッドの重要な点の 1 つは、コンシューマーが SQL クエリを受け取らないことです。SQL クエリは公開される可能性もありますが、一般的なオブザーバーは、SQL 互換のデータレプリカを維持しない限り、SQL にはあまり関心がありません。代わりに、変更されたエンティティ (行) を 1 つずつ受け取ります。このアプローチの利点は、コンシューマーが SQL を理解する必要はなく、事実の単一ソースは誰が変更を消費するのかを知る必要はないということにあります。これは、さまざまなチームが、必要なデータ形式で調整することなく作業できることを意味します。さらに都合がいいことに、Amazon Kinesis クライアントはが特定の時点から読む機能を備えているため、各コンシューマーは独自のペースでメッセージを処理します。これが、メッセージバスがシステムを統合するための結合されていない方法の 1 つとなる理由です。

この記事で使用されている例では、行フェッチャーは中央データベースに接続する通常の Python プロセスであり、リードレプリカをシミュレートします。

データベースは、Amazon RDS または MySQL の任意のインストールのいずれかになります。RDS の場合、フェッチャープロセスは RDS インスタンスホストにカスタムソフトウェアをインストールすることができないため、別のホスト (たとえば EC2) にインストールする必要があります。外部インストールの場合、フェッチャープロセスはデータベースと同じホストにインストールできます。

マスター MySQL インスタンスの準備

MySQL マスター (真実の単一のソース) は、それが通常のレプリケーションのマスターであるかのように設定する必要があります。個々の変更された行を受け取るには、Binlog を有効にして ROW 形式で作業する必要があります。(そうしないと、SQL クエリだけになってしまいます。)詳細については、MySQL サイトの「バイナリログ」を参照してください。

バイナリログを有効にするには、my.cnf 設定ファイルに次の 2 行を追加します。

log_bin=<path to binlog>
binlog_format=ROW

すべての接続 (たとえば、init_connect または JDBC などのデータベース API を使用) のグローバルまたはセッションレベルで、トランザクション分離レベルをコミット済み読み取りに設定することによって、行ベースのロギングを取得できます。

RDS (MySql 5.6+) を使用している場合は、簡単です。定期的なバックアップを有効にして (バックアップが有効になっていなければバイナリログは無効になります)、パラメータグループ変数 binlog_format を ROW に更新することによって、必要な設定を作成できます。(これは、パラメータグループの RDS ダッシュボードから行うことができます。)

アクセス権限の追加

RDS で作成されたデフォルトのユーザーを使用している場合は、既にこれらのアクセス許可がある可能性があります。そうでない場合は、REPLICATION SLAVE 権限を持つユーザーを作成する必要があります。詳細については、「レプリケーション用のユーザーの作成」を参照してください。

mysql> CREATE USER 'repl'@'%.mydomain.com' IDENTIFIED BY 'slavepass';
mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%.mydomain.com';

Amazon Kinesis ストリームの作成

Amazon Kinesis ストリームと boto3 クライアントの認証情報が必要です。クライアントの認証情報の詳細については、「Boto 3 ドキュメント」を参照してください。

Amazon Kinesis コンソールを開き、[ストリームの作成] を選択します。

使用するストリームの名前とシャード数を入力します。この例では、1 つのシャードがあります。

数分後、ストリームで行の変更を受け入れる準備が整います。

CLI ユーザーに権限を割り当てる

AWS Identity and Access Management (IAM) を使用して、このストリームにアクセスする CLI ユーザに権限を与えることができます。

この例では、そのユーザーは KinesisRDSIntegration です。ユーザーを作成したり、既存のユーザーを使用することはできますが、Amazon Kinesis ストリームへの書き込み権限を追加する必要があります。

ストリームに固有のポリシーを作成できます。この例では、Amazon Kinesis に完全にアクセスできるスタンダードポリシーを使用しています。

マスターに接続して変更を公開する

Python パブリッシャーが必要とするライブラリをインストールするには、次のコマンドを実行します。

pip install mysql-replication boto3

詳細な手順については、次を参照してください。

https://github.com/noplay/python-mysql-replication

https://boto3.readthedocs.io/en/latest/guide/quickstart.html

ここに、マジックを実行する Python スクリプトがあります。<HOST>、<PORT>、<USER>、<PASSWORD>、および <STREAM_NAME> の各変数を設定値に置き換えることを忘れないでください。

Python

 

import json

import boto3



from pymysqlreplication import BinLogStreamReader

from pymysqlreplication.row_event import (

  DeleteRowsEvent,

  UpdateRowsEvent,

  WriteRowsEvent,

)



def main():

  kinesis = boto3.client("kinesis")



  stream = BinLogStreamReader(

    connection_settings= {

      "host": "<HOST>",

      "port": <PORT>,

      "user": "<USER>",

      "passwd": "<PASSWORD>"},

    server_id=100,

    blocking=True,

    resume_stream=True,

    only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])



  for binlogevent in stream:

    for row in binlogevent.rows:

      event = {"schema": binlogevent.schema,

      "table": binlogevent.table,

      "type": type(binlogevent).__name__,

      "row": row

      }



      kinesis.put_record(StreamName="<STREAM_NAME>", Data=json.dumps(event), PartitionKey="default")

      print json.dumps(event)



if __name__ == "__main__":

   main()

このスクリプトは、変更された各行を JSON 形式でシリアル化された Amazon Kinesis レコードとして公開します。

メッセージを使用する

これで、変更されたレコードを使用する準備が整いました。あらゆるコンシューマーコードが動作します。この記事のコードを使用すると、次の形式でメッセージが表示されます。

{"table": "Users", "row": {"values": {"Name": "Foo User", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"Name": "Bar user", "idUsers": 124}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"Name": "Foo User", "idUsers": 123}, "after_values": {"Name": "Bar User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}

まとめ

このブログ記事では、擬似リードレプリカと Amazon Kinesis を使用して、変更ストリームをデータベースのレコードに公開する方法を説明しました。多くのデータ指向の企業が、これに類似したアーキテクチャを使用しています。この記事で提供されている例は、実際の本番環境には対応していませんが、この統合スタイルを試して、エンタープライズアーキテクチャの拡張機能を改善するために使用できます。最も複雑な部分は、おそらく Amazon Kinesis の背後で既に解決されているものでしょう。接着剤の役割を果たすものを提供する必要があります。

その他のリソース

すべてのソフトウェアエンジニアがリアルタイムデータの統合抽象化について知っておくべきこと

Databus に搭載されているすべてのもの: LinkedIn のスケーラブルで一貫性のある変更データキャプチャプラットフォーム