Amazon Web Services ブログ

Apache Kafka 向け Amazon Managed Streaming を使用して、データキャプチャを Neo4j から Amazon Neptune に変更

Neo4j から Amazon Neptune へのポイントインタイムデータ移行を実行した後、進行中の更新をリアルタイムでキャプチャして複製することができます。Neo4j から Neptune へのポイントインタイムグラフデータ移行の自動化については、完全に自動化されたユーティリティを使用した Neo4j グラフデータベースの Amazon Neptune への移行をご参照ください。この記事では、cdc-neo4j-msk-neptune リポジトリのサンプルソリューションを使用して、Neo4j から Neptune へのキャプチャとレプリケーションを自動化する手順について説明します。

変更データキャプチャ (CDC) パターンを使用したデータベースの継続的なレプリケーションにより、データをストリーミングして他のシステムで利用できるようにすることができます。この記事では、最新の変更を Neptune にコピーできるように、CDC を使用して Neo4j からデータをストリーミングすることにより、グラフデータベースを最新化することに専念しています。Strangler パターンイベント傍受戦略を使用して Neo4j を最新化することにより、すべての変更を Neptune に段階的にプッシュし、アプリケーションを変更して Neptune を使用できます。Neptune は、高速で信頼性が高い、完全マネージド型グラフデータベースサービスであり、高度に接続されたデータセットと連携するアプリケーションの構築と実行を容易にします。Neptune の中核にあるのは、何十億もの関係を保存し、ミリ秒単位のレイテンシーでグラフをクエリするために最適化された、専用の高性能グラフデータベースエンジンです。

アーキテクチャの概要

この記事のソリューションは、AWS アカウントでの次のアーキテクチャのデプロイを自動化します。このアーキテクチャは、レプリケーション用の疎結合システムを構築するためにソリューションがプロビジョニングする AWS リソースを示しています。

アーキテクチャには次の要素が含まれます。

  1. Amazon VPC 内のすべての必須 AWS リソースをブートストラップする、エンドユーザーがトリガーする AWS クラウド開発キット (AWS CDK) アプリ
  2. レプリケーション用の Docker コンテナで実行される専用サービスを実行するための Amazon Elastic Compute Cloud (Amazon EC2) インスタンス
  3. このレプリケーションのターゲットとして機能する 1 つのグラフデータベースインスタンスを持つ単一ノードの Neptune DB クラスター
  4. このレプリケーションのパブリッシュ/サブスクライブブローカーとして機能する 2 つのノードを持つ Apache Kafka 向け Amazon Managed Streaming (Amazon MSK) クラスター

サンプルソリューションの起動

このソリューションでは、Neo4j グラフデータベースのノードと関係に加えられた変更が、Neptune にリアルタイムで反映されることを期待できます。

開始するには、GitHub リポジトリから AWS CDK アプリのクローンを作成します。前提条件を満たしていることを確認したら、GitHub の指示通りにソリューションを実行します。

ソリューションの詳細

CDC は、ソースシステム内のデータの変更を識別し、それらの変更に作用するアーキテクチャパターンです。このソリューションでは、Neo4j グラフのデータ変更を特定し、それらを変換して、簡単な 3 ステップのプロセスでターゲットの Neptune グラフを更新することにより作用します。

  1. AWS リソースをプロビジョニングする
  2. 変更されたデータを処理して複製する
  3. エンドツーエンドソリューションをテストする

AWS リソースのプロビジョニング

完全に自動化されたエクスペリエンスを実現するには、必要なリソースをプロビジョニングし、適切な AWS Identity and Access Management (IAM) ロールとポリシーを適用するなど、それらの配管を構成することが重要です。これにより、AWS アカウントで実行とテストを行えます。この自動化は、別個の VPC を作成し、その中でリソースを起動することにより、分離を提供します。これにより、既存の環境への依存関係を気にすることなく、セットアップと破棄を簡単に行うことができます。手順通りにソリューションを実行すると、次のコードのような出力が表示されます。

その結果、AWS アカウントに次のリソースを作成します。

AWS リソース 使用方法
Amazon VPC VPC は、AWS 開発アカウントの残りの部分に影響を与えることなく、ソリューションが確実に作成および破棄されるようにする、分離されたネットワークを作成します。アプリは VPC 内で、2 つのアベイラビリティーゾーンに 1 つのパブリックサブネットと 1 つのプライベートサブネットを作成します。
Amazon EC2 単一の EC2 インスタンスを使用して、Docker コンテナで専用サービスを実行します。
セキュリティグループと IAM ポリシー レプリケーションを機能させるには、EC2 インスタンスが Neptune および Amazon MSK と通信する必要があります。セットアップアプリは、セキュリティグループ、IAM ロール、およびポリシーを作成して、サービスが安全に接続し、相互に通信できるようにします。
Amazon MSK Kafka 向け Neo4j Streams は、ソースデータベースから Kafka にリアルタイムで変更を送信します。Amazon MSK は、このソリューションで Neo4j および Neptune と統合するために使用するフルマネージドの Kafka サービスです。
Neptune この完全マネージド型の AWS グラフデータベースサービスを最新化のターゲットとして使用します。

