Amazon Web Services ブログ

新機能 – Amazon SQS 標準キューに拡張されたデッドレターキュー管理エクスペリエンス

何十万人ものお客様が Amazon Simple Queue Service (SQS) を使用してメッセージベースのアプリケーションを構築し、マイクロサービス、分散システム、サーバーレスアプリケーションの分離とスケーリングを行っています。キューコンシューマーがメッセージを正常に処理できない場合は、メッセージをデッドレターキュー (DLQ) に保存するように SQS を設定できます。

ソフトウェア開発者やアーキテクトであれば、DLQ 内の消費されなかったメッセージを調べて確認し、処理できなかった理由を解明して、パターンを特定し、コードエラーを解決して、最終的に元のキューでこれらのメッセージを再処理したいと考えるでしょう。これらの消費されなかったメッセージのライフサイクルは、エラー処理ワークフローの一部であり、多くの場合、手作業で行うため時間がかかります。

2021 年 12 月 1 日(米国時間)は、SQS 標準キュー用に拡張された新しい DLQ 管理エクスペリエンスの一般提供が開始されたことを発表しました。この新しいエクスペリエンスを使用すると、消費されなかったメッセージを DLQ からソースキューに簡単にリドライブできます。

この新機能は SQS コンソールで使用できます。この機能を利用すれば、エラー処理ワークフローの重要なフェーズである、処理エラーの特定と解決に集中できます。この新しい開発エクスペリエンスでは、消費されていないメッセージのサンプルを簡単に調べて、ワンクリックで元のキューに戻すことができ、カスタムコードの作成、保守、保護を行う必要もありません。また、この新しいエクスペリエンスでは、メッセージのリドライブを一括して行うことができ、全体的なコストが削減されます。

DLQ と Lambda プロセッサのセットアップ
既に DLQ のセットアップに習熟している場合は、セットアップをスキップして、すぐに新しい DLQ リドライブエクスペリエンスをご利用いただけます

まず、ソースキューとデッドレターキューの 2 つのキューを作成します。

ソースキューを編集し、[デッドレターキュー] セクションを設定します。ここでは、DLQ を選択し、[最大受信数] を設定します。これは、メッセージが DLQ に送られる前に再処理される回数です。このデモでは、これを 1 に設定しました。この場合、失敗したメッセージはすべて直ちに DLQ に送られます。実際の環境では、要件に応じて、また実際のアプリケーションでの失敗が何を意味するのかに応じて、より大きな数値を設定する必要がある場合があります。

また、DLQ を編集して、私のソースキューだけがこの DLQ を使用できるようにしています。この設定はオプションです。この [Redrive allow policy] (リドライブの許可ポリシー) が無効になっている場合、どの SQS キューでもこの DLQ を使用できます。1 つの DLQ を複数のキューに再利用したい場合があります。しかし、通常は、コストに影響を与えずにリドライブフェーズを簡素化するために、ソースキューごとに独立した DLQ をセットアップすることがベストプラクティスと考えられています。キューの数ではなく、API コールの数に基づいて課金されることに留意してください。

DLQ が正しくセットアップされたら、次にプロセッサが必要です。AWS Lambda を使用して、簡単なメッセージコンシューマーを実装してみましょう

Python で記述された Lambda 関数は、受信メッセージのバッチを反復処理し、メッセージ本文から 2 つの値を取得し、その 2 つの値の合計を出力します。

import json

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['body'])

        value1 = payload['value1']
        value2 = payload['value2']

        value_sum = value1 + value2
        print("the sum is %s" % value_sum)
        
    return "OK"

上記のコードは、合計可能な 2 つの整数値が各メッセージの本文に含まれていると想定しており、検証やエラー処理は一切行っていません。ご想像のとおり、これは後で問題になります。

メッセージを処理する前に、SQS からのメッセージの読み取りおよびトリガーの設定を行うための十分なアクセス許可をこの Lambda 関数に付与する必要があります。IAM アクセス許可には、AWSLambdaSQSQueueExecutionRole という名前のマネージドポリシーを使用します。このポリシーは、sqs:ReceiveMessagesqs:DeleteMessagesqs:GetQueueAttributes を呼び出すためのアクセス許可を付与します。

