Amazon Web Services ブログ

Amazon が取り組む、Amazon DynamoDB を活用した大規模集計の精度維持

本記事は 2025 年 5 月 19 日に公開された How Amazon maintains accurate totals at scale with Amazon DynamoDB を翻訳したものです。翻訳は Solutions Architect の嶋田 朱里が担当しました。

Amazon の Finance Technologies Tax チーム (FinTech Tax) は、世界中の法域で税額計算、税額控除、納付、報告といった重要なサービスを管理しています。このアプリケーションは、複数の国際マーケットプレイスで年間数十億件の取引を処理しています。

この投稿では FinTech Tax チームが Amazon DynamoDB のトランザクションと条件付き書き込みを使用して、段階的な源泉徴収を実装した方法を紹介します。
これらの DynamoDB の機能を使用することで、拡張性と回復力があるイベント駆動の税額計算サービスを構築し、大規模でもミリ秒レベルのレイテンシーを実現しました。
また、一貫したパフォーマンスを実現しながら、データの正確性を厳密に維持するための設計上の決定と実装の詳細についても探ります。

要件

Amazon は複数の法域にまたがる複雑なフィンテック (金融技術) 分野の税制環境で事業を行っており、さまざまな源泉徴収税の要件を管理する必要があります。同社には、膨大な取引量を処理できる堅牢な税処理ソリューションが必要です。このシステムは、毎日数百万件の取引をリアルタイムで処理し、個人ごとの累積取引額の正確な記録を源泉徴収税計算のために維持する必要があります。主な要件には、段階的な源泉徴収税率を正確に適用すること、および Amazon の既存システムとのシームレスな統合が含まれます。このソリューションはデータの整合性と高可用性を維持し、さまざまな源泉徴収税制度に対する規制遵守をサポートする必要があります。

課題

主な課題は世界中の複雑に絡み合った税法に厳密に準拠することにあります。特に、段階的課税モデルでは、個人の総取引額が財務年度内の特定の閾値を超えるかどうかに基づいて、異なる源泉徴収税率が適用されます。個人の累積取引額が増加し、あらかじめ定義された閾値を超えると、その取引に適用される源泉徴収税率が変更されます。例えば、総額が 100,000 インドルピー (INR) に達するまでは低い税率が適用され、その閾値を超えると、より高い税率が適用されます。

次の図は、累積取引金額の閾値に基づいて税率が段階的に変化する様子を示した、3 段階の税率モデルを示しています。

Tiered withholding every 100k

段階的課税モデルの課題は、源泉徴収についてリアルタイムの計算を行いながら、各個人の累計取引額を正確に追跡・記録管理することにあります。
Amazon は 1 日に数百万件のトランザクションを処理しなければなりません。 さらに、正の取引・負の取引(例:プラスまたはマイナスの会計調整)に関わらず、正しい源泉徴収税率をリアルタイムで適用することが求められます。
これには高い取引量 (個人あたり約 150 トランザクション/秒) を処理しながら、正確な記録を維持できるシステムが必要です。

ソリューションの概要

次の図は Amazon の源泉徴収税計算サービスの全体アーキテクチャです。

Components of AWS architecture

ワークフローは以下のステップで構成されています:

  1. クライアントが Amazon API Gateway に源泉徴収税計算リクエストを送信します。
  2. API Gateway が税額計算 (Tax Computation) AWS Lambda を呼び出します。
  3. 税額計算 Lambda 関数が、DynamoDB の個人の累積トランザクションストア(Cumulative Transaction Store)テーブルを取得します。累積トランザクションストアテーブルは過去の累計値をもとに、ユーザーごとの累積取引金額をリアルタイムで管理します。これにより、段階的な税率を適用するための個人の累積取引金額の合計を正確に追跡できます。
  4. Lambda 関数は取引の詳細と個人の累計金額に基づいて、ルールエンジンライブラリから適用される税率を取得します。取得した税率と取引データをもとに、税額が計算されます。
  5. 計算結果は取引データの監査と履歴管理のために DynamoDB の取引監査ストア (Transaction Audit Store) に格納されます。
  6. 現在の取引金額をもとに、個人の累積取引金額が累積トランザクションストアに更新されます。
  7. DynamoDB 操作中に発生する一時的なエラー (例: ConditionalCheckFailedTransactionConflict) は、Amazon Simple Queue Service (Amazon SQS) キューに送られ、再試行されます。
  8. クライアントエラー (400 Validation Exception、401 Unauthorized、403 Forbidden など) や永続的なサーバー障害によるエラーは、SQS DLQ で処理されます。

