Amazon Web Services ブログ

Amazon Kinesis Data Firehose により、VPC のプライバシー内で Amazon Elasticsearch Service にストリーミングデータを取り込む



今日、新しい Amazon Kinesis Data Firehose 機能を追加します。これにより、Kinesis Data Firehose から Amazon Elasticsearch Service ドメインへの VPC 配信をセットアップできます。Amazon Kinesis Data Streams でカスタムアプリケーションを管理してトラフィックを非公開にしている場合は、Kinesis Data Firehose を使用して、VPC 内の Amazon Elasticsearch Service エンドポイントにデータをロードすることができます。それを行うのに、取り込みと配信のインフラストラクチャに投資、運用、拡張する必要はありません。Kinesis Data Firehose コンソール、AWS CLI、および API からこの新機能の使用を開始するには、宛先として Amazon Elasticsearch Service、VPC がアクセスできる特定のドメインを選択し、サブネットとオプションのセキュリティグループで VPC を設定します。

この機能のご利用前に

Amazon Elasticsearch Service ドメインは、パブリックまたはプライベートエンドポイントを持つことができます。パブリックエンドポイントは、パブリックインターネット上の IP アドレスでバックアップされています。プライベートエンドポイントは、VPC の IP スペース内の IP アドレスでバックアップされています。

Amazon Elasticsearch Service VPC エンドポイントを使用している場合、ストリーミングデータを取り込むには、おそらく Kinesis Data Streams または同様のソリューションを使用することになるでしょう。これは、Amazon Elasticsearch Service VPC ドメインに配信するストリームでカスタムアプリケーションを実行することを意味します。おそらく次のアクションを実行する必要があるでしょう。

  • バッファリングの実装
  • フォーマットの変換
  • 圧縮の実行
  • 変換の適用
  • バックアップの管理
  • 一時的な配信エラーの処理

さらに、このカスタムアプリケーションをビルド、スケーリング、モニタリング、更新、および保守する必要があります。

Amazon Elasticsearch Service VPC エンドポイントへ Kinesis Data Firehose を配信する

Kinesis Data Firehose は、Amazon Elasticsearch Service VPC エンドポイントにデータを配信できるようになりました。この方法により、ストリーミングデータを安全かつ簡単に取り込み、変換し、配信することができます。データの取り込みと配信インフラストラクチャの管理について気をもむ必要はありません。この新機能により、Kinesis Data Firehose は Amazon Elasticsearch Service VPC エンドポイントへ安全な方法で通信できます。VPC 内に存在する Amazon Elasticsearch Service エンドポイントのおかげで、セキュリティがさらに強化されます。

仕組み

Amazon Elasticsearch Service VPC エンドポイントにデータを配信する Kinesis Data Firehose 配信ストリームを作成すると、Kinesis Data Firehose は選択した各サブネットに Elastic Network Interface (ENI) が作成されます。アベイラビリティーゾーンを 1 つのみ使用する場合、Kinesis Data Firehose はエンドポイントを 1 つのサブネットのみに配置します。同様に、Amazon Elasticsearch Service VPC エンドポイントを作成すると、選択したサブネットにエンドポイントが作成されます。Kinesis Data Firehose は ENI を使用して、VPC 内にある Amazon Elasticsearch Service ENI にデータを配信します。次のスクリーンショットは、単一のサブネットを持つ最終的なアーキテクチャの概要を示しています。

このチュートリアルでは、次の 2 つのセキュリティグループがあります。

  • Kinesis Data Firehose エンドポイントの kdf-sec-grp
  • Amazon Elasticsearch Service エンドポイントの es-sec-grp

Kinesis Data Firehose が Amazon Elasticsearch Service VPC エンドポイントにアクセスできるようにするには、セキュリティグループ es-sec-grp が Kinesis Data Firehose が作成した ENI に HTTPS 呼び出しを許可する必要があります。Kinesis Data Firehose は、スループット要件を満たすように ENI を自動的にスケーリングします。Kinesis Data Firehose が ENI をスケーリングするとき、囲んでいるセキュリティグループ kdf-sec-grp の送信ルールがデータストリームを制御します。Amazon Elasticsearch Service セキュリティグループ (es-sec-grp) を設定して、Kinesis Data Firehose セキュリティグループ (kdf-sec-grp) からの HTTPS トラフィックを許可する必要があります。Kinesis Data Firehose セキュリティグループはアウトバウンド HTTPS トラフィックを許可する必要があり、その宛先は Amazon Elasticsearch Service セキュリティグループです。Kinesis Data Firehose VPC 配信では、Firehose セキュリティグループを外部トラフィックに開放する必要はありません。

Kinesis Data Firehose エンドポイントと Amazon Elasticsearch Service エンドポイントに同じセキュリティグループを使用することもできます。両方に同じセキュリティグループを使用する場合は、セキュリティグループの受信ルールが HTTPS トラフィックを許可していることを確認してください。

