在 AWS Glue 作业中写入数据之前或之后如何在 Amazon Redshift 表上运行 SQL 命令?

3 分钟阅读
0

我有一个 AWS Glue 作业,内容是将数据加载到 Amazon Redshift 表中。我想要在 AWS Glue 任务完成前后在 Amazon Redshift 上运行 SQL 命令。

解决方法

将以下参数之一传递到 AWS Glue 中 DynamicFrameWriter class

  • **aws_iam_role:**提供在另一个 AWS 资源中访问数据的授权。将此参数与附加到 Amazon Redshift 集群 AWS Identity and Access Management(IAM)角色中的完全指定 ARN 结合使用。例如,使用 arn:aws:iam::123456789012:role/redshift_iam_role。有关更多信息,请参阅授权参数
  • **preactions:**在 COPY 命令前运行的 SQL 命令的以分号隔开的列表。如果命令失败,Amazon Redshift 会引发异常。
    **注意:**请确保 preaction 参数不包含换行符。
  • **postactions:**在 COPY 命令成功后运行的 SQL 命令的分号分隔列表。如果命令失败,Amazon Redshift 会引发异常。
    **注意:**请确保 postaction 参数不包含换行符。
  • **extracopyoptions:**加载数据时附加到 Amazon Redshift COPY 命令的其他选项列表。例如,您可以使用 TRUNCATECOLUMNS 或 MAXERROR。

示例场景

截断一个 Amazon Redshift 表后再将记录插入到 AWS Glue 中

使用 preactions 参数。

参见以下 Python 示例:

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

参见以下 Scala 示例:

val options = JsonOptions(Map(
   "dbtable" -> "schema.target_table",
   "database" -> "redshiftdb",
   "preactions" -> "truncate table schema.target_table;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)

在这些示例中,请务必替换以下值:

  • **test_red:**要使用的目录连接
  • **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表
  • **s3://s3path:**Amazon Redshift 表的临时目录路径

在连接选项中使用 IAM 角色

由于凭证会在 1 小时后过期,因此请在连接选项中使用 IAM 角色来防止长时间运行导致的连接失败。

参见以下 Python 示例:

glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name",  table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})

参见以下 Scala 示例:

val connectionOptions = JsonOptions(Map(
      "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database",
      "dbtable" -> "schema.table",
      "user" -> "redshift_user",
      "password" -> "redshift_password",
      "tempdir" -> "s3://temp_bucket/temp",
      "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" ))

val dyf = glueContext.getSource("redshift", connectionOptions)
          .getDynamicFrame()

将 Amazon Redshift 表合并到 AWS Glue 中(更新插入) 将数据加载到暂存表后创建合并查询

**注意:**要使合并查询生效,target_table 必须已经存在于 Amazon Redshift 数据库中。

参见以下 Python 示例:

post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

参见以下 Scala 示例:

val options = JsonOptions(Map(
   "dbtable" -> "schema.stage_table",
   "database" -> "redshiftdb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)

在这些示例中,请务必替换以下值:

  • **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表
  • **test_red:**要使用的目录连接
  • **schema.stage_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 暂存表
  • **s3://s3path:**Amazon Redshift 表的临时目录路径

有关更多信息,请参阅使用暂存表执行合并(更新插入)

忽略无效的行

使用 extracopyoptions 参数指定更高的 MAXERROR 值。

参见以下 Python 示例:

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

参见以下 Scala 示例:

val options = JsonOptions(Map(
   "dbtable" -> "testalblog2",
   "database" -> "reddb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "delete from emp1;",
   "extracopyoptions" -> "MAXERROR 2"
   ))
glueContext.getJDBCSink(catalogConnection = "test", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(persons_DyF)

在这些示例中,请务必替换以下值:

  • **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表
  • **schema.stage_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 暂存表
  • **test:**要使用的目录连接
  • **testalblog2:**要负载数据的 Amazon Redshift 表
  • **reddb:**Amazon Redshift 数据库
  • **emp1:**当数据加载到 testalblog2 后,要从中删除数据的 Amazon Redshift 表
  • **s3://s3path:**Amazon Redshift 表的临时目录路径

其他信息

在使用 AWS Glue 4.0 ETL 作业时,您可以使用 Amazon Redshift Spark 连接器(redshift-jdbc42-2.1.0.9)。此连接器具有以下属性:

  • 支持基于 IAM 的 JDBC URL。
  • 包括性能改进选项,例如 autopushdownautopushdown.s3_result_cacheunload_s3_format
  • 包括可用于临时文件夹中的数据 SSE_KMS 加密选项。AWS Glue 在读取 Amazon Redshift 表时使用此数据。

相关信息

COPY

TRUNCATE

数据负载运营

AWS 官方
AWS 官方已更新 1 年前