Come posso impedire che un messaggio Amazon SQS richiami la mia funzione Lambda più di una volta?
Ultimo aggiornamento: 28-12-2021
Ho configurato la mia funzione AWS Lambda per elaborare i messaggi in una coda Amazon Simple Queue Service (Amazon SQS). Ora, alcuni dei miei messaggi Amazon SQS validi vengono ricevuti più volte fino a raggiungere il valore maxReceiveCount. Come posso interrompere le invocazioni di funzioni Lambda duplicate per lo stesso messaggio Amazon SQS?
Breve descrizione
Lambda supporta la consegna del messaggio almeno una volta. In alcuni casi, il meccanismo dei tentativi potrebbe inviare duplicati dello stesso messaggio. Amazon SQS invia quindi i messaggi alla coda dei messaggi non recapitabili, se ne hai configurata una.
Le invocazioni Lambda duplicate per lo stesso messaggio Amazon SQS possono verificarsi per uno dei seguenti motivi:
- La funzione restituisce un errore o si verifica un timeout.
- Il servizio Lambda non riesce a eliminare un messaggio dalla coda di Amazon SQS dopo un batch riuscito prima della scadenza del timeout di visibilità.
- Il servizio Lambda ha inviato l'evento a ma non è riuscito a ricevere una notifica dalla funzione.
- Un problema temporaneo ha causato la restituzione dello stesso messaggio da parte di Amazon SQS, che viene nuovamente interrogato dal servizio Lambda.
- La somma della finestra batch e della durata della funzione è superiore al timeout di visibilità della coda di Amazon SQS. Il timeout di visibilità SQS deve essere almeno sei volte il totale del timeout della funzione e del timeout della finestra batch.
Per risolvere questi problemi, attiva Segnala errore elementi batch nel trigger SQS della funzione Lambda. Quindi, crea un codice funzione modulare che esegua iterazioni nel batch, elabori ed elimini i messaggi riusciti e duplicati. La funzione memorizza il messageID dei messaggi riusciti in una tabella DynamoDB e quindi verifica che il messaggio sia stato elaborato in precedenza.
Nota: assicurati di considerare il costo per effettuare chiamate API ad Amazon DynamoDB.
Risoluzione
Creazione di una tabella DynamoDB
- Apri la console DynamoDB.
- Seleziona Crea tabella.
- Nella schermata Crea tabella DynamoDB, imposta i seguenti valori:
Per Tabella, inserisci ProcessedRecords
In Chiave primaria, per Chiave di partizione, specifica Record
Imposta il tipo di dati su String - Specifica le altre impostazioni necessarie per il tuo caso d'uso. Quindi, seleziona Crea.
Creazione di una funzione Lambda
Importante: il codice della funzione Lambda deve essere idempotente. Per le best practice sull'idempotenza ed esempi di logica delle funzioni, consulta Come posso rendere la funzione Lambda idempotente?
Dopo aver creato una tabella DynamoDB, crea una funzione Lambda.
Nella tua funzione:
- Aggiungi un ruolo di esecuzione per consentire le azioni dynamodb:Query e dynamodb:PutItem.
- Segnala gli errori degli elementi batch (Utilizzo di Lambda con Amazon SQS) nel trigger SQS per identificare e ignorare i messaggi duplicati nel batch.
Nell'esempio seguente, la logica della funzione converte il corpo del messaggio in maiuscolo. È scritto sotto il metodo process_message(...):
import boto3
dynamodb_client = boto3.client('dynamodb')
DDB_TABLE = 'ProcessedRecords'
# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
return dynamodb_client.query(
TableName='ProcessedRecords',
Select='COUNT',
KeyConditionExpression='Records = :Records',
ExpressionAttributeValues={
':Records': {'S': message_id}
}
)["Count"] != 0
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
return body.upper()
# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
for message_id in batch_item_success:
response = dynamodb_client.put_item(
TableName = DDB_TABLE,
Item={ 'Records': {'S':message_id}
}
)
return True
# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
batch_item_failures = []
batch_item_success = []
for record in Records:
message_id = record["messageId"]
print("Message: " + message_id)
if is_duplicate_message(message_id):
print("Message duplicate: " + message_id)
continue
try:
process_message(record["body"])
batch_item_success.append(message_id)
except:
batch_item_failures.append({"itemIdentifier": message_id})
push_to_dynamoDB(batch_item_success)
return batch_item_failures
def lambda_handler(event, context):
return iterate_records(event["Records"])
Informazioni correlate
Questo articolo è stato utile?
Hai bisogno di supporto tecnico o per la fatturazione?