Amazon Web Services ブログ

シーメンスが Amazon S3 データレイクの更新用に、フルマネージドのスケジューリングのしくみを構築した方法

シーメンスは 37 万人を超える従業員と 170 年の歴史を持つ、グローバルなテクノロジーをリードする企業です。シーメンスのネットワークとアセットを継続的に監視するシーメンスサイバーディフェンスセンター (CDC) が、同社をサイバー犯罪から保護しています。このときに生じる膨大なデータ負荷を処理するため、CDC は ARGOS と呼ばれる次世代の脅威検出と分析のためのプラットフォームを構築しました。ARGOS はハイブリッドクラウドソリューションで、フルマネージドの AWS のサービスを多用して、ストリーミング、ビッグデータ処理、機械学習を行います。

セキュリティアナリスト、データサイエンティスト、脅威インテリジェンスチーム、インシデントハンドラーなどのユーザーが、ARGOS プラットフォームのデータに継続的にアクセスします。さらに、さまざまな自動コンポーネントがデータを更新、拡張、削除し、情報の充実、データ品質の向上、PII 要件の実施を行ったり、スキーマの進化や追加データの正規化要件のためのデータ変更を行います。データを常に利用可能かつ一貫した状態に保つには、いくつかの課題があります。

このようなシナリオでは、オブジェクトベースのデータレイクは、従来のトランザクションデータベースと比較して、コストの観点では非常に有益ですが、アトミック更新をほとんど許可しない、あるいは極めて複雑でコストのかかる拡張を必要とします。この問題を解決するため、シーメンスは、クエリのパフォーマンスと可用性を損なうことなく、

Amazon S3 ベースのデータレイクでアトミックファイルを更新できるソリューションを設計しました。

この投稿では、S3 データ更新タスク用の使いやすいスケジューリングサービスであるこのソリューションをご紹介します。仮名化、匿名化、機密データの削除など、いくつかの目的のために、シーメンスはこのソリューションを使用しています。この投稿では、このソリューションを使用して、定義した時間が経過した後にデータセットから値を削除する方法を説明します。ソリューションのアーキテクチャは明確に定義されており、スタック全体が 200 行未満のソースコードで構成されているので、データ処理タスクをさらに追加することは難しくはありません。フルマネージドの AWS サービスだけをベースにしているため、運用オーバーヘッドが最小限に抑えられます。

アーキテクチャの概要

この投稿では、クエリメカニズムとしての継続的なデータ取り込みと Amazon Athena を備えた S3 ベースのデータレイクを使用します。取り込み後、定義した時間後の特定の値を自動的に削除することを目指します。Athena を介してデータを使用するアプリケーションとユーザーは、影響を受けません (たとえば、ダウンタイムや重複などのデータ品質に関する問題はありません)。

次の図は、このソリューションのアーキテクチャを示しています。

シーメンスは、以下のサービスとコンポーネントでソリューションを構築しました。

  1. スケジューリングトリガー – 新しいデータ (JSON 形式など) を S3 バケットに継続的にアップロードします。
  2. タスクスケジューリング – 新しいファイルが到着するとすぐに、AWS Lambda 関数が結果の S3 バケット通知イベントを処理します。処理の一部として、Amazon DynamoDBTime to Live (TTL) とその S3 オブジェクトへのパスを指定する新しいアイテムを作成します。
  3. タスク実行トリガー – TTL が期限切れになると、DynamoDB アイテムがテーブルから削除され、DynamoDB ストリームがそのパスで S3 オブジェクトを処理する Lambda 関数をトリガーします。
  4. タスク実行 – Lambda 関数は TTL 有効期限イベントからメタ情報 (関連する S3 パスといったような) を取得し、S3 オブジェクトを処理します。最終的に、古いバージョンが新しい S3 オブジェクトに置き換えられます。
  5. データ使用量 – 手動でさらなる処理する必要がなく、更新したデータを Athena からのクエリに利用できます。更新したデータは読み込み操作の S3 の結果整合性を使用します。

DynamoDB ストリームと TTL について

DynamoDB 向けの TTL では、テーブル内のアイテムの有効期限を定義できるため、アイテムはデータベースから自動的に削除できます。TTL は追加費用なしで使用でき、プロビジョニング済みスループットを使わずに、ストレージの使用や無関係なデータを保存するコストを削減できます。アイテムごとに削除のタイムスタンプを設定できます。このため、テーブルで TTL を有効にすると、ストレージの使用を関連するレコードのみに制限できます。

ソリューションの概要

このソリューションを手動で実施するには、次の手順を実行します。

  1. DynamoDB テーブルを作成し、DynamoDB ストリームを設定する。
  2. TTL レコードを挿入する Lambda 関数を作成する。
  3. ターゲットバケットで S3 イベント通知を設定する。
  4. データ処理タスクを実行する Lambda 関数を作成する。
  5. Athena を使用して、処理済みデータをクエリする。

