¿Cómo puedo evitar que un mensaje de Amazon SQS invoque la función Lambda más de una vez?

Última actualización: 28 de diciembre de 2021

Configuré la función de AWS Lambda para procesar mensajes en una cola de Amazon Simple Queue Service (Amazon SQS). Ahora, algunos de los mensajes válidos de Amazon SQS se reciben varias veces, hasta el maxReceiveCount. ¿Cómo puedo evitar la duplicación de invocaciones de funciones Lambda para el mismo mensaje de Amazon SQS?

Descripción breve

Lambda admite la entrega de mensajes al menos una vez. En algunos casos, el mecanismo de reintento podría enviar duplicados del mismo mensaje. Posteriormente, Amazon SQS envía los mensajes a la cola de mensajes fallidos, si se ha configurado una.

Las invocaciones duplicadas de Lambda para el mismo mensaje de Amazon SQS pueden ocurrir por una de las siguientes razones:

  • La función genera un error o el tiempo de espera se agota.
  • El servicio de Lambda no logra eliminar un mensaje de la cola de Amazon SQS después de un lote exitoso antes de que expire el tiempo de espera de visibilidad.
  • El servicio de Lambda envió el evento, pero no obtuvo una confirmación de la función.
  • Un problema intermitente hizo que Amazon SQS devolviera el mismo mensaje y que el servicio Lambda lo volviera a sondear.
  • La suma del intervalo del lote y la duración de la función es mayor que el tiempo de espera de visibilidad de la cola de Amazon SQS. El tiempo de espera de visibilidad de SQS debe ser al menos seis veces el total del tiempo de espera de la función y el tiempo de espera del intervalo del lote.

Para resolver estos problemas, active la opción Informar de los errores de los elementos del lote en el desencadenador de SQS de la función Lambda. Posteriormente, cree un código de función modular que itere a través del lote, procese y borre los mensajes exitosos y duplicados. La función almacena el messageID de los mensajes correctos en una tabla de DynamoDB y después verifica que el mensaje se haya procesado antes.

Nota: Asegúrese de tener en cuenta el costo de realizar llamadas a la API de Amazon DynamoDB.

Resolución

Crear una tabla de DynamoDB

  1. Abra la consola de DynamoDB.
  2. Elija Crear tabla.
  3. En la pantalla Crear tabla de DynamoDB, defina los siguientes valores:
    En Tabla, ingrese ProcessedRecords
    En Clave principal, bajo Clave de partición, ingrese Registros
    Establecer el tipo de datos en Cadena
  4. Ingrese otras configuraciones según sea necesario para el caso de uso. A continuación, elija Crear.

Crear una función Lambda

Importante: El código de la función Lambda debe ser idempotente. Para conocer las prácticas recomendadas de idempotencia y la lógica de la función de ejemplo, consulte ¿Cómo hago que la función Lambda sea idempotente?

Después de crear una tabla de DynamoDB, cree una función Lambda.

En la función:

  • Agregue un rol de ejecución para permitir las acciones dynamodb:Query y dynamodb:PutItem.
  • Informe de errores en los elementos del lote (Utilizar Lambda con Amazon SQS) en el desencadenador de SQS para identificar y omitir los mensajes duplicados en el lote.

En el siguiente ejemplo, la lógica de la función convierte el cuerpo del mensaje en mayúsculas. Está escrito bajo el método process_message(…):

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])