Como impedir que uma mensagem do Amazon SQS invoque minha função do Lambda mais de uma vez?

Data da última atualização: 28/12/2021

Configurei minha função do AWS Lambda para processar mensagens em uma fila do Amazon Simple Queue Service (Amazon SQS). Agora, algumas das mensagens válidas do Amazon SQS são recebidas várias vezes até o maxReceiveCount. Como interromper invocações duplicadas de função do Lambda para a mesma mensagem do Amazon SQS?

Breve descrição

O Lambda é compatível com a entrega de mensagens at-least-once. Em alguns casos, o mecanismo de nova tentativa pode enviar duplicatas da mesma mensagem. Em seguida, o Amazon SQS envia as mensagens para sua fila de mensagens não enviadas, caso você tenha configurado.

As invocações duplicadas do Lambda para a mesma mensagem do Amazon SQS podem ocorrer por um destes motivos:

  • Sua função retorna um erro ou atinge o tempo limite.
  • O serviço do Lambda não consegue excluir uma mensagem da fila do Amazon SQS após um lote bem-sucedido antes que o tempo limite de visibilidade expire.
  • O serviço do Lambda enviou o evento para a função, mas não recebeu uma confirmação da função.
  • Um problema intermitente fez com que o Amazon SQS retornasse a mesma mensagem e fosse pesquisado novamente pelo serviço do Lambda.
  • A soma da janela do lote e da duração da função é maior do que o tempo limite de visibilidade da fila do Amazon SQS. O tempo limite de visibilidade do SQS deve ser pelo menos seis vezes o tempo limite total da função e o tempo limite da janela do lote.

Para resolver esses problemas, ative Report Batch item failure (Relatório de falha de item do lote) no acionador SQS da função do Lambda. Em seguida, crie um código de função modular que itera pelo lote, processa e exclui mensagens duplicadas e bem-sucedidas. A função armazena o MessageId das mensagens bem-sucedidas em uma tabela do DynamoDB e verifica se a mensagem foi processada anteriormente.

Observação: considere o custo de fazer chamadas de API para o Amazon DynamoDB.

Resolução

Criar uma tabela do DynamoDB

  1. Abra o console do DynamoDB.
  2. Escolha Create table (Criar tabela).
  3. Na tela Create DynamoDB table (Criar tabela do DynamoDB), defina os seguintes valores:
    Em Table (Tabela), insira ProcessedRecords
    Em Primary key (Chave primária), em Partition key (Chave de partição), insira Records (Registros)
    Defina o tipo de dados como String
  4. Insira outras configurações conforme necessário para seu caso de uso. Em seguida, escolha Create (Criar).

Criar uma função do Lambda

Importante: o código da função do Lambda deve ser idempotente. Para obter as práticas recomendadas de idempotência e exemplos de lógica de função, consulte Como tornar minha função Lambda idempotente?

Depois de criar uma tabela do DynamoDB, crie uma função do Lambda.

Na função:

  • Adicione uma função de execução para permitir as ações DynamoDB:Query e DynamoDB:PutItem.
  • Relate falhas de item de lote (Usar o Lambda com o Amazon SQS) no acionador do SQS para identificar e ignorar mensagens duplicadas no lote.

No exemplo a seguir, a lógica da função converte o corpo da mensagem em maiúsculas. Está escrito sob o método process_message(...):

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])