ソリューションを自動的にデプロイする場合は、これらの手順をスキップし、提供されている AWS Cloudformation テンプレートを使用します。

前提条件

このチュートリアルを完了するには、次のものが必要です。

DynamoDB を作成し、DynamoDB ストリームを設定する

まず、時間ベースのトリガー設定から始めます。S3 通知、DynamoDB ストリーム、Lambda 関数を使用して、両サービスを統合します。DynamoDB テーブルには、定義した時間後に処理するアイテムが保存されます。

次の手順を実行します。

  1. DynamoDB コンソールで、[create a table] をクリックします。
  2. [Table name] で objects-to-process と入力します。
  3. [Primary key] でパスを入力し、[String] をクリックします。
  4. テーブルを選択し、[Manage TTL] (テーブルの詳細の下の「Time to live attribute」の横にある) をクリックします。
  5. [TTL attribute] で ttl と入力します。
  6. [DynamoDB Streams] で [Enable with view type New and old images] をクリックします。

DynamoDB TTL を数値以外の属性で有効にできますが、DynamoDB TTL は数値の属性でのみ機能することに注意してください。

DynamoDB TTL には分刻みの正確性はありません。有効期限が切れたアイテムは通常、有効期限の 48 時間以内に削除されますが、実際の TTL 値から 10〜30 分ほどの誤差が発生する場合があります。TTL の詳細については、「有効期限: 仕組み」をご参照ください。

Lambda 関数を作成して、TTL レコードを挿入する

