如何跨不同的 Kinesis Data Streams 账户流式传输 SES 通知?

上次更新时间:2020 年 5 月 13 日

我想要设置 Amazon Simple Email Service (Amazon SES) 事件通知以将数据发送到另一个账户中的 Amazon Kinesis 数据流。该如何操作?

简短描述

要设置至另一个账户中的 Amazon Kinesis 数据流的 Amazon SES 事件通知,请执行以下操作:

1.    在 SES 源开始。在 SES 中更新配置设置,以推送到 Amazon Simple Notification Service (Amazon SNS) 主题。更新后的设置会对 AWS Lambda 进行配置,以订阅主题作为触发器。

2.    订阅 SNS。

3.    设置跨账户访问。

4.    设置 Lambda 来将数据发送至 Amazon Kinesis Data Streams。然后,Lambda 可以使用代码从 Amazon SNS 主题中抓取完整事件。Lambda 还会代入另一个账户中的角色,以将记录放入数据流中。

5.    使用 Amazon Kinesis 客户端库 (KCL) 处理流中的记录。

解决方法

在 SES 源开始

创建一个 SES 环境并将您的电子邮件账户添加到列表中。验证电子邮件账户,以通过服务将电子邮件发送至您自己的电子邮件地址。为电子邮件设置配置集,以跟踪测试期间可能发生的所有可能的事件类型。在配置集中,使用默认的跟踪域将目的地设置为 Simple Notification Service (SNS)

注意:您可以在创建配置集时创建主题,也可以在创建配置集之前创建主题。

有关配置集的更多信息,请参阅使用 Amazon SES 配置集

订阅 SNS

要测试已创建的通知主题,您可以在 SNS 主题中临时订阅自己的电子邮件地址。这可以确保您收到来自 SES 的消息通知。请注意,在配置 Lambda 时,您需要参考订阅列表。

重要提示:测试时,只能在 HTML 格式的电子邮件中跟踪任何打开单击事件。这是因为 AWS 嵌入了一个不可见的 GIF 图像,其中包含跟踪令牌。此跟踪令牌可以跟踪打开的任何通知电子邮件,从而将事件的反馈提交回主题。

SES 和 SNS 集成后,您可以通过使用 SES 控制台来验证集成是否成功。在 SES 控制台左侧选择电子邮件地址。然后,选择发送测试电子邮件按钮并将原始选为电子邮件格式。

注意:如果您使用格式选项,则电子邮件将不会以 HTML 格式发送。因此,打开单击电子邮件事件的反馈也将不会发送。

您可以在控制台中使用以下模板:

Subject: Amazon SES Raw Email Test
MIME-Version: 1.0
Content-Type: text/html
X-SES-CONFIGURATION-SET: SESxLMxKIN
<!DOCTYPE html>
<html>
<body>
<h1>This text should be large, because it is formatted as a header in HTML.</h1>
<p>Here is a formatted link: <a href="https://docs.aws.amazon.com/ses/latest/DeveloperGuide/Welcome.html">Amazon Simple Email Service Developer Guide</a>.</p>
</body>
</html>

在此示例模板中,注意配置集名称包含在标头中。包含了配置集名称,以将配置用于所选的事件。此配置集将向您发送电子邮件和通知,以告知收件箱中的电子邮件所采取的所有操作。有关 SNS 的补充阅读,请参阅设置 Amazon SNS 通知

设置跨账户访问

创建设置,以将消息发送到另一个账户。这需要角色和策略来允许您代入另一个账户中的角色。

注意:您必须拥有一个在目标账户中代入的角色。同时还必须创建 Lambda 的执行角色和即将被代入的角色。即将被代入的角色拥有与所需服务互动的权限。

下面是使用 SES 和 Lambda 的账户的示例模板,您需要将其附加到 Lambda 执行角色:

{
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Action": "sts:AssumeRole",
        "Resource": "arn:aws:iam::222222222222:role/role-on-source-account"
    }
}

