サーバーレスにおけるべき等性の実装 (データストア編)

サーバーレスが気になる開発者に捧ぐ「べき等性」ことはじめ 第 3 回

2021-07-05
デベロッパーのためのクラウド活用方法

福井 厚

このシリーズの 第 2 回 では、クライアントからバックエンドのサービスを利用する際に、なんらかの原因でエラーが発生した場合にクライアント側でリトライ処理が実行されると、リクエストが重複して送られる可能性があることを説明しました。クライアントからキューに対してメッセージを送信するようなサーバーレスのシステムにおいては、リトライ処理によって重複したメッセージが送信されてもメッセージの重複を排除する機能を利用することによってべき等性を実現する方法について解説を行いました。実際には同じメッセージが何度も送信されることを防いでいるので、正確にはべき等とは異なりますが、重複したメッセージをただ一度だけ処理する (Exactly Once) ことで、結果としてべき等性を実現するという考え方を具体的な実装方法と共に紹介しました。


データストアに対するリトライ処理

第 3 回ではこの考え方を進めて、データストアに対する処理でも同様の機能を実装するための方法を紹介します。第 2 回の最後でもお伝えした通り、SQS FIFO や SNS FIFO の機能を利用してメッセージの重複を排除できたとしても、キューからメッセージを取り出してデータストアへ書き込む処理の途中で何らかのエラーが発生した場合、リトライ処理が実行されることになります。ここでは、AWS Lambda (以下 Lambda) と Amazon DynamoDB (以下 DynamoDB) を例として、この問題の対処について考えます (図 1)。

図 1

ご注意

本記事で紹介する AWS サービスを起動する際には、料金がかかります。builders.flash メールメンバー特典の、クラウドレシピ向けクレジットコードプレゼントの入手をお勧めします。

*ハンズオン記事およびソースコードにおける免責事項 »

この図では、①の Lambda 関数は API Gateway へのリクエストのイベントで第 2 回で紹介した SQS FIFO へ重複排除を行うためのメッセージグループ ID と重複排除 ID を設定して SQS FIFO キューへメッセージを送信します。SQS FIFO キューへ送信されたメッセージを処理するために②の Lambda 関数が起動され、DynamoDB へデータを書き込む処理を行うことを想定しています。

Lambda 関数自体はステートレスに実装されているため(第 1 回の記事を参照)、最終的な状態はデータストアである DynamoDB に保存する必要があります。②の Lambda 関数が DynamoDB への書き込みが終了後に何らかの理由でエラーが発生し、Lambda 関数がエラーを返すことによって SQS キューのメッセージを削除できなかった場合、可視性タイムアウトの時間が過ぎるとメッセージは再度 Lambda 関数によって処理されることになります。

ここでは、話を単純にするために、注文する商品は固定で数量のみを指定することとします。クライアント側から注文を送信する場合、クライアント側で生成した注文 ID (UUID) と顧客 ID、数量をメッセージのボディーに含めてリクエストを送信します (クライアントから送信される注文データのメッセージ例 1)。

{
  “order_id”: “616ae274-a023-42a6-a821-78938b8c5fea”,
  “customer_id”: “cust_1234”,
  “order_count”: 2
}
クライアントから送信される注文データのメッセージ例 1

メッセージ内の注文 ID (UUID) をキューの重複排除 ID と DynamoDB のプライマリキーに利用します。②の Lambda 関数が何らかのエラーが原因でキューのメッセージを複数回処理したとしても、DynamoDB への書き込みが単純に同一のプライマリキーを指定してメッセージの値を Put するだけであれば、何回実行されても結果は変わらず、べき等性は担保されます。

DynamoDB では 1 件のアイテムを書き込む場合、DynamoDB の PutItem API をコールします (AWS Python SDK を利用して Python で記述する場合は、put_item メソッドを利用) が、この時、DynamoDB ではプライマリキーに指定した値がテーブル内に存在しなければ新規にアイテムを追加し、存在すればアイテムの内容を置き換えます。

def create_order(dynamodb, tableName, orderId, customerId, count):

    table = dynamodb.Table(tableName)
    try:
        response = table.put_item(
            Item={
                "order_id": orderId,
                "customer_id": customerId,
                "count": Decimal(count),
                "order_created": Decimal(str(datetime.now().timestamp()))
            }
        )
    except:
        print("create an order item error!")

    return response
DynamoDB へのアイテムの登録 (注文 ID がクライアントから送信される場合)

単純にメッセージの値を書き込むのではなく、書き込まれたデータに伴って発生する別の処理がある場合はどうでしょうか。第 1 回の記事では、注文の情報に基づいて配送処理を行う例を挙げました。ここでは、DynamoDB に注文情報が書き込まれた後に DynamoDB Streams によって登録された情報が流れ、その情報を元に③の Lambda 関数が配送指示の処理を行うこととします。