最初に作成する Lambda 関数は、タスクをスケジューリングするためのものです。この Lambda 関数は、入力として S3 通知を受け取り、S3 パス (s3://<bucket>/<key> など) をもう一度作成し、S3 パスと TTL (秒単位) の 2 つの属性を持つ DynamoDB に新しいアイテムを作成します。同様の S3 通知イベント構造の詳細については、「Lambda関数をテストする」をご参照ください。

Lambda 関数をデプロイするには、Lambda コンソールで Python 3.7 ランタイムと次のコードを使って、NotificationFunction という名前の関数を作成します。

import boto3, os, time

# Put here a new parameter for TTL, default 300, 5 minutes
default_ttl = 300

s3_client = boto3.client('s3')
table = boto3.resource('dynamodb').Table('objects-to-process')

def parse_bucket_and_key(s3_notif_event):
    s3_record = s3_notif_event['Records'][0]['s3']
    return s3_record['bucket']['name'], s3_record['object']['key']

def lambda_handler(event, context):
    try:
        bucket_name, key = parse_bucket_and_key(event)
        head_obj = s3_client.head_object(Bucket=bucket_name, Key=key)
        tags = s3_client.get_object_tagging(Bucket=bucket_name, Key=key)
        if(head_obj['ContentLength'] > 0 and len(tags['TagSet']) == 0):
            record_path = f"s3://{bucket_name}/{key}"
            table.put_item(Item={'path': record_path, 'ttl': int(time.time()) + default_ttl})
    except:
        pass # Ignore

ターゲットバケットで S3 イベント通知を設定する

データセットを保存するためのデータレイクとして S3 を使用すると、S3 のスケーラビリティ、セキュリティ、パフォーマンスを活用できます。さらに、S3 イベント通知を使って、バケット内のオブジェクトの作成や削除などの S3 関連のイベントをキャプチャできます。これらのイベントは、Lambda などの他の AWS のサービスに転送できます。

S3 イベント通知を設定するには、次の手順を実行します。

  1. S3 コンソールで、data-bucket という名前の S3 バケットを作成します。
  2. バケットをクリックして、[Properties] に移動します。
  3. [Advanced Settings] で [Events]、[add a notification] の順にクリックします。
  4. [Name] で MyEventNotification と入力します。
  5. [Events] で [All object create events] をクリックします。
  6. [Prefix] で dataset/ と入力します
  7. [Send to] で [Lambda Functions] をクリックします。
  8. [Lambda] で [NotificationFunction] をクリックします。

この設定では、以前に定義したデータセット内で発生するイベントにスケジュールを制限します。詳細については、「S3 バケットのイベント通知を有効化および設定する方法」をご覧ください。

データ処理タスクを実行する Lambda 関数を作成する

これで、DynamoDB テーブルのレコードを削除するための時間ベースのトリガーが作成されました。しかし、システムの削除が発生して、変更が DynamoDB ストリームに記録されると、それ以上のアクションは実行されません。Lambda はストリームをポーリングしてこれらの変更レコードを検出し、アクティビティ (INSERTMODIFYREMOVE) に従ってそれらを処理する関数をトリガーできます。

Lambda は DynamoDB ストリームの TTL 機能を使用してタスクの実行をトリガーするので、この投稿では削除済みアイテムだけを取り上げます。Lambda には、アイテムを単独で処理するか、あるいは処理努力を他の場所 (AWS Glue ジョブ、または Amazon SQS キューなど) に転送するかを選択できる柔軟性があります。

この投稿では、Lambda を直接使用して S3 オブジェクトを処理しています。この Lambda 関数では、次のタスクを実行します。

  1. DynamoDB アイテムの S3 パス属性から S3 オブジェクトを取得する。
  2. オブジェクトのデータを変更する。
  3. 古い S3 オブジェクトを更新されたコンテンツで上書きし、オブジェクトに processed タグを付ける。

次の手順を実行します。

  1. Lambda コンソールで、ランタイムとして Python 3.7 と次のコードを使用して、関数を作成 (JSONProcessingFunction という名前の) します。
    import os, json, boto3
    from functools import partial
    from urllib.parse import urlparse
    
    s3 = boto3.resource('s3')
    
    def parse_bucket_and_key(s3_url_as_string):
        s3_path = urlparse(s3_url_as_string)
        return s3_path.netloc, s3_path.path[1:]
    
    def extract_s3path_from_dynamo_event(event):
        if event["Records"][0]["eventName"] == "REMOVE":
            return event["Records"][0]["dynamodb"]["Keys"]["path"]["S"]
    
    def modify_json(json_dict, column_name, value):
        json_dict[column_name] = value
        return json_dict
        
    def get_obj_contents(bucketname, key):
        obj = s3.Object(bucketname, key)
        return obj.get()['Body'].iter_lines()
    
    clean_column_2_func = partial(modify_json, column_name="file_contents", value="")
    
    def lambda_handler(event, context):
        s3_url_as_string = extract_s3path_from_dynamo_event(event)
        if s3_url_as_string:
            bucket_name, key = parse_bucket_and_key(s3_url_as_string)
            updated_json = "\n".join(map(json.dumps, map(clean_column_2_func, map(json.loads, get_obj_contents(bucket_name, key)))))
            s3.Object(bucket_name, key).put(Body=updated_json, Tagging="PROCESSED=True")
        else:
            print(f"Invalid event: {str(event)}")
  2. Lambda 関数設定のウェブページで、[Add trigger] をクリックする。
  3. [Trigger Configuration] で [DynamoDB] をクリックする。
  4. [DynamoDB table] で [objects-to-process] をクリックする。
  5. [Batch size] で 1 と入力する。
  6. [Batch window] で 0 と入力する。
  7. [Starting position] で [Trim horizon] をクリックする。
  8. [Enable trigger] をクリックする。

DynamoDB テーブルで表される各 S3 オブジェクトは通常大きいため、batch size = 1 を使用します。これらのファイルが小さい場合は、より大きなバッチサイズを使用できます。バッチサイズは基本的に、Lambda 関数が一度に処理するファイル数になります。

S3 の新しいオブジェクト (バージョニング対応バケットにある) はオブジェクト作成イベントを作成するので、そのキーが既に存在する場合でも、タスクスケジュール Lambda 関数がタスク実行関数が作成するオブジェクト作成イベントを無視することを確認する必要があります。キーが存在しない場合は、無限ループを作成します。この投稿では、S3 オブジェクトのタグを使用します。タスク実行関数がオブジェクトを処理すると、processed タグが追加されます。この後の実行では、タスクスケジューリング機能はこれらのオブジェクトを無視します。

Athena を使用して、処理済みデータをクエリする

最後のステップで、Athena がデータをクエリするためのテーブルを作成します。これは、手動で行うことも、データから直接スキーマを推測し、テーブルを自動的に作成する AWS Glue クローラを使用して行うこともできます。この投稿では、スキーマの変更を処理し、新しいパーティションを自動的に追加できるため、クローラを使用しています。次のコードを使用して、このクローラを作成します。

aws glue create-crawler --name data-crawler \ 
--role <AWSGlueServiceRole-crawler> \
--database-name data_db \
--description 'crawl data bucket!' \
--targets \
"{\
  \"S3Targets\": [\
    {\
      \"Path\": \"s3://<data-bucket>/dataset/\"\
    }\
  ]\
}"

<AWSGlueServiceRole-crawler><data-bucket>AWSGlueServiceRole の名前と S3 バケットで、それぞれ置き換えます。

クロールプロセスが完了したら、データのクエリを開始できます。基になるデータが透過的に更新している間、Athena コンソールを使って、テーブルとやり取りが可能です。次のコードをご参照ください。

SELECT * FROM data_db.dataset LIMIT 1000

自動セットアップ

次の AWS CloudFormation テンプレートを使用して、この投稿で解説しているソリューションを AWS アカウントに作成できます。次のリンクを選択して、テンプレートを起動します。

CloudFormation テンプレートには、次のようなパラメータがあります。

  • Stack namedata-updater-solution などのスタックの分かりやすい名前。
  • Bucket name – ソリューションに使用する S3 バケットの名前。スタック作成プロセスで、このバケットが作成されます。
  • Time to Live – DynamoDB テーブルのアイテムを期限切れにする秒数。参照済み S3 オブジェクトは、アイテムの有効期限が切れると処理されます。

スタックの作成には最大数分かかります。AWS CloudFormation の [Resources] タブを確認して更新し、実行中のプロセスをモニタリングします。

スタックがCREATE_COMPLETE の状態を示したら、ソリューションの使用を開始できます。

ソリューションのテスト

ソリューションをテストするには、mock_uploaded_data.json データセット (Mockaroo データジェネレータで作成した) をダウンロードします。ユースケースは、ユーザーがファイルをアップロードできるウェブサービスです。アップロード後の定義した時間にそれらのファイルを削除して、ストレージとクエリのコストを削減することを目指しています。これを行うには、提供されたコードは属性 file_contents を探し、その値を空の文字列で置き換えます。

これで、data-bucket S3 バケット (dataset/ プレフィックス下にある) に新しいデータをアップロードできます。NotificationFunction Lambda 関数はアップロード用の結果のバケット通知イベントを処理し、DynamoDB テーブルに新しいアイテムが表示されます。定義済み TTL 時間のすぐ後に、JSONProcessingFunction Lambda 関数がデータを処理し、Athena クエリを介して結果の変更を確認できます。

また、この S3 オブジェクトに対応する DynamoDB アイテムが DynamoDB テーブルに存在せず、S3 オブジェクトに processed タグがある場合、S3 オブジェクトが正常に処理されたことを確認できます。

まとめ

この投稿では、シンプルでフルマネージドのスケジューリングのしくみを使って、定義した時間が経過した後、S3 でオブジェクトを自動的に再処理する方法を説明しました。ストレージに S3 を使用しているため、元のオブジェクトと処理済みオブジェクトの両方に同一のキー (名前) を使用するだけで、S3 の結果整合性モデルを自動的に活用できます。これで、データが重複または欠落しているクエリ結果を回避できます。さらに、S3 はファイル転送が正常に完了した場合にのみ新しいオブジェクトバージョンを作成するので、不完全にまたは一部分だけアップロードされたオブジェクトはデータの不整合を引き起こしません。

これまでに Spark を使って、オブジェクトを 1 時間ごとに処理したことがある方がいるかもしれません。この方法では、処理する必要があるオブジェクトをモニタリングし、ステージング領域でオブジェクトを移動および処理し、実際の宛先に戻す必要があります。この方法の主な欠点は、Spark の並列性のせいでファイルが異なる名前と内容で生成されるため、最終ステップに起こります。これは、データセット内の直接的なファイルの置換を防ぎますが、移動操作中にデータをクエリすると、ダウンタイムまたは潜在的なデータの重複が発生します。加えて、コピーおよび削除の各操作が潜在的に失敗する可能性があるため、部分的に処理される可能性のあるデータを手動で処理する必要があります。

運用の観点から見ると、AWS サーバーレスサービスではインフラストラクチャが簡素化します。サーバーレスサービスのスケーラビリティを従量課金プランと組み合わせて、低コストの POC を開始すれば、迅速に、しかもすべてを最小限のコードをベースに、本番環境へと拡張できます。

1 時間ごとの Spark ジョブと比較すると、コストを最大で 80% 削減できる可能性があります。この結果、サーバーレスでのソリューションはより安価でシンプルとなります。

Karl Fuchs 氏、Stefan Schmidt 氏、Carlos Rodrigues 氏、João Neves 氏、Eduardo Dixo 氏、そして Marco Henriques 氏からの、この投稿のコンテンツに関する貴重なフィードバックに感謝します。

 


著者について

Pedro Completo Bento はシーメンス CDC に勤務するシニアビッグデータエンジニアです。 リスボンの Instituto SuperiorTécnico でコンピューターサイエンスの修士号を取得しています。フルスタック開発者としてキャリアをスタートさせ、後にビッグデータの専門家となりました。AWS を使って、クラウド上に極めて信頼性と性能が高いスケーラブルなシステムを構築し、コストを抑えています。余暇には友人とボードゲームをするのが好きです。

 

 

Arturo Bayo はアマゾン ウェブ サービスのビッグデータコンサルタントです。EMEA のエンタープライズのお客様に、データに基づいたカルチャーを推進しています。 AWS のお客様やパートナーと連携しながら、ビジネスインテリジェンスとデータレイクのプロジェクトで専門的なガイダンスを行い、データと分析に関する革新的ソリューションを構築しています。