既存の配信ストリームの場合、宛先エンドポイントを変更できます。新しい宛先は、同じ VPC、サブネット、セキュリティグループ内でアクセスできる必要があります。VPC、サブネット、セキュリティグループのいずれかを変更するには、配信ストリームを再作成する必要があります。

既存のすべての Kinesis Data Firehose 制限がこの機能に適用されます。たとえば、割り当て増加リクエストを送信することで、アカウントごとのデフォルトである 50 配信ストリームを増やすことができます。また、Kinesis Data Firehose は、配信ストリームごとに VPC 宛先サブネットあたり 1 つ以上の ENI を作成します。Kinesis Data Firehose は、実際のスループットに基づいて、必要に応じて ENI の数を自動的にスケーリングします。配信ストリームあたりのデフォルトのスループット制限は 5 MB/秒です (リージョンによって異なります)。サポートケースを送信することにより、この制限の引き上げをリクエストできます。

利用可能な ENI が十分あることを確認してください。デフォルトでは、VPC にはリージョンごとに 5000 ENI の割り当てがあります。詳細については、「Amazon VPC クォータ」を参照してください。

Kinesis Data Firehose のようなマネージドサービスを使用する利点は、基礎的な配管になる点ではなく、データの価値に集中できる点にあります。配信ストリームから Amazon Elasticsearch Service ドメインへのデータ配信の頻度を設定できます。Kinesis Data Firehose は、Amazon ES に配信する前に受信データをバッファーします。Amazon Elasticsearch Service のバッファーサイズ (1 MB〜100 MB) またはバッファーインターバル (60〜900 秒) の値を設定でき、最初に満たされた条件が Amazon Elasticsearch Service へのデータ配信をトリガーします。Amazon Elasticsearch Service の宛先でデータ配信が失敗した場合、配信ストリームを作成するときに 0〜7,200 秒の間で再試行期間を指定できます。Amazon Elasticsearch Service エンドポイントへのデータ配信が失敗した場合、Kinesis Data Firehose は指定された期間、データ配信を再試行します。再試行期間後、Kinesis Data Firehose はデータの現在のバッチをスキップして、次のバッチに移動します。スキップされたドキュメントは、手動のバックフィルに使用できる Amazon S3 バケットの elasticsearch_failed フォルダに移動します。

サイジングの詳細については、「Amazon Elasticsearch Service の使用を開始する: T シャツサイズのドメイン」を参照してください。。

ソリューションの概要

この新機能の使用方法を示すために、この記事では、Kinesis Data Firehose コンソールで利用できるストックデモデータを使用して、VPC内 の Amazon Elasticsearch Service エンドポイントに配信します。次の図は、ワークフローを示しています。

このユースケースは、株価表示データを配信ストリーム (A) に送信するプロデューサーをシミュレートします。AWS Lambda 関数 (B) を使用してタイムスタンプを株式レコードに追加し、Kibana 視覚化を作成できるようにします。Kinesis Data Firehose は、株価レコードを VPC 内の Amazon Elasticsearch Service エンドポイント (C) にストリーミングします。最後に、Kibana (D) を使用してデータを視覚化できます。

この記事では、AWS マネジメントコンソールを使用してこのソリューションを実装していますが、AWS CLI を使用することもできます。

セキュリティグループを作成する

最初に 2 つのセキュリティグループを作成します。1 つは Amazon Elasticsearch Service VPC エンドポイント (es-sec-grp) 用で、もう 1 つは配信ストリーム (kdf-sec-grp) 用です。最初にルールなしでセキュリティグループを作成します。セキュリティグループを作成した後、インバウンドルールとアウトバウンドルールを設定します。次のテーブルは、これらのルールをまとめたものです。

Amazon Elasticsearch Service VPC エンドポイントを作成する

VPC 内に Amazon Elasticsearch Service エンドポイントを作成するには、次の手順を実行します。

  1. Amazon Elasticsearch Service コンソールで、[Create a new Domain] を選択します。
  2. [Deployment Type and Latest Version] では、[Development and Testing] を選択します。
  3. [Next] を選択します。
  4. Amazon Elasticsearch Service エンドポイントに名前を付けます。
  5. インスタンスタイプを選択します。

この記事では、m5.xlarge.elasticsearch を使用します。本番環境では、適切なサイズのインスタンスタイプを選択します。この記事では、ノードの数を 1 のままにしますが、2 に設定するがベストプラクティスです。

  1. ノードあたりの EBS ストレージサイズを 100 GiB に設定します。
  2. 残りの設定はデフォルトのままにして、[Next] を選択します。
  3. Amazon Elasticsearch Service エンドポイントの VPC とプライベートサブネット、および以前に作成した Amazon Elasticsearch Service のセキュリティグループを選択します (es-sec-grp)。
  4. Kibana にアクセスするには、きめ細かいアクセスを選択します。
  5. [Create Master User] を選択します。

