亚马逊AWS官方博客

新增功能 – 使用 Amazon EventBridge 存档和重播事件

事件驱动型架构使用事件在一个或多个应用程序的组件之间共享信息。事件告诉我们“发生了什么事情”,可能是您收到了 API 请求、文件已上传到存储平台或数据库记录已更新。业务事件描述了与您的活动相关的内容,例如已创建新客户账户或付款已成功。

要使用您自己的应用程序、集成的软件即服务 (SaaS) 应用程序和 AWS 服务中的事件将应用程序连接在一起,您可以使用 Amazon EventBridge,它是一个无服务器事件总线,可从事件源中交付实时数据流,然后将该数据路由到 AWS Lambda 之类的目标中。

事件告诉我们,可以与对该信息感兴趣的任何人共享信息。创建新的客户账户后,新添加的服务可以使用此信息,而无需更改现有界面。例如,新的欺诈检测系统可以了解为执行安全检查和评估可能的欺诈活动而创建的所有新客户账户。

有时候您可能需要重新处理过去的事件。在很多使用案例中,重新处理很有用,例如:

  • 修复错误后,您可以重新处理受影响的事件以获得正确的结果。此方法假定您的应用程序可以多次处理同一事件。
  • 发布新功能时,您可以重新处理之前的事件,以将该功能的覆盖范围扩展到过去的数据。例如,添加到您的应用程序中的欺诈检测系统不仅可以访问新账户,还可以访问过去几周或几个月创建的账户。

为了使其操作更方便,我很高兴地分享一个消息,EventBridge 现在可以存档和重播事件:

  • 现在,您可以为发布到事件总线的事件创建加密存档 。您可以存档所有事件或使用 EventBridge 规则所用的模式匹配语法对事件进行筛选。您可以无限期地存储事件,也可以设置保留期,在保留期之后,旧事件将自动从存档中删除。
  • 您还可以重播存储在档案中的事件。事件将根据为事件总线定义的所有规则(但不是其他 AWS 服务所创建的托管规则)或您指定的规则重播。重播的事件包含一个额外的 replay-name 字段,以防您需要识别它们。开始重播时,您需要定义一个时间范围,并且只重播该时间范围内的事件。目前,您只能在存档事件的事件总线中重播事件。

可以对 EventBridge 的所有事件流程进行存档和重播,包括来自 AWS 平台、SaaS 集成的事件以及您自己的自定义事件。

重播期间,您当前的事件吞吐量不受影响,因为 EventBridge 为重播保留了单独的配额。就每秒发布的事件而言,重播的速度与您 当前在该区域的 PutEvents 限制相同。如果您要求提高 PutEvents 服务配额,重播速度将更快。这样,您的正常操作不会受到重播的影响。但是,您应检查处理重播的下游服务的性能和限制,以确保它们能够处理额外的工作负载。

您可以停止正在进行的重播。您无法继续已停止的重播,但您可以从以前停止的重播创建一个新的重播,并将开始时间设置为 lastReplayedEvent 时间戳。

让我们来看看存档和重播在实际中是如何运作的。

创建事件存档
EventBridge 控制台中,我为我的应用程序创建了一个事件总线

然后,我使用简单的事件匹配模式为事件总线创建了一个规则,以便将其收到的所有 DetailType 等于 customerCreated 的事件发送到将 Lambda 函数中,以将事件中的客户数据存储到数据库中。

现在,我选择事件总线,然后在 Actions(操作)菜单中选择 Archive events(存档事件)。

我给存档提供名称和描述,然后仔细检查来源是否是我的事件总线。我选择无限保留期,但是您可以选择要保留的天数,之后事件将自动从存档中删除。

选择 Next(下一步)。我可以选择使用类似于创建规则时使用的模式匹配语法筛选要存档的事件。我决定不添加任何筛选条件,并存档来自源的所有事件。

现在,我使用这个简单的 Python 代码将一些事件放入总线中:

import json
import boto3

EVENT_BUS = 'my-event-bus'

# Create EventBridge client

events = boto3.client('events')

eventDetail  = {
    'customerId': '123',
    'customerName': 'Danilo',
    'customerData': 'More info...'
}

# Put an event
response = events.put_events(
    Entries=[
        {
            'EventBusName': EVENT_BUS,
            'Detail': json.dumps(eventDetail),
            'DetailType': 'customerCreated',
            'Source': 'com.mycompany.myapp'
        }
    ]
)
print(response['Entries'])

我多次运行前面的代码。几分钟后,我看到我发布的活动已存档:

重播事件
过了一段时间,我在 Lambda 函数的代码中发现了一个错误… 当地址超过预期时,该函数不会存储所有数据(是的,这是常有的事 ?)。我修复了代码,以便正确处理新事件。使用 EventBridge,我还可以重播更老的事件来修复我已经处理的事件的结果。

我可以放心地这样做,因为我的应用程序为幂等形式。这意味着它可以在不改变结果的情况下使用相同的输入多次执行。在这种情况下,它会用正确的客户数据覆盖数据库项目。为了构建弹性应用程序,我总是建议设计幂等界面。通过这种方式,您可以轻松地从这样的错误中恢复,也可以放心地忽略重复的输入。

请注意,EventBridge 不提供排序保证,并且重播是以多线程方式执行的,这可能导致事件按照与原始排序顺序不同的顺序交付。

选择我的存档后,我点击 Start new replay(开始新的重播),然后为重播提供名称和描述。

我检查重播的来源是否是我的存档。我选择只重播触发修复错误的 Lambda 函数的规则。

我定义了重播的开始和结束时间(我可以使用本地时间或 UTC)并选择 Start Replay(开始重播)。

好吧,我的存档没那么大。重播开始时,只需几秒钟即可完成。我查看我的数据库,发现客户数据现在是正确的。错误修复了!

现已推出
事件存档和重播现已在除中国大陆和大阪之外的所有商业区域推出。有关更多信息,请参阅 AWS 区域服务列表。您可以通过控制台、AWS 命令行界面 (CLI)AWS 开发工具包AWS CloudFormation 使用这些新功能。

使用 Amazon EventBridge,您只需为您的使用量付费。对于存档和重播,您需要按存档的事件数量、存档使用的存储空间以及重播的消息数量付费。有关成本的更多信息和示例,请参阅 EventBridge 定价页面

要了解更多信息,请参阅文档。快告诉我您打算将这些新功能用在哪些方面吧!

Danilo