Kinesis Data Firehose から Amazon Elasticsearch Service へのクロスアカウントストリーミングをセットアップするにはどうすればよいですか?

最終更新日: 2020 年 4 月 15 日

別アカウントにある Amazon Elasticsearch Service (Amazon ES) クラスターに、データを送信する Amazon Kinesis Data Firehose ストリームをセットアップしたいと考えています。異なるアカウント間でデータリソースをストリーミングするにはどうすればよいですか? 

簡単な説明

異なるアカウント間でのストリーミングを行うように、Kinesis Data Firehose と、それに依存関係がある Amazon Simple Storage Service (Amazon S3) や Amazon CloudWatch などを設定できます。ストリーミングデータ配信は、Amazon ES クラスターがパブリックにアクセス可能であり、 ノード間の暗号化 が無効になっている場合にのみ実行できます。ノード間の暗号化 機能を無効にするには、 きめ細かなアクセスコントロールの有効化 設定を選択解除する必要があります。

Amazon ES クラスターにデータを送信するように Data Firehose ストリームを設定するには、以下の手順を実行します。

1.    アカウント A で Amazon S3 バケットを作成します。

2.    アカウント A で CloudWatch ロググループとログストリームを作成します。

3.    アカウント A で Data Firehose ロールとポリシーを作成します。

4.    アカウント B で Elasticsearch クラスターを作成します。次に、アカウント A の Data Firehose ロールからのデータを許可するポリシーを適用します。

5.    アカウント A で Data Firehose ロールのポリシーを修正し、アカウント B にある Elasticsearch クラスターにデータを送信できるようにします。

6.    アカウント A で Data Firehose を作成します。

7.    クロスアカウントストリームをテストします。

解決方法

アカウント A で Amazon S3 バケットを作成する

アカウント A で S3 バケットを作成します。S3 バケットは Amazon リソースネーム (ARN) を生成します。

注: この S3 バケットからのレコードを取得および保存するためのアクセス許可を、後ほど Data Firehose に付与する際に、この ARN を完全な形で使用します。

アカウント A で CloudWatch ロググループとログストリームを作成する

CloudWatch ロググループを作成するには、次の手順を実行します。

1.    CloudWatch コンソールを開きます。

2.    ナビゲーション ペインで、[ロググループ] をクリックします。

3.    [アクション] をクリックします。

4.    [ロググループの作成] を選択します。

5.    [ロググループ名] に名前を入力します。

6.    [ロググループを作成] ボタンをクリックして、新しいロググループを保存します。

7.    新しく作成したロググループを検索し、選択します。このタスクを行うことで、ログストリームが作成可能になったことが確認されます。

次の手順を実行し、CloudWatch ログストリームを作成します。

1.    [ログストリームの作成] をクリックします。

2.    [ログストリーム名] に名前を入力します。

3.    [ログストリームの作成] をクリックします。このアクションにより、新しく作成したログストリームが保存されます。

重要: CloudWatch ロググループ名と CloudWatch ログストリーム名は、Data Firehose ロールポリシーを作成する際に必要となります。

アカウント A で Data Firehose ロールとポリシーを作成する

1.    AWS Identity and Access Management (IAM) コンソールを開きます。

2.    Data Firehose に以下を許可する IAM ポリシーを作成します: ストリームログの CloudWatch への保存、レコードの S3 への保存、データストリームの Elasticsearch クラスターへの保存。

{
    "Version": "2012-10-17",
    "Statement": [
        {          
            "Effect": "Allow",    
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
    "Resource": [
                "<Bucket ARN>",
                "<Bucket ARN>/*"    
            ]
        },
        {    
            "Effect": "Allow",
                "Action": [
                "logs:PutLogEvents"
            ],           
                "Resource": [
                "arn:aws:logs:<region>:<account-id>:log-group:<log-group-name>:log-stream:<log-stream-name>"           
            ]
        }
    ]
}

注: アカウント B のクラスターを先に作成する必要があるため、Elasticsearch クラスターポリシーのストリーミングに必要なアクセス権限は後で追加します。

3.    作成したポリシーを保存します。

4.    [ロールの作成] をクリックします。

5.    新しく作成したポリシーを Kinesis Data Firehose ロールに追加します。

アカウント B で Elasticsearch クラスターを作成し、アカウント A の Data Firehose ロールからのデータを許可するポリシーを適用する

[パブリックにアクセス可能] 設定を有効にしながら Elasticsearch クラスターを作成します。また、アカウント B の [ノード間の暗号化] 設定が無効になっていることも確認します。[ノード間の暗号化] を無効にするには、[きめ細かなアクセスコントロールの有効化] 設定を選択解除する必要があります。

重要: ロールが Data Firehose を介してストリーミングできるようにするには、セキュリティ設定を構成する必要があります。

[アクセスポリシー] で [カスタムアクセスポリシー] を選択し、[IAM]、[許可] の順にクリックした後、IAM コンソールで取得したユーザー ARN を入力します。

クラスターの作成が完了すると、[Amazon ES Domain ARN] が表示されます。[アクション] を表示して、クラスターのポリシーを変更します。その後、[アクセスポリシーの管理] を次のように変更します。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "<Firehose Role ARN in Account A>"
      },
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut"
      ],
      "Resource": [
        "<ES Domain ARN in Account B>",
        "<ES Domain ARN in Account B>/*"
      ]
    },
    {    
      "Effect": "Allow",
      "Principal": {
        "AWS": "<Firehose Role ARN in Account A>"
      },
      "Action": "es:ESHttpGet",
      "Resource": [
        "<ES Domain ARN in Account B>/_all/_settings",
        "<ES Domain ARN in Account B>/_cluster/stats",
        "<ES Domain ARN in Account B>/index-name*/_mapping/type-name",
        "<ES Domain ARN in Account B>/roletest*/_mapping/roletest",
        "<ES Domain ARN in Account B>/_nodes",
        "<ES Domain ARN in Account B>/_nodes/stats",
        "<ES Domain ARN in Account B>/_nodes/*/stats",
        "<ES Domain ARN in Account B>/_stats",
        "<ES Domain ARN in Account B>/index-name*/_stats",
        "<ES Domain ARN in Account B>/roletest*/_stats"
      ]
    }
  ]
}

