Amazon Web Services ブログ

AWS Lambda 関数のイベントソースフィルタリング

この投稿は、 Filtering event sources for AWS Lambda functions を翻訳したものです。原文は Serverless の Principal Specialist Solutions Architect である Heeki Park によって書かれています。

AWS Lambda 関数にイベントソースが設定されている場合、Lambda サービスはメッセージまたはレコードごとに Lambda 関数をトリガーします。正確な動作は、イベントソースの選択とイベントソースマッピングの設定によって異なります。イベントソースマッピングは、Lambda サービスがイベントソースからのメッセージまたはレコードをどのように処理するかを定義します。

AWS は 2021/11/26 に、Lambda 関数を呼び出す前にメッセージをフィルタリングする機能を発表しました。フィルタリングは、 Amazon Kinesis データストリーム、Amazon DynamoDB ストリーム、および Amazon SQS のイベントソースでサポートされています。これにより、Lambda 関数に対するリクエストが減り、コードが単純化され、全体的なコストが削減されます。

概要

現場に多くの車両を持つ物流会社を考えてみましょう。各車両はセンサーと 4G/5G 接続が有効になっており、テレメトリデータを Kinesis Data Streams に出力できます。

  • 1 つのシナリオとして、機械学習モデルを使用し、各テレメトリデータのペイロードに基づいて車両の健全性を推測します。これについては Lambda 料金ページの例 2 で説明されています。
  • 別のシナリオでは、タイヤの空気圧が低い場合にのみ関数を呼び出します。

タイヤの空気圧が低い場合、車両が戻ったときにタイヤをチェックするよう、会社からメンテナンスチームに通知します。このプロセスでは、倉庫に十分な予備交換品があるかどうかがチェックされます。購入チームに追加のタイヤを購入するよう通知することもできます。

アプリケーションはメッセージのストリームに応答し、タイヤの空気圧( tire_pressure 配列の要素で表されている )が 32 psi 未満の場合はビジネスロジックを実行します。現場の各車両は、次のようなテレメトリを発信します。

{
    "time": "2021-11-09 13:32:04",
    "fleet_id": "fleet-452",
    "vehicle_id": "a42bb15c-43eb-11ec-81d3-0242ac130003",
    "lat": 47.616226213162406,
    "lon": -122.33989110734133,
    "speed": 43,
    "odometer": 43519,
    "tire_pressure": [41, 40, 31, 41],
    "weather_temp": 76,
    "weather_pressure": 1013,
    "weather_humidity": 66,
    "weather_wind_speed": 8,
    "weather_wind_dir": "ne"
}

車両群からのすべてのメッセージを処理するには、次の例で示すように fleet_id に一致するフィルターを設定します。Lambda サービスは、受信したペイロード全体に対してフィルターパターンを適用します。

Kinesis および DynamoDB ストリームにおけるペイロードのスキーマは、Kinesis レコードイベントの例の kinesis 属性の下に示されています。Kinesis または DynamoDB ストリームのフィルターを構築するときは、 data 属性の下でペイロードをフィルタリングします。SQS におけるペイロードのスキーマは、 SQS メッセージイベントの例の Records 配列に示されています。SQS を使用する場合、 body 属性の下でペイロードをフィルタリングします。

{
    "data": {
        "fleet_id": ["fleet-452"]
    }
}

特定の車両に関連するすべてのメッセージを処理するには、その vehicle_id だけにフィルターを設定します。ここでは fleet_id を保持し、両方のフィルター条件に一致する例を示しています。

{
    "data": {
        "fleet_id": ["fleet-452"],
        "vehicle_id": ["a42bb15c-43eb-11ec-81d3-0242ac130003"]
    }
}

タイヤ空気圧が 32 psi 未満の場合に限り、その車両群に関連付けられているすべてのメッセージを処理するには、次のパターンを設定します。このパターンは、tire_pressure 下の配列にある 32 未満の値に一致します。

{
    "data": {
        "fleet_id": ["fleet-452"],
        "tire_pressure": [{"numeric": ["<", 32]}]
    }
}

AWS CLI コマンドを使用して、このフィルタ条件でイベントソースマッピングを作成するには、次のコマンドを実行します。

aws lambda create-event-source-mapping \
--function-name fleet-tire-pressure-evaluator \
--batch-size 100 \
--starting-position LATEST \
--event-source-arn arn:aws:kinesis:us-east-1:0123456789012:stream/fleet-telemetry \
--filter-criteria '{"Filters": [{"Pattern": "{\"tire_pressure\": [{\"numeric\": [\"<\", 32]}]}"}]}'

CLI でフィルタ条件の Pattern の値を正しく指定するには、二重引用符をエスケープする必要があります。

また、 AWS サーバーレスアプリケーションモデル (AWS SAM) テンプレートを使用して、このフィルター条件でイベントソースマッピングを作成するには、次のスニペットを使用します。

Events: 
  TirePressureEvent: 
    Type: Kinesis    
    Properties: 
      BatchSize: 100
      StartingPosition: LATEST
      Stream: "arn:aws:kinesis:us-east-1:0123456789012:stream/fleet-telemetry"
      FilterCriteria:
        Filters: 
          - Pattern: '{"data": {"tire_pressure": [{"numeric": ["<", 32]}]}}'

