How do I run the SQL commands on an Amazon Redshift table before or after writing data in an AWS Glue job?

Last updated: 2022-10-27

I have an AWS Glue job that loads data into an Amazon Redshift table. I want to run SQL commands on Amazon Redshift before or after the AWS Glue job completes.

Resolution

Pass one of the following parameters in the AWS Glue DynamicFrameWriter class:

  • aws_iam_role: Provides authorization to access data in another AWS resource. Use this parameter with the fully specified ARN of the AWS Identity and Access Management (IAM) role that's attached to the Amazon Redshift cluster (for example, arn:aws:iam::123456789012:role/redshift_iam_role). For more information, see Authorization parameters.
  • preactions: A semicolon-delimited list of SQL commands that are run before the COPY command. If the commands fail, then Amazon Redshift throws an exception.
    Note: Be sure that the preaction parameter doesn't contain any newline characters.
  • postactions: A semicolon-delimited list of SQL commands that are run after a successful COPY command. If the commands fail, then Amazon Redshift throws an exception.
    Note: Be sure that the postaction parameter doesn't contain any newline characters.
  • extracopyoptions: A list additional options to append to the Amazon Redshift COPY command when loading data (for example, TRUNCATECOLUMNS or MAXERROR).

Example scenarios

Truncate an Amazon Redshift table before inserting records in AWS Glue

Use the preactions parameter.

Python example:

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala example:

val options = JsonOptions(Map(
   "dbtable" -> "target_table",
   "database" -> "redshiftdb",
   "preactions" -> "truncate table target_table;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)

Replace the following values:

  • test_red: the catalog connection to use
  • target_table: the Amazon Redshift table
  • s3://s3path: the path of the Amazon Redshift table's temporary directory

Merge an Amazon Redshift table in AWS Glue (upsert)

Create a merge query after loading the data into a staging table.

Python example:

post_query="begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;","dbtable": "stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala example:

val options = JsonOptions(Map(
   "dbtable" -> "stage_table",
   "database" -> "redshiftdb",
   "preactions" -> "drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;",
   "postactions" -> "begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)

Replace the following values:

  • target_table: the Amazon Redshift table
  • test_red: the catalog connection to use
  • stage_table: the Amazon Redshift staging table
  • s3://s3path: the path of the Amazon Redshift table's temporary directory

For more information, see Use a staging table to perform a merge (upsert).

Ignore rows that aren't valid

Use the extracopyoptions parameter to specify a higher MAXERROR value.

Python example:

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala example:

val options = JsonOptions(Map(
   "dbtable" -> "testalblog2",
   "database" -> "reddb",
   "preactions" -> "drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;",
   "postactions" -> "delete from emp1;",
   "extracopyoptions" -> "MAXERROR 2"
   ))
glueContext.getJDBCSink(catalogConnection = "test", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(persons_DyF)

Replace the following values:

  • test: the catalog connection to use
  • testalblog2: the Amazon Redshift table to load data into
  • reddb: the Amazon Redshift database
  • emp1: the Amazon Redshift table to delete the data from, after the data is loaded into testalblog2
  • s3://s3path: the path of the Amazon Redshift table's temporary directory

Did this article help?


Do you need billing or technical support?