この記事では、HTTP 基本認証が有効になっている内部ユーザーデータベースを使用しています。本番環境では、IAM ロールを使用して、適切なきめ細かいアクセスを設定します。詳細については、「Amazon Elasticsearch Service のきめ細かなアクセスコントロール」を参照してください。

  1. [Allow open access to Domain] を選択します。

セキュリティグループは、すでに IP ベースのアクセスポリシーを実施しています。この手順により、VPC 内のリソースへの Amazon Elasticsearch Service エンドポイントへのアクセスが開き、Amazon Elasticsearch Service エンドポイントからインターネットにアクセスできなくなります。Amazon Elasticsearch Service エンドポイントのセキュリティをさらに強化するには、IAM ユーザーまたはロールを指定するアクセスポリシーを使用します。ドメインへのアクセス制御の詳細については、[Amazon Elasticsearch Service での Identity and Access Management] を参照してください。

  1. [Next] を選択します。
  2. 設定を確認し、[Confirm] を選択します。

次のスクリーンショットは、Amazon Elasticsearch Service エンドポイント VPC 設定の様子を例示したものです。

レコード変換用の Lambda 関数を作成する

Lambda 関数を作成して、タイムスタンプをデータフィードに追加します。次の手順を実行します。

  1. Lambda コンソールで、[Create Function] を選択します。
  2. [Author from scratch] を選択します。
  3. 関数に名前を付けます。たとえば、tmakAddTSToStream
  4. ランタイムとして Python 3.7 を選択します。
  5. [Create] をクリックします。

次のコードは Lambda 関数用です (基本設定セクションで、タイムアウトを 3 秒 から 45 秒 に変更します)。

import base64
import json
from datetime import datetime

def lambda_handler(event, context):
    
    send_back = []
    now = datetime.utcnow().isoformat()

    for record in event['records']:
        stock_rec = json.loads(base64.b64decode(record['data']))
        stock_rec["timestamp"] = now
       
        record_w_ts = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(json.dumps(stock_rec).encode('utf-8') + b'\n').decode('utf-8')
        }
        send_back.append(record_w_ts)

    return {'records': send_back} 

Kinesis Data Firehose 配信ストリームを作成する

配信ストリームを作成するには、次の手順を実行します。

  1. Kinesis Data Firehose コンソールの [Data Firehose] で、[Create Delivery Stream] を選択します。
  2. ストリームの名前を入力します。たとえば、tmak-kdf-stock-delivery-stream
  3. ソースには、[Direct PUT or other sources] を選択します。
  4. [Next] を選択します。
  5. [Data transformation] では、[Enabled] を選択します。
  6. 作成した Lambda 関数を選択します。
  7. [Next] を選択します。
  8. 配信ストリームの宛先として Amazon Elasticsearch Service を選択します。
  9. [Index] には、stockdata と入力します。

VPC セクションは自動的に入力されます。必ず Kinesis Data Firehose 用に作成したセキュリティグループ (kdf-sec-grp) を使用してください。

  1. [Backup Mode] では、[Failed records only] を選択します。

既存の S3 バケットを選択するか、新しいバケットを作成できます。次のスクリーンショットは、配信ストリーム設定の例を示しています。

  1. [Next] を選択します。
  2. バッファリング設定を確認し、タグを設定してストリームを識別します。

VPC 宛先に配信する配信ストリームには、ENI を管理し、VPC とサブネットをリストする権限が必要です。コンソールには、必要なすべての権限を含むテンプレートに基づいて新しいロールを作成するオプションがあります。すでに作成している場合は、既存のロールを使用することもできます。

  1. [Next] を選択します。
  2. 設定を確認し、[Create Stream] を選択します。

ストリームのステータスが Active と表示されるまで数分かかる場合があります。次のスクリーンショットを参照してください。

Amazon EC2 コンソールの [Network and Security] で、Kinesis Data Firehose と Amazon ES によって VPC に作成されたエンドポイントを確認できます。次のスクリーンショットを参照してください。

Kinesis Data Firehose 向けに Kibana のきめ細かなアクセスを設定する

Amazon Elasticsearch Service エンドポイントに株式データを配信するには、Kinesis Data Firehose にアクセス許可を付与する必要があります。これは、Kibana コンソールまたは API を介して実行できます。詳細については、Open Distro for Elasticsearch ウェブサイトの API を参照してください。

Amazon Elasticsearch Service エンドポイントへのアクセスを制御する方法の詳細については、「Amazon Elasticsearch Service ドメインへのアクセスを制御する方法」を参照してください。

