Wie kann ich verhindern, dass eine Amazon-SQS-Nachricht meine Lambda-Funktion mehr als einmal aufruft?

Letzte Aktualisierung: 2022-11-29

Ich habe meine AWS-Lambda-Funktion für die Verarbeitung von Nachrichten in einer Warteschlange von Amazon Simple Queue Service (Amazon SQS) konfiguriert. Jetzt werden einige meiner gültigen Amazon-SQS-Nachrichten mehrmals bis zum maxReceiveCount empfangen. Wie stoppe ich doppelte Lambda-Funktionsaufrufe für dieselbe Amazon-SQS-Nachricht?

Kurzbeschreibung

Lambda unterstützt die mindestens einmalige Nachrichtenzustellung. In einigen Fällen sendet der Wiederholungsmechanismus möglicherweise Duplikate derselben Nachricht. Amazon SQS sendet die Nachrichten dann an Ihre Warteschlange für unzustellbare Nachrichten, sofern Sie eine konfiguriert haben.

Doppelte Lambda-Aufrufe für dieselbe Amazon SQS-Nachricht können aus einem der folgenden Gründe auftreten:

  • Ihre Funktion gibt einen Fehler oder eine Zeitüberschreitung zurück.
  • Der Lambda-Dienst löscht eine Nachricht nach einem erfolgreichen Batch nicht aus der Amazon-SQS-Warteschlange, bevor das Zeitlimit für die Sichtbarkeit abläuft.
  • Der Lambda-Dienst hat das Ereignis an gesendet, aber keine Bestätigung von der Funktion erhalten.
  • Ein zeitweiliges Problem führte dazu, dass Amazon SQS dieselbe Nachricht zurückgab und sie erneut vom Lambda-Dienst abgefragt wurde.
  • Die Summe aus dem Batch-Fenster und der Funktionsdauer ist länger als das Timeout für die Sichtbarkeit der Amazon-SQS-Warteschlange. Das SQS-Sichtbarkeits-Timeout muss mindestens das Sechsfache der Summe des Funktions-Timeouts und des Batch-Fenster-Timeouts betragen.

Um diese Probleme zu beheben, aktivieren Sie im SQS-Auslöser Ihrer Lambda-Funktion den Fehler des Batch-Elements melden. Erstellen Sie dann einen modularen Funktionscode, der den Stapel durchläuft, erfolgreiche und doppelte Nachrichten verarbeitet und löscht. Die Funktion bestätigt, dass die Nachricht zuvor bearbeitet wurde, nachdem die messageID der erfolgreichen Nachrichten in einer Amazon DynamoDB-Tabelle gespeichert wurde.

Wichtig: Die folgende Lösung verlangsamt die Verarbeitungszeit, da für jede eingehende Nachricht mehrere Anfragen an DynamoDB gesendet werden. Dies führt auch zu höheren Kosten durch API-Aufrufe. Stellen Sie daher sicher, dass Sie den Zeit- und Kostenaufwand für diese Lösung für Ihr Projekt berücksichtigen. Wenn Ihre Fehlerquote bei doppelten Lambda-Anrufen gering ist, können Zeit und Kosten dieser Lösung die Vorteile überwiegen.

Lösung

Bestätigen Sie zunächst, dass Sie dieselbe Nachricht mehrmals erhalten, indem Sie die Nachrichten-ID überprüfen. Wenn Sie aus einem der oben genannten Gründe mehrere Kopien derselben Nachricht erhalten, haben die Nachrichten dieselbe ID. Gehen Sie in diesem Fall wie folgt vor. Wenn Sie mehrere Nachrichten mit demselben Inhalt, aber unterschiedlichen Nachrichten-IDs erhalten, wird die Nachricht mehr als einmal an die Warteschlange gesendet. Überprüfen Sie in diesem Fall Ihren Nachrichtenproduzenten.

Die folgenden Schritte gelten nur für den SQS-Trigger einer Lambda-Funktion. Sie funktionieren nicht für manuelle Pull-Requests.

Eine DynamoDB-Tabelle erstellen

Die folgende DynamoDB-Tabelle enthält Ihre Nachrichten-IDs, sodass eine Lambda-Funktion sie auf Duplizierung vergleichen kann.

  1. Öffnen Sie die DynamoDB-Konsole.
  2. Wählen Sie Tabelle erstellen.
  3. Stellen Sie im Bildschirm DynamoDB-Tabelle erstellen die folgenden Werte ein:
    Geben Sie für TabelleProcessedRecords ein
    Geben Sie unter Primärschlüssel für PartitionsschlüsselAkten ein
    Setzen Sie den Datentyp auf String
  4. Geben Sie nach Bedarf weitere Einstellungen für Ihren Anwendungsfall ein. Wählen Sie dann Erstellen.

Lambda-Funktion erstellen

Wichtig: Der Lambda-Funktionscode muss idempotent sein. Bewährte Methoden für Idempotenz und Beispielfunktionslogik finden Sie unter Wie mache ich mein Lambda idempotent?

Nachdem Sie die DynamoDB-Tabelle erstellt haben, erstellen Sie eine Lambda-Funktion. Diese Funktion vergleicht eingehende Nachrichten mit Nachrichten, die zuvor erfolgreich waren und dann in Ihrer DynamoDB-Tabelle gespeichert wurden. Wenn eine Nachricht zuvor erfolgreich war, lässt die Funktion die Verarbeitung von Duplikaten nicht zu. Wenn neue, eindeutige Nachrichten erfolgreich sind, werden sie zum späteren Vergleich an die Tabelle gesendet. Fehlgeschlagene Nachrichten werden wiederholt, bis sie erfolgreich verarbeitet wurden oder bis der ReceiveCount für eine Nachricht den maxReceiveCount überschreitet.

In deiner Funktion:

Im folgenden Beispiel wandelt die Funktionslogik den Nachrichtentext in Großbuchstaben um. Es wird unter process_message (...) -Methode geschrieben:

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])

War dieser Artikel hilfreich?


Benötigen Sie Hilfe zur Fakturierung oder technischen Support?