How can I prevent an Amazon SQS message from invoking my Lambda function more than once?

Last updated: 2021-12-28

I configured my AWS Lambda function to process messages in an Amazon Simple Queue Service (Amazon SQS) queue. Now, some of my valid Amazon SQS messages are received multiple times, up to the maxReceiveCount. How do I stop duplicate Lambda function invocations for the same Amazon SQS message?

Short description

Lambda supports at-least-once message delivery. In some cases, the retry mechanism might send duplicates of the same message. Amazon SQS then sends the messages to your dead-letter queue, if you've configured one.

Duplicate Lambda invokes for the same Amazon SQS message can happen for one of the following reasons:

  • Your function returns an error or times out.
  • The Lambda service fails to delete a message from the Amazon SQS queue after a successful batch before the visibility timeout expires.
  • The Lambda service sent the event to but failed to receive an acknowledgement from the function.
  • An intermittent issue caused Amazon SQS to return the same message and it's polled again by the Lambda service.
  • The sum of the batch window and the function duration is longer than your Amazon SQS queue visibility timeout. The SQS visibility timeout must be at least six times the total of the function timeout and the batch window timeout.

To resolve these issues, turn on Report Batch item failure in your Lambda function's SQS trigger. Then, create a modular function code that iterates through the batch, processes, and deletes successful and duplicate messages. The function stores the messageID of the successful messages in a DynamoDB table and then verifies that the message was processed earlier.

Note: Be sure to consider the cost for making API calls to Amazon DynamoDB.

Resolution

Create a DynamoDB table

  1. Open the DynamoDB console.
  2. Choose Create table.
  3. In the Create DynamoDB table screen, set the following values:
    For Table, enter ProcessedRecords
    Under Primary key, for Partition key, enter Records
    Set the data type to String
  4. Enter other settings as needed for your use case. Then, choose Create.

Create a Lambda function

Important: The Lambda function code must be idempotent. For idempotency best practices and example function logic, see How do I make my Lambda function idempotent?

After you have created a DynamoDB table, create a Lambda function.

In your function:

  • Add an execution role to allow the dynamodb:Query and dynamodb:PutItem actions.
  • Report batch item failures (Using Lambda with Amazon SQS) in the SQS trigger to identify and skip duplicate messages in the batch.

In the following example, the function logic converts the message body to upper case. It is written under process_message(...) method:

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])