変更されたデータの処理と複製

プロビジョニングする EC2 インスタンスは次のサービスを実行します。

  • startup-service – この Docker コンテナは、Neptune および Amazon MSK エンドポイントを決定します。
  • neo4j-service – この Docker コンテナは Neo4j バージョン 4.0.0 を実行し、apoc バージョン 4.0.0.6 および neo4j-streams バージョン 4.0.0 プラグインがインストールされています。このサービスは、次のデフォルト値へのすべての変更を公開するように構成されています。これらのデフォルト値を変更する方法については、GitHub リポジトリの指示通りに進めてください。

    ノード 関係 Amazon MSK トピック名
    Person{*} ACTED_IN{*} movie-topic
    Movie{*)
  • kafka-topic-service – この Docker コンテナは、新しい Amazon MSK トピックを作成します。neo4j-service は変更されたデータをこのトピックにパブリッシュし、transformation-service はこのトピックにサブスクライブして変更されたデータを取得します。カスタム構成を作成することにより、auto.create.topics.enable を使用して新しいトピックを自動的に作成するように Amazon MSK を構成することもできます。
  • transformation-service – Neptune のプロパティグラフは、Neo4j のプロパティグラフと非常によく似ており、頂点の複数のラベルや複数値プロパティ (リストではなくセット) のサポートが含まれます。Neo4j では、重複する値を含む単純なタイプの同種リストがノードとエッジの両方のプロパティとして保存することを可能にします。他方で、Neptune は、頂点プロパティにセットおよび単一のカーディナリティを、エッジプロパティに単一のカーディナリティを、それぞれ提供します。transformation-service は、Neo4j から変更されたデータを受け入れてから、Neptune のグラフデータモデルに変換するように設計されています。

データフローアーキテクチャ

次の図は、データフローアーキテクチャと、これらのサービスが互いにどのように連携するかを示しています。

データフローには次の手順が含まれます。

  • インスタンスのユーザーデータシェルスクリプトは、docker-compose を使用して 4 つの Docker コンテナを起動します。ユーザーデータスクリプトの使用は、インスタンスの起動時に起動スクリプトを実行する一般的なパターンです。このソリューションでは、それを使用してサービスを起動および構成します。
  • 最初に開始するサービスは startup-service です。MSK クラスターエンドポイントアドレスの AWS CloudFormation 記述スタックをクエリするには、このサービスが必要です。クラスターが作成されるまでクラスターエンドポイントは利用できないため、これは別の手順として必要です。エンドポイントアドレスを取得した後、サービスはそれをクエリして、Kafka Bootstrap および Zookeeper のアドレスとポートを取得します。これらのアドレスを使用して、変更を Amazon MSK に送信できるように Neo4j Streams プラグインを構成します。
  • startup-service は CloudFormation スタックに Neptune エンドポイントをクエリします。Amazon CDK スタックは Neptune クラスターエンドポイントを出力しますが、これはランタイム出力であり、スタックの実行中は使用できません。
  • kafka-topic-service は、Amazon MSK に新しいトピックを作成します。
  • neo4j-service で実行されている Neo4j グラフデータベースが実行する Cypher スクリプトを受け取ると、変更されたデータを Amazon MSK トピックに公開します。インタラクティブなユーザーまたは Neo4j グラフに書き込むその他のサービスが操作を実行できます。
  • Amazon MSK トピックにサブスクライブしている transformation-service は、データを受信し、Neo4j のデータモデルから Neptune データに変換することで処理します。
  • transformation-service は、変換されたデータを Neptune にプッシュします。

エンドツーエンドソリューションのテスト

次の図は、ソリューションのエンドツーエンドのテストを実行する手順を示しています。

この記事で実行していく手順を次に示します。

  1. EC2 インスタンスに SSH で接続します。
  2. 次のシェルスクリプトを実行して、neo4j-service Docker コンテナに入ります。
    docker container exec -it neo4j-service cypher-shell
  3. neo4j プロンプトで、次の Cypher スクリプトを実行します。
    CREATE (TheMatrix:Movie {title:'The Matrix', released:1999, tagline:'Welcome to the Real World'});
    CREATE (Keanu:Person {name:'Keanu Reeves', born:1964});
    CREATE (Keanu)-[:ACTED_IN {roles:['Neo']}]->(TheMatrix);

    このサービスは、すべてのデバッグ情報をローカルファイルに保存します。

  4. 省略可能なステップとして、ログを表示するには、次のシェルスクリプトを実行します。
    docker container logs transformation-service
  5. 次のシェルスクリプトを実行して、すべてのクエリを Neptune に送信するように構成された Apache TinkerPop Gremlin コンソールを起動します (この手順では、Neptune グラフがソースの変更と同期していることを確認します)。
    docker run -it -e NEPTUNE_HOST --entrypoint /replace-host.sh sanjeets/neptune-gremlinc-345
  6. Gremlin プロンプトで、次のシェルスクリプトを順番に実行します。
    :remote console
    g.V().count()

ソリューションの拡張

