Amazon Web Services ブログ

AWS Glueを使ったソーススキーマの変更点の特定

現在の多くの組織ではトランザクションデータストア、クリックストリーム、ログデータ、IoT データなど、あらゆる種類の異なるデータソースからこれまでにないほど多くのデータを収集しています。このデータは、構造化データや非構造化データなど、異なる形式であることが多いものです。これらはビッグデータの特徴としてよく言及される 3 つの V (ボリューム、ベロシティ、バラエティ)に含まれます。データから情報を抽出するために、それらのデータは通常、Amazon Simple Storage Service (S3)上に構築されたデータレイクに保存されます。データレイクで利用できるスキーマオンリードという機能を活用することで、データソース上のスキーマやスキーマの変更を気にすることなくデータをデータレイクに取り込むことができます。これにより、データの取り込みやデータパイプラインの構築の高速化が可能になります。

しかし、アプリケーションとの連携を考えると、Amazon QuickSight のようなサービスでのビジネスインテリジェンス (BI) ダッシュボードの構築、Amazon Athena のようなサーバーレスクエリーエンジンを使ったデータ探索など、他の使用事例のために、このデータを活用しているかもしれません。さらに、運用と分析のニーズに合わせて、リレーショナルデータベース、非リレーショナルデータベース、またはデータウェアハウスなどのデータストアにデータを投入するための抽出、変換、ロード(ETL)データパイプラインを構築している場合もあります。このような場合、アプリケーションの障害やダッシュボードやレポートの問題を避けるために、スキーマを前もって定義したり、新しいカラムの追加、既存のカラムの削除、既存のカラムのデータ型の変更、既存のカラムの名前の変更など、スキーマの変更を記録しておく必要があります。

多くのユースケースで、データパイプラインの構築を担当するデータチームはソーススキーマを制御できず、ソーススキーマの変更を特定するソリューションを構築して、それを基にプロセスや自動化を構築する必要があることが分かっています。これには、ソーススキーマに依存するチームへの変更通知の送信、スキーマの変更をすべて記録する監査ソリューションの構築、ソーススキーマの変更を ETL ツールや BI ダッシュボードなどの下流アプリケーションに伝播させるための自動化または変更要求プロセスの構築などが含まれるかもしれません。スキーマのバージョン数を制御するために、古いバージョンのスキーマと新しいスキーマの間で変更が検出されない場合に、古いバージョンのスキーマを削除したい場合があります。

たとえば、さまざまな外部パートナーからフラットファイルの形で請求ファイルを受け取り、これらのファイルに基づいて請求を処理するソリューションを構築したとします。しかし、これらのファイルは外部パートナーから送られてきたものであるため、スキーマやデータ形式をあまり制御することができません。例えば、customer_idclaim_id といったカラムが、あるパートナーによって customeridclaimid に変更されたり、別のパートナーが customer_ageearning といった新しいカラムを追加して、残りのカラムは同じにしている場合があります。このような変更を事前に確認し、カラム名の変更や新しいカラムの追加など、変更に対応したETLジョブの編集を行い、請求を処理できるようにする必要があります。

本ソリューションでは、AWS Glue のクローラーを利用して、データソースのスキーマ変更の捕捉を簡略化する仕組みを紹介します。

ソリューションの概要

このソリューションでは、AWS Glue データクローラーを構築し、既存データに基づいてメタデータを同期します。変更を特定した後、Amazon CloudWatch を使って変更を記録し、Amazon Simple Notification Service (Amazon SNS) を使ってメールにてアプリケーションチームに変更を通知します。この投稿の範囲外ですが、このソリューションを拡張して、下流のアプリケーションやパイプラインに変更を伝播させる自動化の仕組みを構築し、スキーマの変更によって下流のアプリケーションで障害が発生しないようにするなど、他のユースケースに対応することができます。また、比較したスキーマのバージョン間で変更がない場合、古いバージョンのスキーマを削除する方法も紹介します。

イベントドリブンで変更を捕捉したい場合は、Amazon EventBridge を使用することで可能になります。しかし、特定のスケジュールに基づいて、複数のテーブルのスキーマ変更を同時に捕捉したい場合は、この投稿のソリューションを適用することができます。

今回のシナリオでは、スキーマが異なる 2 つのファイルを用意し、スキーマが変更されたデータをシミュレートしています。AWS Glue のクローラーを使って、S3 バケットにあるデータからメタデータを抽出します。次に、AWS Glue の ETL ジョブでスキーマの変更を AWS Glue Data Catalog に反映します。