Amazon Elasticsearch Service エンドポイントは VPC 内にあり、Kibana にアクセスするには、最初に VPC に接続する必要があります。このプロセスはネットワーク設定によって異なりますが、VPN または企業ネットワークへの接続などがあるでしょう。この記事では、VPC のリモートデスクトップ EC2 インスタンスパブリックサブネットを作成します。新しく作成されたセキュリティグループ (rdp-sec-grp) がインスタンスを保護します。es-sec-grp セキュリティグループを変更し、rdp-sec-grp からのインバウンド RDP トラフィックを許可して、Kibana URL にアクセスできるようにすることができます。次の図は、このアーキテクチャを示しています。

Kinesis Data Firehose は、Amazon Elasticsearch Service エンドポイントにデータを送信する前に、配信ロールを使用して HTTP (Signature Version 4) リクエストに署名します。Amazon Elasticsearch Service は、ロール、ユーザー、およびマッピングを使用して、きめ細かなアクセス制御アクセス許可を管理します。このセクションでは、Kinesis Data Firehose のロールを作成し、権限を設定する方法について説明します。

このセクションで作成するロールは、IAM ロールとは異なります。詳細については、「主要な概念」を参照してください。

次の手順を実行します。

  1. Kibana に移動します (Amazon Elasticsearch Service コンソールで URL を確認できます)。
  2. Amazon Elasticsearch Service エンドポイントを作成したときにセットアップしたマスターユーザーとパスワードを入力します。
  3. [Security] で、[Roles] を選択します。
  4. [Add New Role] を選択します。
  5. ロールに名前を付けます。たとえば、firehose-role
  6. クラスターの権限については、cluster_composite_opscluster_monitor を追加します。
  7. [Index permissions] で、[Index Patterns] を選択し、stockdata* と入力します。
  8. [Permissions] で、crudcreate_index、および manage という 3 つのアクショングループを追加します。
  9. [Save Role Definition] を選択します。

次のステップでは、Kinesis Data Firehose が使用する IAM ロールを、作成したロールにマッピングします。

  1. [Security] で、[Role Mappings] を選択します。
  2. 作成したロール (firehose-role) を選択します。
  3. [Backend Roles] では、[Add Backend Role] を選択します。
  4. Kinesis Data Firehose が使用するロールの IAM ARN (arn:aws:iam::123456789012:role/firehose_stream_role_name) を入力します。

配信ストリーム ARN は、Kinesis Data Firehose コンソールで確認できます。

Kinesis Data Firehose を介した株式データのストリーミング

株式データをストリーミングするには、次の手順を実行します。

  1. Kinesis Data Firehose コンソールで、作成したストリームを選択します。
  2. [Test with demo data] を選択します。
  3. [Start sending demo data] を選択します。

すべてが正常に機能している場合、「Demo data is being sent to your delivery stream」というメッセージ が表示されます。数分待ってから、[Stop sending demo data] を選択します。

データの分析と視覚化

データを分析して視覚化するには、次の手順を実行します。

  1. Kibana コンソールで、[Management] を選択します。
  2. [Index patterns] を選択します。
  3. [Index pattern] には、[stockdata*] と入力します。
  4. [Next] を選択します。
  5. [Time ]フィルターフィールドで、[timestamp] を選択します。
  6. [Visualize] を選択します。
  7. 新しい視覚化を作成し、[Line] を選択します。
  8. [Index pattern] では、[stockdata*] を選択します。
  9. [Y-Axis] では、Aggregation=AverageField=price を選択します。
  10. [X-Axis] では、[Aggregation=Data Histogram]、[Field=timestamp]、[Interval=seconds] を選択します。
  11. [X-Axis] では、[Add Sub-buckets] を選択します。
  12. [Split Series] を選択します。
  13. Sub-Aggregation=TermsField=ticker_symbol.keyword を設定します。
  14. [Apply Changes] を選択します。

次のスクリーンショットは、視覚化の例を示しています。

生データを表示するには、Kibana ダッシュボードで [Discover] を選択します。次のスクリーンショットを参照してください。

まとめ

この記事では、Kinesis Data Firehose を使って Amazon Elasticsearch Service エンドポイントを VPC 内に移動する方法を示しました。さらに、Amazon Elasticsearch Service エンドポイントへのパブリックアクセスを有効にして保護する必要はありません。Amazon Elasticsearch Service エンドポイントをインターネットに公開することに消極的だったけれども、データをストリーミングしたい場合は、Kinesis Data Firehose を使用して行うことができるようになりました。

 


著者について

Tarik Makota は、アマゾン ウェブ サービスのプリンシパルソリューションアーキテクトです。彼は、米国北東部の AWS のお客様に技術的ガイダンス、設計アドバイス、および思想的リーダーシップを提供しています。彼はロチェスター工科大学でソフトウェア開発および管理の修士号を取得しています。