如何防止 Amazon SQS 訊息多次叫用 Lambda 函數?

上次更新日期:2021 年 12 月 28 日

我將 AWS Lambda 功能配置為在 Amazon Simple Queue Service (Amazon SQS) 佇列中處理訊息。現在,我的一些有效的 Amazon SQS 訊息被多次接收到到 maxReceiveCount。如何停止針對相同 Amazon SQS 訊息的重複 Lambda 函數叫用?

簡短描述

Lambda 支援 at-least-once 訊息傳遞。在某些情況下,重試機制可能會重複傳送同一訊息。然後,Amazon SQS 會將訊息傳送至您的無法寄出信件佇列 (如果您已設定)。

由於下列其中一個原因,可能會針對相同的 Amazon SQS 訊息重複叫用 Lambda:

  • 您的函數傳回錯誤或逾時。
  • 成功完成批次處理後,在可見性逾時到期之前,Lambda 服務無法從 Amazon SQS 佇列刪除訊息。
  • Lambda 服務傳送事件至函數,但未能從函數接收確認。
  • 間斷性問題導致 Amazon SQS 傳回相同訊息,並由 Lambda 服務再次輪詢該訊息。
  • 批次處理時段和函數持續時間的總和長於 Amazon SQS 佇列可見性逾時。SQS 可見性逾時必須至少是函數逾時和批次處理時段逾時總和的六倍。

若要解決這些問題,請在 Lambda 函數的 SQS 觸發程序中開啟報告批次處理項目失敗。然後,建立模組化函數程式碼,反覆執行批次處理、處理和刪除成功和重複訊息。該函數將成功訊息的 MessageID 存放在 DynamoDB 資料表中,然後驗證是否提前處理該訊息。

注意:請務必考慮對 Amazon DynamoDB 進行 API 呼叫的成本。

解決方案

建立 DynamoDB 資料表

  1. 開啟 DynamoDB 主控台
  2. 選擇 Create Table (建立資料表)。
  3. Create DynamoDB table (建立 DynamoDB 資料表) 螢幕中,設定以下值:
    對於 Table (資料表),輸入 ProcessedRecords
    Primary key (主要索引鍵) 下,對於 Partition key (分區索引鍵),輸入 Records (記錄)
    將資料類型設定為 String (字串)
  4. 根據您的使用案例視需輸入其他設定。然後選擇 Create (建立)。

建立 Lambda 函數

重要提示︰Lambda 函數程式必須為冪等。如需冪等最佳實務和範例函數邏輯,請參閱如何將我的 Lambda 函數設為冪等?

在建立 DynamoDB 資料表之後,建立一個 Lambda 函數。

在函數中:

  • 新增執行角色以允許 dynamodb:Querydynamodb:PutItem 動作。
  • 在 SQS 觸發程序中報告批次處理項目失敗 (將 Lambda 與 Amazon SQS 結合使用),以識別和略過批次處理中的重複訊息。

在以下範例中,函數邏輯將訊息正文轉換為大寫。在 process_message(...) 方法下編寫:

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])