如何防止 Amazon SQS 訊息多次叫用 Lambda 函數?

上次更新日期:2022 年 11 月 29 日

我將 AWS Lambda 函數設定為在 Amazon Simple Queue Service (Amazon SQS) 佇列中處理訊息。現在,我的一些有效的 Amazon SQS 訊息被多次接收到到 maxReceiveCount。如何停止針對相同 Amazon SQS 訊息的重複 Lambda 函數叫用?

簡短描述

Lambda 支援 at-least-once 訊息傳遞。在某些情況下,重試機制可能會重複傳送同一訊息。然後,Amazon SQS 會將訊息傳送至您的無效字母佇列 (如果您已設定)。

由於下列其中一個原因,可能會針對相同的 Amazon SQS 訊息重複叫用 Lambda:

  • 您的函數傳回錯誤或逾時。
  • 成功完成批次處理後,在可見性逾時到期之前,Lambda 服務無法從 Amazon SQS 佇列刪除訊息。
  • Lambda 服務傳送事件至函數,但未能從函數接收確認。
  • 間斷性問題導致 Amazon SQS 傳回相同訊息,並由 Lambda 服務再次輪詢該訊息。
  • 批次處理時段和函數持續時間的總和長於 Amazon SQS 佇列可見性逾時。SQS 可見性逾時必須至少是函數逾時和批次處理時段逾時總和的六倍。

若要解決這些問題,請在 Lambda 函數的 SQS 觸發程序中開啟報告批次處理項目失敗。然後,建立模組化函數程式碼,反覆執行批次處理、處理和刪除成功和重複訊息。該函數將成功訊息的 messageID 存放在 Amazon DynamoDB 資料表中,然後驗證是否提前處理該訊息。

重要事項:下列解決方案會對每個傳入訊息傳送多個請求至 DynamoDB,以減緩處理時間。這也會導致 API 呼叫的成本更高。因此,請務必為您的專案考慮此解決方案的時間和成本。若重複 Lambda 呼叫的錯誤率很低,則此解決方案的時間和成本可能會超過效益。

解決方式

首先,通過檢查訊息 ID來確認您多次收到相同的訊息。若因先前列出的任何原因而收到相同訊息的多個副本,則該訊息具有相同的 ID。於此狀況下,請依照下列步驟作業。若您收到多個具有相同內容但不同訊息 ID 的訊息,則該訊息會多次傳送至佇列。於此狀況下,請檢查您的訊息產生者。

下列步驟僅適用於 Lambda 函數的 SQS 觸發程序。其不適用於手動提取請求。

建立 DynamoDB 資料表

下列 DynamoDB 資料表會保留您的訊息 ID,則 Lambda 函數可加以比較以進行複製。

  1. 開啟 DynamoDB 主控台
  2. 選擇 Create Table (建立資料表)。
  3. Create DynamoDB table (建立 DynamoDB 資料表) 螢幕中,設定以下值:
    對於 Table (資料表),輸入 ProcessedRecords
    Primary key (主索引鍵) 下,對於 Partition key (分區索引鍵),輸入 Records (記錄)
    將資料類型設定為 String (字串)
  4. 根據您的使用案例視需輸入其他設定。然後選擇 Create (建立)。

建立 Lambda 函數

重要提示︰Lambda 函數程式必須為冪等。如需等冪性最佳實務和範例函數邏輯,請參閱如何將我的 Lambda 函數設為等冪?

在您建立 DynamoDB 資料表之後,建立一個 Lambda 函數。此函數會將傳入訊息與先前成功,然後保留於 DynamoDB 資料表中的訊息進行比較。若訊息先前已成功,則函數不允許處理重複項目。若新且唯一的訊息成功,則會將其傳送至資料表以供未來比較。失敗的訊息會重試,直至成功處理完畢,或是訊息的ReceiveCount 超過 maxReceiveCount 為止。

在函數中:

在以下範例中,函數邏輯將訊息正文轉換為大寫。在 process_message(...) 方法下編寫:

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])

此文章是否有幫助?


您是否需要帳單或技術支援?