Amazon Web Services ブログ

Neptune Streams を使用したグラフの変化の取り込み



多くのアプリケーションは、Amazon Neptune データベースに保存された項目の変更を、その変更が発生した時点で取り込む機能を利用することができます。Amazon Neptune は、Neptune Streams をサポートするようになりました。これは、Neptune のフルマネージド機能で、グラフに対するすべての変更を、発生順に確実にログに記録し、HTTP REST API を介してこれらの変更を利用可能にします。Neptune Streams は現在、ラボモードで利用可能です。

このブログ投稿では、Neptune Streams 機能を見直し、Neptune が提供するポーリングフレームワークを使用して Neptune と Amazon ElastiCache for Redis データストアを統合するストリームベースのアプリケーションをプロビジョニングし、この同じポーリングフレームワークを使用して独自のストリームベースのグラフアプリケーションを構築する方法を説明します。

Neptune Streams の概要

Neptune Streams を使用して、次のことを行うことができます。

  • ソース Neptune データベースから別の Neptune データベースに変更をレプリケートする
  • ソース Neptune データベースから、Amazon DynamoDB または Amazon Aurora などの別のデータベースに変更をレプリケートする
  • ソース Neptune データベースの変更を Amazon S3 にアーカイブする
  • Amazon Elasticsearch でソース Neptune データベースで変わるコンテンツをインデックス作成する
  • ソース Neptune データベースのグラフの変更に基づいて集積とグラフメトリックスを計算し、それらを Amazon ElastiCache にキャッシュする
  • ソース Neptune データベースの変更を Amazon Kinesis データストリームに公開し、Amazon Kinesis Data Analytics を使用して、グラフデータの変更に関する実用的なインサイトを取得する
  • ソース Neptune データベースのグラフ変更に対応し、データを変更または強化したり、データ制約をアサートするために、グラフデータに対して追加のクエリを発行する

既存の Neptune データベースに対して Neptune Streams を有効にする場合、ストリームからの変更を適用する前に、ソースデータのコピーをターゲットシステムにシードすることができます。ターゲットがソースと同じリージョンにある別の Neptune データベースである場合、スナップショットを使用してターゲットをシードできます。ほかのレプリケーションシナリオの場合、neptune-export を使用することができます。これは、プロパティグラフのデータを CSV または JSON のいずれかに、RDF データを Turtle にエクスポートして、ソースからすぐにターゲットにインポートできるようにデータを取り込みます。

ストリーム レコード

Neptune Streams が有効の場合、Neptune はグラフデータを変更し、同じトランザクションを使用してストリームに変更レコードを追加します。これにより、ストリームがグラフと整合性があり、ストリームで公開された変更がグラフで発生した順序と同じ順序で表示されることが保証されます。

Neptune のグラフデータは、クワッド (4 つ) から構成されます。.Neptune Gremlin または SPARQL エンドポイントへのリクエストは、暗黙のトランザクションのコンテキストで実行されます。通常、書き込み要求は、トランザクションごとに複数の要素 (クワッド) を作成、変更、および削除します。2 つのプロパティをもつ頂点を作成するための Gremlin リクエストの結果、3 つの要素が作成されます。この例は以下のとおりです。

g.addV('Person').property('firstName', 'John').property('lastName', 'Smith')

これらの各要素はストリームの個別のレコードとして表示され、コミット番号がスタンプされ、そのコミット内にそれが属するトランザクションのオペレーション番号が示されます。

