在 AWS Glue 作业中写入数据之前或之后如何在 Amazon Redshift 表上运行 SQL 命令?
上次更新日期:2022 年 10 月 27 日
我有一个 AWS Glue 作业,内容是将数据加载到 Amazon Redshift 表中。我想要在 AWS Glue 任务完成前后在 Amazon Redshift 上运行 SQL 命令。
解决方法
将以下参数之一传递到 AWS Glue 中 DynamicFrameWriter class:
- aws_iam_role:提供在另一个 AWS 资源中访问数据的授权。将此参数与附加到 Amazon Redshift 集群中的 AWS Identity and Access Management (IAM) 角色的完全指定 ARN 结合使用(例如,arn:aws:iam::123456789012:role/redshift_iam_role)。有关更多信息,请参阅授权参数。
- preactions:在 COPY 命令前运行的 SQL 命令的以分号隔开的列表。如果命令失败,Amazon Redshift 会引发异常。
注意:请确保 preaction 参数不包含任何换行符。 - postactions:在 COPY 命令成功后运行的 SQL 命令的以分号隔开的列表。如果命令失败,Amazon Redshift 会引发异常。
注意:请确保 postaction 参数不包含任何换行符。 - extracopyoptions:负载数据时附加到 Amazon Redshift COPY 命令的其他选项列表(例如 TRUNCATECOLUMNS 或 MAXERROR)。
示例场景
截断一个 Amazon Redshift 表后再将记录插入到 AWS Glue 中
使用 preactions 参数。
Python 示例:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 示例:
val options = JsonOptions(Map(
"dbtable" -> "target_table",
"database" -> "redshiftdb",
"preactions" -> "truncate table target_table;"
))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)
替换以下值:
- test_red:要使用的目录连接
- target_table:Amazon Redshift 表
- s3://s3path:Amazon Redshift 表的临时目录路径
将 Amazon Redshift 表合并到 AWS Glue 中(更新插入)
将数据加载到暂存表后创建合并查询。
Python 示例:
post_query="begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;","dbtable": "stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 示例:
val options = JsonOptions(Map(
"dbtable" -> "stage_table",
"database" -> "redshiftdb",
"preactions" -> "drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;",
"postactions" -> "begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)
替换以下值:
- target_table:Amazon Redshift 表
- test_red:要使用的目录连接
- stage_table:Amazon Redshift 暂存表
- s3://s3path:Amazon Redshift 表的临时目录路径
有关更多信息,请参阅使用暂存表执行合并(更新插入)。
忽略无效的行
使用 extracopyoptions 参数指定更高的 MAXERROR 值。
Python 示例:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 示例:
val options = JsonOptions(Map(
"dbtable" -> "testalblog2",
"database" -> "reddb",
"preactions" -> "drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;",
"postactions" -> "delete from emp1;",
"extracopyoptions" -> "MAXERROR 2"
))
glueContext.getJDBCSink(catalogConnection = "test", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(persons_DyF)
替换以下值:
- test:要使用的目录连接
- testalblog2:要负载数据的 Amazon Redshift 表
- reddb:Amazon Redshift 数据库
- emp1:当数据加载到 testalblog2 后,要从中删除数据的 Amazon Redshift 表
- s3://s3path:Amazon Redshift 表的临时目录路径