亚马逊AWS官方博客

新增功能 – 增强了 Amazon SQS 标准队列的死信队列管理体验

成千上万的客户使用 Amazon Simple Queue Service (SQS) 构建基于消息的应用程序,以分离和扩展微服务、分布式系统和无服务器应用程序。当队列使用者无法成功处理消息时,可以配置 SQS 以将其存储在死信队列 (DLQ) 中。

作为软件开发人员或架构师,您可能希望检查和查看 DLQ 中未处理完的消息,以找出无法处理这些消息的原因,确定模式,解决代码错误,并最终在原始队列中重新处理这些消息。这些未处理完的消息的生命周期也属于错误处理工作流的一部分,并且通常需要手动处理,非常耗时。

我很高兴地宣布,针对 SQS 标准队列的全新增强型 DLQ 管理体验正式推出,让您可以轻松地将未处理完的消息从 DLQ 重新传输到源队列。

这项新功能可在 SQS 控制台中使用,帮助您专注于错误处理工作流中更重要的阶段,包括识别和解决处理错误。通过这一全新的开发体验,您可以轻松地检查未处理完的消息的样例,并且只需单击即可将其移回原始队列,而无需编写、维护和保护任何自定义代码。这种全新的体验还可以批处理重新传输消息,从而降低总体成本。

DLQ 和 Lambda 处理器设置
如果您已经熟悉 DLQ 设置,则可以跳过该设置,直接进入全新的 DLQ 重新传输体验

首先,创建两个队列:源队列和死信队列。

编辑源队列并配置死信队列部分。在这里,选择 DLQ 并配置最大接收次数,也就是消息在发送到 DLQ 之前要重新处理的次数。在本次演示中,将其设置为 1。这意味着每条失败的消息都会立即发送到 DLQ。在现实环境中,您可能需要根据自己的需求和故障对应用程序的影响,设置一个更高的数字。

再次编辑 DLQ,以确保仅允许我的源队列使用此 DLQ。此配置为可选项:禁用此允许重新传输策略时,任何 SQS 队列都可以使用此 DLQ。在某些情况下,您可能需要对多个队列重复使用单个 DLQ。但通常认为的最佳实践是为每个源队列设置独立的 DLQ,以在不影响成本的情况下简化重新传输阶段。请记住,收费基于 API 调用的次数,而不是队列的数量。

DLQ 设置正确后,就需要一个处理器。我们使用 AWS Lambda 实现一个简单的消息处理程序。

Python 编写的 Lambda 函数将遍历该批次的传入消息,从消息正文中获取两个值,并打印这两个值的总和。

import json

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['body'])

        value1 = payload['value1']
        value2 = payload['value2']

        value_sum = value1 + value2
        print("the sum is %s" % value_sum)
        
    return "OK"
Python

上面的代码假设每条消息的正文都包含两个可以求和的整数值,而无需处理任何验证或处理错误。可以想象到,一段时间以后这将产生问题。

在处理任何消息之前,必须授予此 Lambda 函数足够的权限,以便从 SQS 读取消息并配置其触发器。对于 IAM 权限,使用名为 AWSLambdaSQSQueueExecutionRole 的托管式策略,授予调用 sqs:ReceiveMessagesqs:DeleteMessagesqs:GetQueueAttributes 的权限。

使用 Lambda 控制台设置 SQS 触发器。也可以通过 SQS 控制台实现同样的功能。

现在已经可以在 SQS 控制台中使用源队列的发送和接收消息来处理新消息。在消息正文中写入 {"value1": 10, "value2": 5},然后选择发送消息

查看 Lambda 函数的 CloudWatch Logs 时,可以看到调用成功。

START RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1 Version: $LATEST
the sum is 15
END RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1
REPORT RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1	Duration: 1.31 ms	Billed Duration: 2 ms	Memory Size: 128 MB	Max Memory Used: 39 MB	Init Duration: 116.90 ms	
Bash

借助 DLQ Redrive 进行故障排除
现在,如果有其他生成器开始以错误的格式发布消息,该怎么办? 例如,{"value1": "10", "value2": 5}。这里第一个数字是字符串,很可能在处理器中出现问题。

实际上,在 CloudWatch Logs 中可以看到:

START RequestId: 542ac2ca-1db3-5575-a1fb-98ce9b30f4b3 Version: $LATEST
[ERROR] TypeError: can only concatenate str (not "int") to str
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 8, in lambda_handler
    value_sum = value1 + value2
END RequestId: 542ac2ca-1db3-5575-a1fb-98ce9b30f4b3
REPORT RequestId: 542ac2ca-1db3-5575-a1fb-98ce9b30f4b3	Duration: 1.69 ms	Billed Duration: 2 ms	Memory Size: 128 MB	Max Memory Used: 39 MB	
Bash

为了找出出错消息中的错误,可以使用全新的 SQS 重新传输功能,在死信队列中选择 DLQ 重新传输

对消息使用轮询,并从 DLQ 获取所有未处理完的消息。

然后选择未处理完的消息进行检查。

问题很清楚,更新处理代码就可以正确处理此问题。在理想情况下,这种上游问题应该在消息生成器中解决。但我们假设无法控制上游系统,而处理这种新类型的消息又对业务至关重要。

因此,可以更新处理逻辑如下:

import json

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['body'])
        value1 = int(payload['value1'])
        value2 = int(payload['value2'])
        value_sum = value1 + value2
        print("the sum is %s" % value_sum)
        # do some more stuff
        
    return "OK"
Python

现在,代码已经可以处理未处理完的消息,将开启新的从 DLQ 重新传输到源队列的任务。

原定设置下,SQS 会将未处理完的消息重新传输到源队列。但也可以指定其他目的地,并提供自定义传输速度来设置每秒的最大消息数。

在控制台中监控重新传输的状态,等待重新传输任务完成。这个新的部分将始终显示最新的重新传输任务的状态。

消息已移回源队列,并已由 Lambda 函数成功处理。在 CloudWatch Logs 中,看起来一切正常。

START RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1 Version: $LATEST
the sum is 15
END RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1
REPORT RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1	Duration: 1.31 ms	Billed Duration: 2 ms	Memory Size: 128 MB	Max Memory Used: 39 MB	Init Duration: 116.90 ms	
Bash

今天即可使用,无需额外成本
您今天就可以开启利用新的 DLQ 重新传输体验来简化开发和故障排除工作流,无需任何额外成本。这一全新的控制台体验已在所有提供 SQS 的 AWS 区域推出,我们期待听到您的反馈。

Alex