AWS Glue は、分析目的のために複数のソースから多数のデータセットを抽出、変換、ロードするためのサーバーレス環境を提供します。データカタログは、 AWS Glue 内の機能で、異なるデータストアからデータを保存し、アノテーションを付けることで、メタデータの一元的なデータカタログを作成することができます。データストアには、例えば、 Amazon S3 のようなオブジェクトストア、Amazon Aurora PostgreSQL-Compatible Edition のようなリレーショナルデータベース、 Amazon Redshift のようなデータウェアハウスが含まれます。そして、そのメタデータを使用して、もとになるデータに対してクエリを発行して、データを変換できます。データカタログにテーブルを追加するには、クローラーを使用します。クローラーは自動的に新しいデータを発見し、スキーマ定義を抽出し、スキーマの変更を検出し、テーブルをバージョンアップすることができます。また、 Amazon S3 上の Hive スタイルのパーティション(year=YYYY, month=MM, day=DD など)を検出することができます。

Amazon S3 は、自身のデータレイクのストレージとして機能します。 Amazon S3 は、業界をリードするスケーラビリティ、データの可用性、セキュリティ、およびパフォーマンスを提供するオブジェクトストレージサービスです。

このソリューションのアーキテクチャを以下の図に示します。

ワークフローは、以下の手順で行われます。

  1. 最初のデータファイルを S3 バケットの data フォルダにコピーし、 AWS Glue クローラーを実行してデータカタログに新しいテーブルを作成します。
  2. 既存のファイルを data フォルダから archived フォルダに移動します。
  3. スキーマが更新された 2 つ目のデータファイルを data フォルダにコピーし、クローラーを再実行して新しいバージョンのテーブルスキーマを作成します。
  4. AWS Glue ETL ジョブを実行し、新しいバージョンのテーブルスキーマがあるかどうかを確認します。
  5. AWS Glue ジョブは、CloudWatch Logs にスキーマの変更点を旧バージョンのスキーマと一緒にリストアップします。スキーマに変更がなく、古いバージョンを削除するフラグが true に設定されている場合、ジョブは古いバージョンのスキーマも削除します。
  6. AWS Glue ジョブは、スキーマのすべての変更を、Amazon SNS を使用してE メールでアプリケーションチームに通知します。

ソリューションを構築するには、次の手順を実行します。

  1. 新しいデータファイルと処理されたデータファイルを保存するために、dataarchived フォルダを持つ S3 バケットを作成します。
  2. AWS Glue データベースと、data フォルダ内のデータファイルをクロールしてデータベース内に AWS Glue テーブルを作成する AWS Glue クローラーを作成します。
  3. SNS トピックを作成し、E メールのサブスクリプションを追加します。
  4. AWS Glue ETL ジョブを作成し、2 つのバージョンのテーブルスキーマを比較し、古いバージョンのスキーマからの変更点をリストアップし、古いバージョンのスキーマを削除するフラグが true に設定されていれば、古いバージョンのスキーマを削除します。また、このジョブはスキーマの変更をデータチームに通知するために、Amazon SNS でイベントをパブリッシュします。

この投稿では、データファイルを data フォルダから archived フォルダに移動し、クローラーと ETL ジョブをトリガーするステップを手動で実行します。アプリケーションのニーズに応じて、AWS Glue ワークフローでこのプロセスを自動化し、オーケストレーションすることができます。

AWS Glue テーブルのバージョンを直近のスキーマ変更で更新されたバージョンと比較するソリューションを体験するために必要なインフラストラクチャをセットアップしてみましょう。

S3 バケットとフォルダの作成

新しいデータファイルと処理されたデータファイルを保存するために、dataarchived フォルダを持つ S3 バケットを作成するには、次の手順を実行します。

  1. Amazon S3 コンソールで、ナビゲーションペインからバケットを選択します。
  2. バケットの作成を選択します。
  3. バケット名に、DNS に準拠した一意の名前を入力します(例:aws-blog-sscp-ng-202202)。
  4. リージョンには、バケットを配置するリージョンを選択します。
  5. その他の設定はデフォルトのまま、バケットを作成を選択します。
  6. バケットページで、新しく作成したバケットを選択します。
  7. フォルダの作成を選択します。
  8. フォルダ名には、data を入力します。
  9. サーバーサイドの暗号化はデフォルト(無効)のままにしておきます。
  10. フォルダの作成を選択します。
  11. この手順を繰り返して、同じバケットに archived フォルダを作成します。

