我如何设置从 Kinesis Data Firehose 到 Amazon Elasticsearch Service 的跨账户流式传输?

上次更新时间:2020 年 11 月 24 日

我想要设置 Amazon Kinesis Data Firehose 流,以将数据发送到另一个账户中的 Amazon Elasticsearch Service (Amazon ES) 集群。如何在不同的账户间流式传输我的数据资源?

简短描述

您可以设置 Kinesis Data Firehose 及其依赖项,如 Amazon Simple Storage Service (Amazon S3) 和 Amazon CloudWatch,以在不同账户间进行流式传输。只有当 Amazon ES 集群可被公开访问且禁用了节点到节点加密时,才能进行流数据传输。要关闭节点到节点加密功能,必须取消选中启用精细访问控制设置。

要设置 Data Firehose 流以使其将数据发送至 Amazon ES 集群,请执行以下步骤:

1.    在账户 A 中创建 Amazon S3 存储桶。

2.    在账户 A 中创建一个 CloudWatch 日志组和日志流。

3.    在账户 A 中创建 Data Firehose 角色和策略。

4.    在账户 B 中创建一个 Elasticsearch 集群并应用一个策略来允许来自账户 A 中 Data Firehose 角色的数据。

5.    修改账户 A 中 Data Firehose 角色的策略,以便它可以将数据发送到账户 B 中的 Elasticsearch 集群。

6.    在账户 A 中创建 Data Firehose。

7.    测试跨账户流。

分辨率

在账户 A 中创建 Amazon S3 存储桶

在账户 A 中创建 S3 存储桶。S3 存储桶将生成 Amazon 资源名称 (ARN)。

注意:稍后使用完整的 ARN 授予 Data Firehose 访问权限,以便保存和检索此 S3 存储桶中的记录。

在账户 A 中创建一个 CloudWatch 日志组和日志流

要创建 CloudWatch 日志组,请执行以下步骤:

1.    打开 CloudWatch 控制台

2.    在导航窗格中,选择日志组

3.    选择操作

4.    选择创建日志组

5.    输入日志组名称。

6.    选择创建日志组按钮以保存您的新日志组。

7.    搜索您新创建的日志组,然后选中它。此任务完成将验证您现在可以创建日志流。

要创建 CloudWatch 日志流,请执行以下步骤:

1.    选择创建日志流

2.    输入日志流名称

3.    选择创建日志流。此操作将保存您新创建的日志流。

重要提示:创建 Data Firehose 角色策略时,需要 CloudWatch 日志组和 CloudWatch 日志流名称。

在账户 A 中创建 Data Firehose 角色和策略

1.    导航至 AWS Identity and Access Management (IAM) 控制台

2.    创建一个 IAM 策略,以允许 Data Firehose 将流日志保存到 CloudWatch,记录保存到 S3,且将数据流保存到 Elasticsearch 集群:

{
    "Version": "2012-10-17",
    "Statement": [
        {          
            "Effect": "Allow",    
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
    "Resource": [
                "<Bucket ARN>",
                "<Bucket ARN>/*"    
            ]
        },
        {    
            "Effect": "Allow",
                "Action": [
                "logs:PutLogEvents"
            ],           
                "Resource": [
                "arn:aws:logs:<region>:<account-id>:log-group:<log-group-name>:log-stream:<log-stream-name>"           
            ]
        }
    ]
}

注意:稍后您可以附加适当的权限以流式处理 Elasticsearch 集群策略,因为账户 B 中的集群必须先创建。

3.    保存策略。

4.    选择创建角色

5.    将新创建的策略添加到您的 Kinesis Data Firehose 角色。

在账户 B 中创建一个 Elasticsearch 集群并应用一个策略来允许来自账户 A 中 Data Firehose 角色的数据

创建一个启用了可公开访问设置的 Elasticsearch 集群。另外,请确保禁用账户 B 的节点到节点加密设置。要禁用节点到节点加密,必须取消选中启用精细访问控制设置。

重要提示:您必须将安全设置配置为允许您的角色通过 Data Firehose 进行流式传输。

访问策略中,依次选择自定义访问策略IAM允许,然后在您的 IAM 控制台中输入用户 ARN。

集群创建后,Amazon ES 域 ARN 应出现。导航到操作来更改集群的策略。然后,将您的管理访问策略修改为以下内容:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "<Firehose Role ARN in Account A>"
      },
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut"
      ],
      "Resource": [
        "<ES Domain ARN in Account B>",
        "<ES Domain ARN in Account B>/*"
      ]
    },
    {    
      "Effect": "Allow",
      "Principal": {
        "AWS": "<Firehose Role ARN in Account A>"
      },
      "Action": "es:ESHttpGet",
      "Resource": [
        "<ES Domain ARN in Account B>/_all/_settings",
        "<ES Domain ARN in Account B>/_cluster/stats",
        "<ES Domain ARN in Account B>/index-name*/_mapping/type-name",
        "<ES Domain ARN in Account B>/roletest*/_mapping/roletest",
        "<ES Domain ARN in Account B>/_nodes",
        "<ES Domain ARN in Account B>/_nodes/stats",
        "<ES Domain ARN in Account B>/_nodes/*/stats",
        "<ES Domain ARN in Account B>/_stats",
        "<ES Domain ARN in Account B>/index-name*/_stats",
        "<ES Domain ARN in Account B>/roletest*/_stats"
      ]
    }
  ]
}