{
  "lastEventId": {
    "commitNum": 1,
    "opNum": 3
  },
  "lastTrxTimestamp": 1570703182847,
  "format": "GREMLIN_JSON",
  "records": [
    {
      "eventId": {
        "commitNum": 1512,
        "opNum": 1
      },
      "data": {
        "id": "86f89fb0-d214-41eb-812c-6f89285d4e1f",
        "type": "vl",
        "key": "label",
        "value": {
          "value": "Person",
          "dataType": "String"
        }
      },
      "op": "ADD"
    },
    {
      "eventId": {
        "commitNum": 1,
        "opNum": 2
      },
      "data": {
        "id": "86f89fb0-d214-41eb-812c-6f89285d4e1f",
        "type": "vp",
        "key": "firstName",
        "value": {
          "value": "John",
          "dataType": "String"
        }
      },
      "op": "ADD"
    },
    {
      "eventId": {
        "commitNum": 1,
        "opNum": 3
      },
      "data": {
        "id": "86f89fb0-d214-41eb-812c-6f89285d4e1f",
        "type": "vp",
        "key": "lastName",
        "value": {
          "value": "Smith",
          "dataType": "String"
        }
      },
      "op": "ADD"
    }
  ],
  "totalRecords": 3
}

ストリームエンドポイント

データベースの SPARQL または Gremlin ストリームエンドポイントに HTTP GET リクエストを発行することで、ストリームからレコードを取得します。クエリ文字列パラメーターを使用すると、読み取りを開始するストリーム内のポイントと、返されるレコードの数をコントロールできます。次の例では、ストリームの先頭にある最初の 3 つのプロパティグラフ変更レコードについて Gremlin ストリームエンドポイントを照会します。

curl "https://<neptune-endpoint>:8182/gremlin/stream?iteratorType=TRIM_HORIZON&limit=3"

TRIM_HORIZON は、ストリームの照会を開始するポイントをコントロールすることができる 3 つのイテレータの 1 つです。 これとその他のパラメータの詳細について、ドキュメント を参照してください。

Neptune Stremas の AWS Lambda との統合

AWS でストリームベースのアプリケーションを構築する際の一般的なパターンは、AWS Lambda 関数を使用して Amazon Kinesis データストリーム、Amazon Kinesis Data Firehose 配信ストリーム、またはAmazon DynamoDB ストリームからのレコードを処理することです。

Neptune Streams のこの最初のリリースでは、CloudFormation テンプレートを通じて Neptune により提供されるポーリングフレームワークを使用して Lambda を Neptune Streams と統合できます。このフレームワークでは、ストリームハンドラーを作成してから、それをポーリングフレームワークで提供される Lambda 関数で登録できます。このフレームワークは AWS Step Functions と DynamoDB ベースのワークフローを使用して、ホストの Lambda の実行をスケジュールし、ストリーム処理のチェックポイントとなります。

Neptune Streams はシャードされていません。これにより、グラフへのすべての変更がストリームに書き込まれ、データベースに適用された順序と同じ順序でストリームリーダーに利用できるようになります。Neptune のポーリングフレームワークは、ホスト Lambda 関数の単一のインスタンスが確実に指定されたユースケースに対するストリームをポーリングするようにします。しかし、ほかのストリームリーダーアプリケーションを使用してストリームに対して異なるストリーム処理ユースケースを同時に実行するために、ポーリングフレームワークの複数のインスタンスを実行することができます。この形式で作成し、同時に使用できるストリームリーダーの数には制限がありません。

イベント指向のメトリクスを構築するための Neptune Streams と AWS Lambda の使用

この投稿で記載されたデモアプリケーションは、ソーシャルネットワークの成長をシミュレーションします。エッジがこのネットワークに追加されるたびに、Neptuneストリームの対応する変更レコードにより、Lambda 関数によってホストされ、ポーリングフレームワークによって実行されるハンドラーが、エッジの「out」または「to」頂点の度数中心性を計算し、結果をRedisのElastiCacheに保存します。

頂点の度数中心性は、頂点に向かう接するエッジの数のカウントです。中心性測定を使用して、グラフ内の各頂点の相対的な重要度を決定できます。ソーシャルネットワークの例では、度数中心性は人の「人気」の指標として解釈できます。人気が高いほど、度数中心性の値が高くなります。

相対的に小さなグラフの場合、これは数百または数千の頂点から成り立っており、単純な Gremlin クエリ (これとほかの中心性計算については「TinkerPop Recipes」を参照のこと) によりすべての頂点の度数中心性を計算できます。

