Kinesis Data Firehose から Amazon OpenSearch Service へのクロスアカウントストリーミングをセットアップするにはどうすればよいですか?

所要時間5分
0

別のアカウントにある Amazon OpenSearch Service クラスターにデータを送信する Amazon Kinesis Data Firehose ストリームをセットアップしたいと考えています。

簡単な説明

Kinesis Data Firehose とその依存関係 (Amazon Simple Storage Service (Amazon S3) や Amazon CloudWatch など) を設定して、異なるアカウント間でストリーミングを行えるようにします。ストリーミングデータ配信は、きめ細かなアクセス制御 (FGAC) が有効になっているかどうかにかかわらず、パブリックにアクセスできる OpenSearch Service クラスターに対して機能します。

OpenSearch Service クラスターにデータを送信するように Kinesis Data Firehose ストリームを設定するには、以下の手順を実行します。

  1. アカウント A で Amazon S3 バケットを作成します。
  2. アカウント A で CloudWatch ロググループとログストリームを作成します。
  3. アカウント A で Kinesis Data Firehose ロールとポリシーを作成します。
  4. アカウント A で Kinesis Data Firehose ロールがデータをストリーミングする先のアカウント B でパブリックにアクセス可能な OpenSearch Service クラスターを作成します。
  5. (オプション) FGAC が有効になっている場合は、OpenSearch Dashboards にログインし、ロールマッピングを追加します。
  6. アカウント A で Kinesis Data Firehose ロールの AWS Identity and Access Management (IAM) ロールポリシーを更新して、アカウント B にデータを送信します。
  7. アカウント A で Kinesis Data Firehose ストリームを作成します。
  8. OpenSearch Service クラスターへのクロスアカウントストリーミングをテストします。

解決策

アカウント A で Amazon S3 バケットを作成する

アカウント A で S3 バケットを作成します。Amazon S3 バケットは Amazon リソースネーム (ARN) を生成します。

**注:**この Amazon S3 バケットからのレコードを取得および保存するためのアクセス許可を、後ほど Kinesis Data Firehose に付与する際に、この ARN を完全な形で使用します。

アカウント A で CloudWatch ロググループとログストリームを作成する

次の手順を実行して、CloudWatch ロググループを作成します。

  1. CloudWatch コンソールを開きます。
  2. ナビゲーションペインで [ログ] を選択し、次に [ロググループ] を選択します。
  3. [ロググループの作成] を選択します。
  4. [ログストリーム名] に名前を入力します。
  5. [ロググループの作成] ボタンをクリックして、新しいロググループを保存します。
  6. 新しく作成したロググループを検索して選択します。

以下のステップを実行して、Amazon CloudWatch ログストリームを作成します。

  1. [ログストリームの作成] を選択します。
  2. [ログストリーム名] に名前を入力します。
  3. [ログストリームの作成] を選択します。
    **重要:**CloudWatch ロググループ名と CloudWatch ログストリーム名は、Kinesis Data Firehose ロールポリシーを作成する際に必要となります。

アカウント A で Kinesis Data Firehose ロールとポリシーを作成する

  1. AWS Identity and Access Management (IAM) コンソールを開きます。
  2. Kinesis Data Firehose に以下の操作を許可する IAM ポリシーを作成します。
    ストリームログを CloudWatch に保存する
    Amazon S3 に記録する
    OpenSearch Service クラスターにデータをストリームする

例:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:AbortMultipartUpload",
        "s3:GetBucketLocation",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:ListBucketMultipartUploads",
        "s3:PutObject",
        "s3:PutObjectAcl"
      ],
      "Resource": [
        "<Bucket ARN>",
        "<Bucket ARN>/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<account-id>:log-group:/aws/kinesisfirehose/<Firehose Name>:log-stream:*"
      ]
    }
  ]
}

**注:**後で OpenSearch Service クラスターポリシーにストリーミングするためのアクセス許可を追加します。ただし、最初にアカウント B でクラスターを作成する必要があります。

  1. 作成したポリシーを保存します。
  2. [ロールを作成] をクリックします。
  3. 作成したポリシーを Kinesis Data Firehose ロールに追加します。

