如何防止 Amazon SQS 消息多次调用我的 Lambda 函数?

上次更新日期:2021 年 12 月 28 日

我配置了 AWS Lambda 函数以处理 Amazon Simple Queue Service (Amazon SQS) 队列中的消息。现在,我的一些有效 Amazon SQS 消息在达到 maxReceiveCount 之前会被多次接收。如何停止对同一 Amazon SQS 消息的 Lambda 函数重复调用?

简短描述

Lambda 支持至少一次消息传递。在某些情况下,重试机制可能会发送相同消息的副本。然后,Amazon SQS 会将消息发送到死信队列(如果您已经配置了死信队列)。

对同一 Amazon SQS 消息重复调用 Lambda 可能由于以下原因之一:

  • 函数返回错误或超时。
  • Lambda 服务成功完成批处理后,无法在可见性超时到期之前从 Amazon SQS 队列中删除消息。
  • Lambda 服务发送了事件,但未能收到函数的确认。
  • 间歇性的问题导致 Amazon SQS 返回相同的消息,Lambda 服务再次轮询该消息。
  • 批处理窗口和函数持续时间之和长于 Amazon SQS 队列可见性超时。SQS 可见性超时必须至少是函数超时和批处理窗口超时总和的六倍。

要解决这些问题,请在 Lambda 函数的 SQS 触发器中启用报告批处理项目故障。然后,创建模块化函数代码,在批处理、处理和删除成功消息和重复消息中进行迭代。该函数将成功消息的 messageID 存储在 DynamoDB 表中,然后验证消息是否已在前面处理。

注意:请务必考虑对 Amazon DynamoDB 进行 API 调用的成本。

解决方法

创建 DynamoDB 表

  1. 打开 DynamoDB 控制台
  2. 选择 Create table(创建表)。
  3. Create DynamoDB table(创建 DynamoDB 表)屏幕上,设置以下值:
    对于 Table(表),输入 ProcessedRecords
    Primary key(主键)下,对于 Partition key(分区键),输入 Records
    将数据类型设置为 String(字符串)
  4. 根据您使用案例的需要输入其他设置。然后,选择 Create (创建)

创建 Lambda 函数

重要提示:Lambda 函数代码必须是幂等的。有关幂等性最佳实践和示例函数逻辑,请参阅如何让我的 Lambda 函数幂等?

创建 DynamoDB 表后,创建 Lambda 函数。

在您的函数中:

  • 添加执行角色以允许 dynamodb:Querydynamodb:PutItem 操作。
  • 在 SQS 触发器中报告批处理项目故障(使用 Lambda 和 Amazon SQS),以识别和跳过批处理中的重复消息。

在下面的示例中,函数逻辑会将消息正文转换为大写。该函数使用 process_message(...) 方法编写:

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])

这篇文章对您有帮助吗?


您是否需要账单或技术支持?