AWS Glueデータベースとクローラーの作成

ここでは、AWS Glue データベースと、データバケット内のデータファイルをクロールするクローラーを作成し、新しいデータベースに AWS Glue テーブルを作成します。
注)2022年10月20日時点の新UIの手順に対応しました。

  1. AWS Glue のコンソールで、ナビゲーションペインから Databases を選択します。
  2. Add database を選択します。
  3. Name(例:sscp-database)と Description を入力します。
  4. Create Database を選択します。
  5. ナビゲーションペインで Crawlers を選択します。
  6. Create crawler を選択します。
  7. Name には、名前(glue-crawler-sscp-sales-data)を入力します。
  8. Next を選択します。
  9. Add a data source を選択します。
  10. Data source は、 S3 を選択します。
  11. S3 path で、作成した S3 バケットとフォルダ(s3://aws-blog-sscp-ng-202202/data)を選択します。
  12. データストアを繰り返しクロールする場合は、Subsequent crawler runs で、Crawl all sub-folders を選択します。
  13. Add an S3 data source を選択します。
  14. Next を選択します。
  15. Creat new IAM role を選択し、ロールの名前を入力します(例:sscp-blog)。
  16. Next を選択します。
  17. Target database は、AWS Glue データベース(sscp-database)を選択します。
  18. Table name prefix に、プレフィックスを入力します(例:sscp_sales_)。
  19. Advanced options セクションを展開し、 Update the table definition in the data catalog を選択します。
  20. Crawler scheduleFrequency は、On demand を選択します。
  21. その他の設定をデフォルトのままにして、 Next を選択します。
  22. Create crawler を選択して、クローラーを作成します。

SNS のトピックを作成

SNSトピックを作成し、Eメールサブスクリプションを追加するには、次の手順を実行します。

  1. Amazon SNS コンソールで、ナビゲーションペインからトピックを選択します。
  2. トピックの作成を選択します。
  3. タイプスタンダードを選択します。
  4. トピックの名前を入力します (例: NotifySchemaChanges)。
  5. その他の設定をすべてデフォルトのままにして、トピックの作成を選択します。
  6. ナビゲーション ウィンドウで、サブスクリプションを選択します。
  7. サブスクリプションの作成を選択します。
  8. トピック ARN で、作成した SNS トピックの ARN を選択します。
  9. プロトコルE メールを選択します。
  10. エンドポイントには、通知を受け取るメールアドレスを入力します。
  11. その他はデフォルトのまま、サブスクリプションの作成を選択します。サブスクリプション確認の E メールが届くはずです。
  12. メールに記載されているリンクを選択し、確認します。
  13. 先ほどクローラー作成で作成した AWS Glue サービスロール(AWSGlueServiceRole-sscp-blog)に以下の権限ポリシーを追加し、SNS トピックへの公開を許可します。ポリシー内の <$SNSTopicARN> は、実際の SNS トピックの ARN に変更することを忘れないでください。
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowEventPublishing", 
            "Effect": "Allow", 
            "Action": "sns:Publish", 
            "Resource": "<$SNSTopicARN>"
        }
    ]
}

AWS Glue の ETL ジョブを作成

ここでは、あるテーブルの 2 つのスキーマバージョンを比較し、スキーマの変更点をリストアップする AWS Glue ETL ジョブを作成します。スキーマに変更がなく、古いバージョンを削除するフラグが true に設定されている場合、このジョブは古いバージョンをすべて削除します。スキーマに変更がある場合、ジョブは CloudWatch のログに変更をリストアップし、 Amazon SNS でイベントを発行してデータチームに変更を通知します。
注)2022年10月20日時点の新UIの手順に対応しました。

  1. AWS Glue コンソールで、AWS Glue Studio を選択します。
  2. ナビゲーションペインで jobs を選択します。
  3. Python Shell script editorを選択します。
  4. Create を選択して Python Shell ジョブを作成します。
  5. Script タブを選択し、スクリプトエディタのフィールドに次のコードを入力します。
import boto3
import pandas as pd