実装上の考慮事項

トランザクションを受信すると、システムはルールエンジンから導出された閾値に対して個人の累積取引額を評価して、適用される税率を判断します。その後、累積取引金額は累積トランザクションストアに更新され、監査証跡も記録されます。

複数のスレッドが同一個人のデータベースを同時に更新しようとすると、競合が発生します。一般的な 楽観的排他制御 (OCC) の手法は、累積値を読み取り、指定範囲の値に対する税率を計算し、累積値が読み取り以降変更されていないという条件付きでトランザクションを書き込みます。もし値が変更されていた場合はループの最初から処理をやり直します。
トラフィックが多い場合、この再実行が頻繁に発生する可能性があります。

私たちのアプローチは、一般的な OCC パターンを改良したものです。条件の判定を「累積値が最初に読み取った時点の範囲内に留まっているか」のみに絞っています。 累積値が変化しても、その値が閾値を超えない限り、ループを再実行する必要はありません。 この方法により、条件の不一致が少なくなるため、スループットが上がります。個人の累積値がより高い範囲に移行した場合は、書き込み操作が失敗します。 その場合は、更新された値をもとに、読み取りと書き込みを再試行する必要があります。

OCC 戦略とは異なり、このアプローチでは最後の読み取り以降に値が変化していても処理が成功します。これにより、競合を最小限に抑え、スループットを向上させることができます。同時更新(累積合計が閾値を超えるケース)によって条件付き書き込みが失敗し、ConditionalCheckFailedException が発生することがありますが、これは想定された動作であり、データの不整合を示すものではありません。

一時的なエラーを処理し、同じトランザクションの重複処理を防ぐために、クライアント要求トークン (Client Request Token, CRT) を含んだ TransactWriteItems 操作を実行することで、インクリメント操作を冪等性のある状態で行えます。TransactionCanceledException は、エクスポネンシャルバックオフなどのエラー処理メカニズムで処理されます。

この戦略の組み合わせにより、システムはデータの整合性を維持しながら、高いスループットとスケーラビリティを実現できます。 複雑なロック機構が不要になり、従来のOCCソリューションと比べて効率性が向上します。また、大規模な設定やチューニングを必要とせず、さまざまなトランザクション量や同時実行レベルに柔軟に対応できる、高性能なソリューションを提供します。

累積トランザクションストア

累積トランザクションストアテーブルは、特定の個人の取引金額の累積和を維持するために使用されます。以下のデータモデルを使用します:

{
  "indvidual_id": {
    "S": "TIN1" // 累積合計を管理する単位となる一意識別子 
  },
  "cumulative_amount_consumed": { // 使用された金額の累積合計を表す
    "N": "0"
  }
}

税控除対象品目の在庫管理

税額控除監査ストア(Tax Deduction Audit Store)テーブルは、各取引の税控除率の監査記録を保存するために使用されます。以下のデータモデルを使用します:

{
    "transaction_primary_key": {
       "S": "XXX111#2024-01-01T13:05:28" // トランザクションの一意識別子(PartitionKey#SortKey)
    },
    "transaction_amount": {
        "S": "1000".  //トランザクション全体の金額
    },
    "transaction_tax_amount": {
        "S": "100".  //控除される税額
    },
    "transaction_tax_rate":{
        "S":"10".    //このトランザクションに適用される税率(パーセント表記)
    }
    ...
}

条件付き書き込みのコード

次のコードは dynamodb.transact_write_items() を使用して、累積トランザクションストアと取引監査ストアの 2 つの DynamoDB テーブルにまたがるアトミックな条件付き書き込み操作を示しています。累積トランザクションストアから既存のレコードを取得し、現在の取引金額と既存データに基づいて cumulative_amt_consumed (累積消費金額)の更新値を計算します。同時に、取引監査ストアに新しいレコードを記録し、ID、値、税額、税率などのトランザクション詳細を記録します。

