在 AWS Glue 作业中写入数据之前或之后如何在 Amazon Redshift 表上运行 SQL 命令?
3 分钟阅读
0
我有一个 AWS Glue 作业,内容是将数据加载到 Amazon Redshift 表中。我想要在 AWS Glue 任务完成前后在 Amazon Redshift 上运行 SQL 命令。
解决方法
将以下参数之一传递到 AWS Glue 中 DynamicFrameWriter class:
- **aws_iam_role:**提供在另一个 AWS 资源中访问数据的授权。将此参数与附加到 Amazon Redshift 集群 AWS Identity and Access Management(IAM)角色中的完全指定 ARN 结合使用。例如,使用 arn:aws:iam::123456789012:role/redshift_iam_role。有关更多信息,请参阅授权参数。
- **preactions:**在 COPY 命令前运行的 SQL 命令的以分号隔开的列表。如果命令失败,Amazon Redshift 会引发异常。
**注意:**请确保 preaction 参数不包含换行符。 - **postactions:**在 COPY 命令成功后运行的 SQL 命令的分号分隔列表。如果命令失败,Amazon Redshift 会引发异常。
**注意:**请确保 postaction 参数不包含换行符。 - **extracopyoptions:**加载数据时附加到 Amazon Redshift COPY 命令的其他选项列表。例如,您可以使用 TRUNCATECOLUMNS 或 MAXERROR。
示例场景
截断一个 Amazon Redshift 表后再将记录插入到 AWS Glue 中
使用 preactions 参数。
参见以下 Python 示例:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
参见以下 Scala 示例:
val options = JsonOptions(Map( "dbtable" -> "schema.target_table", "database" -> "redshiftdb", "preactions" -> "truncate table schema.target_table;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)
在这些示例中,请务必替换以下值:
- **test_red:**要使用的目录连接
- **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表
- **s3://s3path:**Amazon Redshift 表的临时目录路径
在连接选项中使用 IAM 角色
由于凭证会在 1 小时后过期,因此请在连接选项中使用 IAM 角色来防止长时间运行导致的连接失败。
参见以下 Python 示例:
glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name", table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})
参见以下 Scala 示例:
val connectionOptions = JsonOptions(Map( "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database", "dbtable" -> "schema.table", "user" -> "redshift_user", "password" -> "redshift_password", "tempdir" -> "s3://temp_bucket/temp", "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" )) val dyf = glueContext.getSource("redshift", connectionOptions) .getDynamicFrame()
将 Amazon Redshift 表合并到 AWS Glue 中(更新插入) 将数据加载到暂存表后创建合并查询。
**注意:**要使合并查询生效,target_table 必须已经存在于 Amazon Redshift 数据库中。
参见以下 Python 示例:
post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
参见以下 Scala 示例:
val options = JsonOptions(Map( "dbtable" -> "schema.stage_table", "database" -> "redshiftdb", "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;", "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)
在这些示例中,请务必替换以下值:
- **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表
- **test_red:**要使用的目录连接
- **schema.stage_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 暂存表
- **s3://s3path:**Amazon Redshift 表的临时目录路径
有关更多信息,请参阅使用暂存表执行合并(更新插入)。
忽略无效的行
使用 extracopyoptions 参数指定更高的 MAXERROR 值。
参见以下 Python 示例:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
参见以下 Scala 示例:
val options = JsonOptions(Map( "dbtable" -> "testalblog2", "database" -> "reddb", "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;", "postactions" -> "delete from emp1;", "extracopyoptions" -> "MAXERROR 2" )) glueContext.getJDBCSink(catalogConnection = "test", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(persons_DyF)
在这些示例中,请务必替换以下值:
- **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表
- **schema.stage_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 暂存表
- **test:**要使用的目录连接
- **testalblog2:**要负载数据的 Amazon Redshift 表
- **reddb:**Amazon Redshift 数据库
- **emp1:**当数据加载到 testalblog2 后,要从中删除数据的 Amazon Redshift 表
- **s3://s3path:**Amazon Redshift 表的临时目录路径
其他信息
在使用 AWS Glue 4.0 ETL 作业时,您可以使用 Amazon Redshift Spark 连接器(redshift-jdbc42-2.1.0.9)。此连接器具有以下属性:
- 支持基于 IAM 的 JDBC URL。
- 包括性能改进选项,例如 autopushdown、autopushdown.s3_result_cache 和 unload_s3_format。
- 包括可用于临时文件夹中的数据 SSE_KMS 加密选项。AWS Glue 在读取 Amazon Redshift 表时使用此数据。
相关信息
没有评论
相关内容
- AWS 官方已更新 1 年前
- AWS 官方已更新 2 年前