Elasticsearch ポリシーにおけるアクセス許可の詳細については、「Amazon ES 送信先へのクロスアカウント間の配信」をご参照ください。

アカウント A の Data Firehose ロールでポリシーを修正し、アカウント B の Elasticsearch クラスターにデータを送信できるようにする

Elasticsearch クラスターにデータを送信できるように、Data Firehose ポリシーを修正します。

{
    "Version": "2012-10-17",
    "Statement": [
        {          
            "Effect": "Allow",    
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": [
                "<Bucket ARN>",
                "<Bucket ARN>/*"    
            ]
        },
        {    
            "Effect": "Allow",
                "Action": [
                "logs:PutLogEvents"
            ],           
                "Resource": [
                "arn:aws:logs:<region>:<account-id>:log-group:<log-group-name>:log-stream:<log-stream-name>"
            ]
        },    
         {
            "Effect": "Allow",
            "Action": [
                "es:ESHttpPost",
                "es:ESHttpPut",
                "es:DescribeElasticsearchDomain",    
                "es:DescribeElasticsearchDomains",
                "es:DescribeElasticsearchDomainConfig"
            ],
            "Resource": [
                "<ES Domain ARN in Account B>",
                "<ES Domain ARN in Account B>/*"
            ]
        },
        {        
            "Effect": "Allow",
            "Action": [
                "es:ESHttpGet"
            ],
            "Resource": [
                "<ES Domain ARN in Account B>/_all/_settings",
                "<ES Domain ARN in Account B>/_cluster/stats",
                "<ES Domain ARN in Account B>/index-name*/_mapping/superstore",
                "<ES Domain ARN in Account B>/_nodes",
                "<ES Domain ARN in Account B>/_nodes/stats",
                "<ES Domain ARN in Account B>/_nodes/*/stats",
                "<ES Domain ARN in Account B>/_stats",
                "<ES Domain ARN in Account B>/index-name*/_stats"
            ]
        }
    ]
}

Elasticsearch クラスターへの Data Firehose データ配信の詳細については、「Amazon ES の送信先へのアクセス権を Kinesis Data Firehose に付与する」をご参照ください。

アカウント A で Data Firehose を作成する

AWS CLI を使って設定を行い、クロスアカウントの Elasticsearch クラスターを使う Data Firehose を作成します。

次のコマンドを使用して、AWS CLI が最新であることを確認します。

aws --version

AWS CLI が更新されたら、次の内容で input.json という名前のファイルを作成します。

{
    "DeliveryStreamName": "<Firehose name>",
    "DeliveryStreamType": "DirectPut",
    "ElasticsearchDestinationConfiguration": {
        "RoleARN": "<Firehose Role ARN of Account A>",
        "ClusterEndpoint": "<ES Domain cluster Endpoint of Account B>",
        "IndexName": "local",
        "TypeName": "TypeName",
        "IndexRotationPeriod": "OneDay",
        "BufferingHints": {
            "IntervalInSeconds": 60,
            "SizeInMBs": 50
        },
        "RetryOptions": {
            "DurationInSeconds": 60
        },
        "S3BackupMode": "FailedDocumentsOnly",
        "S3Configuration": {
            "RoleARN": "<Firehose Role ARN of Account A>",
            "BucketARN": "<S3 Bucket ARN of Account A>",
            "Prefix": "",
            "BufferingHints": {
                "SizeInMBs": 128,
                "IntervalInSeconds": 128
            },
            "CompressionFormat": "UNCOMPRESSED"
        },
        "CloudWatchLoggingOptions": {
            "Enabled": true,
            "LogGroupName": "<Log group name>",
            "LogStreamName": "<Log stream name>"
        }
    }
}

ClusterEndpoint 属性フィールドに、エンドポイントの値が正しく入力されていることを確認します。

注: Type は Elasticsearch バージョン 7.x で非推奨となっています。Elasticsearch バージョン 7.x では、 TypeName 属性を削除する必要があります。

その後、 input.json ファイルが置かれているのと同じディレクトリで、次の AWS CLI コマンドを実行します。

aws firehose create-delivery-stream --cli-input-json file://input.json

これにより、アカウント A に Data Firehose が作成され、アカウント B には Elasticsearch クラスターが作成されます。

クロスアカウントストリームをテストする

Kinesis Data Generator を使用して、アカウント A の Data Firehose にレコードをストリーミングします。

Kinesis Data Generator は 毎秒大量のレコードを生成します。これにより、レコード構造のマッピングを正しく検出するのに十分なデータポイントが Amazon ES に送られます。

Kinesis Data Generator で使用されるテンプレート構造は次のとおりです。

{
    "device_id": {{random.number(5)}},
    "device_owner": "{{name.firstName}}  {{name.lastName}}",
    "temperature": {{random.number(
        {
            "min":10,
            "max":150
        }
    )}},
    "timestamp": "{{date.now("DD/MMM/YYYY:HH:mm:ss Z")}}"
}

クロスアカウントストリーミングが成功したことを確認するには、Elasticsearch クラスターの [インデックス] タブで、「local」という名前のローカルインデックスを探します。

注: Amazon ES が正しいマッピングを検出するまでには数分かかることがあります。