Come posso impedire che un messaggio Amazon SQS richiami la mia funzione Lambda più di una volta?

Ultimo aggiornamento: 28-12-2021

Ho configurato la mia funzione AWS Lambda per elaborare i messaggi in una coda Amazon Simple Queue Service (Amazon SQS). Ora, alcuni dei miei messaggi Amazon SQS validi vengono ricevuti più volte fino a raggiungere il valore maxReceiveCount. Come posso interrompere le invocazioni di funzioni Lambda duplicate per lo stesso messaggio Amazon SQS?

Breve descrizione

Lambda supporta la consegna del messaggio almeno una volta. In alcuni casi, il meccanismo dei tentativi potrebbe inviare duplicati dello stesso messaggio. Amazon SQS invia quindi i messaggi alla coda dei messaggi non recapitabili, se ne hai configurata una.

Le invocazioni Lambda duplicate per lo stesso messaggio Amazon SQS possono verificarsi per uno dei seguenti motivi:

  • La funzione restituisce un errore o si verifica un timeout.
  • Il servizio Lambda non riesce a eliminare un messaggio dalla coda di Amazon SQS dopo un batch riuscito prima della scadenza del timeout di visibilità.
  • Il servizio Lambda ha inviato l'evento a ma non è riuscito a ricevere una notifica dalla funzione.
  • Un problema temporaneo ha causato la restituzione dello stesso messaggio da parte di Amazon SQS, che viene nuovamente interrogato dal servizio Lambda.
  • La somma della finestra batch e della durata della funzione è superiore al timeout di visibilità della coda di Amazon SQS. Il timeout di visibilità SQS deve essere almeno sei volte il totale del timeout della funzione e del timeout della finestra batch.

Per risolvere questi problemi, attiva Segnala errore elementi batch nel trigger SQS della funzione Lambda. Quindi, crea un codice funzione modulare che esegua iterazioni nel batch, elabori ed elimini i messaggi riusciti e duplicati. La funzione memorizza il messageID dei messaggi riusciti in una tabella DynamoDB e quindi verifica che il messaggio sia stato elaborato in precedenza.

Nota: assicurati di considerare il costo per effettuare chiamate API ad Amazon DynamoDB.

Risoluzione

Creazione di una tabella DynamoDB

  1. Apri la console DynamoDB.
  2. Seleziona Crea tabella.
  3. Nella schermata Crea tabella DynamoDB, imposta i seguenti valori:
    Per Tabella, inserisci ProcessedRecords
    In Chiave primaria, per Chiave di partizione, specifica Record
    Imposta il tipo di dati su String
  4. Specifica le altre impostazioni necessarie per il tuo caso d'uso. Quindi, seleziona Crea.

Creazione di una funzione Lambda

Importante: il codice della funzione Lambda deve essere idempotente. Per le best practice sull'idempotenza ed esempi di logica delle funzioni, consulta Come posso rendere la funzione Lambda idempotente?

Dopo aver creato una tabella DynamoDB, crea una funzione Lambda.

Nella tua funzione:

  • Aggiungi un ruolo di esecuzione per consentire le azioni dynamodb:Query e dynamodb:PutItem.
  • Segnala gli errori degli elementi batch (Utilizzo di Lambda con Amazon SQS) nel trigger SQS per identificare e ignorare i messaggi duplicati nel batch.

Nell'esempio seguente, la logica della funzione converte il corpo del messaggio in maiuscolo. È scritto sotto il metodo process_message(...):

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])