Category: Database*


Amazon Kinesis を用いた Databaseの継続的な変更

Emmanuel Espina は、アマゾン ウェブ サービスのソフトウェア開発エンジニアです。

このブログ記事では、Amazon Kinesis を使用して変更をストリーミングすることによって、中央リレーショナルデータベース を他のシステムと統合する方法について説明します。

次の図は、分散システムにおける一般的なアーキテクチャ設計を示しています。これには、「」と呼ばれる中央ストレージと、この中央ストレージを消費するいくつかの派生「衛星」システムが含まれます。

この設計アーキテクチャを使用して、リレーショナルデータベースを中央データストアとして使用し、このシステムのトランザクション機能を利用してデータの整合性を維持することができます。このコンテキストにおける派生システムは、この変化の事実の単一ソースを観察し、それらの変更を変換し、フィルタリングし、最終的にはその内部インデックスを更新する全文検索システムとすることができます。もう 1 つの例は、OLAP クエリに適した列形式ストレージです。一般に、中央リレーショナルシステムの個々の行を変更する際にアクションを取る必要のあるシステムは、派生データストアに適した候補となります。

これらの種類のアーキテクチャの単純な実装では、変更された行を検索するために派生システムが定期的にクエリを発行し、本質的に SELECT ベースのクエリで中央データベースをポーリングします。

このアーキテクチャのより優れた実装となるのが、非同期の更新ストリームを使用するアーキテクチャです。データベースには通常、行のすべての変更が格納されるトランザクションログがあるため、この変更のストリームが外部オブザーバシステムに公開されている場合、これらのシステムにこれらのストリームを添付して行の変更を処理およびフィルタリングできます。ここでは、中央データベースとして MySQL、メッセージバスとして Amazon Kinesis を使用して、このスキーマの基本的な実装をご紹介します。

通常、MYSQL バイナリログは、マスター上のすべての変更を読み取ってローカルに適用する読取りレプリカに公開されます。この記事では、変更をローカルデータベースに適用するのではなく、Amazon Kinesis ストリームに変更を公開する、一般化されたリードレプリカを作成します。

このメソッドの重要な点の 1 つは、コンシューマーが SQL クエリを受け取らないことです。SQL クエリは公開される可能性もありますが、一般的なオブザーバーは、SQL 互換のデータレプリカを維持しない限り、SQL にはあまり関心がありません。代わりに、変更されたエンティティ (行) を 1 つずつ受け取ります。このアプローチの利点は、コンシューマーが SQL を理解する必要はなく、事実の単一ソースは誰が変更を消費するのかを知る必要はないということにあります。これは、さまざまなチームが、必要なデータ形式で調整することなく作業できることを意味します。さらに都合がいいことに、Amazon Kinesis クライアントはが特定の時点から読む機能を備えているため、各コンシューマーは独自のペースでメッセージを処理します。これが、メッセージバスがシステムを統合するための結合されていない方法の 1 つとなる理由です。

この記事で使用されている例では、行フェッチャーは中央データベースに接続する通常の Python プロセスであり、リードレプリカをシミュレートします。

データベースは、Amazon RDS または MySQL の任意のインストールのいずれかになります。RDS の場合、フェッチャープロセスは RDS インスタンスホストにカスタムソフトウェアをインストールすることができないため、別のホスト (たとえば EC2) にインストールする必要があります。外部インストールの場合、フェッチャープロセスはデータベースと同じホストにインストールできます。

マスター MySQL インスタンスの準備

MySQL マスター (真実の単一のソース) は、それが通常のレプリケーションのマスターであるかのように設定する必要があります。個々の変更された行を受け取るには、Binlog を有効にして ROW 形式で作業する必要があります。(そうしないと、SQL クエリだけになってしまいます。)詳細については、MySQL サイトの「バイナリログ」を参照してください。

バイナリログを有効にするには、my.cnf 設定ファイルに次の 2 行を追加します。

log_bin=<path to binlog>
binlog_format=ROW

すべての接続 (たとえば、init_connect または JDBC などのデータベース API を使用) のグローバルまたはセッションレベルで、トランザクション分離レベルをコミット済み読み取りに設定することによって、行ベースのロギングを取得できます。

RDS (MySql 5.6+) を使用している場合は、簡単です。定期的なバックアップを有効にして (バックアップが有効になっていなければバイナリログは無効になります)、パラメータグループ変数 binlog_format を ROW に更新することによって、必要な設定を作成できます。(これは、パラメータグループの RDS ダッシュボードから行うことができます。)

アクセス権限の追加

RDS で作成されたデフォルトのユーザーを使用している場合は、既にこれらのアクセス許可がある可能性があります。そうでない場合は、REPLICATION SLAVE 権限を持つユーザーを作成する必要があります。詳細については、「レプリケーション用のユーザーの作成」を参照してください。

mysql> CREATE USER 'repl'@'%.mydomain.com' IDENTIFIED BY 'slavepass';
mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%.mydomain.com';

Amazon Kinesis ストリームの作成

Amazon Kinesis ストリームと boto3 クライアントの認証情報が必要です。クライアントの認証情報の詳細については、「Boto 3 ドキュメント」を参照してください。

Amazon Kinesis コンソールを開き、[ストリームの作成] を選択します。

使用するストリームの名前とシャード数を入力します。この例では、1 つのシャードがあります。

数分後、ストリームで行の変更を受け入れる準備が整います。

CLI ユーザーに権限を割り当てる

AWS Identity and Access Management (IAM) を使用して、このストリームにアクセスする CLI ユーザに権限を与えることができます。

この例では、そのユーザーは KinesisRDSIntegration です。ユーザーを作成したり、既存のユーザーを使用することはできますが、Amazon Kinesis ストリームへの書き込み権限を追加する必要があります。

ストリームに固有のポリシーを作成できます。この例では、Amazon Kinesis に完全にアクセスできるスタンダードポリシーを使用しています。

マスターに接続して変更を公開する

Python パブリッシャーが必要とするライブラリをインストールするには、次のコマンドを実行します。

pip install mysql-replication boto3

詳細な手順については、次を参照してください。

https://github.com/noplay/python-mysql-replication

https://boto3.readthedocs.io/en/latest/guide/quickstart.html

ここに、マジックを実行する Python スクリプトがあります。<HOST>、<PORT>、<USER>、<PASSWORD>、および <STREAM_NAME> の各変数を設定値に置き換えることを忘れないでください。

Python

 

import json

import boto3



from pymysqlreplication import BinLogStreamReader

from pymysqlreplication.row_event import (

  DeleteRowsEvent,

  UpdateRowsEvent,

  WriteRowsEvent,

)



def main():

  kinesis = boto3.client("kinesis")



  stream = BinLogStreamReader(

    connection_settings= {

      "host": "<HOST>",

      "port": <PORT>,

      "user": "<USER>",

      "passwd": "<PASSWORD>"},

    server_id=100,

    blocking=True,

    resume_stream=True,

    only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])



  for binlogevent in stream:

    for row in binlogevent.rows:

      event = {"schema": binlogevent.schema,

      "table": binlogevent.table,

      "type": type(binlogevent).__name__,

      "row": row

      }



      kinesis.put_record(StreamName="<STREAM_NAME>", Data=json.dumps(event), PartitionKey="default")

      print json.dumps(event)



if __name__ == "__main__":

   main()

このスクリプトは、変更された各行を JSON 形式でシリアル化された Amazon Kinesis レコードとして公開します。

メッセージを使用する

これで、変更されたレコードを使用する準備が整いました。あらゆるコンシューマーコードが動作します。この記事のコードを使用すると、次の形式でメッセージが表示されます。