g.V().group().by().by(inE().count())

しかし、数千、さらには数百万の頂点を含む大きなグラフの場合、この計算が完了するには長い時間がかかります。同じことが、結果を計算するためにグラフのすべてまたは大部分に触れる必要があるその他の多くの演算にも当てはまります。集計またはグラフメトリックを計算するための演算については、多くの場合、単一の長時間かけて実行される「graph global」クエリを、イベントごとに結果の一部を再計算または入力するイベント指向のソリューションに置き換えることができます。これは頂点またはエッジの追加または削除、またはプロパティ値の変更などによってトリガーされます。これがここで採用したソリューションであり、Neptune Streams は、各イベントに含まれるターゲット頂点の度数中心性の再計算をトリガーする一連のイベントを提供します。

この結果をキャッシュに保存することにより、必要な集計、測定、またはメトリックに直ちにアクセスできるようになります。サンプルアプリケーションを使用すると、ソーシャルネットワークの最も人気のあるトップ 3 のメンバーを照会し、グラフのサイズに関係なく、ミリ秒の応答時間で、中心性測定などを含む詳細を返すことができます。

ソリューションの概要

このブログの記事で紹介されているソリューションは、以下のリソースを作成します。

  • 3 つのプライベートサブネットと 3 つのパブリックサブネットをもつ Neptune VPC
  • 適切なサブネット、パラメータ、およびセキュリティグループを持つ、単一の r4.xlarge インスタンスを含む Neptune クラスター
  • Neptune にデータを書き込むための Neptune ワークロード Lambda 関数
  • 適切なサブネットグループを持つ Redis クラスター用 ElastiCache
  • Lambda、DynamoDB、およびステップ関数ベースの DynamoDB および CloudWatch VPC エンドポイントを使用したポーリングフレームワーク
  • Neptune および ElastiCache クエリコンテンツを含む Amazon SageMaker Jupyter ノートブックインスタンス

  1. Workload Manager AWS Lambda 機能は、Neptune Workload Lambda 関数を開始し、停止します。
  2. Neptune Workload Lambda 関数は、Amazon Neptune データベースに頂点とエッジを書き込み、ソーシャルネットワークの成長をシミュレーションします。
  3. グラフデータの変更は、Neptune ストリームに発行されます。
  4. ポーリングフレームワークの Lambda 関数によりホストされる Stream Handler は、変更記録のバッチに対して Neptune ストリームをポーリングします。それぞれの新しい頂点に対して、Amazon ElastiCache クラスターの頂点数を更新します。それぞれの新しいエッジに対して、Neptune をクエリして、エッジのターゲット頂点の度数中心性を再計算し、その結果を ElastiCache クラスターに発行します。
  5. Amazon SageMaker がホストする Jupyter ノートブックを使用して、ElastiCache と Neptune クラスターにクエリして、頂点カウントチを取得し、頂点の詳細を最高度の基数で見直します。

グレイのボックスのコンポーネントは、ソリューションのコアを表します。グレイのボックスの外にあるコンポーネントは、デモの目的でデータジェネレーターおよびデータクライアントとして機能します。AWS Step Functions ワークフローや Amazon DynamoDB 表などの基礎のポーリングフレームワークコンポーネントが、ダイアグラムから省略されています。

Amazon ElastiCache for Redis は、中心性の計算結果をキャッシュするために選択されました。これらはメモリ内のデータストアであり、読み取りと書き込みの両方でサブミリ秒のレイテンシを提供するためです。フルマネージドサービスとして、一般的な管理タスクを自動化し、高可用性を実現し、アプリケーションの成長に合わせて簡単に拡張できます。ElastiCache は Redis と Memcached 互換エンジンの両方を提供します。Redis エンジンはソートされたセットを提供します。これを使用して、中心性に基づいた高速ランキングを提供するため、このソリューションに選択されました。