下面是使用数据流的账户的示例模板,它应该附加到 Lambda 代入的新角色中:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::111111111111:role/my-lambda-executionrole-source-account"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

此模板定义了 Lambda 执行角色的信任关系。Lambda 执行角色代入了将把数据传输到 Kinesis 数据流的角色。

注意:Lambda 代入的角色必须拥有将记录放入 Kinesis 数据流的权限。

设置 Lambda 以将数据发送至 Kinesis Data Streams

您可以通过使用类似于以下内容的模板设置 Lambda 来将数据发送至 Data Streams:

from __future__ import print_function
import base64 #Kinesis only accepts base64 encoded payloads. 
import boto3  #Allow programmatic access to AWS services.
import json
print('Loading function')
def lambda_handler(event, context):
    # Setting up the assumeRole for cross-account
    sts_connection = boto3.client('sts')
    acct_b = sts_connection.assume_role(
        RoleArn="arn:aws:iam::[ADD_ACCOUNT_ID]:role/AssumeLambdaCrossAccount", #TODO: Add relevant role ARN
        RoleSessionName="cross_acct_lambda"
    )
    print(acct_b)
    
    ACCESS_KEY = acct_b['Credentials']['AccessKeyId']
    SECRET_KEY = acct_b['Credentials']['SecretAccessKey']
    SESSION_TOKEN = acct_b['Credentials']['SessionToken']
    
    #Gets the SNS message that was posted (cliks, opens, sends, deliveries etc.)
    message = event['Records'][0]['Sns']['Message'] #Fetching the message in the JSON structure received.
    print("From SNS: " + message) #Checking the message received from SNS.
    
    #TODO:Generate a random partitionKey for kinesis. 
    #If a random partitionKey is not used in a multi shard stream it will lead to performance problems
    #PartitionKey = RandomKey
    
    print("Add data to cross-account stream")
    kinesis_client = boto3.client('kinesis',region_name="us-east-1",aws_access_key_id=ACCESS_KEY,aws_secret_access_key=SECRET_KEY,aws_session_token=SESSION_TOKEN) #Initiates the kinesis service with cross-account credentials.. 
    kinesis_client.put_record(StreamName="CrossAccountTest",Data=json.dumps(message),PartitionKey="partitionKey")

在此 Python 示例代码中,使用您的流名称和区域替换 RoleArn,以适应您的环境。此函数从 SES 中接收 SNS 消息,从而代入目标账户中的角色。然后,代入的角色将消息放入目标流中。

要将 Lambda 配置为 SNS 主题的订阅者,您可以遵照以下步骤:

1.    登录 Amazon SNS 控制台

2.    在导航窗格中,选择主题

3.    在主题页面上,选择一个主题。

4.    在订阅部分中,选择创建订阅

5.    在创建订阅页面上,验证所选的主题 ARN

6.    选择 AWS Lambda 作为您的协议类型。

7.    输入您的函数的 Amazon 资源名称 (ARN) 以建立终端节点。

8.    选择创建订阅

有关将 Lambda 用作 SNS 订阅者的更多信息,请参阅通过将 AWS Lambda 函数作为订阅者,来使用 Amazon SNS 进行系统到系统消息收发

如果将消息发布至 Lambda 订阅的 SNS 主题,已发布消息的有效负载将调用 Lambda。有关如何创建示例消息历史记录存储的更多信息,请参阅通过 Amazon SNS 调用 AWS Lambda 函数

注意:如果您不想设置跨账户访问,可以删除凭证部分并将其写入本地 Kinesis 数据流。有关将 Kinesis 与 Lambda 和 Python 结合使用的更多信息,请参阅 AWS Boto3 文档中的 Kinesis

使用 KCL 处理记录

您可以使用 KCL 来处理 Kinesis 数据流中的记录。当数据处于 Kinesis 数据流之后,您可以构建自定义应用程序并实施 Kinesis Data Analytics 来进行分析和可视化。


这篇文章对您有帮助吗?

我们可以改进什么?


需要更多帮助?