{"table": "Users", "row": {"values": {"Name": "Foo User", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"values": {"Name": "Bar user", "idUsers": 124}}, "type": "WriteRowsEvent", "schema": "kinesistest"}
{"table": "Users", "row": {"before_values": {"Name": "Foo User", "idUsers": 123}, "after_values": {"Name": "Bar User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"}

まとめ

このブログ記事では、擬似リードレプリカと Amazon Kinesis を使用して、変更ストリームをデータベースのレコードに公開する方法を説明しました。多くのデータ指向の企業が、これに類似したアーキテクチャを使用しています。この記事で提供されている例は、実際の本番環境には対応していませんが、この統合スタイルを試して、エンタープライズアーキテクチャの拡張機能を改善するために使用できます。最も複雑な部分は、おそらく Amazon Kinesis の背後で既に解決されているものでしょう。接着剤の役割を果たすものを提供する必要があります。

その他のリソース

すべてのソフトウェアエンジニアがリアルタイムデータの統合抽象化について知っておくべきこと

Databus に搭載されているすべてのもの: LinkedIn のスケーラブルで一貫性のある変更データキャプチャプラットフォーム

 

Amazon Aurora Multi-Master のプレビュー申し込み開始

Amazon Aurora Multi-Master は、複数のアベイラビリティゾーンにわたって複数のRead/Writeマスターインスタンスを作成することができます。これによって、リードレプリカで現在できることと同様に、アプリケーションは1つのクラスター内の複数のデータベースインスタンスを読み書きできるようになります。

Multi-MasterクラスターはAuroraの高可用性を向上させます。複数インスタンス内の1つが落ちたとしても、クラスター内のその他のインスタンスに即座に引き継がれます。インスタンス障害やAZ全体障害が起きたとしても、アプリケーションのダウンタイムほぼゼロで、読み書きの可用性が維持されます。

現在のSingle-MasterのAuroraは、1つのクラスター内に1台の書き込みインスタンスと最大15台の昇格可能なリードレプリカをサポートし、書き込みインスタンスはr4.16xlargeで秒間200,000書き込みを実行できます。Aurora Multi-Master であれば、より高い書き込みスループットを必要とするワークロードであっても、マスターインスタンスを追加することで書き込みを水平方向にスケールアウトさせることができます。

このプレビューはMySQL互換のAuroraで使用でき、サインアップフォームに記入することで参加できます。

Aamazon Aurora はオープンソースデータベースのシンプルさとコスト効率、およびハイエンド商用データベースのパフォーマンスと可用性を両立した、完全マネージドなリレーショナルデータベースです。

Aurora Multi-Master プレビュー: サインアップ

(翻訳はSA柴田が担当しました。原文はこちら

新機能- Amazon DynamoDBにGlobal TablesとOn-Demand Backupが追加されました

AWSの多くのお客様にDynamoDBは広く、ミッションクリティカルな用途に使われています。金融、E-コマース、広告、IoT、そしてゲームなど様々な用途で使われており、数百万リクエスト/秒のスループットとテラバイトのデータと数兆のアイテムを格納しています。

本日、我々は2つの強力な新機能を皆様に紹介出来る事に喜びを感じています。

Global Tables – 今から新しくテーブルを作る時に全自動で2つのリージョン間、若しくはそれ以上のリージョン間で同期されるマルチマスタのテーブルを数回のクリックで簡単に作成する事が出来ます。簡単かつ素早く構築が出来る事と、大規模なスケールをアプリケーションに提供出来るためグローバルスケールのユーザーに対応することがレプリケーションの管理無しに可能になります。

On-Demand Backup – 一回のクリックで、今利用しているDynamoDBのテーブルのフルバックアップを取得する事が今から可能です。これはパフォーマンスや可用性などに影響がありません。バックアップ取得中もアプリケーションはオンラインのまま実行され、設定したプロビジョンドキャパシティ応じて読み書きが可能です。バックアップは長期間の保持、アーカイブを行い、監査要件などで必要な場合にも役立ちます。

Global Tables

DynamoDBでは既にデータは3つのAZにレプリケーションされ耐久性と高い可用性のストレージを提供しています。今日からGlobal Tablesを使うことによって、あなたのテーブルのデータは複数のリージョンへのレプリケーションを数クリックで実現出来ます。多くのグローバルで提供しているアプリケーションに必要な素早い書き込み、読み込み性能をこのスケールによって実現が可能です。

アプリケーションのコードは変更する必要がありません。書き込みリクエストと結果整合性の読み込みリクエストであればそれぞれのリージョンで作成されるendpointにリクエストを送れば大丈夫です(もし強い一貫性の読み込みが必要であれば一つのendpointに書き込みと読み込みリクエストを一つのendpointに集約する必要があります)。裏側では、DynamoDBはマルチマスター書き込みを実装し、特定のアイテムへの最後の書き込みが確実に行われるようにします。あなたがGlobal Tablesを使う時、全てのアイテムは最新の書き込み時刻を表すタイムスタンプ属性を含む様になります。アイテムへのアップデートがあった場合他のリージョンへ非同期で伝播し、DynamoDB Streams経由で完了までに1秒程度で終わります(あなたがこの状況を追跡するために新しくTimeToReplicateReplicationQueueBacklogメトリクスが追加されます。)。
(more…)

Amazon Neptune – フルマネージドのグラフデータベースサービス

現代の生活を可能にするために必要な全てのデータ構造やアルゴリズムの中でも、グラフは日々世界を変えています。ビジネスからは、複雑な関係性を持つリッチなデータが生まれ続け、また取り込まれ続けています。しかし開発者は未だにトラディショナルなデータベースの中でグラフのような複雑な関係性を扱うことを強要されています。必然的に、そのような関係性-リレーションシップが追加されるにつれ、パフォーマンスは劣化し、いらいらするくらい高コストで複雑なクエリとなっていきます。我々はそのようなモダンで複雑性が日々高まるようなデータセットやリレーションシップ、パターンを簡単に扱えるようにしたいと考えました。

Hello, Amazon Neptune

2017年11月29日、我々は限定プレビュー版のAmazon Neptuneを発表します。Amazon Neptuneにより、高度に接続されたデータセット間のリレーションシップから簡単に洞察を得ることができます。Neptuneのコア部分は、数十億ものリレーションシップが格納可能で、グラフに対してミリ秒レベルの遅延となるよう最適化された、専用の、高性能なグラフデータベースエンジンです。フルマネージドなデータベースとして提供されることで、Neptuneはお客様をメンテナンスやパッチ適用、バックアップ/リストアなどの退屈なオペレーションから解放し、アプリケーションに集中できるようにします。高速なフェイルオーバー、Point-in-Timeリカバリ、そしてマルチAZでの実装など高可用性のための各種機能も備えるサービスです。最大15個のリードレプリカによりクエリのスループットを秒間10万件レベルまでスケールさせることも可能です。Amazon NeptuneはAmazon VPC内で動作し、データを暗号化して保管でき、保管時や転送時にデータの整合性について完全に制御することができます。

(more…)

In The Works – Amazon Aurora Serverless

既にご存知の通り、Amazon AuroraはMySQL互換とPostgreSQL互換があり、マネージドサービスで自動的に64TBまでスケールアップするデータベースストレージ’を備えています。Auroraデータベースインスタンスを作成する際に必要なインスタンスサイズの選択や、リードスループットを向上させるためにリードレプリカを作成するかどうかのオプションを選択します。処理の需要やクエリ数の増減に応じて、インスタンスサイズやリードレプリカの数を必要に応じて変更可能です。このモデルはリクエスト数や負荷などのワークロードが予測し易い場合はうまくいきます。

しかし、場合によっては1日や1週間の間に数時間、もしくは数分間だけリクエストがスパイクするようなワークロードの割り込みがあったり予測が難しいケースがあります。セールや1回だけもしくは不定期イベント、オンラインゲームや日時・週次のレポーティング、dev/test、新規アプリケーションなどが当てはまります。適切なキャパシティに調整し続けるためには多くの作業が必要です、そのため安定している状態を基準として支払いを行うほうが懸命です。

(more…)

Scaling Your Amazon RDS Instance Vertically and Horizontally

Marie Yap はアマゾン ウェブ サービスのソリューションアーキテクトです。

Amazon RDSは、マネージド型サービスとして、リレーショナルデータベースのスケーリングを処理し、データベースがアプリケーションやアプリケーションの増加する要求に対応できるようにします。

このブログ記事では、RDS インスタンスを縦横に拡大縮小する方法について説明します。ほぼ同じ数の読み取りと書き込みを使用するアプリケーションの増加する要求に対応するために、垂直方向に拡大縮小することができます。また、読み取りが重いアプリケーションの場合は、水平方向に拡大縮小することもできます。

垂直スケーリング
データベースの高い負荷を処理するために、ボタンを押すだけでマスターデータベースを垂直方向にスケールアップできます。現在、RDS MySQL、PostgreSQL、MariaDB、Oracle、または Microsoft SQL Server インスタンスのサイズを変更する際に、18 種類以上のインスタンスサイズを選択できます。Amazon Aurora では、5 つのメモリ最適化インスタンスサイズを選択できます。インスタンスタイプを幅広く選択することで、データベースサーバーに最適なリソースとコストを選択できます。

以下は、RDS インスタンスをスケールアップする際の考慮事項です。

  • スケールを変更する前に、商用エンジン (SQL Server、Oracle) の正しいライセンスを取得していることを確認してください (特に、ライセンス持ち込み (BYOL) が必要な場合)。重要なことは、商用エンジンの場合はライセンスによって制限されていることです。ライセンスは、通常 CPU ソケットまたはコアに関連付けられています。
  • 変更をいつ適用するかを決めます。変更をすぐに適用するか、インスタンスで指定されたメンテナンス期間中に変更を適用するかを選択できます。
  • ストレージとインスタンスのタイプは切り離されています。データベースインスタンスを上下にスケールしたときは、ストレージサイズは同じままで、変更の影響を受けません。DB インスタンスを個別に変更して、割り当てられたストレージスペースを増やすか、ストレージタイプを変更して (一般目的 SSD からプロビジョニング IOPS SSD などに) パフォーマンスを向上させることができます。
  • スタンバイデータベースが最初にアップグレードされた後で、新しくサイズの変更されたデータベースでフェイルオーバーが発生するため、Multi-AZ 環境でスケールアップする場合のダウンタイムは最小限に抑えられます。シングル AZ インスタンスは、スケール操作中は使用できません。

インスタンスのタイプを変更するには、RDS コンソールの [インスタンスの操作] メニューから [変更] を選択します。

次に、新しい DB インスタンスクラスを選択します。

最後に、変更をすぐに適用するかどうかを決定します。変更をすぐに適用するには、[変更] ページの一番下にある [すぐに適用] チェックボックスを選択します。変更をすぐに適用しないと、定義した優先メンテナンスウィンドウ中に変更がスケジュールされます。

水平スケーリング
マスターデータベースを垂直方向に拡張するだけでなく、読み取りレプリカを使用してデータベースを水平方向に拡大することによって、読み取りが重いデータベースのパフォーマンスを向上させることもできます。RDS MySQL、PostgreSQL、MariaDB には最大 5 つのリードレプリカがあり、Amazon Aurora には最大 15 のリードレプリカがあります。

リードレプリカを使用すると、マスターデータベースと同期する読み取り専用コピーを作成できます。より良いパフォーマンスを得るために、リードレプリカをユーザーにより近い別の AWS Region に配置することもできます。また、リードレプリカをマスターに昇格させることで、リードレプリカを使用してデータベースの可用性を高め、災害時の迅速なリカバリーを行うことができます。ただし、リードレプリカは、Multi-AZ が提供する高可用性と自動フェイルオーバー機能の代替となるものではありません。

現在、RDS リードレプリカは、クエリまたは接続の透過的なロードバランシングをサポートしています。各レプリカには固有のドメインネームサービス (DNS) エンドポイントがあり、アプリケーションはレプリカエンドポイントに接続してロードバランシングを実装できます。では、アプリケーションで RDS リードレプリカを認識させる方法のオプションを見てみましょう。

アプリケーションがネイティブの MySQL ドライバを使用している場合は、アプリケーションに大きな変更を加えることなく、読み取り/書き込み分割と読み取り専用のエンドポイントロードバランシングを実行できる MySQL Connector があります。たとえば、PHP アプリケーションをお持ちの場合は、MySQL ネイティブドライバの PHP Mysqlnd レプリケーションとロードバランシングプラグインを使用できます。

MySQL Connector を使用することに加えて、アプリケーションとデータベースサーバの間にロードバランサを追加することができます。アプリケーションに単一のデータベースエンドポイントが表示されるように、この追加を行います。このアプローチでは、アプリケーションのデータベース接続文字列を絶えず更新することなく、ロードバランサの背後にあるリードレプリカを透過的に追加または削除できるより動的な環境が可能になります。また、スクリプトを使用してカスタムヘルスチェックを実行することもできます。

図に示すように、トランスポートまたはレイヤ 4 ロードバランサを MySQL Connector とともに使用できます。現在、Elastic Load Balancing (ELB) ロードバランサは、RDS インスタンスへのトラフィックのルーティングをサポートしていません。したがって、多くの人が使用するオープンソースのソフトウェアベースのロードバランサである HAProxy などのオプションを検討することもできます。このソリューションでは、1 つのポートで読み込みクエリを受信し、もう 1 つのポートで書き込みクエリを受信するように HAProxy を構成できます。

もう 1 つの選択肢は、レイヤ 7 の SQL 対応ロードバランサを使用することです。複雑なルールを使用してデータベースにクエリを転送することができます。このタイプのロードバランサは、マルチステートメントで読み書きスプリットを適切に実行する方法を理解する、MySQL Connector よりも洗練された機能を備えています。このソリューションは、分散データベース環境でスケーリングの問題を処理するため、アプリケーション層のスケーリングを処理する必要がないため、アプリケーション自体にはほとんど変更が加えられません。これを実現するには、MaxScale、ProxySQL、MySQL Proxy などのオープンソースソリューションと商用ソリューションがあります。そのうちのいくつかは AWS Marketplace にあります。

まとめ
要約すると、アプリケーションの増加するニーズに対応するために、RDS 構成を拡張または縮小することができます。RDS はデータベースのスケーリングに大いに役立つため、お客様はアプリケーションやアプリケーションにより集中できるようになります。

 

Amazon DynamoDB からのデータストリームを AWS Lambda と Amazon Kinesis Firehose を活用して Amazon Aurora に格納する

Aravind Kodandaramaiah は AWS パートナープログラムのパートナーソリューションアーキテクトです。

はじめに

AWS ワークロードを実行するお客様は Amazon DynamoDBAmazon Aurora の両方を使用していることがよくあります。Amazon DynamoDB は、どのような規模でも、一貫した、数ミリ秒台にレイテンシーを抑える必要のあるアプリケーションに適した、高速で柔軟性の高い NoSQL データベースサービスです。データモデルの柔軟性が高く、パフォーマンスが信頼できるため、モバイル、ウェブ、ゲーム、広告、IoT、他の多くのアプリケーションに最適です。

Amazon Aurora は、MySQL と互換性のあるリレーショナルデータベースエンジンで、オープンソースデータベースのコスト効率性と簡素性を備えた、高性能の商用データベースの可用性とスピードをあわせもったエンジンです。Amazon Aurora は、MySQL よりも最大 5 倍のパフォーマンスを発揮するだけでなく、商用データベースのセキュリティ、可用性、および信頼性を 10 分の 1 のコストで実現します。

DynamoDB と Aurora を連携させるために、カスタムウェブ解析エンジンを構築して、毎秒数百万のウェブクリックが DynamoDB に登録されるようにしたとします。Amazon DynamoDB はこの規模で動作し、データを高速に取り込むことができます。また、このクリックストリームデータを Amazon Aurora などのリレーショナルデータベース管理システム (RDBMS) にレプリケートする必要があるとします。さらに、ストアドプロシージャまたは関数内で SQL の機能を使用して、このデータに対してスライスアンドダイスや、さまざまな方法でのプロジェクションを行ったり、他のトランザクション目的で使用したりするとします。

DynamoDB から Aurora に効率的にデータをレプリケートするには、信頼性の高いスケーラブルなデータレプリケーション (ETL) プロセスを構築する必要があります。この記事では、AWS Lambda Amazon Kinesis Firehose によるサーバーレスアーキテクチャを使用して、このようなプロセスを構築する方法について説明します。

ソリューションの概要

以下の図に示しているのは、このソリューションのアーキテクチャです。このアーキテクチャの背後には、次のような動機があります。

  1. サーバーレス – インフラストラクチャ管理を AWS にオフロードすることで、メンテナンスゼロのインフラストラクチャを実現します。ソリューションのセキュリティ管理を簡素化します。これは、キーやパスワードを使用する必要がないためです。また、コストを最適化します。さらに、DynamoDB Streams のシャードイテレーターに基づいた Lambda 関数の同時実行により、スケーリングを自動化します。
  2. エラー発生時に再試行可能 – データ移動プロセスには高い信頼性が必要であるため、プロセスは各ステップでエラーを処理し、エラーが発生したステップを再試行できる必要があります。このアーキテクチャではそれが可能です。
  3. 同時データベース接続の最適化 – 間隔またはバッファサイズに基づいてレコードをバッファすることで、Amazon Aurora への同時接続の数を減らすことができます。この方法は、接続タイムアウトを回避するのに役立ちます。
  4. 懸念部分の分離 – AWS Lambda を使用すると、データレプリケーションプロセスの各懸念部分を分離できます。たとえば、抽出フェーズを DynamoDB ストリームの処理として、変換フェーズを Firehose-Lambda 変換として、ロードフェーズを Aurora への一括挿入として分離できます。

以下に示しているのは、このソリューションのしくみです。

  1. DynamoDB Streams がデータソースです。DynamoDB Streams を使用すると、DynamoDB テーブル内の項目が変更されたときに、その変更を取得できます。AWS Lambda は、新しいストリームレコードを検出すると、Lambda 関数を同期的に呼び出します。
  2. Lambda 関数は、DynamoDB テーブルに新たに追加された項目をバッファし、これらの項目のバッチを Amazon Kinesis Firehose に送ります。
  3. Firehose は、受け取ったデータを Lambda 関数により変換して、Amazon S3 に配信します。Firehose に対してデータ変換を有効にしていると、Firehose は受け取ったデータをバッファし、バッファしたデータのバッチごとに、指定された Lambda 関数を非同期的に呼び出します。変換されたデータは Lambda から Firehose に返されてバッファされます。
  4. Firehose は変換されたすべてのレコードを S3 バケットに配信します。
  5. Firehose は変換されないすべてのレコードも S3 バケットに配信します。ステップ 4 と 5 は同時に実行されます。Amazon SNS トピックをこの S3 バケットに登録して、以降の通知、修復、再処理に使用できます (通知に関する詳細はこのブログ記事では取り上げません)。
  6. Firehose が変換されたデータを S3 に正常に配信するたびに、S3 はイベントを発行することで Lambda 関数を呼び出します。この Lambda 関数は VPC 内で実行されます。
  7. Lambda 関数は Aurora データベースに接続し、SQL 式を実行して、S3 から直接テキストファイルにデータをインポートします。
  8. Aurora (VPC プライベートサブネット内で実行) は、S3 VPC エンドポイントを使用して S3 からデータをインポートします。

ソリューションの実装とデプロイ
次に、このソリューションを機能させるために必要な手順について説明します。以下の手順では、AWS CloudFormation スタックを起動して一連の AWS CLI コマンドを実行することで、VPC 環境を作成する必要があります。

AWS サービスを使用してこれらの手順を実行している間、AWS サービスの料金が適用されることがあります。

ステップ 1: ソリューションのソースコードをダウンロードする
このブログ記事で概説したソリューションでは、多くの Lambda 関数を使用し、また、多くの AWS Identity and Access Management (IAM) ポリシーおよびロールを作成します。このソリューションのソースコードは以下の場所からダウンロードします。

git clone https://github.com/awslabs/dynamoDB-data-replication-to-aurora.git

このリポジトリには、以下のフォルダ構造があります。このブログ記事の後続の手順を実行するために、lambda_iam フォルダに移動します。

ステップ 2: Firehose 配信用の S3 バケットを作成する
Amazon Kinesis Firehose を使用すると、Amazon S3 にリアルタイムのストリーミングデータを配信できます。そのためには、まず S3 バケットを作成します。次に、レコードの処理に失敗した場合に備えて、変換された最終のレコードとデータバックアップを保存するフォルダを作成します。

aws s3api create-bucket --bucket bucket_name

aws s3api put-object \
--bucket bucket_name \
--key processed/

aws s3api put-object \
--bucket bucket_name
--key tranformation_failed_data_backup/

 

ステップ 3: IAM ポリシー、S3 イベント通知、Firehose-S3 配信設定ファイルを変更する
次に、以下のファイルで、プレースホルダー AWS_REGION、

AWS_ACCOUNT_NUMBER、BUCKET_NAME をそれぞれ、お客様の AWS リージョン ID、AWS アカウント番号、ステップ 2 で作成した S3 バケットの名前に置き換えます。

 

·         aurora-s3-access-Policy.json

·         DynamoDb-Stream-lambda-AccessPolicy.json

·         firehose_delivery_AccessPolicy.json

·         lambda-s3-notification-config.json

·         s3-destination-configuration.json

·         firehose_delivery_trust_policy.json

ステップ 4: CloudFormation を使用して Aurora クラスターを設定する
次に、[Launch Stack] ボタンをクリックして AWS CloudFormation スタックを起動します。CloudFormation テンプレートは、VPC を作成し、その VPC のパブリックおよびプライベートサブネットを設定します。また、このテンプレートは、プライベートサブネット内で Amazon Aurora データベースクラスターを起動し、パブリックサブネット内でパブリック IP を割り当てた踏み台ホストも起動します。


ステップ 5: Aurora DB クラスターを設定する
CloudFormation スタックが完成したら、S3 バケット内のテキストファイルから DB クラスターにデータをロードするように、Aurora クラスターを変更する必要があります。以下に示しているのは、そのための手順です。

Amazon Aurora が Amazon S3 にアクセスできるようにします。そのためには、IAM ロールを作成し、先ほど作成した信頼およびアクセスポリシーをそのロールにアタッチします。

auroraS3Arn=$(aws iam create-role \
    --role-name aurora_s3_access_role \
    --assume-role-policy-document file://aurora-s3-Trust-Policy.json \
    --query 'Role.Arn' \
    --output text)

aws iam put-role-policy \
    --role-name aurora_s3_access_role \
    --policy-name aurora-s3-access-Policy \
    --policy-document file://aurora-s3-access-Policy.json

 

その IAM ロールを Aurora DB クラスターに関連付けます。そのためには、新しい DB クラスターパラメータグループを作成し、その DB クラスターに関連付けます。

aws rds add-role-to-db-cluster \
    --db-cluster-identifier Output AuroraClusterId from CloudFormation Stack \
    --role-arn $auroraS3Arn

aws rds create-db-cluster-parameter-group \
    --db-cluster-parameter-group-name webAnayticsclusterParamGroup \
    --db-parameter-group-family aurora5.6 \
    --description 'Aurora cluster parameter group - Allow access to Amazon S3'

aws rds modify-db-cluster-parameter-group \
    --db-cluster-parameter-group-name webAnayticsclusterParamGroup \
    --parameters "ParameterName=aws_default_s3_role,ParameterValue= $auroraS3Arn,ApplyMethod=pending-reboot"

aws rds modify-db-cluster \
	--db-cluster-identifier Output AuroraClusterId from CloudFormation Stack \
	--db-cluster-parameter-group-name webAnayticsclusterParamGroup

 

プライマリ DB インスタンスを再起動します。

aws rds reboot-db-instance \
--db-instance-identifier Output PrimaryInstanceId from CloudFormationF Stack 

ステップ 6: DynamoDB ストリームと、そのストリームを処理する Lambda 関数を設定する

1.    ストリームを有効にして新しい DynamoDB テーブルを作成します。以降の手順では、AWS Lambda 関数をストリームに関連付けることで、トリガーを作成します。

aws dynamodb create-table \
    --table-name web_analytics \
    --attribute-definitions AttributeName=page_id,AttributeType=S AttributeName=activity_dt,AttributeType=S \
    --key-schema AttributeName=page_id,KeyType=HASH AttributeName=activity_dt,KeyType=RANGE \
    --provisioned-throughput ReadCapacityUnits=50,WriteCapacityUnits=50 \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE

2.    Lambda 実行ロールを作成します。

DdbStreamLambdaRole=$(aws iam create-role \
    --role-name DdbStreamLambdaRole \
    --assume-role-policy-document file://DynamoDB-Stream-lambda-Trust-Policy.json \
    --query 'Role.Arn' \
    --output text)


aws iam put-role-policy \
    --role-name DdbStreamLambdaRole \
    --policy-name DdbStreamProcessingAccessPolicy \
    --policy-document file://DynamoDb-Stream-lambda-AccessPolicy.json

3.  DynamoDB ストリームを処理する Lambda 関数を作成します。

aws lambda create-function \
    --function-name WebAnalyticsDdbStreamFunction \
    --zip-file fileb://ddbStreamProcessor.zip \
    --role $DdbStreamLambdaRole \
    --handler ddbStreamProcessor.handler \
    --timeout 300 \
    --runtime nodejs4.3

 

4.  その Lambda 関数を DynamoDB ストリームに関連付けることで、トリガーを作成します。

tableStreamArn=$(aws dynamodb describe-table --table-name web_analytics --query 'Table.LatestStreamArn' --output text)

aws lambda create-event-source-mapping \
    --function-name WebAnalyticsDdbStreamFunction \
    --event-source-arn $tableStreamArn \
    --batch-size 100 \
    --starting-position LATEST

ステップ 7: Firehose のデータ変換 Lambda 関数を作成して設定する

Lambda 実行ロールを作成します。

transRole=$(aws iam create-role \
            --role-name firehose_delivery_lambda_transformation_role \
            --assume-role-policy-document  file://firehose_lambda_transformation_trust_policy.json \
            --query 'Role.Arn' --output text)

aws iam put-role-policy \
        --role-name firehose_delivery_lambda_transformation_role \
        --policy-name firehose_lambda_transformation_AccessPolicy \
        --policy-document file://firehose_lambda_transformation_AccessPolicy.json

2.  データ変換 Lambda 関数を作成します。

aws lambda create-function \
    --function-name firehoseDeliveryTransformationFunction \
    --zip-file fileb://firehose_delivery_transformation.zip \
    --role $transRole \
    --handler firehose_delivery_transformation.handler \
    --timeout 300 \
    --runtime nodejs4.3 \
    --memory-size 1024

この関数は、受け取ったストリームのレコードを JSON スキーマに対して検証します。スキーマに一致したら、受け取った JSON レコードを解析し、カンマ区切り値 (CSV) 形式に変換します。

'use strict';

var jsonSchema = require('jsonschema');

var schema = { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object", "properties": { "Hits": { "type": "integer" }, "device": { "type": "object", "properties": { "make": { "type": "string" }, "platform": { "type": "object", "properties": { "name": { "type": "string" }, "version": { "type": "string" } }, "required": ["name", "version"] }, "location": { "type": "object", "properties": { "latitude": { "type": "string" }, "longitude": { "type": "string" }, "country": { "type": "string" } }, "required": ["latitude", "longitude", "country"] } }, "required": ["make", "platform", "location"] }, "session": { "type": "object", "properties": { "session_id": { "type": "string" }, "start_timestamp": { "type": "string" }, "stop_timestamp": { "type": "string" } }, "required": ["session_id", "start_timestamp", "stop_timestamp"] } }, "required": ["Hits", "device", "session"] };

exports.handler = (event, context, callback) => {
    let success = 0; // Number of valid entries found
    let failure = 0; // Number of invalid entries found

    const output = event.records.map((record) => {

        const entry = (new Buffer(record.data, 'base64')).toString('utf8');

        var rec = JSON.parse(entry);
        console.log('Decoded payload:', entry);
        var milliseconds = new Date().getTime();

        var payl = JSON.parse(rec.payload.S);

        var jsonValidator = new jsonSchema.Validator();
        var validationResult = jsonValidator.validate(payl, schema);
        console.log('Json Schema Validation result = ' + validationResult.errors);

        if (validationResult.errors.length === 0) {

            const result = `${milliseconds},${rec.page_id.S},${payl.Hits},
				${payl.session.start_timestamp},
				${payl.session.stop_timestamp},${payl.device.location.country}` + "\n";

            const payload = (new Buffer(result, 'utf8')).toString('base64');
            console.log(payload);
            success++;

            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
        }
        else {
            failure++;
            return {
                recordId: record.recordId,
                result: 'ProcessingFailed',
                data: record.data,
            }
        }

    });

    console.log(`Processing completed.  Successful records ${success}, Failed records ${failure}.`);
    callback(null, { records: output });

};

ステップ 8: Firehose 配信ストリームを作成し、S3 にデータを配信するように設定する

Amazon S3 ターゲットを使用するとき、Firehose は S3 バケットにデータを配信します。配信ストリームを作成するには、IAM ロールが必要です。Firehose はその IAM ロールを引き受けることで、指定したバケットとキーにアクセスする権限を取得します。Firehose はまた、その IAM ロールを使用することで、Amazon CloudWatch ロググループにアクセスし、データ変換 Lambda 関数を呼び出す権限を取得します。

1.    S3 バケット、キー、CloudWatch ロググループ、データ変換 Lambda 関数にアクセスする権限を付与する IAM ロールを作成します。

aws iam create-role \
    --role-name firehose_delivery_role \
    --assume-role-policy-document  file://firehose_delivery_trust_policy.json

aws iam put-role-policy \
    --role-name firehose_delivery_role \
    --policy-name firehose_delivery_AccessPolicy \
    --policy-document file://firehose_delivery_AccessPolicy.json

 

2.  S3 ターゲット設定を指定して、Firehose 配信ストリームを作成します。

aws firehose create-delivery-stream \
    --delivery-stream-name webAnalytics \
    --extended-s3-destination-configuration='CONTENTS OF s3-destination-configuration.json file' 

3.  AWS マネジメントコンソールにサインインし、Firehose コンソールに移動します。webAnalytics という名前の配信ストリームを選択します。[Details] タブで、[Edit] を選択します。[Data transformation][Enabled] を、[IAM role] [firehose_delivery_role] を選択します。[Lambda function] で、[firehoseDeliveryTransformationFunction] を選択します。次に、[Save] を選択してこの設定を保存します。

ステップ 8: Lambda 関数を作成して、VPC リソースにアクセスするように設定する
S3 バケットから Amazon Aurora にデータをインポートするには、VPC 内のリソースにアクセスするように Lambda 関数を設定します。

1.    Lambda 関数の IAM 実行ロールを作成します。

auroraLambdaRole=$(aws iam create-role \
                --role-name lambda_aurora_role \
                --assume-role-policy-document file://lambda-aurora-Trust-Policy.json \
                --query 'Role.Arn' --output text)

aws iam put-role-policy \
    --role-name lambda_aurora_role \
    --policy-name lambda-aurora-AccessPolicy \
    --policy-document file://lambda-aurora-AccessPolicy.json

2.  プライベートサブネットやセキュリティグループなどの VPC 設定を指定する Lambda 関数を作成します。CLI の実行中に渡される環境変数 AuroraEndpoint、dbUser (データベースユーザー)、dbPassword (データベースパスワード) に正しい値を設定していることを確認します。これらの値については、CloudFormation スタック出力を参照してください。

aws lambda create-function \
    --function-name AuroraDataManagementFunction \
    --zip-file fileb://AuroraDataMgr.zip \
    --role $auroraLambdaRole \
    --handler dbMgr.handler \
    --timeout 300 \
    --runtime python2.7 \
    --vpc-config SubnetIds='Output PrivateSubnets from CloudFormation stack',SecurityGroupIds='Output DefaultSecurityGroupId from CloudFormation stack' \
    --memory-size 1024 \    
    --environment='    
                    {
                        "Variables": {
                            "AuroraEndpoint": "Output AuroraEndpoint from CloudFormation stack",
                            "dbUser": "Database User Name",
                            "dbPassword": "Database Password"
                        }
                    }'

 

Lambda 関数は Aurora データベースに接続します。この関数は、LOAD DATA FROM S3 SQL コマンドを実行して、S3 バケット内のテキストファイルから Aurora DB クラスターにデータをロードします。

import logging import pymysql import boto3 import sys import os
# rds settings
db_name = "Demo"
dbUser = os.environ['dbUser']
dbPassword = os.environ['dbPassword']
AuroraEndpoint = os.environ['AuroraEndpoint']

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client('s3')
try:
    conn = pymysql.connect(AuroraEndpoint, user=dbUser, passwd=dbPassword, db=db_name, connect_timeout=5)
except:
    logger.error("ERROR: Unexpected error: Could not connect to Aurora instance.")
    sys.exit()

logger.info("SUCCESS: Connection to RDS Aurora instance succeeded")

def handler(event, context):

    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        s3location = 's3://' + bucket + '/' + key
        logger.info(s3location)

        sql = "LOAD DATA FROM S3 '" + s3location + "' INTO TABLE Demo.WebAnalytics FIELDS TERMINATED BY ',' " \
              "LINES TERMINATED BY '\\n' (timestarted, page_id, hits, start_time, end_time, country_code);"

        logger.info(sql)

        with conn.cursor() as cur:
            cur.execute(sql)
            conn.commit()
            logger.info('Data loaded from S3 into Aurora')

ステップ 9: S3 イベント通知を設定する
最後に、S3 がイベントを発行することで、前のステップで作成した Lambda 関数を呼び出すように、その関数を設定します。このプロセスの最初の手順は、Lambda 関数を呼び出すアクセス権限を S3 に付与することです。

1.    Lambda 関数を呼び出すアクセス権限を S3 に付与します。

aws lambda add-permission \
    --function-name AuroraDataManagementFunction \
    --action “lambda:InvokeFunction” \
    --statement-id Demo1234 \
    --principal s3.amazonaws.com \
    --source-arn ‘ARN of S3 Bucket created in Step 2’ \
    --source-account ‘AWS Account Id

2.    S3 バケット通知を設定します。

aws s3api put-bucket-notification-configuration \
    --bucket 'Name of S3 Bucket created in step 2' \
    --notification-configuration=' CONTENTS OF lambda-s3-notification-config.json ' 

 

ステップ 10: ソリューションをテストする
最後のステップは、ソリューションをテストすることです。

  1. このブログ記事のソースコードの TestHarness フォルダには、テストハーネスがあります。このテストハーネスは DynamoDB テーブルにデータを読み込みます。まず、TestHarness フォルダに移動し、コマンドノード loadDataToDDb.js を実行します。
  2. Secure Shell (SSH) を使用して、踏み台ホストに接続します。SSH を使用した接続の詳細については、EC2 のドキュメントを参照してください。
  3. 踏み台ホストのブートストラッププロセス中に MySQL クライアントがインストールされたため、以下のコマンドを使用して Aurora データベースに接続できます。パラメータ値を適切に変更していることを確認します。
/usr/bin/mysql -h DatabaseEndpoint -u DatabaseUsername --password=DatabasePassword 

4.    コネクションが成功したら、以下のコマンドを実行します。

 mysql> select count(*) from Demo.WebAnalytics;

このコマンドを実行した後、テーブルにレコードが読み込まれています。

テーブルにレコードが読み込まれていない場合、Firehose はそれらのレコードを S3 に送信する前にバッファしている可能性があります。これを回避するには、1 分くらい後に同じ SQL コードを再試行してください。この間隔は、現在設定されている S3 バッファ間隔の値に基づきます。このコードを再試行した後、Amazon Aurora にレコードが挿入されています。

 

まとめ

DynamoDB Streams と、Amazon Kinesis Firehose のデータ変換関数を使用すると、DynamoDB から Amazon Aurora などのデータソースにデータをレプリケートする強力でスケーラブルな方法を得られます。このブログ記事では、DynamoDB から Aurora へのデータのレプリケートを取り上げましたが、同じ一般的なアーキテクチャパターンを使用すれば、他のストリーミングデータを変換して、Amazon Aurora に取り込むことができます。

さらに、以下の関連するブログ記事を参照してください。

 

OracleDBからPostgreSQLへの移行

 

Knievel Co は、アマゾン ウェブ サービスのデータベースエンジニアです。

このブログ記事では、Oracle データベースを PostgreSQL に移行する方法の概要について説明します。データベース移行の2つの主要部分は、スキーマの変換とデータの複製です。)AWS スキーマ変換ツール (AWS SCT) と AWS Database Migration Service (AWS DMS) を使用して、これら 2 つの部分に取り組む方法について説明します。

SCT と DMSについて説明する前に、予備的な手順を実行する必要があります。これらは、すべての移行に役立つことが判明しています。移行を容易にする方法の 1 つは、移行の前に、通常更新フェーズと呼ばれるものを行うことです。このフェーズでは、Oracle データベース内のオブジェクトのインベントリを作成し、いくつかの決定を下します。

最初に、不要になったオブジェクトを削除します。オブジェクトの移行にかかる時間は誰も気にかけませんが、無駄にしないでください。また、不要になった履歴データを削除することもできます。一時的なテーブルや過去のメンテナンス時のテーブルのバックアップコピーなど、不要なデータを複製するために時間を無駄にすることはありません。次に、LOB、CLOB、LONG などに保存されているフラットファイルおよび長い文字列を Amazon S3 または Amazon Dynamo DB に移動します。このプロセスではクライアントソフトウェアの変更が必要となりますが、データベースが簡素化されサイズが削減されることで、システム全体がより効率的になります。最後に、PL/SQL パッケージとプロシージャを移動します。特にビジネスロジックを含むものをクライアントソフトウェアに戻してみます。これらのオブジェクトは、SCT が変換できない場合は手動で変更する必要があります。

次の手順は、異なるデータベースエンジン (この場合は Oracle から PostgreSQL へ) に移行するための手順です。別のプラットフォームに移動していない場合は、データベースを移動するためのより適切なネイティブツールやその他のテクニックがあります。

  1. ターゲットデータベースでスキーマを作成します。
  2. ターゲットデータベースの外部キーとセカンダリインデックスを削除し、トリガーを無効にします。
  3. データを複製するためのDMSタスクを設定します (全ロードと変更データキャプチャ (CDC))。
  4. 全ロードフェーズが完了したらタスクを停止し、外部キーとセカンダリインデックスを再作成します。
  5. DMS タスクを有効にします。
  6. ツールとソフトウェアを移行し、トリガーを有効にします。

ターゲットデータベースでスキーマを作成します。

移行するスキーマを確認して、移行を開始します。このケースでは、AWS スキーマ変換ツール (AWS SCT) を使用して分析を実行します。アプリケーションを起動するときは、ソースが Oracle でターゲットが PostgreSQL となる新しいプロジェクトを作成する必要があります。再接続したら、左側で移行するスキーマの名前を選択します。スキーマ名を右クリックし、[スキーマの変換] を選択します。次に、[View / Assessment Report View] を選択します。

AWS SCT 評価レポートは、Oracle データベースを PostgreSQL に変換するために必要な作業の高レベルな概要を示します。以下に示しているのは、評価レポートの具体的な例です。

 

このレポートは、各オブジェクトタイプごとに手動で変換するために必要な手作業を示しています。一般的に、パッケージ、プロシージャ、関数には解決すべきいくつかの問題があります。AWS SCT では、これらのオブジェクトを修正する理由を説明し、その方法のヒントを示します。

スキーマが自動的に変換されない場合の、問題を解決するためのヒントを次に示します。

  • ソースの Oracle データベースのオブジェクトを変更して、AWS SCT がそれらをターゲットの PostgreSQL に変換できるようにします。
  • スキーマをそのまま変換し、AWT SCT によって生成されたスクリプトを手動で変更してから、それらをターゲットの PostgreSQL データベースに適用してみてください。
  • 変換できないオブジェクトを無視して、機能を別の AWS サービスまたは同等のものに置き換えます。

スキーマの変換機能を改善すると、反復プロセスを経てレポートとスキーマを再生成できます。[Action Items] ビューには、変換プロセスを実行する際に生じる問題のリストが表示されます。変換されたスキーマの結果が満足のいくものであれば、それらをターゲットの PostgreSQL データベースに適用することができます。

ターゲットデータベースのスキーマを参照し、列のデータ型、オブジェクト名などを簡単ににチェックすることをお勧めします。ソースおよびターゲットのデータ型の詳細については、「AWS Database Migration Service リファレンス」を参照してください。 また、Oracle から PostgreSQL への変換であるため、オブジェクト名が Oracle では大文字で、PostgreSQL では小文字であることが問題となります。

外部キー制約とセカンダリインデックスを削除する。トリガーを無効にする

ターゲット上に必要なスキーマを揃えるするには、ソースから実際のデータを移行するためにスキーマを準備する必要があります。ここでは、AWS Database Migration Service (AWS DMS) を使用します。DMSには、全ロードと変更データキャプチャ (CDC) の 2 つのフェーズがあります。全ロードフェーズでは、テーブルは順不同でロードされます。したがって、ターゲットで制約を有効にすると、いくつかの外部キーで制約違反が発生します。また、全ロード時には表のレプリケーションが遅くなる可能性があるため、セカンダリインデックスを無効にする必要があります。これは、レコードがロードされるときにインデックスを維持する必要があるためです。

ターゲットの PostgreSQL データベースで、クエリを実行してデータベーステーブルの外部キー制約に DDL を生成し、出力を保存します。これを行うための多くのサンプルクエリをオンラインで見つけることができます。次のような情報が表示されます。これを実行すると、後で外部キー制約を再作成するための DDL が提供されます。

ALTER TABLE <テーブル名> ADD CONSTRAINT <制約名> FOREIGN KEY(キー列)REFERENCES <親テーブル名>(キー列)MATCH FULL;

同様に、DDL 生成クエリを実行して、ターゲットデータベース上のすべての外部キー制約を削除します。

ALTER TABLE <テーブル名> DROP CONSTRAINT <制約名>;

ここで、セカンダリインデックスについても同じことを行います。つまり、コマンドを作成し結果を生成してから、セカンダリインデックスを削除します。

次に、トリガーを無効にします。

ALTER TABLE <テーブル名> DISABLE TRIGGER ALL;

ID 列でシーケンスを使用する場合は、ターゲットでシーケンスを作成するときに、次の値をソースデータベースよりも高く設定することをお勧めします。十分なギャップを残して、移行カットオーバーの日付で値がソースのデータベースの値よりも高くなっていることを確認してください。このアプローチによって、移行後のシーケンス ID の競合が回避されます。

DMS タスクを設定してデータをレプリケートする

ターゲットの PostgreSQL データベースでスキーマの準備ができました。これでデータをレプリケートする準備が整いました。これは DMS が入る場所となります。DMS の素晴らしい点は、各テーブルのデータをレプリケートするのではなく、移行する準備ができるまで CDC モードでデータを最新の状態で保持することです。

ソース Oracle データベースの準備:

  • Oracle ログインに必要な権限を確認します。
  • DMS が Oracle ソースデータベースから変更を取得するために必要なサプリメンタルロギングを設定します。
  • ソース Oracle データベースの準備の詳細については、DMS ドキュメントを参照してください。

AWS コンソールで DMS を起動します。最初に、レプリケーションインスタンスを作成する必要があります。レプリケーションインスタンスが DMS タスクを実行します。このインスタンスは、ソース Oracle データベースとターゲット PostgreSQL データベースの両方に接続する中間サーバーです。適切なサイズのサーバーを選択します。特に、複数のタスクの作成、多数のテーブルの移行、またはその両方を行う場合は、適切なサーバーを選択します。

次に、ソースデータベースのエンドポイントとターゲットデータベースのエンドポイントを作成します。Oracle データベースと PostgreSQL データベースの適切な接続情報をすべて入力します。各エンドポイントの作成が完了する前に、接続テストが成功した後で必ずスキーマの更新オプションを選択し、テストを実行 してください。

これで、タスクを作成する準備ができました。タスク名を入力し、作成したレプリケーションインスタンス、およびソースおよびターゲットエンドポイントを選択します。[移行タイプ] で、[既存データの移行と進行中の変更のレプリケート] を使用します。スキーマを事前に作成するためにAWS SCTを使用しているため、[ターゲットテーブル準備モード][何もしない] または [切り詰め] を選択します。

オプション [全ロードの完了後にタスクを停止する] で、[キャッシュされた変更の適用後に停止] を選択します。完全ロードが完了し、キャッシュされた変更が適用された後で、タスクを一時的に停止します。キャッシュされた変更は、テーブル全体のロードプロセスが実行されている間に発生し蓄積された変更です。これは、CDC が適用される直前のステップです。

できれば、ソース Oracle データベースを更新して、LOB を S3、DynamoDB、または別の同様のサービスに移動します。そうでない場合は、LOB を処理する方法についていくつかのオプションがあります。すべてのテーブルの LOB 全体をレプリケートする場合は、[レプリケーションに LOB 列を含める] で、[完全 LOB モード] を選択します。LOB を特定の長さまでレプリケートするのみの場合は、[制限付き LOB モード] を選択します。移行する LOB の長さは、[最大 LOB サイズ (KB)] テキストボックスで指定します。

最後に、[ログ作成の有効化] を選択して、タスクで発生したエラーや警告を確認し、問題のトラブルシューティングを行えるようにすることをお勧めします。[タスクの作成] を選択します。

次に、[テーブルマッピング] で移行するスキーマを選択し、[選択ルールの追加] を選択します。[JSON] タブを選択します。[JSON の編集を有効にする] チェックボックスを選択します。次に、次の JSON 文字列を入力し、スキーマ名 DMS_SAMPLE を使用するスキーマ名に置き換えます。

 

{

 "rules": [

  {

   "rule-type": "selection",

   "rule-id": "1",

   "rule-name": "1",

   "object-locator": {

    "schema-name": "DMS_SAMPLE",

    "table-name": "%"

   },

   "rule-action": "include"

  },

  {

   "rule-type": "transformation",

   "rule-id": "6",

   "rule-name": "6",

   "rule-action": "convert-lowercase",

   "rule-target": "schema",

   "object-locator": {

    "schema-name": "%"

   }

  },

  {

   "rule-type": "transformation",

   "rule-id": "7",

   "rule-name": "7",

   "rule-action": "convert-lowercase",

   "rule-target": "table",

   "object-locator": {

    "schema-name": "%",

    "table-name": "%"

   }

  },

  {

   "rule-type": "transformation",

   "rule-id": "8",

   "rule-name": "8",

   "rule-action": "convert-lowercase",

   "rule-target": "column",

   "object-locator": {

    "schema-name": "%",

    "table-name": "%",

    "column-name": "%"

   }

  }

 ]

}

この JSON 文字列は、PostgreSQL の場合、スキーマ名、テーブル名、列名を小文字に変換します。

タスクが作成されると、自動的に開始されます。タスクを選択して[ステータス] タブをクリックすると、DMS コンソールを使用して進行状況を監視できます。全ロードが完了しキャッシュされた変更が適用されると、タスクは自動的に停止します。

全ロードフェーズが完了したらタスクを停止し、外部キーとセカンダリインデックスを再作成します。

テーブルのロードが完了しました。今度は、ログを確認してタスクにエラーがないことを確認するのがよいでしょう。

タスクの次のフェーズは、ソースデータベースで発生した順序で変更を適用する CDC です。このアプローチは、親テーブルがターゲットデータベース上の子テーブルの前に更新されるため、外部キーを再作成できることを意味します。

必要に応じて生成されたスクリプトを調整して、以前に削除された外部キーとセカンダリインデックスを再作成します。セカンダリインデックスはタスクのこのフェーズで重要となります。このフェーズは重要です。なぜなら、where 句を使用してソースデータベースに対して行われた更新は、ターゲットデータベース上のインデックスのルックアップでもあるからです。更新に欠落しているインデックスがある場合、これらの更新は全テーブルスキャンとして実行されます。

ソースからのデータを更新できるため、移行の切り替えまでトリガーを有効にしないでください。

DMS タスクを有効にする

これで外部キーとセカンダリインデックスが戻ったので、DMS タスクを有効にすることができます。DMS コンソールに移動して、[タスク] を選択します。リストでタスクを選択し、[開始/再開] を選択します。[開始] オプションを選択し、[タスクの開始] を選択します。

ツールとソフトウェアを移行し、トリガーを有効にします。

最後に、カットオーバーポイントがここにあります。ツールとソフトウェアの接続がソースデータベースへのアクセスを停止し、DMS タスクが最後のデータ変更をターゲットデータベースにレプリケートしたら、DMS コンソールで DMS タスクを停止します。次に、ターゲットデータベースでトリガーを有効にします。

ALTER TABLE <テーブル名> ENABLE TRIGGER ALL;

最後のステップは、アプリケーションを新しいターゲット PostgreSQL データベースに再配置して再起動することです。完了です。

役立つヒント

スキーマを変換しやすくするには、AWS SCT を使用して評価レポートを取得し、アクション項目を反復処理します。ターゲット PostgreSQL スキーマの最終バージョンに到達するまで、ターゲットスキーマを複数回生成する必要があります。
新しいスキーマのクエリが新しいプラットフォームで機能するようにするには、ターゲットシステムでアプリケーションをテストします。AWS SCT は、アプリケーションクエリを PostgreSQL に変換することもできます。詳細については、AWS SCT ドキュメントを参照してください。また、ターゲットシステムの負荷テストを実装して、ターゲットサーバーのサイズとデータベース設定が正しいことを確認します。
前に概要を説明した実際の移行手順を実践し、プロセスを合理化します。上記の手順は単なる出発点に過ぎず、各データベースは一意のものです。

詳細情報

また、次を検討することをお勧めします。

AWS SCT および AWS DMS のドキュメント
DMS のステップバイステップチュートリアル
DMS のベストプラクティス
GitHub の DMS サンプルデータベース
関連するブログ記事: SQL を使用して Oracle から PostgreSQL へユーザー、ロール、および許可をマップする

Amazon ElastiCache for Redis を使ったChatアプリの開発

Sam Dengler は、アマゾン ウェブ サービスのソリューションアーキテクトです。

このブログ記事では、チャットアプリケーションに関連する概念とアーキテクチャのパターンについて説明します。また、チャットクライアントとサーバーの実装の詳細、サンプルのチャットアプリケーションを AWS アカウントに展開する方法についても説明します。

背景情報

チャットアプリケーションを構築するには、クライアントがチャットルームの他の参加者に再配信されるメッセージを送信できる通信チャネルが必要となります。この通信は、一般に publish-subscribe パターン (PubSub) を使用して実装されます。このパターンでは、メッセージが中央トピックチャネルに送信されます。関係者は、このチャンネルをサブスクライブして更新の通知を受けることができます。このパターンでは、発行者の知識なしに受信者のグループを拡大または縮小できるように、発行者と受信者を切り離しています。

PubSubは、クライアントが WebSockets を使用して通信するバックエンドサーバーに実装されます。WebSockets は、クライアントとサーバー間で双方向にストリーミングされるデータのチャネルを提供する永続的な TCP 接続です。単一サーバアーキテクチャでは、1 つの PubSub アプリケーションが発行者と受信者の状態を管理し、WebSocket を介してクライアントにメッセージを再配布することもできます。次の図は、単一サーバー PubSub アーキテクチャ上の 2 つのクライアント間でメッセージが WebSocket を通過するパスを示しています。

単一サーバーアーキテクチャは、通信フローを説明するのに役立ちます。しかし、ほとんどのソリューションビルダーはマルチサーバーアーキテクチャで設計したいと考えています。マルチサーバーアーキテクチャは、信頼性を高め、伸縮性を高め、クライアントの数が増えるにつれてアプリケーションを水平的に拡大するのに役立ちます。

マルチサーバーアーキテクチャでは、クライアントはサーバープールにトラフィックを転送するロードバランサーに対して WebSocket 接続を行います。これらのサーバーは、WebSocket 接続とそれを経由してストリーミングされるデータを管理します。WebSocket 接続が PubSub アプリケーションサーバーとの間で確立されると、その接続は永続化され、データは両方向のアプリケーションにストリームされます。ロードバランサーは、WebSocket 接続のリクエストを健全なサーバーに配信します。つまり、2 つのクライアントが異なるアプリケーションサーバーに WebSocket 接続を確立できます。

 

複数のアプリケーションがクライアントの WebSocket 接続を管理するため、アプリケーションはメッセージを再配布するためにそれらの間で通信する必要があります。この通信が必要なのは、メッセージが WebSocket を介して 1 つのアプリケーションサーバーにストリームアップされ、別のアプリケーションサーバーに接続されたクライアントにストリームダウンされる必要があるためです。クライアント接続を管理しているアプリケーションから PubSub ソリューションを外部に出すことで、アプリケーション間の共有通信の要件を満たすことができます。

 

次の図は、マルチサーバー PubSub アーキテクチャ上の 2 つのクライアント間でメッセージが WebSocket を通過するパスを示しています。永続的な接続は、各クライアントと WebSocket サーバー間のロードバランサーを通じて確立されます。また、永続的な接続は、WebSocket サーバーと PubSub サーバー間で、すべてのクライアント間で共有されるサブスクリプショントピックごとに確立されます。

 

 

カスタムの PubSub ソリューションも可能ですが、既存のソフトウェアアプリケーションを使用してこの機能を提供することもできます。Redis は、高速なオープンソースのインメモリ型データストアおよびキャッシュで、PubSub をサポートしています。Amazon ElastiCache for Redis は、Redis 対応のインメモリサービスです。使いやすく、Redis の性能を利用でき、もっとも要求の厳しいアプリケーションに対応できる可用性、信頼性、パフォーマンスを提供します。

Using ElastiCache for Redis and the WebSocket support found in the that is part of Elastic Load Balancing の一部である Application Load Balancer にある ElastiCache for Redis と WebSocket サポートを使用して、サンプルのチャットアプリケーションを構築する方法を説明します。アプリケーションには、Node.js と AWS Elastic Beanstalk に基づくバックエンドと、Vue.js ウェブクライアントがあります。サンプルアプリケーションのすべてのコードは、すべて elasticache-redis-chatapp GitHub リポジトリにあります。

アーキテクチャ

次の図は、Redis、Application Load Balancer、Node.js Elastic Beanstalk アプリケーション、および Vue.js ウェブクライアント用の ElastiCache を使用した AWS の最終的なアーキテクチャを示しています。

チャットアプリケーションの実装を高いレベルで見てみましょう。

実装の概要

サンプルチャットアプリケーションは、以下のスクリーンショットに示す共有チャットルームで通信するメンバーとメッセージで構成されています。

このサンプルアプリケーションでは、メンバー登録、プロフィール管理、ログインを拒否しています。代わりに、ブラウザでチャットアプリケーションを開くと、ユーザーの代わりにランダムなユーザー名とアバターでメンバーが生成されます。この名前とアバターは、左側のメンバーリストに表示されます。他のメンバーがブラウザでアプリケーションを開いて参加したり離したりすると、その名前がウェブアプリケーションに表示されます。メンバーは、他のウェブクライアントに再配信されメインのチャットウィンドウに表示されるメッセージを送信できます。

次に、Vue.js ウェブクライアントの詳細を調べ、Node.js バックエンドアプリケーションを確認します。

Vue.js ウェブクライアント

ウェブクライアントは、ビューレイヤを管理する Vue.js、UI 用の Bootstrap、および WebSocket 通信用の Socket.io を使用して実装されています。初心者向けに複雑さを軽減するために、JavaScript バンドルは使用していません。ただし、本稼働アプリケーションでは webpack または類似のソフトウェアを検討する必要があります。Vue.js は、基礎となるデータモデルの更新に基づいて UI の変更をレンダリングします。これらのフレームワークとライブラリを、最新の単一ページの代表的なウェブアプリケーションとして選択しました。ただし、コミュニティには多くの類似した選択肢があり、毎日多くのものが出現しています。

次に、Vue.js アプリケーションコンポーネントを設定する HTML マークアップのコードスニペットと、メンバーを表示するイテレーターを示します。機能に焦点を当てるために、いくつかの中間的なマークアップと CSS スタイルを削除しています。。完全なサンプルは GitHub リポジトリにあります。

<html>
<body>
    <div id=”app”>
        <li v-for="(value, key) in members">
            <img v-bind:src="value.avatar">
            <small>{{ value.username }}</small>
        </li>
    </div>

v-for パラメータは、イテレーターを定義するために使用されます。この場合は、この後で説明するメンバーオブジェクトデータモデルのキー値タプルです。反復されるループ内で Mustache テンプレートを使用して、各メンバーオブジェクトにアクセスし、ユーザー名を表示します。Mustache テンプレートは HTML 属性内では機能しないため、メンバーのアバター画像 URL を解決するためには v-bind 引数を使用する必要があります。

Vue.js は、スマート DOM の差の計算に基づいて UI の変更を最小限にレンダリングします。このアプローチにより、基になる Vue.js データモデルの状態変更に専念できます。サンプルアプリケーションでは、HTML 内で JavaScript コードをインライン展開しています。しかし、本番稼働用システムでは、外部の .vue ファイルを使用して UI コンポーネントをモジュール化し、ビルド時に webpack を使用して変換する可能性があります。Socket.io ライブラリも初期化されており、多少カバーされています。

<script src="js/vue/2.1.10/vue.min.js"></script>
<script src="js/socket.io/1.7.2/socket.io.min.js"></script>
<script>
    var socket = io();

    new Vue({
        el: '#app',
        data: {
            message: '',
            messages: [],
            members: {}
        }

ここでは、Vue.js アプリケーションを宣言し、アプリケーション ID で HTML div 要素にバインドしました。また、次の 3 つのデータモデルを宣言しました。

  • message: フォームに入力されたメッセージテキスト
  • messages: メッセージのリスト。メッセージを追加するだけなので、配列が使用されます。
  • messages: メンバーのリスト。メンバーがチャットルームを離れたときにメンバーを見つけて削除できるように、オブジェクトが使用されています。

マークアップおよび Vue.js アプリケーション宣言に加えて、ウェブクライアントは WebSocket 接続を確立し、サブスクライブするトピックを宣言し、それらのトピックに発行されたメッセージが前に宣言されたデータモデルをどのように変更するかを確立します。このアプリケーションでは、コミュニケーションのための 5 つのトピックを確立します。それぞれのトピックが、トリガーイベントと対応するアクションとともに示されます。

[メッセージ]

トリガー: メッセージがチャットルームに送信される。

アクション: メッセージテキストおよびメンバーメタデータを使用して、メッセージのリストを更新します。

[member_add]

トリガー: メンバーがチャットルームに参加する。

アクション: メンバーのユーザー名とパスワードをメンバーのリストに追加します。

[member_delete]

トリガー: メンバーがチャットルームを離れる。

アクション: メンバーの一覧からメンバーを削除します。

[message_history]

トリガー: クライアントがメッセージのリストを初期化。

アクション: メッセージのリストを、最近の履歴メッセージの切り詰められたリストとして設定します。

[member_history]

トリガー: クライアントがメンバーのリストを初期化。

アクション: メンバーリストをチャットルームに参加しているメンバーのリストとして設定します。

 

以下に、これらのメソッドを実装するための JavaScript コードを示します。以前の Vue.js コードをリファレンスポイントとして維持しています。

new Vue({
    el: '#app',
    data: {
        message: '',
        messages: [],
        members: {}
    },
    methods: {
        send: function() {
            socket.emit('send', this.message);
            this.message = '';
        },
    mounted: function() {
        socket.on('messages', function(message) {
            this.messages.push(message);
        }.bind(this));

        socket.on('member_add', function(member) {
            Vue.set(this.members, member.socket, member);
        }.bind(this));

        socket.on('member_delete', function(socket_id) {
            Vue.delete(this.members, socket_id);
        }.bind(this));

        socket.on('message_history', function(messages) {
            this.messages = messages;
        }.bind(this));

        socket.on('member_history', function(members) {
                    this.members = members;
        }.bind(this));
    }

以前のコードスニペットで宣言された Socket.io オブジェクトは、前述のトピックごとに 1 つずつ、socket.on を使用してデータトピックにサブスクライブします。メッセージがトピックに発行されると、コールバック関数が実行されます。データモデルは、アクション (セット、追加、削除) とターゲットデータモデル (配列、オブジェクト) に従って更新されます。バインド (this) 文が追加され、Vue.js データモデルがコールバック関数スコープに挿入されます (詳細については、Function.prototype.bind を参照してください)。

最後は、メッセージフォームの送信を処理する Vue.jsメソッドです。Vue.js は、フォーム提出をメソッドにバインドする便利なメソッドを提供します。このメソッドは、WebSocket 上にメッセージテキストを発行し、メッセージを空の文字列に設定します。この文字列は、Vue.js バインディングを使用して UI を更新します。

Node.js バックエンドアプリケーション

ここでは、ウェブクライアントの基本について説明しました。次に、Node.js バックエンドアプリケーションについて見ていきましょう。PubSub を使ってデータを保持し、WebSocket メッセージを再配布するために Redis がどのように使用されるかを見ていきます。

Redis と WebSockets の設定

ウェブクライアントがブラウザで開かれると、PubSub アプリケーションで WebSocket が確立されます。接続すると、アプリケーションは既存のメンバーとメッセージを新しいクライアントに発行するために、いくつかのデータをアセンブルする必要があります。また、新しいチャットルーム参加者について他のクライアントを更新する必要があります。次に、HTTP アプリケーションと WebSocket の宣言を示すコードスニペットを示します。

var express = require('express');
var app = express();
var http = require('http').Server(app);
var io = require('socket.io')(http);
var port = process.env.PORT || 3000;

WebSocket リスナーの作成に加えて、アプリケーションは Redis クラスタへの複数の接続も確立する必要があります。 Redis データモデルを更新してトピックにメッセージを発行するには、1 つの接続が必要です。 トピックのサブスクリプションごとに追加の接続が必要になります。

var Redis = require('ioredis');
var redis_address = process.env.REDIS_ADDRESS || 'redis://127.0.0.1:6379';

var redis = new Redis(redis_address);
var redis_subscribers = {};

function add_redis_subscriber(subscriber_key) {
    var client = new Redis(redis_address);

    client.subscribe(subscriber_key);
    client.on('message', function(channel, message) {
        io.emit(subscriber_key, JSON.parse(message));
    });

    redis_subscribers[subscriber_key] = client;
}
add_redis_subscriber('messages');
add_redis_subscriber('member_add');
add_redis_subscriber('member_delete');

このコードスニペットでは、
Redis コマンドチャネルは ioredis JavaScript クライアントを使用して確立されています。
また関数は、新しいトピックのチャンネルを初期化するためにも定義され、
チャンルのトピックについてキー入力されたすべての受信者のハッシュに追加されます。
各サブスクリプションチャネルは同じように機能します。

  • Redis サブスクリプションチャンネルで JSON 文字列メッセージを受信します。
  • JSON 文字列を JavaScript オブジェクトに解析します。
  • Redis PubSub と同じトピックを使用して、JavaScript オブジェクトを WebSocket 接続に発行します。

この後で説明するように、JavaScript オブジェクトは、Redis データ型の値として保存され、PubSub トピックに発行されるときに JSON 文字列にシリアル化されることが重要です。WebSocket を介して発行する前に、JSON 文字列を JavaScript オブジェクトに逆シリアル化して戻す必要があります。Socket.io ライブラリがクライアントと通信するときには、オブジェクトをシリアル化したり逆シリアル化したりするため、この逆シリアル化が生じる必要があります。

ウェブクライアントがブラウザのチャットルームに参加すると、クライアントは WebSocket への新しい接続を確立します。次のような関数を定義することによって、この接続が確立されたときにアクションを実行できます。

io.on('connection', function(socket) {

... application business logic ... 

}

ソケットは、クライアントへの各 WebSocket 接続を識別するために使用される ID プロパティを含むオブジェクトです。socket.id 値を使用して、メンバーを識別します。この識別により、Redis データモデルからメンバーを見つけて削除することができます。また、member_delete トピックを使用しているすべてのチャットルームクライアントにメンバーの削除を伝えることもできます。以下で説明する他の関数は、このコールバック関数のコンテキストにあります。

次のセクションでは、新しいクライアントが WebSocket を介して Node.js バックアップアプリケーションに接続すると何が起こるかを見ていきます。

新しいクライアント接続の初期化

新しいクライアントがチャットルームに参加すると、いくつかのことが起こります。

現在のメンバーリストが取得されます。
これがクライアントの再接続でない限り、ランダムなユーザー名とアバター URL で新しいメンバーが作成され、Redis Hash に保存されます。
最近の履歴メッセージの切り詰められたリストが検索されます。
これらのタスクを達成するためのコードを見てみましょう。まず、次のコードがあります。

var get_members = redis.hgetall('members').then(function(redis_members) {
    var members = {};
    for (var key in redis_members) {
        members[key] = JSON.parse(redis_members[key]);
    }
    return members;
});

ioredis JavaScript クライアントは、非同期実行処理で Promises を使用します。HGETALL (‘members’) 呼び出しは、キー ‘members’に格納されているハッシュのすべてのキーと値を返します。Redis はハッシュデータ型をサポートしていますが、1 レベルの深さしかありません。ハッシュの値は文字列でなければなりません。コールバック関数は、次のチェーンで逆シリアル化されたハッシュのキーと値のペアを反復して、メンバーを初期化します。

var initialize_member = get_members.then(function(members) {
    if (members[socket.id]) {
        return members[socket.id];
    }

    var username = faker.fake("{{name.firstName}} {{name.lastName}}");
    var member = {
        socket: socket.id,
        username: username,
        avatar: "//api.adorable.io/avatars/30/" + username + '.png'
    };
    
    return redis.hset('members', socket.id, JSON.stringify(member)).then(function() {
        return member;
    });
});

initialize_member Promise 関数は、メンバーが再接続ソケットであるかどうかをまずチェックします。再接続ソケットでない場合は、Faker を使用してランダムなユーザー名で新しいメンバーが生成されますこのユーザー名から、Adorable Avatars サービスを使用してランダムなアバター URL が生成されます。

クライアント初期化の最後のステップは、過去の最近のメッセージの切り捨てられたリストを取得することです。これを行うには、ソート対象セットと呼ばれる別の Redis データ型を利用することができます。このタイプは Redis セットに似ていますが、セット内の各要素のランクを含みます。ソート対象セットは、リーダーボードの一般的なデータ型です。タイムスタンプがランクとして使用されている場合は、時間順に並べられた要素のコレクションを格納するためにも使用できます。

var get_messages = redis.zrange('messages', -1 * channel_history_max, -1).then(function(result) {
    return result.map(function(x) {
       return JSON.parse(x);
    });
});

We use a Redis method on the Sorted Set, called ZRANGE というソート対象セットでは、ランクに基づいて要素のリストを返す Redis メソッドを使用します。要素は最低スコアから最高スコアまで並べられます。したがって、初期化時に取得するメッセージの最大数 (-1 * channel_history_max) まで最後の要素 (-1) を取得する必要があります。繰り返しになりますが、各要素は JSON 文字列としてシリアル化されていますので、要素を JavaScript オブジェクトに対して逆シリアル化する必要があります。

要約すると、新しいクライアントがチャットルームに参加すると、いくつかのことが起こります。

  1. 現在のメンバーリストが取得されます。
  2. これがクライアントの再接続でない限り、ランダムなユーザー名とアバター URL で新しいメンバーが作成され、Redis Hash に保存されます。
  3. 最近の履歴メッセージの切り詰められたリストが検索されます。

これらの手順をそれぞれを確認しました。ここで、初期化とクライアントへのストリームデータの完了方法を見ていきましょう。ioredis は Promise を使用するため、非同期の実行を連鎖させて、すべてが完了するのを待ってから Promise.all を使用して結果を処理できます。

Promise.all([get_members, initialize_member, get_messages])
    .then(function(values) {
        var members = values[0];
        var member = values[1];
        var messages = values[2];

...

)};

これですべての必要なデータが得られたので、WebSocket 接続を使用してデータをストリーミングして新しいクライアントを初期化し、新しいメンバーがチャットルームに参加したことをすべてのメンバーに伝える必要があります。

io.emit('member_history', members);
io.emit('message_history', messages);
redis.publish('member_add', JSON.stringify(member));

Socket.io の emit メソッドを使用して、初期化中のクライアントにメッセージとメンバーのリストをストリーミングします。1 つの WebSocket を使用して複数のメッセージを送信できます。ここでは、トピック (member_history, message_history) は先ほどのクライアントコードでレビューしたトピックリスナーに対応しています。新しいメンバーはすべての参加者に伝えなければなりません。これを行うには、Redis コマンドチャネルを使用して、シリアル化された JSON 文字列を member_add トピックに発行します。すでに行っているように、WebSocket を使用して同じトピックをリッスンしているクライアントにメッセージを再配布するため、3 つの Redis トピックを設定しました。

次のセクションでは、チャットルームで送信されたメッセージを処理するハンドラを設定する方法を見ていきましょう。

メッセージの処理

新しいクライアントが初期 WebSocket 接続を完了すると、新しいクライアントによって送信されるメッセージ用のメッセージハンドラも定義する必要があります。メッセージは、メッセージテキスト、メンバーのユーザー名、メンバーのアバター、メッセージの作成タイムスタンプで構成されます。

socket.on('send', function(message_text) {
    var date = moment.now();
    var message = JSON.stringify({
        date: date,
        username: member['username'],
        avatar: member['avatar'],
        message: message_text
    });

    redis.zadd('messages', date, message);
    redis.publish('messages', message);
});

ZADD コマンドとメッセージ作成タイムスタンプをランクとして使用して、ソート対象セットに格納されたメッセージ履歴にメッセージを追加します。最後に、Redis コマンドチャネルを使用してメッセージトピックを発行しました。Redis/WebSockets の再配布は以前に定義しています。

クライアントを初期化し、チャットルームで送信されたメッセージを処理する方法について説明しました。最後に、クライアントがチャットルームを離れるときの対処方法を見てみましょう。

切断処理

Socket.io は、クライアントがサーバーに接続するときに確立された WebSocket のハートビートを作成します。ハートビートが失敗すると、クライアントで切断イベントが発生します。

socket.on('disconnect', function() {
    redis.hdel('members', socket.id);
    redis.publish('member_delete', JSON.stringify(socket.id));
});

クライアントが切断すると、メンバーの初期化中に実行されたアクションが取り消されます。まず、Redis HDEL メソッドを使用して、クライアントの WebSocket ソケット ID を使用するメンバーハッシュ Redis データ型からクライアントを削除します。同じメソッドをクライアントをハッシュに追加するのに使用します。すべての参加者に対してチャットルームに加わった新しいメンバーを通知するように、メンバーがチャットルームを離れることをすべての参加者に通知する必要があります。これは、member_delete Redis トピックを使用して行います。このトピックは、WebSocket を使用して残りのクライアントに再配布されます。

これでコードの確認は完了です。次に、AWS CloudFormation を使用してアプリケーションスタックを AWS にデプロイする方法を確認します

AWS CloudFormation を使用したアプリケーションスタックのデプロイ

CloudFormation は、開発者やシステム管理者は、関連する AWS リソースのコレクションを簡単に作成および管理する方法を提供します。CloudFormation は、整った予測可能な方法でリソースを提供し、更新します。チャットアプリケーションの CloudFormation スタックを起動するには、次のボタンをクリックします。

CloudFormation スクリプトは、Elastic Beanstalk 環境、アプリケーション、および構成テンプレートを作成します。Redis の ElastiCache クラスタと、ロードバランサ、アプリケーションサーバー、Redis クラスタの Amazon EC2 セキュリティグループも作成します。このようにして、アーキテクチャレイヤ間の最小権限セキュリティ構成にベストプラクティスを使用します。

ElisiCache for Redis の設定スニペットに関する 1 つの注意点AWS::EC2::SecurityGroup では進入セキュリティルールのインライン指定が可能ですが、そうすることで、CacheCluster と SecurityGroup の間に循環参照が作成されてしまいます。次のスニペットに示すように、進入ルールを別の AWS::EC2::SecurityGroupIngress に分割して循環参照を破棄する必要があります。

リソース:

RedisCluster:
    Type: AWS::ElastiCache::CacheCluster
    Properties:
      CacheNodeType:
        Ref: ClusterNodeType
      VpcSecurityGroupIds:
        - !GetAtt CacheSecurityGroup.GroupId
      Engine: redis
      NumCacheNodes: 1
  CacheSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Cache security group
  CacheSecurityGroupIngress:
    Type: AWS::EC2::SecurityGroupIngress
    Properties:
      GroupId: !GetAtt CacheSecurityGroup.GroupId
      IpProtocol: tcp
      FromPort: !GetAtt RedisCluster.RedisEndpoint.Port
      ToPort: !GetAtt RedisCluster.RedisEndpoint.Port
      SourceSecurityGroupId: !GetAtt ApplicationSecurityGroup.GroupId

次に、WebSocket をサポートするための Elastic Beanstalk Nginx プロキシ設定の設定を変更する方法を確認してみましょう。

WebSocket サポートのための AWS Elastic Beanstalk の Nginx 設定

AWS Elastic Beanstalk は、Java、.NET、PHP、Node.js、Python、Ruby、Go および Docker を使用して開発されたウェブアプリケーションやサービスを、Apache、Nginx、Passenger、IIS など使い慣れたサーバーでデプロイおよびスケーリングするための、使いやすいサービスです。

Elastic Beanstalk は、Elastic Load Balancer (ELB) と Application Load Balancer (ALB) の両方をサポートします。弊社のクライアントとサーバーは WebSocket を使用して通信するので、WebSockets サポートのための ALB を構成しますサンプルの Node.js バックエンドアプリケーションでは、Node.js ベースの Elastic Beanstalk の事前構成済みアプリケーションスタックを選択します。アプリケーションコードの前で、ウェブ層プロキシとして Nginx を使用します。

WebSocket がサポートされていない場合、Socket.io にはポーリング戦略に戻る手段があります。ただし、単純に ALB と Nginx に設定を変更することで、WebSocket のサポートを有効にし、サーバーからクライアントへのプッシュベースのデータストリームを使用することができます。ALB で WebSocket サポートを有効にするには、クライアントが 2 つの連続した HTTP リクエストを行って接続をアップグレードして Websocket を使用したときに、同じインスタンスが応答するように、スティッキーセッションを有効化する必要があります。Nginx で WebSocket サポートを有効にするには、Elastic Beanstalk の .ebextensions メカニズムを使用して、Nginx の設定を少し変更する必要があります。コンテナコマンドは、アプリケーションアーカイブが展開された後、アクティブアプリケーションとしてインストールされる前に、アプリケーションに変更を導入する方法を提供します。

container_commands:
  enable_websockets:
    command: |
      sed -i '/\s*proxy_set_header\s*Connection/c \
              proxy_set_header Upgrade $http_upgrade;\
              proxy_set_header Connection "upgrade";\
      ' /tmp/deployment/config/#etc#nginx#conf.d#00_elastic_beanstalk_proxy.conf

前述のコードスニペットは、Nginx 設定ファイルを所定の位置で変更します。/tmp/deployment/config/#etc#nginx#conf.d#00_elastic_beanstalk_proxy.conf。これは、sed コマンドを使用して「プロキシセットヘッダー」行を検索し、WebSocket をサポートする設定に置き換えます。アプリケーションがインストールされると、Elastic Beanstalk は設定ファイルを /etc/nginx/conf.d/00_elastic_beanstalk_proxy.conf にコピーします。この手順を実行すると、Nginx サービスが再起動され変更がアクティブになります。

まとめ

このブログ記事では、publish-subscribe パターンについて見てきました。また、それを ElastiCache for Redis 内で使用して、チャットアプリケーションの複数のクライアントの双方向ストリーミング通信をサポートする方法についても説明しました。

リマインダーとして、awslabs GitHub リポジトリにこのサンプルアプリケーションの完全なソースがあります。このサンプルアプリケーションを起動したら、ユーザー認証、添付ファイル、または自分のチャットや PubSub アプリケーションに役立つその他の機能を追加して、チャットアプリケーションに独自のアイデアを組み込んで拡張することをお勧めします。

Amazon Aurora Under the Hood: クオーラムメンバーシップ

Anurag Guptaは幾つものデザインのヘルプを行ったAmazon Auroraを含むAWSが提供するデータベースサービスの責任者です。このシリーズではAnuragがAuroraを支える技術やデザインについて説明します。

この記事は、Amazon Auroraがどのようにクオーラムを使用するのかをお話する4回シリーズの最後です。最初の記事では、障害が発生した場合に必要なクォーラムのメリットとメンバの最小数について説明しました。2回目の記事では、読み書きを行う際に利用するネットワーク帯域の増加を避けるために、ロギング、キャッシュの状態、および非破壊的な書き込みを使用する方法について説明しました。3回目の記事では、より高度なクォーラムモデルを使用して複製コストを削減する方法について説明しました。クォーラムに関するこの最後の記事では、クォーラムメンバーシップの変更を管理する際にAmazon Auroraが問題を回避する方法について説明します。

クオーラムメンバーシップの変更を管理するテクニック
マシンは故障します。クオーラムメンバの1つが破損すると、ノードを交換することによってクオーラムを修復する必要があります。これは複雑な決定になります。 クォーラムの他のメンバーは、障害のあるメンバに一時的なレイテンシーの増加が発生したか、再起動のための短期間の可用性低下が発生したか、または永久にダウンしたかどうかを判断できません。 ネットワークパーティションにより、複数のメンバーグループが同時にお互いに隔離を実行出来ます。

ノードごとに大量の永続状態を管理している場合、クォーラムを修復するための状態の再複製には長い時間がかかります。 そのような場合、障害のあるメンバーが復帰できる場合に備えて修復を開始することについて慎重に行う必要があります。 多くのノードで状態をセグメント化することで、修復時間を最適化することができます。 しかし、これは失敗の可能性を高めます。

Auroraでは、データベースボリュームを10GBのチャンクに分割し、3つのアベイラビリティゾーン(AZ)に分散した6つのコピーを使用します。 現在の最大データベースサイズが64TBなので、プロテクショングループは6,400個、セグメント数は38,400個です。 このスケールでは破損は一般的に発生する可能性があります。 メンバーシップの変更を管理する一般的な方法は、一定期間リースを使用し、各リースでメンバーシップを確保するためにPaxosなどのコンセンサスプロトコルを使用することです。 しかし、Paxosは処理負荷のかかるプロトコルであり、最適化されたバージョンでは多数の障害が発生します。

障害を管理するためにクオーラムセットを利用する
Auroraはメンバーシップの変更を管理するために、ロギング、ロールバック、コミットなどのクォーラムセットとデータベース技術を使用します。 A、B、C、D、E、Fの6つのセグメントを持つプロテクショングループを考えてみましょう。この場合、書き込みクォーラムはこの6組のうち4つのメンバーであり、読み取りクォーラムは3つのメンバーです。 前回の記事でご紹介したように、Auroraのクオーラムはこれよりも複雑ですが、今は単純に考えてみることにします。

Auroraの読み書きはそれぞれ、メンバーシップエポックを使用します。これは、メンバーシップの変更ごとに単調に増加する値です。 現在のメンバーシップエポックよりも古いエポックにある読み取りと書き込みは拒否されます。そのような場合、クオーラムメンバーシップの状態をリフレッシュする必要があります。 これは、概念的には、REDOログ内のlog sequence numbers(LSN)の概念に似ています。 エポックナンバーおよび関連する変更記録は、メンバーシップに順序付けられたシーケンスを提供します。 メンバーシップエポックを変更するには、データ書き込みと同様に書き込みクォーラムを満たす必要があります。 現在のメンバーシップの読み込みには、データの読み込みと同様のリードクオーラムが必要です。

ABCDEFのプロテクショングループの話を続けましょう。セグメントFが破損した可能性があるとし、新しいセグメントGを導入する必要があると考えてください。一時的な障害に遭遇する可能性があり、迅速に復帰する可能性があります。またはリクエストを処理しているかもしれませんが、なんらかの理由で検出出来ない可能性があります。また、Fが復活したかどうかを確認するのを待ちたくはありません。クオーラムが損なわれて2回目の障害が発生する可能性が増加だけです。

これを解決するためにクォーラムセットを使用します。 私たちはABCDEFからABCDEGに直接メンバーシップの変更をすることはありません。代わりに、メンバーシップのエポックを増やし、クォーラムセットをABCDEFとABCDEGに移動します。書き込みはABCDEFの6つのコピーのうち4つから正常に行われなければならず、またABCDEGの6つのコピーのうち4つからackが返る必要があります。 ABCDEのどの4つのメンバーは両方とも書き込みクォーラムを満たしています。 読み取り/修復クォーラムは同じように動作し、ABCDEFからの3つのackとABCDEGから3つのackが必要です。ABCDEからの3つのいずれかが両方を満たします。

データがノードG上に完全に移動され、Fを取り除くと決定した場合、メンバーシップエポックの変更を行い、クォーラムセットをABCDEGに変更します。エポックの使用は、コミットLSNがREDO処理のために行うのと同様に、これをアトミックに行います。このエポックの変更は、現在の書き込みクォーラムが満たされている必要があり、他のアップデートと同様に、ABCDEFの6つのうち4つと、ABCDEGの6つのうちの4つからのackが必要です。Gが利用可能になり前に再びノードFが利用可能になると、変更を元に戻しメンバーシップエポックの変更をABCDEFに戻します。完全に健全なクオーラムに戻るまで、いかなる状態やセグメントも破棄しません。

このクォーラムへの読み書きは、メンバーシップの変更中に、変更前または変更後と同じように行われることに注意してください。 クォーラムメンバーシップへの変更は、読み取りまたは書き込みをブロックしません。失効したメンバーシップ情報を持つ呼び出し元は、状態をリフレッシュして正しいクォーラムセットに要求を再発行します。また、クオーラムメンバーシップの変更は、読み取り操作と書き込み操作の両方に対して非ブロッキングです。

もちろん、Fの代わりにGへデータを移動しクオーラムを修復している間にABCDEGのいずれかが破損する可能性もあります。多くのメンバーシップ変更プロトコルはメンバーシップの変更中に障害を柔軟に処理しません。クォーラムセットとエポックでは、簡単です。Eも破損してHに置き換えられる場合を考えてみましょう。ABCDEFとABCDEGとABCDFHとABCDGHのクオーラムに移動するだけです。単一障害と同様に、ABCDへの書き込みはこれらのすべてを満たします。メンバーシップの変更は、読み取りと書き込みの失敗と同じ範囲になります。

まとめ
クォーラムセットをメンバーシップの変更に使用することにより、Auroraは小さなセグメントを使用することができます。これにより、Mean Time To Repair(MTTR)および複数の障害に対する可能性を削減することで、耐久性が向上します。また、お客様のコストを削減します。Auroraのボリュームは必要に応じて自動的に増加し、小さなセグメントでは少しずつ増加します。クォーラムセットを使用することで、メンバーシップの変更が行われている間も読み取りと書き込みが継続できるようになります。

メンバーシップの決定を元に戻すことができれば、積極的にクオーラムを変更することができます。障害のあったメンバーが返ってくると、いつでも変更を元に戻すことができます。いくつかの他のシステムでは、リースが期限切れとなり、クオーラムメンバシップを再確立する必要があるため、定期的な停止が発生します。Auroraは、リースが期限切れになるまでメンバーシップの変更操作を延期するという耐久性の犠牲を払わず、クオーラムメンバシップが確立されている間に読み込み、書き込み、またはコミットを遅らせるというパフォーマンス上のペナルティも発生しません。

Auroraは、さまざまな分野で進歩を遂げています。データベースと分散システムを統合するアプローチは、これらの多くの中核を成しています。クォーラムをどのように使用するかについてのこの連載をご覧いただき、ご自身のアプリケーションやシステムを設計する方法について考えるときに役立てて頂けると思います。今回使用した手法は広く適用可能ですが、スタックの多くの要素にに対して適用する必要があります。

もしご質問などありまししたら、コメントもしくは aurora-pm@amazon.comにご連絡下さい。

翻訳は星野が担当しました (原文はこちら)