Amazon Web Services ブログ

AWS Lambda がサポートするイベントソースに Amazon Simple Queue Service を追加

Amazon Simple Queue Service (SQS) を使って AWS Lambda 関数をトリガーできるようになりました。これは私が個人的に 4 年以上前から楽しみにしてきた、重要な機能を提供する特別なアップデートです。皆さん、試用を待ち望んでいることでしょうから、昔話に興味のない方は下記を飛ばしてもらって結構です。

SQS は当社が立ち上げた初めてのサービスで、14 年前の 2004 年、AWS より公開されました。ご参考に、2004 年当時と言えば、商用ハードドライブは最大でも約 60 GB、PHP 5 が現れ、Facebook がちょうど開始したところ、テレビ番組のフレンズはシリーズが終了、Gmail はまだ珍しく、そして私はまだ高校生でした。振り返ってみると、今日の AWS を生み出した理念、つまり完全な管理体制、ネットワークにアクセス可能で、プリペイドで契約維持料なしといった方針は、SQS 開発の初期段階であった当時にも少し垣間見ることができます。現在、SQS は数多くの顧客が非常に大規模に使用しているサービスの中でも最も人気が高く、多くのアプリケーションの基本構成部分の 1 つとなっています。

これに対して、AWS Lambda は 2014 年に開催された AWS re:Invent (私はその日の参加者でした) にてリリースした比較的新しいサービスです。Lambda は、サーバーのプロビジョニングや管理を行わずにコードを実行できるコンピューティングサービスであり、2014 年にサーバーレスの革命を起こしたのです。Web およびモバイルバックエンドから IT ポリシーエンジン、データ処理パイプラインまで、幅広いユースケースに即座に採用されました。現在、Lambda は Node.js、Java、Go、C#、Python のランタイムをサポートしているので、既存のコードベースの変更を最小限に抑え、新しいコードベースを柔軟に構築できます。さらにこの過去 4 年間で、Lambda の機能やイベントソースを多数追加したため、さらに迅速な仕事ができるようになりました。Lambda に SQS のサポートを追加することで、ポーリングサービスの実行や、SQS から SNS へのマッピング作成といった大きな負荷を大幅に削減しました。

どのように機能するかを見てみましょう。

Lambda を SQS からトリガーする

まず、既存の SQS 標準キューが必要なので、作成します。 AWS マネジメントコンソールに行き、SQS を開いて新しいキューを作成します。おもしろい名前を付けてみましょう。現時点では、Lambda トリガーは FIFO キューではなく標準キューのみで動作します。

このキューで、これを処理する Lambda 関数を作成ましょう。Lambda コンソールに移動し、次のようないくつかの Python コードで、メッセージ本文を表示するだけの単純な関数を新しく作成します。

def lambda_handler(event, context):
    for record in event['Records']:
        print(record['body'])

次に、Lambda 関数にトリガーを追加します。これはコンソールの左側にある SQS トリガーを選択することで、コンソールから行うことができます。Lambda および単一の Lambda が処理するレコードの最大数 (最大 10 個で、SQS ReceiveMessage API をベースにしたもの) を呼び出すのに使用したいキューを選択できます。

Lambda は自動的に横並びでスケールアウトし、キュー内の新しいメッセージごとに Lambda 関数を 1 つ呼び出します。例えば、キューに 100 個のメッセージがあり、トリガーを有効にした場合、Lambda は 100 個の関数呼び出しを実行してメッセージを一斉に処理するのです。キューのトラフィックが変動すると、Lambda サービスは、インフライトメッセージの数に基づいてポーリング操作を上下させます。この動作についての詳細は、この投稿の下部にある追加情報をご参照ください。バッチサイズを 10 に設定した場合、10 個の新しい Lambda 関数を呼び出します。この動作を制御するには、自分の関数の同時実行制限を増減させることで可能となります。

関数が正常に返された場合に処理されるメッセージの各バッチに対して、それらのメッセージをキューから削除します。関数のエラーが発生するかタイムアウトすると、キューに設定する可視性タイムアウト後にメッセージがキューに戻ります。ここで気をつけたいのは、SQS から Lambda へのイベントマッピングを作成するのに、Lambda 関数のタイムアウトはキューの可視性タイムアウトよりも短くないといけないことです。

トリガーを追加した後、関数に必要な変更を加えて保存します。SQS コンソールに戻ると、トリガーが登録されていることが分かります。SQS コンソールからトリガーを設定、編集することもできます。

では、AWS CLI を使って、簡単なメッセージをエンキューし、機能をテストしてみましょう。

aws sqs send-message --queue-url https://sqs.us-east-2.amazonaws.com/054060359478/SQS_TO_LAMBDA_IS_FINALLY_HERE_YAY --message-body "hello, world"

Lambda はメッセージを受信し、メッセージペイロードを Amazon CloudWatch ログに表示するコードを実行します。

もちろん、これら機能の全ては、追加設定などが必要のない AWS SAM で作動します。

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Example of processing messages on an SQS queue with Lambda
Resources:
  MySQSQueueFunction:
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python3.6
      CodeUri: src/
      Events:
        MySQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt MySqsQueue.Arn
            BatchSize: 10

  MySqsQueue:
    Type: AWS::SQS::Queue

追加情報

まず、この機能には追加料金はかかりませんが、Lambda サービスは SQS キューを連続してロングポーリングするため、標準の SQS 価格設定でこれらの API コールに対して課金されます。

ですので、ここでは同時実行と Auto Scaling について簡潔ではありますが重点的にお話します。ただし、これらの動作が将来変更される可能性があることに留意してください。Lambda の Auto Scaling 動作は、キューが空の時にポーリングコストを低く抑えると同時に、キューが頻繁に使用されている場合にでも、高いスループットにスケールアップできるように設計されています。SQS イベントソースマッピングを最初に作成し有効化している場合、またはトラフィックがない期間後にメッセージが最初に表示された場合、Lambda サービスは 5 つのパラレルロングポーリング接続を使用して SQS キューのポーリングを開始します。Lambda サービスはインフライトメッセージの数を監視し、この数が増加傾向にあることを検出すると、ポーリング頻度を増加させます。これは、毎分 20 件の ReceiveMessage 要求、および 1 分あたり 60 件の呼び出しでの関数を同時に実行します。キューがビジーの間は、関数同時実行の制限に達するまで、スケーリングを継続します。インフライトメッセージの数が減少するにつれて、Lambda はポーリング頻度を毎分 10 件の ReceiveMessage 要求まで減らし、毎分 30 件の呼び出しで関数を呼び出すために作動している同時実行を減らします。

ドキュメントには、ここの記事の内容よりも多くの情報があり最新です。SQS イベントペイロードの例もそちらで参照できます。詳細は、SQS のドキュメント内にもあります。

この機能に関してフィードバックがあれば、Twitter や下のコメント欄でお知らせください。

Randall