AWS Glue ジョブでデータを書き込む前後に、SQL コマンドを Amazon Redshift テーブルで実行するにはどうすればよいですか?

所要時間3分
0

Amazon Redshift テーブルにデータをロードする AWS Glue ジョブがあります。AWS Glue ジョブが完了する前か後に Amazon Redshift で SQL コマンドを実行したいと考えています。

解決方法

AWS Glue の DynamicFrameWriter クラスで、以下に示すパラメータのいずれかを渡します。

  • aws_iam_role: 別の AWS リソースにあるデータへのアクセスを認可します。Amazon Redshift クラスターにアタッチされている AWS Identity and Access Management (IAM) ロールの、完全に指定された ARN と共にこのパラメータを使用します。例えば、arn:aws:iam::123456789012:role/redshift_iam_role を使用します。詳細については、「認証パラメータ」を参照してください。
  • preactions: COPY コマンドに先立って実行される SQL コマンドのセミコロン区切りリスト。コマンドが失敗した場合、Amazon Redshift は例外をスローします。
    注: preaction パラメータに改行文字が含まれていないことを確認してください。
  • postactions: COPY コマンドが完了した後に実行される SQL コマンドのセミコロン区切りリスト。コマンドが失敗した場合、Amazon Redshift は例外をスローします。
    注: postaction パラメータに改行文字が含まれていないことを確認してください。
  • extracopyoptions: データをロードするときに Amazon Redshift の COPY コマンドに追加する追加オプションのリストです。例えば、TRUNCATECOLUMNS や MAXERROR を使用できます。

サンプルシナリオ

AWS Glue でレコードを挿入する前に Amazon Redshift テーブルを切り詰める

preactions パラメータを使用します。

次の Python の例を参照してください。

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

次の Scala の例を参照してください。

val options = JsonOptions(Map(
   "dbtable" -> "schema.target_table",
   "database" -> "redshiftdb",
   "preactions" -> "truncate table schema.target_table;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)

これらの例では、必ず次の値を置き換えてください。

  • test_red: 使用するカタログ接続
  • schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル
  • s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス

接続オプションで IAM ロールを使用する

認証情報は 1 時間後に期限切れになるため、接続オプションで IAM ロールを使用して、長時間実行されている接続が失敗しないようにしてください。

次の Python の例を参照してください。

glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name",  table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})

次の Scala の例を参照してください。

val connectionOptions = JsonOptions(Map(
      "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database",
      "dbtable" -> "schema.table",
      "user" -> "redshift_user",
      "password" -> "redshift_password",
      "tempdir" -> "s3://temp_bucket/temp",
      "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" ))

val dyf = glueContext.getSource("redshift", connectionOptions)
          .getDynamicFrame()

AWS Glue で Amazon Redshift テーブルをマージする (アップサート) データをステージングテーブルにロードした後、マージクエリを作成します。

**注:**マージクエリが機能するためには、 target_table が既に Amazon Redshift データベースに存在している必要があります。

次の Python の例を参照してください。

post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

次の Scala の例を参照してください。

val options = JsonOptions(Map(
   "dbtable" -> "schema.stage_table",
   "database" -> "redshiftdb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)

これらの例では、必ず次の値を置き換えてください。

  • schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル
  • test_red: 使用するカタログ接続
  • schema.stage_table: Amazon Redshift データベースのスキーマと Amazon Redshift ステージングテーブル
  • s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス

詳細については、「ステージングテーブルを使用したマージ (アップサート) の実行」を参照してください。

有効でない行を無視する

extracopyoptions パラメータを使用して、より大きい MAXERROR 値を指定します。

次の Python の例を参照してください。

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

次の Scala の例を参照してください。

val options = JsonOptions(Map(
   "dbtable" -> "testalblog2",
   "database" -> "reddb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "delete from emp1;",
   "extracopyoptions" -> "MAXERROR 2"
   ))
glueContext.getJDBCSink(catalogConnection = "test", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(persons_DyF)

これらの例では、必ず次の値を置き換えてください。

  • schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル
  • schema.stage_table: Amazon Redshift データベースのスキーマと Amazon Redshift ステージングテーブル
  • test: 使用するカタログ接続
  • testalblog2: データを読み込む Amazon Redshift テーブル
  • reddb: 当該の Amazon Redshift データベース
  • emp1: testalblog2 にデータをロードした後、データを削除する Amazon Redshift テーブル
  • s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス

追加情報

AWS Glue 4.0 ETL ジョブを使用する場合は、Amazon Redshift Spark コネクタ (redshift-jdbc42-2.1.0.9) を使用できます。このコネクタには次のプロパティがあります。

  • IAM ベースの JDBC URL をサポートします。
  • autopushdownautopushdown.s3_result_cacheunload_s3_format などのパフォーマンス改善オプションが含まれます。
  • 一時フォルダのデータに使用できる SSE_KMS 暗号化オプションが含まれます。AWS Glue は Amazon Redshift テーブルから読み取るときにこのデータを使用します。

関連情報

COPY

TRUNCATE

データのロード操作

AWS公式
AWS公式更新しました 1年前
コメントはありません