如何设置从 Kinesis Data Firehose 到 Amazon OpenSearch Service 的跨账户流式传输?

5 分钟阅读
0

我想设置一个 Amazon Kinesis Data Firehose 流,将数据发送到另一个账户中的 Amazon OpenSearch Service 集群。

简短描述

设置 Kinesis Data Firehose 及其依赖项,例如 Amazon Simple Storage Service(Amazon S3)和 Amazon CloudWatch,以便在不同账户之间进行流式传输。无论是否开启精细访问控制(FGAC),流式数据传输都适用于可公开访问的 OpenSearch Service 集群。

要设置 Kinesis Data Firehose 流式传输以使其向 OpenSearch Service 集群发送数据,请完成以下步骤:

  1. 在账户 A 中创建 Amazon S3 存储桶。
  2. 在账户 A 中创建 CloudWatch 日志组和日志流。
  3. 在账户 A 中创建 Kinesis Data Firehose 角色和策略。
  4. 在账户 B 中为账户 A 中的 Kinesis Data Firehose 角色创建一个可公开访问的 OpenSearch Service 集群,以便将数据流式传输到账户 A。
  5. (可选)如果 FGAC 已开启,请登录 OpenSearch 控制面板并添加角色映射。
  6. 更新账户 A 中您的 Kinesis Data Firehose 角色的 AWS Identity Access Management(IAM)角色策略,以向账户 B 发送数据。
  7. 在账户 A 中创建 Kinesis Data Firehose 流。
  8. 测试跨账户流式传输数据到 OpenSearch Service 集群。

解决方法

在账户 A 中创建 Amazon S3 存储桶

在账户 A 中创建 S3 存储桶。Amazon S3 存储桶生成一个 Amazon 资源名称(ARN)。

**注意:**完整的 ARN 稍后将用于授予 Kinesis Data Firehose 访问权限,以保存和检索 Amazon S3 存储桶中的记录。

在账户 A 中创建 CloudWatch 日志组和日志流

要创建 CloudWatch 日志组,请完成以下步骤:

  1. 打开 CloudWatch 控制台
  2. 在导航窗格中,选择日志,然后选择日志组
  3. 选择创建日志组
  4. 输入日志组名称。
  5. 选择创建日志组按钮以保存新日志组。
  6. 搜索新创建的日志组,然后将其选中。

要创建 Amazon CloudWatch 日志流,请完成以下步骤:

  1. 选择创建日志流
  2. 输入日志流名称
  3. 选择创建日志流
    **重要事项:**创建 Kinesis Data Firehose 角色策略时,需要提供 CloudWatch 日志组和 CloudWatch 日志流名称。

在账户 A 中创建 Kinesis Data Firehose 角色和策略

  1. 打开 AWS Identity and Access Management(IAM)控制台
  2. 创建一个 IAM 策略,允许 Kinesis Data Firehose 执行以下操作:
    将流日志保存到 CloudWatch
    记录到 Amazon S3
    将数据流式传输到 OpenSearch Service 集群

示例:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:AbortMultipartUpload",
        "s3:GetBucketLocation",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:ListBucketMultipartUploads",
        "s3:PutObject",
        "s3:PutObjectAcl"
      ],
      "Resource": [
        "<Bucket ARN>",
        "<Bucket ARN>/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<account-id>:log-group:/aws/kinesisfirehose/<Firehose Name>:log-stream:*"
      ]
    }
  ]
}

**注意:**稍后,您可以向 OpenSearch Service 集群策略添加流式传输权限。但是,您必须先在账户 B 中创建集群。

  1. 保存该策略。
  2. 选择创建角色
  3. 将策略添加到您的 Kinesis Data Firehose 角色。