ソリューションを起動する

次の表で、Launch Stack ボタンの1つを選択して、AWS CloudFormation コンソールからソリューション スタックを起動します。AWS CloudFormation が IAM リソースを作成することを確認し、Create を選択します。スタックを作成するのに約 20 分かかります。

リージョン 表示 起動
米国東部 (バージニア北部) 表示
米国東部 (オハイオ) 表示
米国西部 (オレゴン) 表示
アジアパシフィック (ムンバイ) 表示
アジアパシフィック (ソウル) 表示
アジアパシフィック (シンガポール) 表示
アジアパシフィック (シドニー) 表示
アジアパシフィック (東京) 表示
欧州 (フランクフルト) 表示
欧州 (アイルランド) 表示
欧州 (ロンドン) 表示
欧州 (ストックホルム) 表示

このスタックは 3 つの出力パラメーターを作成します。

  • SageMakerNotebook – ElastiCache と Neptune クラスターをクエリするために使用できる、Amazon SageMaker がホストした Jupyter ノートブックにリンクされます。
  • StartWorkload – AWS Lambda ベースの Neptune ワークロードを開始するために CLI コマンドです
  • StopWorkoad – Neptune ワークロードを停止するために使用できる CLI コマンドです

Jupyter ノートブックを開く

[SageMakerNotebook] リンクをクリックします。Jupyter ウィンドウで、Neptune/neptune-streams ディレクトリを開きます。このディレクトリには 2 つのノートブックがあります。

  • centrality-counts.ipynb – ElastiCache および Neptune クラスターをクエリして、データが Neptune に保存されているソーシャルネットワークの最も人気のあるトップ 3 のメンバーの詳細を取得します。
  • stream-viewer.ipynb – Neptune ストリームのコンテンツを表形式で示し、ストリームで前後に閲覧できるようにします。

centrality-counts ノートブックを開き、[Cell] ドロップダウンメニューから [Run All] を選択します。Neptune ワークロードをまだ開始していない場合、結果は空になります。 空の結果をもつ centrality-counts ノートブックのプレビューを以下に示します。

Neptune ワークロードを開始する

この次のステップでは、ローカルマシンに AWS コマンドラインインターフェイス (CLI) をインストールしていることを想定しています。

ターミナルを開き、CloudFormation 出力から StartWorkload CLI コマンドを実行します。これは、Amazon Neptune データベースに頂点とエッジを書き込み、ソーシャルネットワークの成長をシミュレーションする Neptune Workload Lambda 関数をトリガーします。ワークロードは最初にネットワークに 1000 個の頂点をシードし、その後、頂点とエッジの混合を作成します。

数秒待ってから (ステップ関数ワークフローは現在、ワークロードを開始する直前の期間にストリームレコードが見つからなかったため、アイドル状態になっている可能性があります)、centrality-counts ノートブックを再実行します。今回、ネットワークの最も人気のあるメンバーのいくつかの中心性の詳細が表示されます。ネットワークの最も人気のあるメンバーを含む centrality-counts ノートブックのプレビューを以下に示します。

Neptune ストリームコンテンツを表示する

stream-viewer ノートブックを開き、[Cell] ドロップダウンメニューから [Run All] を選択します。ノートブックには、Neptune ストリームの先頭から 10 件のレコードが表示されます。スライダーを使用して、ビューアをストリーム内の別の場所に配置し、[Next] ボタンを使用して次の 10 件レコードに進みます。 ストリームビューアのプレビューを以下に示します。

コードの概要

demono Neptune ストリームハンドラー関数のコードは、neptune-streams-demo パッケージの neptune_stream_handler.py にあります。このパッケージにはまた、Workload Manager と Neptune Workload Lambda 関数のコードも含まれます。

neptune_stream_handler.py には、次の 3 つのクラスが含まれます。VertexMetricsVertexMetricsService、および NeptuneStreamHandler です。