# Input Paramaters:  
# catalog_id - Your AWS Glue Catalg Id - it is same as your AWS account ID
# db_name - name of your AWS Glue Database in your Glue Data catalog_id
# table_name - name of the table in your AWS Glue Database that you would like to check of change in schema
# topic_arn - ARN of the SNS topic to publish the changes in table schema
# versions_to_compare - Two versions that customer would want to compare. 0 is the lastes version and 1 in the version prior to the latest version
# delete_old_versions - If True and there are no changes in the versions compared, job would delete all old versions except for the latest "number_of_versions_to_retain" versions 
# number_of_versions_to_retain - if delete_old_versions is True and there are no changes in the versions compared, the job would delete all old versions except for the latest "number_of_versions_to_retain" versions

catalog_id = '&lt;$catalog_id&gt;'
db_name='&lt;$db_name&gt;'
table_name='&lt;$table_name&gt;'
topic_arn='&lt;$sns_topic_ARN&gt;'
versions_to_compare=[0,1]
delete_old_versions = False
number_of_versions_to_retain = 2

columns_modified = []

# Function to compare the name and type of columns in new column list with old column list to 
# find any newly added column and the columns with changed data type
def findAddedUpdated(new_cols_df, old_cols_df, old_col_name_list):
    for index, row in new_cols_df.iterrows():
        new_col_name = new_cols_df.iloc[index]['Name']
        new_col_type = new_cols_df.iloc[index]['Type']

        # Check if a column with same name exist in old table but the data type has chaged
        if new_col_name in old_col_name_list:
            old_col_idx = old_cols_df.index[old_cols_df['Name']==new_col_name][0]
            old_col_type = old_cols_df.iloc[old_col_idx]['Type']

            if old_col_type != new_col_type:
                columns_modified.append(f"Data type changed for '{new_col_name}' from '{old_col_type}' to '{new_col_type}'")
        # If a column is only in new column list, it a newly added column
        else:
            columns_modified.append(f"Added new column '{new_col_name}' with data type as '{new_col_type}'")

# Function to iterate through the list of old columns and check if any column doesn't exist in new columns list to find out dropped columns
def findDropped(old_cols_df, new_col_name_list):
    for index, row in old_cols_df.iterrows():
        old_col_name = old_cols_df.iloc[index]['Name']
        old_col_type = old_cols_df.iloc[index]['Type']

        #check if column doesn't exist in new column list  
        if old_col_name not in new_col_name_list:
            columns_modified.append(f"Dropped old column '{old_col_name}' with data type as '{old_col_type}'")

# Function to publish changes in schema to a SNS topic that can be subscribed to receive email notifications when changes are detected
def notifyChanges(message_to_send):
    sns = boto3.client('sns')
    # Publish a simple message to the specified SNS topic
    response = sns.publish(
        TopicArn=topic_arn,   
        Message=message_to_send,  
        Subject="DWH Notification: Changes in table schema"
    )
    
# Function to convert version_id to int to use for sorting the versions
def version_id(json):
    try:
        return int(json['VersionId'])
    except KeyError:
        return 0

# Function to delete the table versions
def delele_versions(glue_client, versions_list, number_of_versions_to_retain):
    print("deleting old versions...")
    if len(versions_list) &gt; number_of_versions_to_retain:
        version_id_list = []
        for table_version in versions_list:
            version_id_list.append(int(table_version['VersionId']))
        # Sort the versions in descending order
        version_id_list.sort(reverse=True)
        versions_str_list = [str(x) for x in version_id_list]
        versions_to_delete = versions_str_list[number_of_versions_to_retain:]
        
        del_response = glue_client.batch_delete_table_version(
            DatabaseName=db_name,
            TableName=table_name,
            VersionIds=versions_to_delete
        )
        return del_response

# Calling glue API to get the list of table versions. The solution assums that number of version in the table are less than 100. If you have more than 100 versions, you should use pagination and loop through each page.  
glue = boto3.client('glue')
response = glue.get_table_versions(
    CatalogId=catalog_id,
    DatabaseName=db_name,
    TableName=table_name,
    MaxResults=100
)
table_versions = response['TableVersions']
table_versions.sort(key=version_id, reverse=True)

version_count = len(table_versions)
print(version_count)