実は、この場合でも PutItem API で DynamoDB テーブルの特定のアイテムを上書きした時に、上書きする前と後ですべてのアトリビュートの値がまったく同じ場合は、DynamoDB Streams に更新情報が流れることはありません。そのため誤って複数回配送指示が行われることもありません (これは前回説明した通り FIFO でリクエストの順序性が守られていることが前提です)。


注文 ID をバックエンドで生成する

では、注文 ID をクライアント側では生成せず、バックエンド側の②の Lambda 関数で注文 ID を生成して書き込んでいる場合はどうでしょうか。クライアント側はリクエストの送信時に注文日時をミリ秒単位の Unix エポック時間形式でメッセージに含めて、顧客 ID と注文日時を組み合わせて重複排除 ID として利用しているとします。クライアント側から送信されるメッセージは、「クライアントから送信される注文データのメッセージ例 2」のようになります。

{
  “order_created”: 1623247469.726968,
  “customer_id”: “cust_1234”,
  “order_count”: 2
}
クライアントから送信される注文データのメッセージ例 2

さて、この時に先ほどの説明と同様に何らかの原因で②の Lambda 関数の処理でエラーが発生し、キューのメッセージが複数回処理されてしまったとします。今回は②の Lambda 関数で注文 ID (UUID) を生成して DynamoDB に書き込むため、異なる注文 ID を持つアイテムが複数登録されてしまいます。注文の情報が重複して書き込まれるのと同じ内容で複数の配送指示が出されることになります。

def create_order2(dynamodb, tableName, orderCreated, customerId, count):

    table = dynamodb.Table(tableName)
    try:
        response = table.put_item(
            Item={
                “order_id”: str(uuid.uuid4()), //バックエンドでIDを生成
                "customer_id": customerId,
                "count": Decimal(count),
                "order_created": Decimal(orderCreated)
            }
        )
    except:
        print("create an order item error!")

    return response
DynamoDB へのアイテムの登録 (注文 ID がバックエンドで生成される場合)

このような場合の対応策として考えられる一つの方法として、DynamoDB 側のテーブル設計を変更し、プライマリキーを order_id の代わりにパーティションキーに customer_id、ソートキーに order_created を指定したプライマリーキーに変更します。その上で、put_item を実行する時の条件として同じ customer_id と order_created のデータが存在しないことを指定してアイテムの書き込みを行います。

def create_order3(dynamodb, tableName, customerId, count, orderCreated):

    table = dynamodb.Table(tableName)

    response=None

    try:
        response = table.put_item(
            Item={
                "order_id": str(uuid.uuid4()),
                "customer_id": customerId,
                "count": Decimal(count),
                "order_created": Decimal(orderCreated)
            },
            ConditionExpression=
              'attribute_not_exists(customer_id) AND attribute_not_exists(order_created)'
        )
    
    except ClientError as e:
        print(e)
    
    return response
DynamoDB へのアイテムの登録 (顧客 ID と注文日時の重複をチェック)

このように条件指定することで、同じ顧客 ID と注文日時のデータが既に存在する場合は、ConditionalCheckFailedException の例外が発生し、新しいアイテムが作成されることを防ぐことができます。

今回は DynamoDB を例に取り上げましたが、リレーショナルデータベースでも考え方は同じです。プライマリキーに指定する一意の値の扱いがポイントになります。リレーショナルデータベースでは、プライマリキーやユニークインデックスによって値の重複を排除する機能が提供されていますし、where 句に条件を指定して SQL 文を実行することでも重複を排除することができます。


さらに考慮すべきこと

本記事では、データストアに対して複数回同じデータを処理することについて考えました。次回はこの考えを広げて、複数の処理を連続して実行するバッチ処理や、マイクロサービスアーキテクチャにおいて、個別のサービスがそれぞれのデータストアに結果を保存 (コミット) しながら、複数のサービスで連携して一連の処理を完結するようなロングトランザクションについて検討します。どうぞお楽しみに。


builders.flash メールメンバーへ登録することで
AWS のベストプラクティスを毎月無料でお試しいただけます

筆者プロフィール

福井 厚
アマゾン ウェブ サービス ジャパン合同会社
シニアソリューションアーキテクト サーバーレススペシャリスト

2015 年からアマゾンウェブサービスジャパンでソリューションアーキテクトとして活動。サーバーレススペシャリストとして日々モダンアプリケーション開発とサーバーレスの活用の技術支援を行なっています。

AWS のベストプラクティスを毎月無料でお試しいただけます

さらに最新記事・デベロッパー向けイベントを検索

AWS を無料でお試しいただけます

AWS 無料利用枠の詳細はこちら ≫
5 ステップでアカウント作成できます
無料サインアップ ≫
ご不明な点がおありですか?
日本担当チームへ相談する