VertexMetrics は Redis クライアントを使用して ElastiCache の頂点メトリクスを保存します。これには頂点数と度数中心性の値が含まれます。

class VertexMetrics:
    
    def __init__(self, elasticache_endpoint):
        self.ec = redis.StrictRedis(host=elasticache_endpoint, port=6379)
        
    def most_popular_vertices(self):
        return self.ec.zrevrange('degree_centrality', 0, 2, withscores=True)
        
    def vertex_count(self):
        return self.ec.get('vertex_count')
        
    def set_vertex_count(self, count):
        self.ec.set('vertex_count', count)
        
    def increment_vertex_count(self):
        self.ec.incr('vertex_count')
        
    def decrement_vertex_count(self):
        self.ec.decr('vertex_count')
        
    def update_degree_centrality(self, v_id, centrality):
        self.ec.zadd('degree_centrality', {v_id:centrality})

VertexMetricsService は Gremlin クライアントを使用し、Neptune をクエリして頂点の度数中心性を計算し、VertexMetrics インスタンスを ElastiCache の頂点のメトリクスを計算した度数中心性チで更新するために使用します。

class VertexMetricsService:
    
    def __init__(self, neptune_endpoint, elasticache_endpoint):
        GremlinUtils.init_statics(globals())
        gremlin_utils = GremlinUtils(Endpoints(neptune_endpoint=neptune_endpoint))
        self.vertext_metrics = VertexMetrics(elasticache_endpoint)
        self.neptune_connection = gremlin_utils.remote_connection()
        self.g = gremlin_utils.traversal_source(connection=self.neptune_connection)
        
    def __init_vertex_count(self):
        count = self.g.V().count().next()
        self.vertext_metrics.set_vertex_count(count)

    def __increment_vertex_count(self):
        if self.vertext_metrics.vertex_count() is None:
            self.__init_vertex_count()
        self.vertext_metrics.increment_vertex_count()
        
    def __decrement_vertex_count(self):
        if self.vertext_metrics.vertex_count() is None:
            self.__init_vertex_count()
        self.vertext_metrics.decrement_vertex_count()
        
    def __update_degree_centrality(self, v_id):
        centrality = self.g.V(v_id).inE().count().next()
        self.vertext_metrics.update_degree_centrality(v_id, centrality)
        
    def handle_event(self, op, data):
        
        type = data['type']

        if op == ADD_OPERATION:
            if type == 'vl':
                self.__increment_vertex_count()
            if type == 'e':
                self.__update_degree_centrality(data['to'])
                
        if op == REMOVE_OPERATION:
            if type == 'vl':
                self.__decrement_vertex_count()
            if type == 'e':
                self.__update_degree_centrality(data['to'])
            
        
    def close(self):
        self.neptune_connection.close()

VertexMetricService.handle_event() メソッドは Neptune ストリームの各レコードに対してコールされます。レコードが頂点を追加または削除する演算を記述する場合、サービスはそれぞれ、 VertexMetrics のカウントを増分または減少させます。レコードがエッジを追加または削除させる演算を記述する場合、サービスは Neptune にクエリを返すことでエッジの「out」または「to」頂点の度数中心性を再計算します。このメソッドは、ほかのすべての演算を無視します。

ここでの最後のクラス NeptuneStreamHandler には、次のポーリングフレームワークのホスト Lambda 関数により呼び出される handle_records() ハンドラーメソッドが含まれます。

class NeptuneStreamHandler(AbstractHandler):

    def handle_records(self, stream_log):
        
        params = json.loads(os.environ['AdditionalParams'])
        svc = VertexMetricsService(
            params['neptune_endpoint'],
            params['elasticache_endpoint'])
        
        records = stream_log[RECORDS_STR]
        
        try:
            for record in records:
            
                svc.handle_event(record[OPERATION_STR], record[DATA_STR])
                yield HandlerResponse(
                    record[EVENT_ID_STR][OP_NUM_STR],
                    record[EVENT_ID_STR][COMMIT_NUM_STR],
                    1)
            
        except Exception as e:
            logger.error('Error occurred - {}'.format(str(e)))
            raise e
        finally:
            svc.close()

