Amazon Web Services ブログ
AWS Lambda でのカスタムチェックポイントによるバッチ処理の最適化
AWS Lambdaは、Amazon Kinesis Data StreamsやAmazon DynamoDB Streamsなどのソースから取得した複数メッセージをバッチ処理できます。通常の操作では、処理を行う関数は1つのバッチから次のバッチに移動して、ストリームからのメッセージを消費します。
ただし、バッチ内のアイテムの1つでエラーが発生すると、そのバッチ内の同じメッセージ群の一部が再処理される可能性があります。新しいカスタムチェックポイント機能により、失敗したメッセージを含むバッチの処理方法をより詳細に制御できるようになりました。
このブログ記事では、バッチ失敗時のデフォルトの動作と、このエラー状態に対処するために開発者が使用可能なオプションについて説明します。また、この新しいチェックポイント機能の使用方法について説明し、ストリーム処理を行う関数内でこの機能を使用する利点についても説明します。
概要
Lambda関数を使用してストリームからのメッセージを消費する場合、batch sizeプロパティは各イベントで渡されるメッセージの最大数を制御します。
ストリームは、チェックポイントと現在のイテレータという2つの内部ポインタを管理します。チェックポイントは、正常に処理された最後の既知のアイテムの位置です。現在のイテレータは、ストリーム内の次に読み取り操作が行われる位置です。成功例のオペレーションとして、ストリームに対するバッチサイズが10の2つのバッチ処理を次に示します。
- Lambda関数に配信された最初のバッチには、アイテム1~10が含まれています。この関数は、これらのアイテムをエラーなく処理します。
- チェックポイントはアイテム11に移動します。Lambda関数に配信される次のバッチには、11~20のアイテムが含まれています。
デフォルトのオペレーションでは、バッチ全体の処理は成功または失敗のどちらかとなります。1 つのアイテムが処理に失敗し、関数がエラーを返す場合、そのバッチは失敗します。その後、最大再試行回数に達するまで、バッチ全体が再試行されます。これにより、同じ失敗が複数回発生し、個々のメッセージに関して不必要な再処理が発生する可能性があります。
イベントソースマッピングでBisectBatchOnFunctonErrorプロパティを有効にすることもできます。バッチが失敗した場合、呼び出し元のサービスは失敗したバッチを2つに分割し、分割された半分のバッチを別々に再試行します。バッチ内のアイテムが一つとなるか、メッセージが正常に処理されるまで、プロセスは再帰的に続行されます。たとえば、アイテム番号5が失敗する10個のメッセージのバッチでは、次のように処理が行われます。
- バッチ1は失敗します。バッチ2と3に分割されます。
- バッチ2は失敗し、バッチ3は成功します。バッチ2はバッチ4と5に分割されます。
- バッチ4は失敗し、バッチ5は成功します。バッチ4はバッチ6と7に分割されます。
- バッチ6は失敗し、バッチ7は成功します。
これにより、失敗するメッセージが1つあるバッチでメッセージを処理する方法が提供されますが、関数が複数回呼び出されます。この例では、メッセージ番号4は成功する前に4回処理されます。
新しいカスタムチェックポイント機能を使用すると、失敗したメッセージのシーケンス識別子を返すことができます。これにより、ストリームの処理を継続する方法をより正確に制御できます。たとえば、6番目のメッセージが失敗する10個のメッセージのバッチでは、次のようになります。
- Lambda関数は、アイテム1~10のメッセージをバッチ処理します。6番目のメッセージは失敗し、Lambda関数は失敗したシーケンス識別子を返します。
- ストリーム内のチェックポイントは、失敗したメッセージの位置に移動します。バッチは6~10のメッセージに対してのみ再試行されます。
これまでのストリーム処理動作
次の例では、Amazon DynamoDB ストリームから呼び出されるLambda関数を見てみましょう。チェックポイントに関する動作は同じとなるため、必要に応じてAmazon Kinesis Data Streamsを使用することもできます。イベントソースマッピングはバッチサイズが10アイテムに設定されているため、ストリーム内の10メッセージが1回のLambda関数呼び出しのEventペイロードに渡されます。
次のNode.jsスクリプトを使用して、DynamoDBテーブルに10個のアイテムのバッチを生成します。
const AWS = require('aws-sdk')
AWS.config.update({ region: 'us-east-1' })
const docClient = new AWS.DynamoDB.DocumentClient()
const ddbTable = 'ddbTableName'
const BATCH_SIZE = 10
const createRecords = async () => {
// Create envelope
const params = {
RequestItems: {}
}
params.RequestItems[ddbTable] = []
// Add items to batch and write to DDB
for (let i = 0; i < BATCH_SIZE; i++) {
params.RequestItems[ddbTable].push({
PutRequest: {
Item: {
ID: Date.now() + i
}
}
})
}
await docClient.batchWrite(params).promise()
}
const main = async() => await createRecords()
main()
このスクリプトを実行すると、DynamoDBテーブルに10個のアイテムがあり、DynamoDBストリームに追加されて処理されます。
処理を行うLambda関数は、次のコードを使用します。これにはFAILED_MESSAGE_NUM
という定数が含まれており、イベントバッチ内の対応するインデックスを持つメッセージにエラーを起こします。
exports.handler = async (event) => {
console.log(JSON.stringify(event, null, 2))
console.log('Records: ', event.Records.length)
const FAILED_MESSAGE_NUM = 6
let recordNum = 1
let batchItemFailures = []
event.Records.map((record) => {
const sequenceNumber = record.dynamodb.SequenceNumber
if ( recordNum === FAILED_MESSAGE_NUM ) {
console.log('Error! ', sequenceNumber)
throw new Error('kaboom')
}
console.log('Success: ', sequenceNumber)
recordNum++
})
}
このコードは、ストリームイベントの各レコードで提供されるDynamoDBアイテムのシーケンス番号を使用します。
イベントソースマッピングのデフォルト設定では、メッセージ6が失敗すると、バッチ全体が失敗します。その後、バッチ全体が複数回再試行されます。これは、関数のCloudWatch Logsに表示されます。
次に、関数のイベントトリガーでbisect-on-error機能を有効にします。最初の呼び出しは前の例と同じように失敗しますが、5つのメッセージのバッチで2つの呼び出しが後に続いて発生します。元のバッチは二等分されます。これらのバッチの処理は正常に完了します。
カスタムチェックポイントの設定
最後に、カスタムチェックポイント機能を有効にします。これは、DynamoDBトリガーの[Report batch item failures] チェックボックスをオンにして、Lambda関数コンソールで設定します。
処理する Lambda 関数を次のコードで更新します。
exports.handler = async (event) => {
console.log(JSON.stringify(event, null, 2))
console.log('Records: ', event.Records.length)
const FAILED_MESSAGE_NUM = 4
let recordNum = 1
let sequenceNumber = 0
try {
event.Records.map((record) => {
sequenceNumber = record.dynamodb.SequenceNumber
if ( recordNum === FAILED_MESSAGE_NUM ) {
throw new Error('kaboom')
}
console.log('Success: ', sequenceNumber)
recordNum++
})
} catch (err) {
// Return failed sequence number to the caller
console.log('Failure: ', sequenceNumber)
return { "batchItemFailures": [ {"itemIdentifier": sequenceNumber} ] }
}
}
このバージョンのコードでは、各メッセージの処理は try...catch
ブロックに囲まれます。処理が失敗すると、関数は残りのメッセージの処理を停止します。JSON オブジェクト内で失敗したメッセージのシーケンス番号を返します。
{
"batchItemFailures": [
{
"itemIdentifier": sequenceNumber
}
]
}
呼び出し側サービスは、指定されたシーケンス番号でチェックポイント値を更新します。batchitemFailures 配列が空の場合、呼び出し側はすべてのメッセージが正しく処理されたものとみなします。batchitemFailures 配列に複数のアイテムが含まれている場合、最小のシーケンス番号がチェックポイントとして使用されます。
この例では、Lambda 関数で FAILED_MESSAGE_NUM
定数を4に変更しています。これにより、各バッチの4番目のメッセージがエラーを発生させます。DynamoDB テーブルに10個のアイテムを追加すると、処理を行う関数の CloudWatch logsに以下が表示されます。
カスタムチェックポイントを使用して、10個のメッセージのストリームは以下のように処理されます。
- 最初の呼び出しでは、10個のメッセージすべてがバッチに入っています。4番目のメッセージでエラーが発生します。この関数は、チェックポイントとしてこの位置を返します。
- 2回目の呼び出しでは、メッセージ4~10がバッチ内に含まれます。メッセージ 7 はエラーを発生し、そのシーケンス番号がチェックポイントとして返されます。
- 3回目の呼び出しでは、バッチに7~10のメッセージが含まれています。メッセージ10はエラーを発生し、シーケンス番号が戻されたチェックポイントになります。
- 最後の呼び出しには、正常に処理されたメッセージ10のみが含まれます。
この方法を使用すると、後続の呼び出しは、前に正常に処理されたメッセージを取得しません。
まとめ
Lambda 関数でのストリーム処理のデフォルトの動作により、メッセージのバッチ全体を成功または失敗させることができます。また、バッチ二等分機能を使用して、単一のメッセージが失敗した場合にバッチを繰り返し再試行することもできます。カスタムチェックポイントを使用して、失敗したメッセージの処理をより詳細に制御できるようになりました。
この記事では、3つの異なる処理モードについて説明し、失敗したメッセージを処理するためのコード例を示しました。ユースケースに応じて、ワークロードに適したモードを選択できます。これにより、不要な Lambda関数呼び出しを減らし、失敗を含むバッチで同じメッセージの再処理を防ぐことができます。
この機能の使用方法の詳細については、DynamoDB および Kinesis Streams の開発者ドキュメントを参照してください。サーバーレス技術を使用した構築の詳細については、Serverless Landをご覧ください。
翻訳はソリューションアーキテクト福本が担当しました。原文はこちらです。