# checking if the version of table to compare exists. You would need pass the numbers of versions to compare to the job. 
if version_count &gt; max(versions_to_compare):

    new_columns = table_versions[versions_to_compare[0]]['Table']['StorageDescriptor']['Columns']
    new_cols_df = pd.DataFrame(new_columns)

    old_columns = table_versions[versions_to_compare[1]]['Table']['StorageDescriptor']['Columns']
    old_cols_df = pd.DataFrame(old_columns)

    new_col_name_list =  new_cols_df['Name'].tolist()
    old_col_name_list =  old_cols_df['Name'].tolist()
    findAddedUpdated(new_cols_df, old_cols_df, old_col_name_list)
    findDropped(old_cols_df, new_col_name_list)
    if len(columns_modified) &gt; 0: 
        email_msg = f"Following changes are identified in '{table_name}' table of '{db_name}' database of your Datawarehouse. Please review.\n\n"
        print("Job completed! -- below is list of changes.")
        for column_modified in columns_modified:
            email_msg += f"\t{column_modified}\n"

        print(email_msg)
        notifyChanges(email_msg)
    else:
        if delete_old_versions:
            delele_versions(glue, table_versions,number_of_versions_to_retain)
        print("Job completed! -- There are no changes in table schema.")
else:
    print("Job completed! -- Selected table doesn't have the number of versions selected to compare. Please verify the list passed in 'versions_to_compare'")
 
       
  1. Job details のタブを選択します。
  2. ジョブの名前を入力します(例:find-change-job-sscp)。
  3. IAM Role は、AWS Glue のサービスロール(AWSGlueServiceRole-sscp-blog)を選択します。
  4. その他はデフォルトのまま、Save を選択します。

ソリューションのテスト

これまでの手順で、このソリューションを実行するためのインフラストラクチャを構成しました。では、実際に動かしてみましょう。まず、最初のデータファイルをアップロードし、クローラーを実行して、データカタログに新しいテーブルを作成します。 注)2022年10月20日時点の新UIの手順に対応しました。
  1. 以下の内容で salesdata01.csv という名前の CSV ファイルを作成します。
ordertime,region,rep,item,units,unitcost
2022-01-06,US-West,Jones,Pencil,95,1.99
2022-01-06,US-Central,Kivell,Binder,50,19.99
2022-01-07,US-Central,Jardine,Pencil,36,4.99
2022-01-07,US-Central,Gill,Pen,27,19.99
2022-01-08,US-East,Sorvino,Pencil,56,2.99
2022-01-08,US-West,Jones,Binder,60,4.99
2022-01-09,US-Central,Andrews,Pencil,75,1.99
2022-01-10,US-Central,Jardine,Pencil,90,4.99
2022-01-11,US-East,Thompson,Pencil,32,1.99
2022-01-20,US-West,Jones,Binder,60,8.99
  1. Amazon S3 コンソールで、data フォルダに移動し、CSV ファイルをアップロードします。
  2. AWS Glue コンソールで、ナビゲーションペインから Crawlers を選択します。
  3. クローラーを選択し、Run を選択します。クローラーが完了するまで数分かかります。AWS Glue のデータベース(sscp-database)にテーブル(sscp_sales_data)が追加されます。
  4. ナビゲーションペインの Tables を選択して、作成されたテーブルを確認します。ここで、data フォルダの既存のファイルを archived フォルダに移動します。
  5. Amazon S3 コンソールで、data フォルダに移動します。
  6. アップロードしたファイル(salesdata01.csv)を選択し、アクションメニューから移動を選択します。
  7. ファイルを archived フォルダに移動します。スキーマを更新した2つ目のデータファイルを data フォルダにコピーし、クローラーを再実行します。
  8. 次のコードで salesdata02.csv という CSV ファイルを作成します。前のバージョンからの変更点は以下の通りです。
    1. region カラムのデータを地域名から何らかのコードに変更(例:データ型を文字列から BIGINT に変更する)。
    2. rep カラムを削除
    3. 新しいカラム total を追加