在账户 B 中创建可公开访问的 OpenSearch Service 集群,让账户 A 中的 Kinesis Data Firehose 角色能够流式传输数据

  1. 在账户 B 中创建可公开访问的 OpenSearch Service 集群
  2. 记录 OpenSearch Service 域 ARN。您将在以后的步骤中使用该 ARN。
  3. 为您的集群配置安全设置。
    **重要事项:**您必须配置 OpenSearch Service 安全设置,以允许账户 A 中的 Kinesis Data Firehose 角色流式传输数据到您的 OpenSearch Service 集群。

要配置您的安全设置,请执行以下步骤:

  1. 在 OpenSearch Service 中,导航到访问策略

  2. 选择 JSON 定义的访问策略。您的策略必须具有以下权限:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "AWS": "*"
          },
          "Action": "es:*",
          "Resource": "<ES Domain ARN in Account B>/*",
          "Condition": {
            "IpAddress": {
              "aws:SourceIp": "<Your IP Address for OpenSearch Dashboards access>"
            }
          }
        },
        {
          "Effect": "Allow",
          "Principal": {
            "AWS": "<Firehose Role ARN in Account A>"
          },
          "Action": [
            "es:ESHttpPost",
            "es:ESHttpPut"
          ],
          "Resource": [
            "<ES Domain ARN in Account B>",
            "<ES Domain ARN in Account B>/*"
          ]
        },
        {
          "Effect": "Allow",
          "Principal": {
            "AWS": "<Firehose Role ARN in Account A>"
          },
          "Action": "es:ESHttpGet",
          "Resource": [
            "<ES Domain ARN in Account B>/_all/_settings",
            "<ES Domain ARN in Account B>/_cluster/stats",
            "<ES Domain ARN in Account B>/index-name*/_mapping/type-name",
            "<ES Domain ARN in Account B>/roletest*/_mapping/roletest",
            "<ES Domain ARN in Account B>/_nodes",
            "<ES Domain ARN in Account B>/_nodes/stats",
            "<ES Domain ARN in Account B>/_nodes/*/stats",
            "<ES Domain ARN in Account B>/_stats",
            "<ES Domain ARN in Account B>/index-name*/_stats",
            "<ES Domain ARN in Account B>/roletest*/_stats"
          ]
        }
      ]
    }

    有关 OpenSearch Service 策略中权限的更多信息,请参阅 Cross-account delivery to an OpenSearch Service destination

  3. (可选)如果您的集群启用了 FGAC,请登录 OpenSearch 控制面板并添加角色映射。角色映射允许 Kinesis Data Firehose 角色向 OpenSearch Service 发送请求。

要登录 OpenSearch 控制面板并添加角色映射,请完成以下步骤:

  1. 打开控制面板。
  2. 选择安全选项卡。
  3. 选择角色
  4. 选择 all_access 角色。
  5. 选择已映射的用户选项卡。
  6. 选择管理映射
  7. 后端角色部分中,输入 Kinesis Data Firehose 角色。
  8. 选择映射

更新账户 A 中您的 Kinesis Data Firehose 角色的 IAM 角色策略,以向账户 B 发送数据

要将账户 A 中的 Kinesis Data Firehose 角色中的数据发送到账户 B 中的 OpenSearch Service 集群,请更新 Kinesis Data Firehose 策略。

示例:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:AbortMultipartUpload",
        "s3:GetBucketLocation",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:ListBucketMultipartUploads",
        "s3:PutObject",
        "s3:PutObjectAcl"
      ],
      "Resource": [
        "<Bucket ARN>",
        "<Bucket ARN>/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<account-id>:log-group:/aws/kinesisfirehose/<Firehose Name>:log-stream:*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut",
        "es:DescribeDomain",
        "es:DescribeDomains",
        "es:DescribeDomainConfig"
      ],
      "Resource": [
        "<Domain ARN in Account B>",
        "<Domain ARN in Account B>/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpGet"
      ],
      "Resource": [
        "<Domain ARN in Account B>/_all/_settings",
        "<Domain ARN in Account B>/_cluster/stats",
        "<Domain ARN in Account B>/index-name*/_mapping/superstore",
        "<Domain ARN in Account B>/_nodes",
        "<Domain ARN in Account B>/_nodes/stats",
        "<Domain ARN in Account B>/_nodes/*/stats",
        "<Domain ARN in Account B>/_stats",
        "<Domain ARN in Account B>/index-name*/_stats"
      ]
    }
  ]
}

