How can I prevent an Amazon SQS message from invoking my Lambda function more than once?

5 minute read
0

I configured my AWS Lambda function to process messages in an Amazon Simple Queue Service (Amazon SQS) queue. Now, some of my valid Amazon SQS messages are received multiple times, up to the maxReceiveCount. How do I stop duplicate Lambda function invocations for the same Amazon SQS message?

Short description

Lambda supports at-least-once message delivery. In some cases, the retry mechanism might send duplicates of the same message. Amazon SQS then sends the messages to your dead-letter queue, if you've configured one.

Duplicate Lambda invokes for the same Amazon SQS message can happen for one of the following reasons:

  • Your function returns an error or times out.
  • The Lambda service fails to delete a message from the Amazon SQS queue after a successful batch before the visibility timeout expires.
  • The Lambda service sent the event to but failed to receive an acknowledgement from the function.
  • An intermittent issue caused Amazon SQS to return the same message and it's polled again by the Lambda service.
  • The sum of the batch window and the function duration is longer than your Amazon SQS queue visibility timeout. The SQS visibility timeout must be at least six times the total of the function timeout and the batch window timeout.

To resolve these issues, turn on Report Batch item failure in your Lambda function's SQS trigger. Then, create a modular function code that iterates through the batch, processes, and deletes successful and duplicate messages. The function stores the messageID of the successful messages in an Amazon DynamoDB table and then verifies that the message was processed earlier.

Important: The following resolution slows processing time by sending multiple requests to DynamoDB for each incoming message. This also results in higher costs from API calls. Therefore, make sure you consider the time and cost of this resolution for your project. If your error rate for duplicate Lambda calls is low, then the time and cost of this resolution might outweigh the benefits.

Resolution

First, confirm that you're receiving the same message multiple times by checking the message ID. If you receive multiple copies of the same message due to any of the reasons listed earlier, then the messages have the same ID. In this case, follow the steps below. If you receive multiple messages with the same content but different message IDs, then the message is being sent to the queue more than once. In this case, check your message producer.

The following steps apply only to a Lambda function's SQS trigger. They don't work for manual pull requests.

Create a DynamoDB table

The following DynamoDB table holds your message IDs so that a Lambda function can compare them for duplication.

  1. Open the DynamoDB console.
  2. Choose Create table.
  3. In the Create DynamoDB table screen, set the following values:
    For Table, enter ProcessedRecords
    Under Primary key, for Partition key, enter Records
    Set the data type to String
  4. Enter other settings as needed for your use case. Then, choose Create.

Create a Lambda function

Important: The Lambda function code must be idempotent. For idempotency best practices and example function logic, see How do I make my Lambda function idempotent?

After you create the DynamoDB table, create a Lambda function. This function compares incoming messages with messages that were previously successful and then held in your DynamoDB table. If a message was previously successful, then the function doesn't allow duplicates to process. If new, unique messages are successful, then they're sent to the table for future comparison. Failed messages are retried until they're successfully processed, or until or the ReceiveCount for a message exceeds the maxReceiveCount.

In your function:

In the following example, the function logic converts the message body to upper case. It is written under process_message(...) method:

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])

Related information

Why is my Lambda function retrying valid Amazon SQS messages and placing them in my dead-letter queue?


AWS OFFICIAL
AWS OFFICIALUpdated a year ago