有关 Elasticsearch 策略内的权限的更多信息,请参阅至 Amazon ES 目的地的跨账户传输

修改账户 A 中 Data Firehose 角色的策略,以便它可以将数据发送到账户 B 中的 Elasticsearch 集群。

修改 Data Firehose 策略,以便它能够将数据发送到 Elasticsearch 集群:

{
    "Version": "2012-10-17",
    "Statement": [
        {          
            "Effect": "Allow",    
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": [
                "<Bucket ARN>",
                "<Bucket ARN>/*"    
            ]
        },
        {    
            "Effect": "Allow",
                "Action": [
                "logs:PutLogEvents"
            ],           
                "Resource": [
                "arn:aws:logs:<region>:<account-id>:log-group:<log-group-name>:log-stream:<log-stream-name>"
            ]
        },    
         {
            "Effect": "Allow",
            "Action": [
                "es:ESHttpPost",
                "es:ESHttpPut",
                "es:DescribeElasticsearchDomain",    
                "es:DescribeElasticsearchDomains",
                "es:DescribeElasticsearchDomainConfig"
            ],
            "Resource": [
                "<ES Domain ARN in Account B>",
                "<ES Domain ARN in Account B>/*"
            ]
        },
        {        
            "Effect": "Allow",
            "Action": [
                "es:ESHttpGet"
            ],
            "Resource": [
                "<ES Domain ARN in Account B>/_all/_settings",
                "<ES Domain ARN in Account B>/_cluster/stats",
                "<ES Domain ARN in Account B>/index-name*/_mapping/superstore",
                "<ES Domain ARN in Account B>/_nodes",
                "<ES Domain ARN in Account B>/_nodes/stats",
                "<ES Domain ARN in Account B>/_nodes/*/stats",
                "<ES Domain ARN in Account B>/_stats",
                "<ES Domain ARN in Account B>/index-name*/_stats"
            ]
        }
    ]
}

有关将 Data Firehose 数据传输到您的 Elasticsearch 集群的更多信息,请参阅授予 Kinesis Data Firehose 访问 Amazon ES 目的地的权限

在账户 A 中创建 Data Firehose

要创建一个具有跨账户 Elasticsearch 集群的 Data Firehose,请使用并配置 AWS 命令行界面 (AWS CLI)

检查以确保您的 AWS CLI 处于最新状态:

aws --version

注意:如果在运行 AWS CLI 命令时收到错误,请确保您使用的是最新的 AWS CLI 版本

更新 AWS CLI 后,使用以下内容常见一个名为 input.json 的文件:

{
    "DeliveryStreamName": "<Firehose name>",
    "DeliveryStreamType": "DirectPut",
    "ElasticsearchDestinationConfiguration": {
        "RoleARN": "<Firehose Role ARN of Account A>",
        "ClusterEndpoint": "<ES Domain cluster Endpoint of Account B>",
        "IndexName": "local",
        "TypeName": "TypeName",
        "IndexRotationPeriod": "OneDay",
        "BufferingHints": {
            "IntervalInSeconds": 60,
            "SizeInMBs": 50
        },
        "RetryOptions": {
            "DurationInSeconds": 60
        },
        "S3BackupMode": "FailedDocumentsOnly",
        "S3Configuration": {
            "RoleARN": "<Firehose Role ARN of Account A>",
            "BucketARN": "<S3 Bucket ARN of Account A>",
            "Prefix": "",
            "BufferingHints": {
                "SizeInMBs": 128,
                "IntervalInSeconds": 128
            },
            "CompressionFormat": "UNCOMPRESSED"
        },
        "CloudWatchLoggingOptions": {
            "Enabled": true,
            "LogGroupName": "<Log group name>",
            "LogStreamName": "<Log stream name>"
        }
    }
}

确保在 ClusterEndpoint 属性字段中正确输入终端节点值。

注意:类型已在 Elasticsearch 版本 7.x 中弃用。对于 Elasticsearch 版本 7.x,请确保删除 TypeName 属性。

然后,在与 input.json 文件位置相同的目录中运行以下 AWS CLI 命令:

aws firehose create-delivery-stream --cli-input-json file://input.json

这将在账户 A 中创建 Data Firehose,并在账户 B 中创建 Elasticsearch 集群中。

测试跨账户流

使用 Kinesis Data Generator 将记录流式传输到账户 A 中的 Data Firehose。

Kinesis Data Generator (KDG) 每秒都会生成很多记录。此工作效率等级使 Amazon ES 有足够的数据点来确定记录结构的正确映射。

下面是 Kinesis Data Generator 中使用的模板结构:

{
    "device_id": {{random.number(5)}},
    "device_owner": "{{name.firstName}}  {{name.lastName}}",
    "temperature": {{random.number(
        {
            "min":10,
            "max":150
        }
    )}},
    "timestamp": "{{date.now("DD/MMM/YYYY:HH:mm:ss Z")}}"
}

要验证跨账户流传输是否成功,请检查 Elasticsearch 集群中的索引选项卡下是否有名为“local”的本地索引。

注意:Amazon ES 可能需要几分钟才能确定正确的映射。