AWS SAM テンプレートの場合、フィルター条件の Pattern はJSONを受け取るため、エスケープされた二重引用符で囲む必要はありません(CLIの Pattern には文字列が渡されています)。

Lambda は EventBridge と同じ方法でメッセージをフィルタリングするため、フィルター作成の詳細については、EventBridge のイベントパターンルールの例を参照してください。

イベントフィルタリングによるコスト削減

このようなフィルター条件を使ってイベントソースを設定することで、Lambda 関数が呼び出されるメッセージの数を減らすことができます。

現場に 10,000 台の車両が配備されている Lambda 料金ページの例 2 では、各車両が 1 時間に 1 回テレメトリを発信しています。車両は毎月 10,000 * 24 * 31 = 7,440,000 個のメッセージを発信し、同じ回数の Lambda 呼び出しをトリガーします。関数には 256 MB のメモリを設定し、平均実行時間は 100 ミリ秒です。この例では、車両は 31 日に 1 回、タイヤの空気圧が低いというテレメトリを発信します。

フィルタリングを行わないと、アプリケーションのコストは次のようになります。

  • 月間リクエスト料金 → 744万 * $0.20/百万 = $1.49
  • 毎月の計算時間 (秒) → 744万 * 0.1 秒 = 74.4万秒
  • 毎月の計算 (ギガバイト秒) → 256MB/1024MB * 74.4万秒 = 18.6万 ギガバイト秒
  • 毎月のコンピューティング料金 → 18.6 万 ギガバイト秒 * $0.0000166667 = $3.10
  • 月間合計請求額 = $1.49 + $3.10 = $4.59

フィルタリングを使用すると、アプリケーションのコストは次のようになります。

  • 月間リクエスト料金 → (744万/31) * $0.20/百万 = $0.05
  • 毎月の計算時間 (秒) → (744万/31) * 0.1 秒 = 2.4万秒
  • 毎月の計算 (ギガバイト秒) → 256MB/1024MB * 2.4万秒 = 0.6万 ギガバイト秒
  • 毎月のコンピューティング料金 → 0.6 万 ギガバイト秒 * $0.0000166667 = $0.10
  • 月間合計請求額 = $0.05 + $0.10 = $0.15

フィルタリングを使用することで、コストは $4.59 から $0.15 となり、96.7% のコスト削減になります。

イベントフィルタリングの設計と実装

コスト削減に加え、 Lambda 関数の運用効率が向上しました。これは、メッセージの配列を繰り返し処理するフィルター処理がなくなったためです。Lambda サービスは、ソースから受信したメッセージをフィルタリングしてから、バッチ処理して関数呼び出しのペイロードとして送信します。操作の順序は次の通りです。

Event flow with filtering

フィルタリングによるイベントフロー

フィルタ基準を設計する際には、追加で考慮すべきことがいくつかあることに留意してください。イベントソースマッピングでは、最大 5 つのパターンを使用できます。各パターンは最大 2048 文字まで入力できます。Lambda サービスがメッセージを受信し、パターンでフィルタリングすると、通常のイベントソースの動作に従ってバッチがいっぱいになります。

たとえば、最大バッチサイズが 100 レコードに設定され、最大バッチウインドウが 10 秒に設定されている場合、Lambda サービスはこれら 2 つの条件のいずれかが満たされるまで、レコードをフィルタリングしてバッチに蓄積します。バッチ処理期間中にフィルター条件を満たすレコードが 100 件ある場合、Lambda サービスはペイロードでフィルタリングされた 100 件のレコードを含む関数をトリガーします。

バッチウィンドウ中にフィルター条件を満たすレコードが 100 件未満の場合、10 秒のバッチウィンドウが終了する間にフィルター処理されたレコードを使用して関数をトリガーします。レイテンシーの要件に合わせて、バッチウィンドウを設定してください。

Lambda サービスはフィルターされたメッセージを無視し、正常に処理されたものとして扱います。Kinesis Data Streams および DynamoDB ストリームの場合、イテレータはイベントソースマッピングで送信されたレコードを超えて進みます。

SQS の場合、メッセージは追加の処理なしでキューから削除されます。SQS では、除外されたメッセージが不要であることを確認してください。たとえば、複数の SQS キューがサブスクライブされた Amazon SNS トピックがあるとします。これらの SQS キューをそれぞれ消費する Lambda 関数は、メッセージの異なるサブセットを処理します。SNS でフィルターを使用することもできますが、その場合、メッセージパブリッシャーは送信するメッセージに属性を追加する必要があります。代わりに、SQS のイベントソースマッピングでフィルターを使用できます。この場合、フィルターはメッセージペイロードに直接適用されるため、パブリッシャーは変更を加える必要がなくなります。

まとめ

Lambda は、定義した条件に基づいてメッセージをフィルタリングする機能をサポートするようになりました。これにより、関数が処理するメッセージの数が減り、コストが削減され、コードが単純化されます。

開発者は、イベント駆動型アーキテクチャを通過するメッセージの一部を利用する、特定ユースケース用のアプリケーションを作成できるようになりました。これにより、 Lambda 関数の計算効率を最適化できます。

この機能の詳細については、 AWS Lambda 開発者ガイドをご覧ください。

翻訳は Solutions Architect 木村が担当しました。原文はこちらです。