このソリューションには、疎結合アーキテクチャがあります。transformation-service を独自のサービスに置き換える場合は、Docker コンテナに新しい実装を提供することで簡単に行うことができます。Docker 構成ファイル 02-docker-compose.yml を変更して、transformation-service を置き換える必要があります。

同様に、ソリューションの他のサービスを置き換えることができます。たとえば、Neo4j Docker コンテナを置き換えることができます。Docker コンテナで Gremlin コンソールを使用する代わりに、必要に応じて、ライブコードと説明テキストを含む完全マネージド型インタラクティブな開発環境である Jupyter ノートブックを使用して、Neptune データベースに簡単かつすばやくクエリを実行できます。ノートブックは、Amazon SageMaker を通じてホストおよび請求されます。

ソリューションのスケーリング

このソリューションのモジュール式アーキテクチャにより、transformation-service を個別にスケーリングして、高スループットの変更データキャプチャ要件を満たすことができます。また、Amazon Neptune を監視することで、必要に応じてスケールアップまたはスケールダウンできるようになります。次のパターンは、このソリューションを実際のシナリオで大規模に実行するのに役立ちます。

Amazon MSK による transformation-service のスケーリング

簡単にするために、このソリューションでは単一の Kafka コンシューマーと単一のパーティションを使用しています。このソリューションを拡張する場合は、次のアーキテクチャに示すように、コンシューマーグループに複数のパーティションと複数のコンシューマーを作成することができます。transformation-service コンテナの複数のインスタンスを起動できるようにすることで、ソースデータベースからの大量の CDC を処理します。新しいアーキテクチャは次の図のようになります。

Neptune が負荷に応じてスケーリングする方法

Neptune DB クラスターとインスタンスは、ストレージ、インスタンス、読み取りの 3 つの異なるレベルでスケーリングします。最適化によっては、Neptune クラスターを綿密に監視した後、前述のスケーリングレベルを個別に微調整できます。

Neptune のモニタリング

次のスクリーンショットは、Neptune コンソールのダッシュボードビューとしてデフォルトで使用できるさまざまなメトリックを示しています。

CDC のパフォーマンスを監視するには (たとえば、Gremlin または SPARQL クエリを含む未加工のリクエストとペイロードを検査するため)、次の変更を加えることができます。

ソリューションの実行コスト

次の表は、us-west-2 でオンデマンド料金を設定してこのソリューションを実行した場合に 1 時間ごとの見積もりに関する概要を示しています。cdk.json ファイルでインスタンスタイプのデフォルトを変更すると、コストが変わります。ストレージ、I/O、およびデータ転送速度は、計算を簡略化するために行われた仮定です。すべての料金情報はこの記事の執筆時点のものであり、今後変更される可能性があります。詳細情報を得るか完全な計算を実行するには、各サービスの料金ページをご参照ください。

サービス インスタンスタイプ (A) EBS ストレージ (B) データ転送 (C) 1 時間あたりの料金 1 時間あたりの推定コスト (A+B+C)
Amazon EC2 t3a.xlarge 100 GB 同一 AZ 内無料 Amazon EC2 の料金を見る 0.1504 USD + 0.01 USD + 0 USD = 0.1604 USD
サービス インスタンスタイプ (A) 保存されたデータ (B) データ転送 (C) I/O (D) 1 時間あたりの料金 1 時間あたりの推定コスト (A+B+C+D)
Neptune db.r5.large 100 GB 同一 AZ 内無料 1 百万未満 Amazon Neptune の料金を見る 0.348 USD + 0.01 USD + 0 USD + 0.20 USD = 0.558 USD
サービス インスタンスタイプ (A) 保存されたデータ (B) データ転送 (C) 1 時間あたりの料金 1 時間あたりの推定コスト (A+B+C)
Amazon MSK kafka.m5.large 100 GB 同一 AZ 内無料 Amazon MSK の料金を見る 0.21 USD + 0.01 USD + 0 USD = 0.22 USD

Amazon Elastic Block Store (Amazon EBS) 、Neptune、および Amazon MSK のストレージの計算は、毎月の GB に基づいています。次の計算は、1 時間あたりの料金に基づく内訳です。

毎月 1 GB あたり 100 GB のストレージを 1 時間分計算 
= 0.10 USD x 100 x 1/ (24 x 30) ~ 0.01 USD

1 時間あたりのコストを小数点以下 2 桁まで切り上げて概算すると、0.16 USD + 0.56 USD + 0.22 USD = 0.94 USD になります。

まとめ

この記事では、Neo4j の既存のユーザーが、Amazon MSK を使用して CDC から Neptune へのストリーミングをシンプルな手順で簡単に自動化するためのソリューションを提供します。記事では、スケーリング、拡張、監視の方法について説明しています。最後に、このようなソリューションを実行するための 1 時間あたりのコストを計算する方法についても説明します。早速、始めましょう! このソリューションをご自分の AWS アカウントで実行するか、Amazon Neptune の詳細をお読みになって、グラフデータベースのニーズにどのように役立つかを確認してください。

 


著者について

 

Sanjeet Sahay 氏は、アマゾン ウェブ サービスのシニアパートナーソリューションズアーキテクトです。