Amazon SQS 메시지가 내 Lambda 함수를 두 번 이상 호출하지 못하게 하려면 어떻게 해야 하나요?

최종 업데이트 날짜: 2021년 12월 28일

Amazon Simple Queue Service(Amazon SQS) 대기열의 메시지를 처리하도록 AWS Lambda 함수를 구성했습니다. 이제 유효한 Amazon SQS 메시지 중 일부가 maxReceiveCount까지 여러 번 수신됩니다. 동일한 Amazon SQS 메시지에 대한 중복 Lambda 함수 호출을 중지하려면 어떻게 해야 하나요?

간략한 설명

Lambda는 최소 1회 메시지 전송을 지원합니다. 경우에 따라 재시도 메커니즘이 동일한 메시지를 중복으로 보낼 수 있습니다. 그러면 Amazon SQS는 메시지를 배달 못한 편지 대기열로 보냅니다(사용자가 구성한 경우).

동일한 Amazon SQS 메시지에 대한 중복 Lambda 호출은 다음 이유 중 하나로 인해 발생할 수 있습니다.

  • 함수가 오류를 반환하거나 시간 초과되었습니다.
  • 가시성 시간 제한이 만료되기 전에 배치가 성공한 후 Lambda 서비스가 Amazon SQS 대기열에서 메시지를 삭제하지 못합니다.
  • Lambda 서비스가 이벤트를 함수에 전송했지만 함수로부터 승인을 받지 못했습니다.
  • 간헐적인 문제로 인해 Amazon SQS가 동일한 메시지를 반환하고 Lambda 서비스가 이를 다시 폴링했습니다.
  • 배치 기간과 함수 지속 시간의 합계가 Amazon SQS 대기열 가시성 시간 제한보다 깁니다. SQS 가시성 시간 제한은 함수 시간 제한과 배치 기간 시간 제한 합계의 6배 이상이어야 합니다.

이러한 문제를 해결하려면 Lambda 함수의 SQS 트리거에서 배치 항목 실패 보고(Report Batch item failure)를 켭니다. 그런 다음 배치를 반복하여 성공 메시지와 중복 메시지를 처리하고 삭제하는 모듈형 함수 코드를 생성합니다. 이 함수는 성공 메시지의 messageID를 DynamoDB 테이블에 저장한 다음 메시지가 이전에 처리되었는지 확인합니다.

참고: Amazon DynamoDB로 API를 호출하는 데 드는 비용을 고려해야 합니다.

해결 방법

DynamoDB 테이블 생성

  1. DynamoDB 콘솔을 엽니다.
  2. 테이블 생성(Create table)을 선택합니다.
  3. DynamoDB 테이블 생성(Create DynamoDB table) 화면에서 다음과 같은 값을 설정합니다.
    테이블(Table)ProcessedRecords 입력
    기본 키(Primary key) 아래의 파티션 키(Partition key)Records 입력
    데이터 유형을 String으로 설정
  4. 사용 사례에 필요한 다른 설정을 입력합니다. 그런 다음 생성(Create)을 선택합니다.

Lambda 함수 생성

중요: Lambda 함수 코드는 멱등성이어야 합니다. 멱등성 모범 사례 및 예제 함수 논리에 대한 자세한 내용은 Lambda 함수를 멱등성으로 만들려면 어떻게 해야 합니까?를 참조하세요.

DynamoDB 테이블을 생성한 후 Lambda 함수를 생성합니다.

함수에서 다음을 수행합니다.

  • dynamodb:Querydynamodb:PutItem 작업을 허용하는 실행 역할을 추가합니다.
  • (Amazon SQS에서 Lambda를 사용하여) SQS 트리거에서 배치 항목 실패를 보고하여 배치에서 중복 메시지를 식별하고 건너뜁니다.

다음 예제에서는 함수 논리가 메시지 본문을 대문자로 변환합니다. process_message(...) 메서드 아래에 작성됩니다.

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])