Come posso impedire che un messaggio Amazon SQS richiami la mia funzione Lambda più di una volta?

5 minuti di lettura
0

Ho configurato la mia funzione AWS Lambda per elaborare i messaggi in una coda Amazon Simple Queue Service (Amazon SQS). Ora, alcuni dei miei messaggi Amazon SQS validi vengono ricevuti più volte fino a raggiungere il valore maxReceiveCount. Come posso interrompere le invocazioni di funzioni Lambda duplicate per lo stesso messaggio Amazon SQS?

Breve descrizione

Lambda supporta la consegna del messaggio almeno una volta. In alcuni casi, il meccanismo dei tentativi potrebbe inviare duplicati dello stesso messaggio. Amazon SQS invia quindi i messaggi alla coda dei messaggi non recapitabili, se ne hai configurata una.

Le invocazioni Lambda duplicate per lo stesso messaggio Amazon SQS possono verificarsi per uno dei seguenti motivi:

  • La funzione restituisce un errore o si verifica un timeout.
  • Il servizio Lambda non riesce a eliminare un messaggio dalla coda di Amazon SQS dopo un batch riuscito prima della scadenza del timeout di visibilità.
  • Il servizio Lambda ha inviato l'evento a ma non è riuscito a ricevere una notifica dalla funzione.
  • Un problema temporaneo ha causato la restituzione dello stesso messaggio da parte di Amazon SQS, che viene nuovamente interrogato dal servizio Lambda.
  • La somma della finestra batch e della durata della funzione è superiore al timeout di visibilità della coda di Amazon SQS. Il timeout di visibilità SQS deve essere almeno sei volte il totale del timeout della funzione e del timeout della finestra batch.

Per risolvere questi problemi, attiva Segnala errore elementi batch nel trigger SQS della funzione Lambda. Quindi, crea un codice funzione modulare che esegua iterazioni nel batch, elabori ed elimini i messaggi riusciti e duplicati. La funzione memorizza il messageID dei messaggi riusciti in una tabella Amazon DynamoDB e quindi verifica che il messaggio sia stato elaborato in precedenza.

Importante: la seguente risoluzione rallenta i tempi di elaborazione inviando più richieste a DynamoDB per ogni messaggio in arrivo. Ciò comporta anche costi più elevati dovuti alle chiamate API. Pertanto, assicurati di considerare i tempi e i costi di questa risoluzione per il tuo progetto. Se il tasso di errore per le chiamate Lambda duplicate è basso, i tempi e i costi di questa risoluzione potrebbero superare i vantaggi.

Risoluzione

Per prima cosa, conferma di ricevere lo stesso messaggio più volte controllando l'ID del messaggio. Se ricevi più copie dello stesso messaggio per uno dei motivi elencati in precedenza, i messaggi hanno lo stesso ID. In questo caso, procedi nel seguente modo. Se ricevi più messaggi con lo stesso contenuto ma ID di messaggio diversi, il messaggio viene inviato alla coda più di una volta. In questo caso, controlla il produttore del messaggio.

I passaggi seguenti si applicano solo al trigger SQS di una funzione Lambda. Non funzionano per le richieste di pull manuali.

Creazione di una tabella DynamoDB

La seguente tabella DynamoDB contiene gli ID dei messaggi in modo che una funzione Lambda possa confrontarli per la duplicazione.

  1. Apri la console DynamoDB.
  2. Seleziona Crea tabella.
  3. Nella schermata Crea tabella DynamoDB, imposta i seguenti valori:
    Per Tabella, inserisci ProcessedRecords
    In Chiave primaria, per Chiave di partizione, specifica Record
    Imposta il tipo di dati su String
  4. Specifica le altre impostazioni necessarie per il tuo caso d'uso. Quindi, seleziona Crea.

Creazione di una funzione Lambda

Importante: il codice della funzione Lambda deve essere idempotente. Per le best practice sull'idempotenza ed esempi di logica delle funzioni, consulta Come posso rendere la funzione Lambda idempotente?

Dopo aver creato una tabella DynamoDB, crea una funzione Lambda. Questa funzione confronta i messaggi in arrivo con quelli che in precedenza avevano avuto successo e poi contenuti nella tabella DynamoDB. Se in precedenza un messaggio ha avuto successo, la funzione non consente l'elaborazione dei duplicati. Se i nuovi messaggi univoci hanno esito positivo, vengono inviati alla tabella per il confronto futuro. I messaggi non riusciti vengono ritentati finché non vengono elaborati correttamente o fino a quando il ReceiveCount per un messaggio non supera il valore maxReceiveCount.

Nella tua funzione:

Nell'esempio seguente, la logica della funzione converte il corpo del messaggio in maiuscolo. È scritto sotto il metodo process_message(...):

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])

Informazioni correlate

Perché la funzione Lambda riprova a inviare messaggi Amazon SQS validi e li inserisce nella coda dei messaggi non recapitabili?


AWS UFFICIALE
AWS UFFICIALEAggiornata un anno fa