このハンドラーメソッドは、AdditionalParams 環境変数から取得される Neptune と ElastiCache エンドポイントを渡す VertexMetricsService を構築します。AdditionalParams を使用して、カスタム構成情報をハンドラーに渡す方法をこの投稿で後ほど説明します。このハンドラーはその後、ストリームからレコードのバッチを取り出し、各レコードに対して VertexMetricService.handle_event() を呼び出します。

レコードが処理されるたびに、ハンドラーは演算番号を含む HandlerResponse、演算が属するコミット、前回ハンドラーが応答を返してから処理されたレコードの数を返します。このフレームワークは次にこの応答を使用して、そのリースを更新し、その結果次のポーリングでそれを確認することで、ハンドラーに指定したレコードは正常に処理されたばかりのものから後に続きます。

独自のポーリングアプリケーションを作成する

ポーリングアプリケーションを作成するには、ストリームハンドラー を作成し、それを S3 に発行してから、事前定義された CloudFormation テンプレートを使用してポーリングワークフローを作成します。この CloudFormation テンプレートは、Lambda ポーリングフレームワークを作成します。このフレームワークはホスト Lambda 関数を使用してストリームハンドラーを実行し、Step Functions と DynamoDB ベースのワークフローを使用して、ストリーム処理をスケジュールし、チェックポイントとなります。

Lambda ポーリングフレームワークは現在、Python 3.6 で書かれたストリームハンドラー関数のみをサポートします。

ポーリングフレームワークで使用されるストリームハンドラーをオーサリングすることは、Lambda 関数をオーサリングすることに似ています。ストリームハンドラークラスは、ポーリングフレームワークの AbstractHandler から継承し、handle_records() メソッドを実装しなければなりません。以下のコードは、Neptune ストリームの各レコードをログする単純なハンドラーを示します。

import lambda_function
import logging
from commons import *
from handler import AbstractHandler, HandlerResponse

logger = logging.getLogger('MyHandler')
logger.setLevel(logging.INFO)

class MyHandler(AbstractHandler):

  def handle_records(self, stream_log):
        
    records = stream_log[RECORDS_STR]
    
    last_op_num = None
    last_commit_num = None    
    count = 0
    
    try:
      for record in records:
      
        # Handle record
        logger.info(record)
      
        # Update local checkpoint info
        last_op_num = record[EVENT_ID_STR][OP_NUM_STR]
        last_commit_num = record[EVENT_ID_STR][COMMIT_NUM_STR]
      
        count += 1
      
    finally:

      try:   
        yield HandlerResponse(last_op_num, last_commit_num, count)     
      except Exception as e:
        logger.error('Error occurred - {}'.format(str(e)))
        raise e

ポーリングフレームワークのホスト Lambda 関数がハンドラーを実行するとき、関数はハンドラーに StreamRecordsBatchSize CloudFormation パラメーターで指定された番号までストリームレコードのバッチを渡します。以下に CloudFormation パラメーターの詳細について示します。

チェックポイントを作成する

handle_records() メソッドにはハンドラーをストリームレコードのバッチにわたすために使用されるフレームワークである stream_log パラメーターがあります。handle_records() メソッドの中から、正常に処理された最後のレコードの詳細 (その演算とコミットされた数字) とハンドラーが前回応答を返してから処理されたレコード数を含む 1 つまたは複数の HandlerResponse オブジェクトを返すことが期待されます。

上記の例では、バッチのすべてのレコードを処理した後で応答を返します。以前の頂点メトリクスの例では、各レコードの後で応答を返しました。デモコードには、finally() ブロックが含まれ、ハンドラーがレコードの処理中に失敗した場合、ハンドラーが確実に引き続き正常に処理された最後のレコードの詳細を示す応答を返すようにします。結果として、レコード処理の失敗の後で、フレームワークが次にハンドラーを呼び出したときに、ハンドラーは前の呼び出しを終了させたその失敗したレコードから開始されます。

