¿Cómo puedo evitar que un mensaje de Amazon SQS invoque la función de Lambda más de una vez?

Última actualización: 29/11/2022

Configuré la función de AWS Lambda para procesar mensajes en una cola de Amazon Simple Queue Service (Amazon SQS). Ahora, algunos de los mensajes válidos de Amazon SQS se reciben varias veces, hasta el maxReceiveCount. ¿Cómo puedo evitar la duplicación de invocaciones de funciones Lambda para el mismo mensaje de Amazon SQS?

Descripción breve

Lambda admite la entrega de mensajes al menos una vez. En algunos casos, el mecanismo de reintento podría enviar duplicados del mismo mensaje. Posteriormente, Amazon SQS envía los mensajes a la cola de mensajes fallidos, si se ha configurado una.

Las invocaciones duplicadas de Lambda para el mismo mensaje de Amazon SQS pueden ocurrir por una de las siguientes razones:

  • La función genera un error o el tiempo de espera se agota.
  • El servicio de Lambda no logra eliminar un mensaje de la cola de Amazon SQS después de un lote exitoso antes de que expire el tiempo de espera de visibilidad.
  • El servicio de Lambda envió el evento, pero no obtuvo una confirmación de la función.
  • Un problema intermitente hizo que Amazon SQS devolviera el mismo mensaje y que el servicio Lambda lo volviera a sondear.
  • La suma del intervalo del lote y la duración de la función es mayor que el tiempo de espera de visibilidad de la cola de Amazon SQS. El tiempo de espera de visibilidad de SQS debe ser al menos seis veces el total del tiempo de espera de la función y el tiempo de espera del intervalo del lote.

Para resolver estos problemas, active la opción Informar de los errores de los elementos del lote en el desencadenador de SQS de la función Lambda. Posteriormente, cree un código de función modular que itere a través del lote, procese y borre los mensajes correctos y duplicados. La función almacena el messageID de los mensajes correctos en una tabla de Amazon DynamoDB y después verifica que el mensaje se haya procesado antes.

Importante: La siguiente resolución reduce el tiempo de procesamiento al enviar varias solicitudes a DynamoDB para cada mensaje entrante. Esto también se traduce en un aumento de los costos de las llamadas a la API. Por lo tanto, asegúrese de considerar el tiempo y el costo de esta resolución para su proyecto. Si el porcentaje de errores de llamadas Lambda duplicadas es bajo, es posible que el tiempo y el costo de esta resolución superen los beneficios.

Resolución

En primer lugar, confirme que está recibiendo el mismo mensaje varias veces comprobando la ID del mensaje. Si recibe varias copias del mismo mensaje por alguno de los motivos mencionados anteriormente, los mensajes tienen el mismo ID. En este caso, siga los pasos que se indican a continuación. Si recibe varios mensajes con el mismo contenido, pero distintos ID de mensajes, el mensaje se envía a la cola más de una vez. En este caso, compruebe su productor de mensajes.

Los siguientes pasos se aplican solo al disparador SQS de una función de Lambda. No funcionan para solicitudes de extracción manual.

Crear una tabla de DynamoDB

La siguiente tabla de DynamoDB contiene los identificadores de los mensajes para que una función de Lambda pueda compararlos para comprobar si están duplicados.

  1. Abra la consola de DynamoDB.
  2. Elija Crear tabla.
  3. En la pantalla Crear tabla de DynamoDB, defina los siguientes valores:
    En Tabla, ingrese ProcessedRecords
    En Clave principal, bajo Clave de partición, ingrese Registros
    Establecer el tipo de datos en Cadena
  4. Ingrese otras configuraciones según sea necesario para el caso de uso. A continuación, elija Crear.

Crear una función Lambda

Importante: El código de la función Lambda debe ser idempotente. Para conocer las prácticas recomendadas de idempotencia y la lógica de la función de ejemplo, consulte ¿Cómo hago que la función de Lambda sea idempotente?

Después de crear una tabla de DynamoDB, cree una función de Lambda. Esta función compara los mensajes entrantes con los mensajes que anteriormente se ejecutaban correctamente y que luego se conservaban en la tabla de DynamoDB. Si un mensaje tuvo éxito anteriormente, la función no permite que se procesen duplicados. Si los mensajes nuevos y únicos tienen éxito, se envían a la tabla para compararlos en el futuro. Los mensajes fallidos se vuelven a intentar hasta que se procesen correctamente o hasta que el ReceiveCount de un mensaje supere el maxReceiveCount.

En la función:

En el siguiente ejemplo, la lógica de la función convierte el cuerpo del mensaje en mayúsculas. Está escrito bajo el método process_message(…):

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])

¿Le resultó útil este artículo?


¿Necesita asistencia técnica o con la facturación?