有关更多信息,请参阅 Grant Kinesis Data Firehose access to an Amazon OpenSearch Service destination

在账户 A 中创建 Kinesis Data Firehose 流

要创建可跨账户访问 OpenSearch Service 集群的 Kinesis Data Firehose 流,请使用和配置 AWS 命令行界面(AWS CLI)

检查以确保您的 AWS CLI 是最新的:

aws --version

**注意:**如果您在运行 AWS CLI 命令时收到错误,请参阅排查 AWS CLI 错误。此外,请确保您使用的是最新版本的 AWS CLI

使用以下内容创建名为 input.json 的文件:

{
  "DeliveryStreamName": "<Firehose Name>",
  "DeliveryStreamType": "DirectPut",
  "ElasticsearchDestinationConfiguration": {
    "RoleARN": "",
    "ClusterEndpoint": "",
    "IndexName": "local",
    "TypeName": "TypeName",
    "IndexRotationPeriod": "OneDay",
    "BufferingHints": {
      "IntervalInSeconds": 60,
      "SizeInMBs": 50
    },
    "RetryOptions": {
      "DurationInSeconds": 60
    },
    "S3BackupMode": "FailedDocumentsOnly",
    "S3Configuration": {
      "RoleARN": "",
      "BucketARN": "",
      "Prefix": "",
      "BufferingHints": {
        "SizeInMBs": 128,
        "IntervalInSeconds": 128
      },
      "CompressionFormat": "UNCOMPRESSED",
      "CloudWatchLoggingOptions": {
        "Enabled": true,
        "LogGroupName": "/aws/kinesisfirehose/<Firehose Name>",
        "LogStreamName": "S3Delivery"
      }
    },
    "CloudWatchLoggingOptions": {
      "Enabled": true,
      "LogGroupName": "/aws/kinesisfirehose/<Firehose Name>",
      "LogStreamName": "ElasticsearchDelivery"
    }
  }
}

确保在 ClusterEndpoint 属性字段中正确输入了端点值。

**注意:**Elasticsearch 版本 7.x 已弃用类型。对于 Elasticsearch 版本 7.x,从 input.json 文件中移除 TypeName 属性。

然后,在 input.json 文件所在目录中运行以下命令:

aws firehose create-delivery-stream --cli-input-json file://input.json

此命令语法在账户 A 中创建 Kinesis Data Firehose 流,并将目标设置为账户 B 中的 OpenSearch Service 集群。

测试跨账户流式传输数据到 OpenSearch Service 集群

使用 Kinesis Data Generator(KDG)将记录流式传输到账户 A 的 Kinesis Data Firehose 流中。

KDG 每秒生成许多记录。这种生成速率使 OpenSearch Service 有足够的数据点来确定记录结构的正确映射。

以下是 Kinesis Data Generator 中使用的模板结构:

{
    "device_id": {{random.number(5)}},
    "device_owner": "{{name.firstName}}  {{name.lastName}}",
    "temperature": {{random.number(
        {
            "min":10,
            "max":150
        }
    )}},
    "timestamp": "{{date.now("DD/MMM/YYYY:HH:mm:ss Z")}}"
}

要验证跨账户流式传输是否成功,请查看集群索引选项卡下的索引条目。检查是否存在使用当前日期且前缀为“local”的索引名称。您还可以检查这些记录是否存在于 OpenSearch 控制面板中。

**注意:**OpenSearch Service 需要几分钟才能确定正确的映射。

相关信息

Creating an Amazon Kinesis Data Firehose delivery stream

Writing to Kinesis Data Firehose using Kinesis Data Streams

AWS 官方
AWS 官方已更新 4 个月前