追加パラメーター

多くのハンドラーでは、正しく機能するために、追加の構成が必要になります。たとえば、私たちのデモハンドラーの場合、Neptune をクエリしてそれぞれのエッジのターゲット頂点の度数中心性を計算し、その結果を ElastiCache インスタンスに保存しますが、Neptune と ElastiCache クラスターの両方に対して、エンドポイント情報を必要とします。

ハンドラーには、AdditionalParams CloudFormation パラメーターを通じて追加のパラメーターを指定できます。CloudFormation テンプレートに指定するストリング値は、AdditionalParams 環境変数経由でハンドラーからアクセスできます。

デモアプリケーションの場合、JSON オブジェクトとして Neptune および ElastiCache エンドポイントの値を指定するために選択しました。

{
  "elasticache_endpoint": "neptune-streams-xxx.euw2.cache.amazonaws.com",
  "neptune_endpoint": "neptune-streams-xxx.eu-west-2.neptune.amazonaws.com"
}

ハンドラーを発行する

ストリームハンドラーの作成を終了したら、ZIP アーカイブの形式でデプロイパッケージを作成し、S3 にそれをアップロードしてください。

CloudFormation

ストリームハンドラーをインストールし、実行するためには、ポーリングフレームワークのルート CloudFormation テンプレートを使用して CloudFormation スタックを作成します。

リージョン 表示 起動
米国東部 (バージニア北部) 表示
米国東部 (オハイオ) 表示
米国西部 (オレゴン) 表示
アジアパシフィック (ムンバイ) 表示
アジアパシフィック (ソウル) 表示
アジアパシフィック (シンガポール) 表示
アジアパシフィック (シドニー) 表示
アジアパシフィック (東京) 表示
欧州 (フランクフルト) 表示
欧州 (アイルランド) 表示
欧州 (ロンドン) 表示
欧州 (ストックホルム) 表示

テンプレートには 20 以上のパラメーターがあり、それは多様なユースケースに対して構成できます。一部のパラメーターはオプションで、いくつかにはデフォルト値がありますが、環境に特有の値をいくつか入力する必要があります。

環境

  • ApplicationName を指定します。これは、CloudFormation テンプレートにより作成されたリソースを参照するために使用されます。デモアプリケーションは、vertex-metrics と呼ばれます。
  • Lambda ポーリングフレームワークが実行される VPC のネットワーキングの詳細を指定します。これは、通常、Neptune VPC と同じ VPC です。必要なネットワーキングパラメーターには、VPCSubnetIdsSecurityGroupIds、および RouteTableIds があります。

Neptune

  • NeptuneStreamEndpoint を指定します。これは、https://<cluster>:<port>/gremlin/stream または https://<cluster>:<port>/sparql/stream の形式になります。
  • ソース Neptune データベースで IAM database authentication が有効の場合、IAMAuthEnabledOnSourceStreamtrue に設定し、StreamDBClusterResourceId を指定します。

ハンドラー

  • ハンドラーパッケージには、LambdaS3Bucket および LambdaS3Key を指定します。
  • StreamRecordsHandler の名前を指定します。デモアプリケーションでは、NeptuneStreamHandler クラスは ZIP アーカイブのルートの neptune_stream_handler ファイルにあります。したがって、StreamRecordsHandler 値は NeptuneStreamHandler です。

