在 AWS Glue 作业中写入数据之前或之后如何在 Amazon Redshift 表上运行 SQL 命令?

上次更新日期:2022 年 10 月 27 日

我有一个 AWS Glue 作业,内容是将数据加载到 Amazon Redshift 表中。我想要在 AWS Glue 任务完成前后在 Amazon Redshift 上运行 SQL 命令。

解决方法

将以下参数之一传递到 AWS Glue 中 DynamicFrameWriter class

  • aws_iam_role:提供在另一个 AWS 资源中访问数据的授权。将此参数与附加到 Amazon Redshift 集群中的 AWS Identity and Access Management (IAM) 角色的完全指定 ARN 结合使用(例如,arn:aws:iam::123456789012:role/redshift_iam_role)。有关更多信息,请参阅授权参数
  • preactions:在 COPY 命令前运行的 SQL 命令的以分号隔开的列表。如果命令失败,Amazon Redshift 会引发异常。
    注意:请确保 preaction 参数不包含任何换行符。
  • postactions:在 COPY 命令成功后运行的 SQL 命令的以分号隔开的列表。如果命令失败,Amazon Redshift 会引发异常。
    注意:请确保 postaction 参数不包含任何换行符。
  • extracopyoptions:负载数据时附加到 Amazon Redshift COPY 命令的其他选项列表(例如 TRUNCATECOLUMNS 或 MAXERROR)。

示例场景

截断一个 Amazon Redshift 表后再将记录插入到 AWS Glue 中

使用 preactions 参数。

Python 示例:

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala 示例:

val options = JsonOptions(Map(
   "dbtable" -> "target_table",
   "database" -> "redshiftdb",
   "preactions" -> "truncate table target_table;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)

替换以下值:

  • test_red:要使用的目录连接
  • target_table:Amazon Redshift 表
  • s3://s3path:Amazon Redshift 表的临时目录路径

将 Amazon Redshift 表合并到 AWS Glue 中(更新插入)

将数据加载到暂存表后创建合并查询

Python 示例:

post_query="begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;","dbtable": "stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala 示例:

val options = JsonOptions(Map(
   "dbtable" -> "stage_table",
   "database" -> "redshiftdb",
   "preactions" -> "drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;",
   "postactions" -> "begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)

替换以下值:

  • target_table:Amazon Redshift 表
  • test_red:要使用的目录连接
  • stage_table:Amazon Redshift 暂存表
  • s3://s3path:Amazon Redshift 表的临时目录路径

有关更多信息,请参阅使用暂存表执行合并(更新插入)

忽略无效的行

使用 extracopyoptions 参数指定更高的 MAXERROR 值。

Python 示例:

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala 示例:

val options = JsonOptions(Map(
   "dbtable" -> "testalblog2",
   "database" -> "reddb",
   "preactions" -> "drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;",
   "postactions" -> "delete from emp1;",
   "extracopyoptions" -> "MAXERROR 2"
   ))
glueContext.getJDBCSink(catalogConnection = "test", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(persons_DyF)

替换以下值:

  • test:要使用的目录连接
  • testalblog2:要负载数据的 Amazon Redshift 表
  • reddb:Amazon Redshift 数据库
  • emp1:当数据加载到 testalblog2 后,要从中删除数据的 Amazon Redshift 表
  • s3://s3path:Amazon Redshift 表的临时目录路径

这篇文章对您有帮助吗?


您是否需要账单或技术支持?