transact_write_items() メソッドは、取引トランザクションストアテーブルへの更新操作と取引監査ストアテーブルへの put 操作を 1 つのトランザクションとして実行します。
2 つの操作がともに成功すれば、両方のテーブルに変更がコミットされます。そうでない場合は、トランザクション全体がロールバックされ、データの整合性が保たれます。

SAMPLE_TIN = 'TIN1' # 累積トランザクションストアにおける一意の識別子を表す
 SAMPLE_AMOUNT = 5000  # transact_write_items で処理される売上値を表す
 SAMPLE_TRANSACTION_ID = 'XXX111'
 DEFAULT_TAX_RATE = 10 # 既定の税率(パーセンテージ値)
 LOWER_TAX_RATE = 5 # 低い方の税率(パーセンテージ値)

 RETRYABLE_ERRORS = (
    'TransactionConflictException',
    'ConditionalCheckFailedException',
    'ProvisionedThroughputExceededException',
    'ThrottlingException',
    'ServiceUnavailableException',
    'InternalServerErrorException'
)

 MAX_RETRIES = 3 
 RETRY_DELAY = 0.1  # 秒

 def send_to_error_queue(error_message, is_retryable, transaction_id):
    queue_url = 'TransientErrorQueue' if is_retryable else 'NonTransientErrorQueue'
    message_body = {
        'error_message': error_message,
        'transaction_id': transaction_id 
    }
    try:
        sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(message_body)
        )
    except Exception as e:
        print(f"Failed to send message to {queue_url}: {str(e)}")

 def process_transaction(tin, amount, transaction_id):
    for attempt in range(MAX_RETRIES + 1):
        try:
            response = dynamodb.get_item(TableName='CumulativeTransactionStore', Key={'cumulativeStore_primary_key': {'S': tin}})
            item = response.get('Item')

            if not item:
                print("Record not found.")
                return 

            cumulative_amount_consumed = int(item.get('cumulative_amount_consumed', {}).get('N', '0'))
            threshold_value = int(item.get('threshold_value', {}).get('N', '0'))
            current_amount = amount 

            if (cumulative_amount_consumed + current_amount < threshold_value):
                update_expression = 'SET cumulative_amount_consumed = cumulative_amount_consumed + :val, tax_rate = :tax_rate'
                tax_rate = DEFAULT_TAX_RATE 
                max_value = threshold_value 
                min_value = 0 
            else:
                update_expression = 'SET cumulative_amount_consumed = cumulative_amount_consumed + :val, tax_rate = :tax_rate'
                tax_rate = LOWER_TAX_RATE 
                max_value = sys.maxsize 
                min_value = threshold_value 

            expression_attribute_values = {
                ':val': {'N': str(current_amount)},
                ':tax_rate': {'N': str(tax_rate)},
                ':lo': {'N': str(min_value)},
                ':hi': {'N': str(max_value)}
            }

            dynamodb.transact_write_items(
                TransactItems=[ 
                    {
                        'Update': {
                            'TableName': 'CumulativeTransactionStore',
                            'Key': {'cumulativeStore_primary_key': {'S': tin}},
                            'UpdateExpression': update_expression,
                            'ConditionExpression': 'cumulative_amount_consumed < :hi AND cumulative_amount_consumed >= :lo',
                            'ExpressionAttributeValues':  expression_attribute_values,
                        }
                    },
                    {
                        'Put': {
                            'TableName': 'TaxDeductionAuditStore',
                            'Item': {
                                'transactionID': {'S': transaction_id},
                                'transaction_amount': {'N': str(amount)},
                                'transaction_tax_amount': {'N': str(amount * tax_rate / 100)}
                            }
                        }
                    }
                ],
                ClientRequestToken=transaction_id 
            )
            print(f"Transaction processed successfully on attempt {attempt + 1}")
            return  # Success, exit the function 

        except Exception as e:
            error_code = e.response['Error']['Code'] 
            error_message = f"Error accessing DynamoDB: {error_code} - {e.response['Error']['Message']}"
            is_retryable = error_code in RETRYABLE_ERRORS 

            if is_retryable and attempt < MAX_RETRIES:
                print(f"Retryable error occurred on attempt {attempt + 1}. Retrying...")
                time.sleep(RETRY_DELAY * (2 ** attempt))  # Exponential backoff 
            else:
                send_to_error_queue(error_message, is_retryable, transaction_id)

    # If we've exhausted all retries 
    error_message = f"Max retries ({MAX_RETRIES}) exceeded. Last error: {error_message}"
    send_to_error_queue(error_message, True, transaction_id)