残りのハンドラーパラメーターはすべてオプションです。

  • ハンドラーが追加パラメーターを必要とする場合、AdditionalParams ストリング値を指定します。上述のとおり、デモアプリケーションで、Neptune と ElastiCache エンドポイントを含む JSON オブジェクトをハンドラーに渡します。
  • LambdaMemorySizeLambdaRuntimeLambdaLoggingLevel では Lambda ランタイムを設定できるようになります。現在、Lambda ポーリングフレームワークは Python 3.6 のみをサポートしています。デフォルトで、LambdaMemorySize は 128 MB に設定されています。
  • ハンドラーがその他の AWS リソースにアクセスするために追加の許可が必要な場合、追加の IAM マネージドポリシーをホスト Lambda 関数実行ロールにアタッチできます。ManagedPolicies への追加アクセス権を含むポリシーの Amazon リソースネーム (ARN) のカンマ区切りのリストを指定します。
  • StreamRecordsBatchSizeMaxPollingIntervalMaxPollingWaitTime は、バッチごとに取得するレコードの数、秒数で示したホスト Lambda 関数のタイムアウト値、およびストリームの連続するポーリング間の秒数で示した待ち時間をコントロールできます。ホスト Lambda 関数は、実行時間が MaxPollingInterval の約 90% に達するまで、ストリームからのバッチの処理を継続します。ポーリングをもう一度行う前に最大 MaxPollingWaitTime 待ちます。連続ポーリングの場合は、MaxPollingWaitTime0 にセットしてください。

ポーリングフレームワーク

次の残りのすべての CloudFormation パラメーターはオプションです。

  • DDBReadCapacity および DDBWriteCapacity は、チェックポイントを行うために使用される DynamoDB テーブルにプロビジョニングされたスループットをコントロールします。両方のパラメーターのデフォルト値は 5 です。
  • StepFunctionFallbackPeriod および StepFunctionFallbackPeriodUnit は、Step Function の失敗の回復期間をコントロールします。
  • CreateDDBVPCEndPoint および CreateMonitoringEndPoint は、VPC に必要な DynamoDB および CloudWatch エンドポイントを作成します。両方のパラメーターは、デフォルトで true です。エンドポイントがすでに VPC に存在する場合、対応する CloudFormation パラメーターを false にセットします。
  • ポーリングが連続して 3 回以上失敗するたびに通知を受けたい場合は、CreateCloudWatchAlarmtrue にセットして、NotificationEmail を指定します。デフォルトにより、CreateCloudWatchAlarmfalse にセットします。

はじめは多くのパラメーターがあるように見えますが、多くのアプリケーションでは、ApplicationName、VPC、SubnetIds、SecurityGroupIds、RouteTableIds、LambdaS3Bucket、LambdaS3Key、および StreamRecordsHandler の値のみを指定することが必要です。

まとめ

Neptune Streams は、グラフデータで変更が起こったときに容易に反応できるようになります。ストリームハンドラーを使用して、グラフデータベースのイベントに反応して、グラフデータの変更をその他の AWS マネージドサービスに発行することができます。この投稿では、Neptune CloudFormation テンプレートにより頂点メトリクスを計算し、Amazon ElastiCache にその結果を発行するストリームハンドラーをホストするために指定される AWS Lambda ポーリングフレームワークを使用する方法を示しました。独自のハンドラーを作成することにより、イベント指向のグラフアプリケーションを素早く、容易に構築できます。

その他のリソース

CloudFormation テンプレート、Jupyter ノートブック、ストリームハンドラー、および Lambda 関数を含むこのデモのコードは、Amazon Neptune サンプル GitHub リポジトリ からダウンロードできます。

Amazon Neptune リソース ホームページには、ドキュメント、ブログ投稿、ビデオ、コード リポジトリ、およびサンプルとツールへのリンクが含まれています。

データベースの設計を開始する前に、「グラフデータベースを使用するためのAWS リファレンスアーキテクチャ」を参照してください。ここでは、グラフモードとクエリ言語の選択肢を示し、リファレンスのデプロイアーキテクチャの例を参照しています。

 

 


著者について

 

Ian Robinson は、AWS のデータベースサービスカスタマーアドバイザリーチームのメンバーです。彼は「Graph Databases」、「REST in Practice」(両方ともに O’Reilly)と「REST: From Research to Practice」(Springer)と「Service Design Patterns(Addison-Wesley)」の共著者です。