Wie kann ich verhindern, dass eine Amazon-SQS-Nachricht meine Lambda-Funktion mehr als einmal aufruft?

Letzte Aktualisierung: 28.12.2021

Ich habe meine AWS-Lambda-Funktion für die Verarbeitung von Nachrichten in einer Warteschlange von Amazon Simple Queue Service (Amazon SQS) konfiguriert. Jetzt werden einige meiner gültigen Amazon-SQS-Nachrichten mehrmals bis zum maxReceiveCount empfangen. Wie stoppe ich doppelte Lambda-Funktionsaufrufe für dieselbe Amazon-SQS-Nachricht?

Kurzbeschreibung

Lambda unterstützt die mindestens einmalige Nachrichtenzustellung. In einigen Fällen sendet der Wiederholungsmechanismus möglicherweise Duplikate derselben Nachricht. Amazon SQS sendet die Nachrichten dann an Ihre Warteschlange für unzustellbare Nachrichten, sofern Sie eine konfiguriert haben.

Doppelte Lambda-Aufrufe für dieselbe Amazon SQS-Nachricht können aus einem der folgenden Gründe auftreten:

  • Ihre Funktion gibt einen Fehler oder eine Zeitüberschreitung zurück.
  • Der Lambda-Dienst löscht eine Nachricht nach einem erfolgreichen Batch nicht aus der Amazon-SQS-Warteschlange, bevor das Zeitlimit für die Sichtbarkeit abläuft.
  • Der Lambda-Dienst hat das Ereignis an gesendet, aber keine Bestätigung von der Funktion erhalten.
  • Ein zeitweiliges Problem führte dazu, dass Amazon SQS dieselbe Nachricht zurückgab und sie erneut vom Lambda-Dienst abgefragt wurde.
  • Die Summe aus dem Batch-Fenster und der Funktionsdauer ist länger als das Timeout für die Sichtbarkeit der Amazon-SQS-Warteschlange. Das SQS-Sichtbarkeits-Timeout muss mindestens das Sechsfache der Summe des Funktions-Timeouts und des Batch-Fenster-Timeouts betragen.

Um diese Probleme zu beheben, aktivieren Sie im SQS-Auslöser Ihrer Lambda-Funktion den Fehler des Batch-Elements melden. Erstellen Sie dann einen modularen Funktionscode, der den Stapel durchläuft, erfolgreiche und doppelte Nachrichten verarbeitet und löscht. Die Funktion speichert die MessageID der erfolgreichen Nachrichten in einer DynamoDB-Tabelle und überprüft dann, ob die Nachricht zuvor verarbeitet wurde.

Hinweis: Denken Sie daran, die Kosten für API-Aufrufe an Amazon DynamoDB zu berücksichtigen.

Auflösung

Eine DynamoDB-Tabelle erstellen

  1. Öffnen Sie die DynamoDB-Konsole.
  2. Wählen Sie Tabelle erstellen.
  3. Stellen Sie im Bildschirm DynamoDB-Tabelle erstellen die folgenden Werte ein:
    Geben Sie für TabelleProcesssedRecords ein
    Geben Sie unter Primärschlüssel für PartitionsschlüsselAkten ein
    Setzen Sie den Datentyp auf String
  4. Geben Sie nach Bedarf weitere Einstellungen für Ihren Anwendungsfall ein. Wählen Sie dann Erstellen.

Lambda-Funktion erstellen

Wichtig: Der Lambda-Funktionscode muss idempotent sein. Bewährte Methoden für Idempotenz und Beispielfunktionslogik finden Sie unter Wie mache ich mein Lambda idempotent?

Nachdem Sie eine DynamoDB-Tabelle erstellt haben, erstellen Sie eine Lambda-Funktion.

In deiner Funktion:

  • Fügen Sie eine Ausführungsrolle hinzu, um die Aktionen dynamoDB: Query und DynamoDB: PutItem zuzulassen.
  • Melden Sie Fehler bei Batchelementen (Verwendung von Lambda mit Amazon SQS) im SQS-Trigger, um doppelte Nachrichten im Stapel zu identifizieren und zu überspringen.

Im folgenden Beispiel wandelt die Funktionslogik den Nachrichtentext in Großbuchstaben um. Es wird unter process_message(...)-Methode geschrieben:

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])