アカウント A で Kinesis Data Firehose ロールがデータをストリーミングできるよう、アカウント B でパブリックにアクセス可能な OpenSearch Service クラスターを作成する

  1. アカウント B でパブリックにアクセス可能な OpenSearch Service クラスターを作成します。
  2. OpenSearch Service ドメイン ARN を記録します。後の手順で ARN を使用します。
  3. クラスターのセキュリティを設定します。
    **重要:**アカウント A で Kinesis Data Firehose ロールが OpenSearch Service クラスターにストリーミングできるようにするには、OpenSearch Service のセキュリティの設定を行う必要があります。

次の手順を実行して、セキュリティの設定を行います。

  1. OpenSearch Serviceで、[アクセスポリシー] に移動します。

  2. JSON 定義のアクセスポリシーを選択します。ポリシーには、次のアクセス許可が必要です。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "AWS": "*"
          },
          "Action": "es:*",
          "Resource": "<ES Domain ARN in Account B>/*",
          "Condition": {
            "IpAddress": {
              "aws:SourceIp": "<Your IP Address for OpenSearch Dashboards access>"
            }
          }
        },
        {
          "Effect": "Allow",
          "Principal": {
            "AWS": "<Firehose Role ARN in Account A>"
          },
          "Action": [
            "es:ESHttpPost",
            "es:ESHttpPut"
          ],
          "Resource": [
            "<ES Domain ARN in Account B>",
            "<ES Domain ARN in Account B>/*"
          ]
        },
        {
          "Effect": "Allow",
          "Principal": {
            "AWS": "<Firehose Role ARN in Account A>"
          },
          "Action": "es:ESHttpGet",
          "Resource": [
            "<ES Domain ARN in Account B>/_all/_settings",
            "<ES Domain ARN in Account B>/_cluster/stats",
            "<ES Domain ARN in Account B>/index-name*/_mapping/type-name",
            "<ES Domain ARN in Account B>/roletest*/_mapping/roletest",
            "<ES Domain ARN in Account B>/_nodes",
            "<ES Domain ARN in Account B>/_nodes/stats",
            "<ES Domain ARN in Account B>/_nodes/*/stats",
            "<ES Domain ARN in Account B>/_stats",
            "<ES Domain ARN in Account B>/index-name*/_stats",
            "<ES Domain ARN in Account B>/roletest*/_stats"
          ]
        }
      ]
    }

    OpenSearch Service ポリシーにおけるアクセス許可の詳細については、「サービス送信先への OpenSearch クロスアカウント配信」を参照してください。

  3. (オプション) クラスターで FGAC が有効になっている場合は、OpenSearch Dashboards にログインし、ロールマッピングを追加します。ロールマッピングにより、Kinesis Data Firehose ロールが OpenSearch Service にリクエストを送信できるようになります。

以下の手順を実行して、OpenSearch Dashboards にログインしてロールマッピングを追加します。

  1. ダッシュボードを開きます。
  2. [セキュリティ] タブを選択します。
  3. [ロール] を選択します。
  4. [all_access] ロールを選択します。
  5. [マッピングされたユーザー] タブを選択します。
  6. [マッピングを管理] を選択します。
  7. [バックエンドロール] セクションで、Kinesis Data Firehose ロールを入力します。
  8. [マップ] を選択します。

アカウント A で Kinesis Data Firehose ロールの IAM ロールポリシーを更新して、アカウント B にデータを送信する

アカウント A の Kinesis Data Firehose ロールからアカウント B の OpenSearch Service クラスターにデータを送信するには、次のようにして Kinesis Data Firehose ポリシーを更新します。

例:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:AbortMultipartUpload",
        "s3:GetBucketLocation",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:ListBucketMultipartUploads",
        "s3:PutObject",
        "s3:PutObjectAcl"
      ],
      "Resource": [
        "<Bucket ARN>",
        "<Bucket ARN>/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<account-id>:log-group:/aws/kinesisfirehose/<Firehose Name>:log-stream:*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut",
        "es:DescribeDomain",
        "es:DescribeDomains",
        "es:DescribeDomainConfig"
      ],
      "Resource": [
        "<Domain ARN in Account B>",
        "<Domain ARN in Account B>/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpGet"
      ],
      "Resource": [
        "<Domain ARN in Account B>/_all/_settings",
        "<Domain ARN in Account B>/_cluster/stats",
        "<Domain ARN in Account B>/index-name*/_mapping/superstore",
        "<Domain ARN in Account B>/_nodes",
        "<Domain ARN in Account B>/_nodes/stats",
        "<Domain ARN in Account B>/_nodes/*/stats",
        "<Domain ARN in Account B>/_stats",
        "<Domain ARN in Account B>/index-name*/_stats"
      ]
    }
  ]
}

