Comment puis-je empêcher un message Amazon SQS d'appeler ma fonction Lambda plusieurs fois ?

Dernière mise à jour : 28/12/2021

J'ai configuré ma fonction AWS Lambda pour traiter les messages dans une file d'attente Amazon Simple Queue Service (Amazon SQS). À présent, certains de mes messages Amazon SQS valides sont reçus plusieurs fois, jusqu'à la limite maxReceiveCount. Comment puis-je arrêter les appels de fonctions Lambda en double pour le même message Amazon SQS ?

Brève description

Lambda prend en charge la distribution des messages au moins une fois. Dans certains cas, le mécanisme de nouvelle tentative peut envoyer des doublons du même message. Amazon SQS envoie ensuite les messages à votre file d'attente de lettres mortes si elle est configurée.

Les appels Lambda en double pour le même message Amazon SQS peuvent intervenir pour l'une des raisons suivantes :

  • Votre fonction renvoie une erreur ou un délai d'expiration.
  • Le service Lambda ne parvient pas à supprimer un message de la file d'attente Amazon SQS à la suite d'un traitement en lot réussi avant l'expiration du délai de visibilité.
  • Le service Lambda a envoyé l'événement à cette fonction, mais n'a pas reçu d'accusé de réception de celle-ci.
  • Un problème intermittent a provoqué le renvoi du même message par Amazon SQS, qui a été à nouveau interrogé par le service Lambda.
  • La somme de la fenêtre de lot et de la durée de la fonction est supérieure à votre délai de visibilité de la file d'attente Amazon SQS. Le délai de visibilité SQS doit être au moins six fois supérieur au délai total de la fonction et de celui de la fenêtre de lot.

Pour résoudre ces problèmes, activez l'option Signaler l'échec d'élément de lot dans le déclencheur SQS de votre fonction Lambda. Créez ensuite un code de fonction modulaire qui parcourt le lot, traite et supprime les messages reçus et les messages en double. La fonction stocke le messageID des messages reçus dans une table DynamoDB et vérifie ensuite que le message a été traité plus tôt.

Remarque : assurez-vous de prendre en compte le coût des appels d'API vers Amazon DynamoDB.

Résolution

Créer une table DynamoDB

  1. Ouvrez la console DynamoDB.
  2. Sélectionnez Créer une table.
  3. Dans l'écran Créer une table DynamoDB, définissez les valeurs suivantes :
    Pour Table, saisissez ProcesssedRecords
    Sous Clé primaire, pour Clé de partition, saisissez Registres
    Définissez le type de données sur String
  4. Saisissez d'autres paramètres selon les besoins de votre cas d'utilisation. Ensuite, choisissez Créer.

Créer une fonction Lambda

Important : le code de la fonction Lambda doit être idempotent. Pour connaître les bonnes pratiques en matière d'idempotence et accéder à un exemple de logique de fonction, veuillez consulter la section Comment rendre ma fonction Lambda idempotente ?

Après avoir créé une table DynamoDB, créez une fonction Lambda.

Dans votre fonction :

  • Ajoutez un rôle d'exécution pour autoriser les actions DynamoDB:Query et DynamoDB:PutItem.
  • Signalez les échecs d'éléments de lot (à l'aide de Lambda avec Amazon SQS) dans le déclencheur SQS pour identifier et ignorer les messages en double dans le lot.

Dans l'exemple suivant, la logique de fonction convertit le corps du message en majuscules. Le message est écrit sous la méthode process_message (...) :

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])