ordertime,region,item,units,unitcost,total
2022-02-01,01,Pencil,35,4.99,174.65
2022-02-01,03,Desk,2,125,250
2022-02-02,01,Pen Set,16,15.99,255.84
2022-02-02,03,Binder,28,8.99,251.72
2022-02-03,01,Pen,64,8.99,575.36
2022-02-03,01,Pen,15,19.99,299.85
2022-02-06,03,Pen Set,96,4.99,479.04
2022-02-10,03,Pencil,67,1.29,86.43
2022-02-11,01,Pen Set,74,15.99,183.26
2022-02-11,03,Binder,46,8.99,413.54
  1. Amazon S3 バケット上で、data フォルダにファイルをアップロードします。
  2. AWS Glue コンソールで、ナビゲーションペインの Crawlers を選択します。
  3. クローラーを選択し、Run を選択します。クローラーの実行には約 2 分かかります。以前に作成したテーブル (sscp_sales_data) のスキーマを更新します。
  4. Tables ページで新しいバージョンのテーブルが作成されていることを確認します。ここで AWS Glue ETL ジョブを実行し、新しいバージョンのテーブルスキーマがあるかどうかを確認し、CloudWatch Logs に旧バージョンのスキーマとの変更点をリストアップします。
  5. AWS Glue コンソールで、ナビゲーションペインから Jobs を選択します。
  6. ジョブ (find-change-job-sscp) を選択し、Script タブを選択します。
  7. スクリプト内のジョブの以下の入力パラメータを、設定に合わせて変更します。

  1. Save を選択します。
  2. スクリプトエディタを閉じます。
  3. ジョブを再度選択し、デフォルトのパラメータはすべてそのままにして、Run Job を選択します。
  4. ジョブの状態を監視するには、ナビゲーションペインから monitoring を選択し、Job runs にてジョブを確認します。
  5. ジョブが完了したら、View Cloud Watch Logs を選択します。

ログには、AWS Glue ジョブで確認された変更が表示されるはずです。

また、スキーマの変更に関する詳細が記載されたメールも受信しているはずです。以下は、受信したメールの例です。

これで、AWS Glue ETL ジョブによって特定されたスキーマの変更を確認し、S3 バケットから下流のアプリケーションにデータを伝播させるジョブを実行する前に、スキーマの変更に応じて下流のデータストアに変更を加えることができます。例えば、Amazon Redshift のテーブルがある場合、ジョブがすべてのスキーマ変更をリストアップした後、Amazon Redshift データベースに接続し、これらのスキーマ変更を行う必要があります。本番システムでスキーマ変更を行う前に、組織で設定された変更要求プロセスに従うことを忘れないでください。

以下の表は、Apache Hive と Amazon Redshift のデータ型に対するマッピングのリストです。他のデータストアの同様のマッピングを見つけ、下流のデータストアを更新することができます。

提供された Python コードは、スキーマの変更を比較するためのロジックを担当します。スクリプトは、AWS Glue データカタログ ID、AWS Glue データベース名、AWS Glue テーブル名のパラメータを受け取ります。

Hive Data Types Description Amazon Redshift Data Types Description
TINYINT 1 byte integer . .
SMALLINT Signed two-byte integer SMALLINT Signed two-byte integer
INT Signed four-byte integer INT Signed four-byte integer
BIGINT Signed eight-byte integer BIGINT Signed eight-byte integer
DECIMAL . . .
DOUBLE . . .
STRING . VARCHAR, CHAR .
VARCHAR 1 to 65355, available starting with Hive 0.12.0 VARCHAR .
CHAR 255 length, available starting with Hive 0.13.0 CHAR .
DATE year/month/day DATE year/month/day
TIMESTAMP No timezone TIME Time without time zone
. . TIMETZ Time with time zone
ARRAY/STRUCTS . SUPER .
BOOLEAN . BOOLEAN .
BINARY . VARBYTE Variable-length binary value

クリーンアップ

ソリューションの体験が終わったら、このウォークスルーの一部として作成したリソースを削除してください。
注)下記リソースを削除しない場合、継続的な課金がなされますのでご注意ください。

  • AWS Glue ETL ジョブ (find-change-job-sscp)
  • AWS Glue クローラ (glue-crawler-sscp-sales-data)
  • AWS Glue テーブル (sscp_sales_data)
  • AWS Glue データベース (sscp-database)
  • クローラーと ETL ジョブ用の IAM ロール (AWSGlueServiceRole-sscp-blog)
  • data フォルダと archived フォルダにあるすべてのファイルと、それらが格納されていた S3 バケット (aws-blog-sscp-ng-202202)
  • SNS トピックとサブスクリプション (NotifySchemaChanges)

結論

この投稿では、AWS サービスを組み合わせてソースデータのスキーマ変更を検出し、それを使って下流のデータストアを変更したり、ETL ジョブを実行したりして、データパイプラインの障害を回避する方法を紹介しました。ソースデータのスキーマを理解しカタログ化するために AWS Glue を、スキーマの変更を特定するために AWS Glue API を、そして変更についてチームに通知するために Amazon SNS を使用しました。また、AWS Glue API を使用して、ソースデータスキーマの古いバージョンを削除する方法を紹介しました。データレイクのストレージ層として、Amazon S3 を使用しました。
こちらで、AWS Glue の詳細について説明しています。


原文は、こちらです。