AWS Glue ジョブにおけるデータの書き込みの前か後に、Amazon Redshift テーブルで SQL コマンドを実行するにはどうすればよいですか?

最終更新日: 2019 年 4 月 17 日

Amazon Redshift テーブルにデータを読み込む AWS Glue ジョブがあります。AWS Glue ジョブが完了する前か後に Amazon Redshift で SQL コマンドを実行したいです。

解決方法

AWS Glue の DynamicFrameWriter クラスで、以下に示すパラメータのいずれかを渡します。

  • aws_iam_role: 別の AWS リソースにあるデータへのアクセスを認可します。Amazon Redshift クラスターにアタッチされている AWS Identity and Access Management (IAM) ロール (arn:aws:iam::123456789012:role/redshift_iam_role 等) の完全に規定された ARN と共にこのパラメータを使用します。詳細については、「認証パラメータ」を参照してください。
  • preactions: COPY コマンドに先立って実行される SQL コマンドのセミコロン区切りリスト。コマンドが失敗した場合、Amazon Redshift は例外をスローします。
  • postactions: COPY コマンドが完了した後に実行される SQL コマンドのセミコロン区切りリスト。コマンドが失敗した場合、Amazon Redshift は例外をスローします。
  • extracopyoptions: データの読み込み時、Amazon Redshift COPY コマンドに追加するオプションのリスト (TRUNCATECOLUMNS または MAXERROR 等)。

サンプルシナリオ

AWS Glue でレコードを挿入する前に Amazon Redshift テーブルを切り取る

以下に示す Python の例を参考に、preactions パラメータを使用します。以下の値を置き換えます。

  • test_red: 使用するカタログ接続
  • target_table: 当該の Amazon Redshift テーブル
  • s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

AWS Glue で Amazon Redshift テーブルをマージする (アップサート)

以下に示す Python の例を参考に、ステージングテーブルへのデータの読み込み後にマージクエリを作成します。以下の値を置き換えます。

  • target_table: 当該の Amazon Redshift テーブル
  • test_red: 使用するカタログ接続
  • stage_table: 当該の Amazon Redshift ステージングテーブル
  • s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス
post_query="begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
        
        datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;","dbtable": "stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

詳細については、「ステージングテーブルを使用したマージ (アップサート) の実行」を参照してください。

無効な行を無視する

以下に示す Python の例を参考に、extracopyoptions パラメータを使用して MAXERROR により高い値を指定します。以下の値を置き換えます。

  • test: 使用するカタログ接続
  • testalblog2: データを読み込む Amazon Redshift テーブル
  • reddb: 当該の Amazon Redshift データベース
  • emp1: testalblog2 にデータを読み込んだ後、データを削除する Amazon Redshift テーブル
  • test_red: 使用するカタログ接続
  • stage_table: 当該の Amazon Redshift ステージングテーブル
  • s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

この記事は役に立ちましたか?

改善できることはありますか?


さらにサポートが必要な場合