我如何在写入数据到 AWS Glue 作业前后在 Amazon Redshift 表上执行 SQL 命令?

上次更新时间:2019 年 4 月 17 日

我有一个 AWS Glue 作业,内容是将数据加载到 Amazon Redshift 表中。我想要在 AWS Glue 作业完成前后在 Amazon Redshift 上执行 SQL 命令。

解决方法

将以下参数之一传递到 AWS Glue 中 DynamicFrameWriter class

  • aws_iam_role:提供在另一个 AWS 资源中访问数据的授权。将此参数与附加到 Amazon Redshift 集群中的 AWS Identity and Access Management (IAM) 角色的完全指定 ARN 结合使用(例如,arn:aws:iam::123456789012:role/redshift_iam_role)。有关更多信息,请参阅授权参数
  • preactions:在 COPY 命令前执行的 SQL 命令的以分号隔开的列表。如果命令失败,Amazon Redshift 会引发异常。
  • postactions:在 COPY 命令成功后执行的 SQL 命令的以分号隔开的列表。如果命令失败,Amazon Redshift 会引发异常。
  • extracopyoptions:加载数据时附加到 Amazon Redshift COPY 命令的其他选项列表(例如 TRUNCATECOLUMNS 或 MAXERROR)。

示例场景

截断一个 Amazon Redshift 表后再将记录插入到 AWS Glue 中

按下面的 Python 示例所示使用 preactions 参数。替换以下值:

  • test_red:要使用的目录连接
  • target_table:Amazon Redshift 表
  • s3://s3path:Amazon Redshift 表的临时目录路径
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

将 Amazon Redshift 表合并到 AWS Glue 中(更新插入)

将数据加载到暂存表后创建 merge query,如下面的 Python 示例所示。替换以下值:

  • target_table:Amazon Redshift 表
  • test_red:要使用的目录连接
  • stage_table:Amazon Redshift 暂存表
  • s3://s3path:Amazon Redshift 表的临时目录路径
post_query="begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
        
        datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;","dbtable": "stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

有关更多信息,请参阅使用暂存表执行合并(更新插入)

忽略无效的行

使用 extracopyoptions 参数指定较高的 MAXERROR 值,如下面的 Python 示例所示。替换以下值:

  • test:要使用的目录连接
  • testalblog2:要加载数据的 Amazon Redshift 表
  • reddb:Amazon Redshift 数据库
  • emp1:当数据加载到 testalblog2 后,要从中删除数据的 Amazon Redshift 表
  • test_red:要使用的目录连接
  • stage_table:Amazon Redshift 暂存表
  • s3://s3path:Amazon Redshift 表的临时目录路径
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

这篇文章对您有帮助吗?

您觉得我们哪些地方需要改进?


需要更多帮助?