Amazon SQS メッセージが Lambda 関数を複数回呼び出さないようにするにはどうすればよいですか?

最終更新日: 2021 年 12 月 28 日

Amazon Simple Queue Service (Amazon SQS) キューのメッセージを処理するために AWS Lambda 関数を設定しました。現在、有効な Amazon SQS メッセージの一部は maxReceiveCount まで複数回受信され、デッドレターキューに入ってしまいます。同じ Amazon SQS メッセージに対する Lambda 関数の重複呼び出しを止めるにはどうすればよいですか?

簡単な説明

Lambda は少なくとも 1 回のメッセージ配信をサポートしています。場合によっては、再試行メカニズムによって同じメッセージが重複して送信されることがあります。Amazon SQS は、デッドレターキューを設定していれば、メッセージをデッドレターキューに送信します。

同じ Amazon SQS メッセージに対する Lambda 呼び出しの重複は、次のいずれかの理由で発生する可能性があります。

  • 関数がエラーを返すか、タイムアウトしました。
  • Lambda サービスは、バッチが成功した後、可視性タイムアウトが期限切れになる前に Amazon SQS キューからメッセージを削除できません。
  • Lambda サービスはイベントをに送信しましたが、関数からの承認を受け取れませんでした。
  • 断続的な問題により、Amazon SQS から同じメッセージが返され、Lambda サービスによって再度ポーリングされました。
  • バッチウィンドウと関数期間の合計が Amazon SQS キューの可視性タイムアウトより長くなっています。SQS 可視性タイムアウトは、関数タイムアウトとバッチウィンドウタイムアウトの合計の 6 倍以上にする必要があります。

これらの問題を解決するには、Lambda 関数の SQS トリガーで [バッチ項目の失敗を報告] をオンにします。次に、バッチを反復処理し、成功したメッセージと重複したメッセージを処理、削除するモジュラー関数コードを作成します。この関数は、成功したメッセージの messageID を DynamoDB テーブルに保存し、メッセージが以前に処理されたことを確認します。

:Amazon DynamoDB に対する API 呼び出しのコストを考慮してください。

解決方法

DynamoDB テーブルを作成する

  1. DynamoDB コンソールを開きます。
  2. テーブルを作成 を選択します。
  3. [Create DynamoDB table] 画面で、次の操作を実行します。
    [テーブル] に「処理済みレコード」と入力します
    [プライマリキー] の [パーティションキー] に「Records
    データ型を String に設定します。
  4. ユースケースに応じて、必要に応じて他の設定を入力します。その後、[Create] を選択します。

Lambda 関数の作成

重要:Lambda 関数コードはべき等である必要があります。冪等のベストプラクティスと関数ロジックの例については、「Lambda 関数を冪等にするにはどうすればよいですか?」を参照してください。

DynamoDB テーブルを作成したら、Lambda 関数を作成します。

あなたの関数では次のようになります。

  • dynamoDB: Query アクションと dynamoDB: PutItem アクションを許可する実行ロールを追加します。
  • SQS トリガーでバッチ項目の失敗 (Lambda と Amazon SQS を使用) を報告し、バッチ内の重複メッセージを識別してスキップします。

次の例では、関数ロジックがメッセージ本文を大文字に変換します。process_message (...) メソッドで書かれています。

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])