# Main execution 
 try:
    process_transaction(SAMPLE_TIN, SAMPLE_AMOUNT, SAMPLE_TRANSACTION_ID)
 except Exception as e:
    print(f"Transaction processing failed: {str(e)}")

結果

システムのパフォーマンス評価では、実行時間を 30 秒に固定し、スレッド数を変えながら一連のテストを実施しました。
各実行後に累積トランザクションストアをゼロにリセットすることで、さまざまな負荷条件下でのシステムの動作を包括的に分析しました。

1 スレッドから 130 スレッドにスケールアップするにつれて、処理されたトランザクション数が一貫して増加したことから、システムが大規模な並列処理の場面においても高い並行性を効果的に処理できることが示されました。
しかし、この処理能力の向上には一時的な競合の増加が伴いました。これは、大規模な並列処理の場面において、パフォーマンスと競合管理のトレードオフを浮き彫りにしています。

一時的なアクセスの競合は、複数のトランザクションが同時に同じアイテムを更新しようとしたときに発生し、一部のトランザクションがキャンセルされることになります。このデータが示すのは、スレッド数を増やしても競合管理のオーバーヘッドが増大するため、スループットが大幅には向上しなくなるということです。

次のグラフはスレッド数とトランザクションメトリクスの相関関係を示しています。
これにより、スループットと競合率が同時実行スレッドの増加に伴ってどのように変化するかがわかります。

TPS peaking at about 200/sec

結論

この投稿では、Amazon Fintech チームが DynamoDB の強力な条件付き書き込み機能を使用することで、段階的税率アプリケーション向けのシンプルかつ高いスケーラビリティを持つソリューションを実装した方法を紹介しました。
この手法を採用し、まれに発生する ConditionalCheckFailedException を先に見越して処理することで、大量の同時トランザクションが発生するシナリオにおいても、高いスループットとスケーラビリティを実現しながら、データの一貫性を維持することができます。

この手法は、同時リクエスト数が増加するにつれボトルネックになりがちな楽観的ロックの必要性をスマートに排除しています。代わりに、Amazon Fintech システムは DynamoDB の組み込みの同時アクセス制御メカニズムを活用し、高負荷状況でも一貫したデータと効率的な更新を可能にしています。

拡張性のあるトランザクション処理システムを独自に実装するには、DynamoDB の 条件付き更新 機能を確認してください。詳しいガイダンスが必要な場合は、DynamoDB の ドキュメント を参照するか、AWS サポートにお問い合わせください。

著者について

Jason Hunter はカリフォルニア在住の Amazon DynamoDB 専任のプリンシパルソリューションアーキテクトです。2003 年から NoSQL データベースに携わっています。Java、オープンソース、XML への貢献で知られています。DynamoDB の投稿Jason Hunter が書いた他の投稿は、AWS Database Blog で見つけることができます。

Balajikumar Gopalakrishnan は、Amazon Finance Technology の Principal Engineer です。
2013 年から Amazon に在籍し、Amazon の顧客の生活に直接影響を与える技術を通じて、実世界の課題を解決してきました。
仕事以外では、ハイキング、絵画、家族と過ごすことを楽しんでいます。また、映画好きでもあります。

Jay Joshi は、Amazon Finance Technology のソフトウェア開発エンジニアです。
2020 年から Amazon に在籍し、主に世界各地の法域での税額計算とレポーティングのためのプラットフォームの構築に従事しています。
仕事以外では、家族や友人と過ごしたり、新しい料理の行き先を探索したり、バドミントンをするのが好きです。

Arjun Choudhary は 2019 年からAmazon の Finance Technology 部門でソフトウェア開発エンジニアとして働いています。主な業務は、グローバルな法人税の源泉徴収プラットフォームの開発です。仕事以外では、小説を読んだり、クリケットやバレーボールをしたりして楽しんでいます。