AWS Glue 작업에서 데이터 쓰기 전후에 Amazon Redshift 테이블에서 SQL 명령을 실행하려면 어떻게 해야 하나요?

4분 분량
0

Amazon Redshift 테이블로 데이터를 로드하는 AWS Glue 작업이 있습니다. AWS Glue 작업이 완료되기 전이나 후에 Amazon Redshift에 대해 SQL 명령을 실행하려고 합니다.

해결 방법

AWS Glue DynamicFrameWriter 클래스에 다음 파라미터 중 하나를 전달합니다.

  • aws_iam_role: 다른 AWS 리소스의 데이터에 액세스할 수 있는 권한을 제공합니다. Amazon Redshift 클러스터에 연결된 AWS Identity and Access Management(IAM) 역할의 완전한 ARN과 함께 이 파라미터를 사용합니다. 예를 들어 arn:aws:iam::123456789012:role/redshift_iam_role을 사용합니다. 자세한 내용은 권한 부여 파라미터를 참조하세요.
  • preactions: COPY 명령 전에 실행되는 세미콜론으로 구분된 SQL 명령 목록입니다. 명령이 실패할 경우 Amazon Redshift에서 예외를 발생시킵니다.
    참고: preaction 파라미터에는 줄 바꿈 문자를 포함할 수 없습니다.
  • postactions: COPY 명령이 성공한 후 실행되는 세미콜론으로 구분된 SQL 명령 목록입니다. 명령이 실패할 경우 Amazon Redshift에서 예외를 발생시킵니다.
    참고: postaction 파라미터에는 줄 바꿈 문자를 포함할 수 없습니다.
  • extracopyoptions: 데이터를 로드할 때 Amazon Redshift COPY 명령에 추가할 추가 옵션 목록입니다. 예를 들어 TRUNCATECOLUMNS 또는 MAXERROR를 사용할 수 있습니다.

예제 시나리오

AWS Glue에 레코드를 삽입하기 전에 Amazon Redshift 테이블 자르기

preactions 파라미터를 사용합니다.

다음 Python 예제를 참조하세요.

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

다음 Scala 예제를 참조하세요.

val options = JsonOptions(Map(
   "dbtable" -> "schema.target_table",
   "database" -> "redshiftdb",
   "preactions" -> "truncate table schema.target_table;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)

이전 예제에서 명령을 다음 값으로 교체해야 합니다.

  • test_red: 사용할 카탈로그 연결
  • schema.target_table: Amazon Redshift 데이터베이스의 스키마 및 Amazon Redshift 테이블
  • s3://s3path: Amazon Redshift 테이블의 임시 디렉터리 경로

연결 옵션에서 IAM 역할 사용

보안 인증 정보는 1시간 후에 만료되므로 연결 옵션에서 IAM 역할을 사용하여 오래 실행되는 연결이 실패하지 않도록 합니다.

다음 Python 예제를 참조하세요.

glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name",  table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})

다음 Scala 예제를 참조하세요.

val connectionOptions = JsonOptions(Map(
      "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database",
      "dbtable" -> "schema.table",
      "user" -> "redshift_user",
      "password" -> "redshift_password",
      "tempdir" -> "s3://temp_bucket/temp",
      "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" ))

val dyf = glueContext.getSource("redshift", connectionOptions)
          .getDynamicFrame()

AWS Glue에 Amazon Redshift 테이블 병합(upsert) 스테이징 테이블에 데이터를 로드한 후 병합 쿼리를 생성합니다.

참고: 병합 쿼리가 작동하려면 target_table이 Amazon Redshift 데이터베이스에 이미 존재해야 합니다.

다음 Python 예제를 참조하세요.

post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

다음 Scala 예제를 참조하세요.

val options = JsonOptions(Map(
   "dbtable" -> "schema.stage_table",
   "database" -> "redshiftdb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)

이전 예제에서 명령을 다음 값으로 교체해야 합니다.

  • schema.target_table: Amazon Redshift 데이터베이스의 스키마 및 Amazon Redshift 테이블
  • test_red: 사용할 카탈로그 연결
  • schema.stage_table: Amazon Redshift 데이터베이스의 스키마 및 Amazon Redshift 스테이징 테이블
  • s3://s3path: Amazon Redshift 테이블의 임시 디렉터리 경로

자세한 내용은 스테이징 테이블을 사용한 병합 실행(Upsert)을 참조하세요.

유효하지 않은 행 무시

extracopyoptions 파라미터를 사용하여 MAXERROR 값을 더 높게 지정합니다.

다음 Python 예제를 참조하세요.

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

다음 Scala 예제를 참조하세요.

val options = JsonOptions(Map(
   "dbtable" -> "testalblog2",
   "database" -> "reddb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "delete from emp1;",
   "extracopyoptions" -> "MAXERROR 2"
   ))
glueContext.getJDBCSink(catalogConnection = "test", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(persons_DyF)

이전 예제에서 명령을 다음 값으로 교체해야 합니다.

  • schema.target_table: Amazon Redshift 데이터베이스의 스키마 및 Amazon Redshift 테이블
  • schema.stage_table: Amazon Redshift 데이터베이스의 스키마 및 Amazon Redshift 스테이징 테이블
  • test: 사용할 카탈로그 연결
  • testalblog2: 데이터를 로드할 Amazon Redshift 테이블
  • reddb: Amazon Redshift 데이터베이스
  • emp1: 데이터를 testalblog2에 로드한 후 데이터를 삭제할 Amazon Redshift 테이블
  • s3://s3path: Amazon Redshift 테이블의 임시 디렉터리 경로

추가 정보

AWS Glue 4.0 ETL 작업을 사용하는 경우 Amazon Redshift Spark 커넥터(redshift-jdbc42-2.1.0.9)를 사용할 수 있습니다. 이 커넥터에는 다음과 같은 속성이 있습니다.

  • IAM 기반 JDBC URL을 지원합니다.
  • autopushdown, autopushdown.s3_result_cacheunload_s3_format과 같은 성능 개선 옵션이 있습니다.
  • 임시 폴더의 데이터에 사용할 수 있는 SSE_KMS 암호화 옵션이 있습니다. AWS Glue는 Amazon Redshift 테이블에서 데이터를 읽을 때 이 데이터를 사용합니다.

관련 정보

COPY

TRUNCATE

데이터 로드 작업

AWS 공식
AWS 공식업데이트됨 일 년 전