Lambda コンソールを使用して SQS トリガーをセットアップします。SQS コンソールからも同じことができます。

これで、SQS コンソールでソースキューに対して [メッセージを送受信] を使用して新しいメッセージを処理する準備ができました。メッセージ本文に {"value1": 10, "value2": 5} と入力して、[メッセージを送信] を選択します。

Lambda 関数の CloudWatch Logs を見ると、呼び出しが成功したことがわかります。

START RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1 Version: $LATEST
the sum is 15
END RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1
REPORT RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1	Duration: 1.31 ms	Billed Duration: 2 ms	Memory Size: 128 MB	Max Memory Used: 39 MB	Init Duration: 116.90 ms	

DLQ リドライブによるトラブルシューティング
では、別のプロデューサーが間違った形式のメッセージを発行し始めた場合はどうなるでしょうか。 例えば、{"value1": "10", "value2": 5} などです。最初の数字は文字列ですが、これは私のプロセッサでは問題になる可能性が非常に高いです。

実際、CloudWatch Logs を見ると、次のようになっています。

START RequestId: 542ac2ca-1db3-5575-a1fb-98ce9b30f4b3 Version: $LATEST
[ERROR] TypeError: can only concatenate str (not "int") to str
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 8, in lambda_handler
    value_sum = value1 + value2
END RequestId: 542ac2ca-1db3-5575-a1fb-98ce9b30f4b3
REPORT RequestId: 542ac2ca-1db3-5575-a1fb-98ce9b30f4b3	Duration: 1.69 ms	Billed Duration: 2 ms	Memory Size: 128 MB	Max Memory Used: 39 MB	

問題のあるメッセージのどこに問題があるのかを解明するために、新しい SQS リドライブ機能を使用し、デッドレターキューで [DLQ redrive] (DLQ リドライブ) を選択します。

[メッセージをポーリング] を使用し、DLQ から消費されていないメッセージをすべて取得します。

そして、消費されていないメッセージを選択して調べます。

問題がはっきりしたので、このケースを適切に処理するために、処理コードを更新することにしました。理想としては、これは上流の問題であり、メッセージプロデューサーで修正されるべきです。しかしここでは、私がそのシステムをコントロールできず、この新しいタイプのメッセージを処理することがビジネスにとって非常に重要であると仮定しましょう。

そのため、処理ロジックを次のように更新します。

import json

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['body'])
        value1 = int(payload['value1'])
        value2 = int(payload['value2'])
        value_sum = value1 + value2
        print("the sum is %s" % value_sum)
        # do some more stuff
        
    return "OK"

これで、消費されていないメッセージをコードで処理する準備が整ったので、DLQ からソースキューへの新しいリドライブタスクを開始します。

デフォルトでは、SQS は、消費されていないメッセージをソースキューにリドライブします。しかし、別の宛先を指定して、1 秒あたりの最大メッセージ数を設定するカスタム速度を指定することもできます。

コンソールでリドライブのステータスを監視して、リドライブタスクが完了するのを待ちます。この新しいセクションには、最新のリドライブタスクのステータスが常に表示されます。

メッセージはソースキューに戻され、Lambda 関数によって正常に処理されました。CloudWatch Logs を見ると、すべて正常のようです。

START RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1 Version: $LATEST
the sum is 15
END RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1
REPORT RequestId: 637888a3-c98b-5c20-8113-d2a74fd9edd1	Duration: 1.31 ms	Billed Duration: 2 ms	Memory Size: 128 MB	Max Memory Used: 39 MB	Init Duration: 116.90 ms	

今すぐ追加コストなしで利用可能
追加コストなしで、今すぐ新しい DLQ リドライブエクスペリエンスをご利用いただけます。開発やトラブルシューティングのワークフローの簡素化にご活用ください。この新しいコンソールエクスペリエンスは、SQS を利用可能なすべての AWS リージョンでご利用いただけます。皆様からのフィードバックをお待ちしております。

Alex

原文はこちらです。