詳細については、「Amazon Data Firehose にパブリック OpenSearch サービスの送信先へのアクセス権を付与する」を参照してください。

アカウント A で Kinesis Data Firehose ストリームを作成する

OpenSearch Service クラスターへのクロスアカウントアクセスを用いて Kinesis Data Firehose ストリームを作成するには、AWS コマンドラインインターフェイス (AWS CLI) を使用して設定します。

AWS CLI が最新であることを確認してください。

aws --version

注: AWS CLI コマンドの実行時にエラーが発生する場合は、「AWS CLI エラーのトラブルシューティング」を参照してください。また、AWS CLI の最新バージョンを使用していることを確認してください。

次の内容を含む input.json という名前のファイルを作成します。

{
  "DeliveryStreamName": "<Firehose Name>",
  "DeliveryStreamType": "DirectPut",
  "ElasticsearchDestinationConfiguration": {
    "RoleARN": "",
    "ClusterEndpoint": "",
    "IndexName": "local",
    "TypeName": "TypeName",
    "IndexRotationPeriod": "OneDay",
    "BufferingHints": {
      "IntervalInSeconds": 60,
      "SizeInMBs": 50
    },
    "RetryOptions": {
      "DurationInSeconds": 60
    },
    "S3BackupMode": "FailedDocumentsOnly",
    "S3Configuration": {
      "RoleARN": "",
      "BucketARN": "",
      "Prefix": "",
      "BufferingHints": {
        "SizeInMBs": 128,
        "IntervalInSeconds": 128
      },
      "CompressionFormat": "UNCOMPRESSED",
      "CloudWatchLoggingOptions": {
        "Enabled": true,
        "LogGroupName": "/aws/kinesisfirehose/<Firehose Name>",
        "LogStreamName": "S3Delivery"
      }
    },
    "CloudWatchLoggingOptions": {
      "Enabled": true,
      "LogGroupName": "/aws/kinesisfirehose/<Firehose Name>",
      "LogStreamName": "ElasticsearchDelivery"
    }
  }
}

ClusterEndpoint 属性フィールドに、エンドポイントの値が正しく入力されていることを確認します。

**注:**Elasticsearch バージョン 7.x ではタイプが非推奨となっているため、input.json ファイルから TypeName 属性を削除する必要があります。

その後、input.json ファイルが置かれているのと同じディレクトリで、次のコマンドを実行します。

aws firehose create-delivery-stream --cli-input-json file://input.json

このコマンド構文は、アカウント B の OpenSearch Service クラスターへの送信先を使用して、アカウント A で Kinesis Data Firehose ストリームを作成します。

OpenSearch Service クラスターへのクロスアカウントストリーミングをテストする

Kinesis Data Generator (KDG) を使用して、アカウント A で Kinesis Data Firehose ストリームにレコードをストリーミングします。

KDG は毎秒大量のレコードを生成します。この生産性レベルにより、レコード構造のマッピングについて正しく判断するのに十分なデータポイントを OpenSearch Service に送ることができます。

Kinesis Data Generator で使用されるテンプレート構造は次のとおりです。

{
    "device_id": {{random.number(5)}},
    "device_owner": "{{name.firstName}}  {{name.lastName}}",
    "temperature": {{random.number(
        {
            "min":10,
            "max":150
        }
    )}},
    "timestamp": "{{date.now("DD/MMM/YYYY:HH:mm:ss Z")}}"
}

クロスアカウントストリーミングが成功したかどうかは、クラスターの [インデックス] タブの下のインデックスエントリで確認できます。ここでは、現在の日付で接頭辞「local」を使用してインデックス名があるかどうかをチェックします。これは、OpenSearch Dashboards にレコードが存在するかどうかをチェックしても確認できます。

注: OpenSearch Service が正しいマッピングについて判断するには数分かかります。

関連情報

Creating an Amazon Kinesis Data Firehose delivery stream

Writing to Kinesis Data Firehose using Kinesis Data Streams

AWS公式
AWS公式更新しました 5ヶ月前
コメントはありません

関連するコンテンツ