How do I execute SQL commands on an Amazon Redshift table before or after writing data in an AWS Glue job?

Last updated: 2019-04-17

I have an AWS Glue job that loads data into an Amazon Redshift table. I want to execute SQL commands on Amazon Redshift before or after the AWS Glue job completes.

Resolution

Pass one of the following parameters in the AWS Glue DynamicFrameWriter class:

  • aws_iam_role: Provides authorization to access data in another AWS resource. Use this parameter with the fully specified ARN of the AWS Identity and Access Management (IAM) role that is attached to the Amazon Redshift cluster (for example, arn:aws:iam::123456789012:role/redshift_iam_role). For more information, see Authorization Parameters.
  • preactions: A semicolon-delimited list of SQL commands that are executed before the COPY command. If the commands fail, Amazon Redshift throws an exception.
  • postactions: A semicolon-delimited list of SQL commands that are executed after a successful COPY command. If the commands fail, Amazon Redshift throws an exception.
  • extracopyoptions: A list additional options to append to the Amazon Redshift COPY command when loading data (for example, TRUNCATECOLUMNS or MAXERROR).

Example scenarios

Truncate an Amazon Redshift table before inserting records in AWS Glue

Use the preactions parameter, as shown in the following Python example. Replace the following values:

  • test_red: the catalog connection to use
  • target_table: the Amazon Redshift table
  • s3://s3path: the path of the Amazon Redshift table's temporary directory
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Merge an Amazon Redshift table in AWS Glue (upsert)

Create a merge query after loading the data into a staging table, as shown in the following Python examples. Replace the following values:

  • target_table: the Amazon Redshift table
  • test_red: the catalog connection to use
  • stage_table: the Amazon Redshift staging table
  • s3://s3path: the path of the Amazon Redshift table's temporary directory
post_query="begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
        
        datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;","dbtable": "stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

For more information, see Use a Staging Table to Perform a Merge (Upsert).

Ignore invalid rows

Use the extracopyoptions parameter to specify a higher MAXERROR value, as shown in the following Python example. Replace the following values:

  • test: the catalog connection to use
  • testalblog2: the Amazon Redshift table to load data into
  • reddb: the Amazon Redshift database
  • emp1: the Amazon Redshift table to delete the data from, after the data is loaded into testalblog2
  • test_red: a catalog connection to use
  • stage_table: the Amazon Redshift staging table
  • s3://s3path: the path of the Amazon Redshift table's temporary directory
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Did this article help you?

Anything we could improve?


Need more help?