如何防止 Amazon SQS 訊息多次叫用 Lambda 函數?
上次更新日期:2022 年 11 月 29 日
我將 AWS Lambda 函數設定為在 Amazon Simple Queue Service (Amazon SQS) 佇列中處理訊息。現在,我的一些有效的 Amazon SQS 訊息被多次接收到到 maxReceiveCount。如何停止針對相同 Amazon SQS 訊息的重複 Lambda 函數叫用?
簡短描述
Lambda 支援 at-least-once 訊息傳遞。在某些情況下,重試機制可能會重複傳送同一訊息。然後,Amazon SQS 會將訊息傳送至您的無效字母佇列 (如果您已設定)。
由於下列其中一個原因,可能會針對相同的 Amazon SQS 訊息重複叫用 Lambda:
- 您的函數傳回錯誤或逾時。
- 成功完成批次處理後,在可見性逾時到期之前,Lambda 服務無法從 Amazon SQS 佇列刪除訊息。
- Lambda 服務傳送事件至函數,但未能從函數接收確認。
- 間斷性問題導致 Amazon SQS 傳回相同訊息,並由 Lambda 服務再次輪詢該訊息。
- 批次處理時段和函數持續時間的總和長於 Amazon SQS 佇列可見性逾時。SQS 可見性逾時必須至少是函數逾時和批次處理時段逾時總和的六倍。
若要解決這些問題,請在 Lambda 函數的 SQS 觸發程序中開啟報告批次處理項目失敗。然後,建立模組化函數程式碼,反覆執行批次處理、處理和刪除成功和重複訊息。該函數將成功訊息的 messageID 存放在 Amazon DynamoDB 資料表中,然後驗證是否提前處理該訊息。
重要事項:下列解決方案會對每個傳入訊息傳送多個請求至 DynamoDB,以減緩處理時間。這也會導致 API 呼叫的成本更高。因此,請務必為您的專案考慮此解決方案的時間和成本。若重複 Lambda 呼叫的錯誤率很低,則此解決方案的時間和成本可能會超過效益。
解決方式
首先,通過檢查訊息 ID來確認您多次收到相同的訊息。若因先前列出的任何原因而收到相同訊息的多個副本,則該訊息具有相同的 ID。於此狀況下,請依照下列步驟作業。若您收到多個具有相同內容但不同訊息 ID 的訊息,則該訊息會多次傳送至佇列。於此狀況下,請檢查您的訊息產生者。
下列步驟僅適用於 Lambda 函數的 SQS 觸發程序。其不適用於手動提取請求。
建立 DynamoDB 資料表
下列 DynamoDB 資料表會保留您的訊息 ID,則 Lambda 函數可加以比較以進行複製。
- 開啟 DynamoDB 主控台。
- 選擇 Create Table (建立資料表)。
- 在 Create DynamoDB table (建立 DynamoDB 資料表) 螢幕中,設定以下值:
對於 Table (資料表),輸入 ProcessedRecords
在 Primary key (主索引鍵) 下,對於 Partition key (分區索引鍵),輸入 Records (記錄)
將資料類型設定為 String (字串) - 根據您的使用案例視需輸入其他設定。然後選擇 Create (建立)。
建立 Lambda 函數
重要提示︰Lambda 函數程式必須為冪等。如需等冪性最佳實務和範例函數邏輯,請參閱如何將我的 Lambda 函數設為等冪?
在您建立 DynamoDB 資料表之後,建立一個 Lambda 函數。此函數會將傳入訊息與先前成功,然後保留於 DynamoDB 資料表中的訊息進行比較。若訊息先前已成功,則函數不允許處理重複項目。若新且唯一的訊息成功,則會將其傳送至資料表以供未來比較。失敗的訊息會重試,直至成功處理完畢,或是訊息的ReceiveCount 超過 maxReceiveCount 為止。
在函數中:
- 新增執行角色以允許 dynamodb:Query 和 dynamodb:PutItem 動作。
- 在 SQS 觸發程序中報告批次處理項目失敗,以識別和略過批次處理中的重複訊息。
在以下範例中,函數邏輯將訊息正文轉換為大寫。在 process_message(...) 方法下編寫:
import boto3
dynamodb_client = boto3.client('dynamodb')
DDB_TABLE = 'ProcessedRecords'
# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
return dynamodb_client.query(
TableName='ProcessedRecords',
Select='COUNT',
KeyConditionExpression='Records = :Records',
ExpressionAttributeValues={
':Records': {'S': message_id}
}
)["Count"] != 0
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
return body.upper()
# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
for message_id in batch_item_success:
response = dynamodb_client.put_item(
TableName = DDB_TABLE,
Item={ 'Records': {'S':message_id}
}
)
return True
# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
batch_item_failures = []
batch_item_success = []
for record in Records:
message_id = record["messageId"]
print("Message: " + message_id)
if is_duplicate_message(message_id):
print("Message duplicate: " + message_id)
continue
try:
process_message(record["body"])
batch_item_success.append(message_id)
except:
batch_item_failures.append({"itemIdentifier": message_id})
push_to_dynamoDB(batch_item_success)
return batch_item_failures
def lambda_handler(event, context):
return iterate_records(event["Records"])