AWS Glue 작업에서 데이터 쓰기 전후에 Amazon Redshift 테이블에서 SQL 명령을 실행하려면 어떻게 해야 합니까?

최종 업데이트 날짜: 2021년 8월 16일

Amazon Redshift 테이블로 데이터를 로드하는 AWS Glue 작업이 있습니다. AWS Glue 작업이 완료되기 전이나 후에 Amazon Redshift에 대해 SQL 명령을 실행하려고 합니다.

해결 방법

AWS Glue DynamicFrameWriter 클래스에 다음 파라미터 중 하나를 전달합니다.

  • aws_iam_role: 다른 AWS 리소스의 데이터에 액세스할 수 있는 권한을 제공합니다. Amazon Redshift 클러스터에 연결된 AWS Identity and Access Management(IAM) 역할의 완전한 ARN과 함께 이 파라미터를 사용합니다(예: arn:aws:iam::123456789012:role/redshift_iam_role). 자세한 내용은 권한 부여 파라미터를 참조하세요.
  • preactions: COPY 명령 전에 실행되는 세미콜론으로 구분된 SQL 명령 목록입니다. 명령이 실패할 경우 Amazon Redshift에서 예외를 발생시킵니다.
    참고: preaction 파라미터에는 줄 바꿈 문자를 포함할 수 없습니다.
  • postactions: COPY 명령이 성공한 후 실행되는 세미콜론으로 구분된 SQL 명령 목록입니다. 명령이 실패할 경우 Amazon Redshift에서 예외를 발생시킵니다.
    참고: postaction 파라미터에는 줄 바꿈 문자를 포함할 수 없습니다.
  • extracopyoptions: 데이터를 로드할 때 Amazon Redshift COPY 명령에 추가할 추가 옵션 목록입니다(예: TRUNCATECOLUMNS 또는 MAXERROR).

예제 시나리오

AWS Glue에 레코드를 삽입하기 전에 Amazon Redshift 테이블 자르기

다음 Pyjthon 예제와 같이 preactions 파라미터를 사용합니다. 다음 값을 바꿉니다.

  • test_red: 사용할 카탈로그 연결
  • target_table: Amazon Redshift 테이블
  • s3://s3path: Amazon Redshift 테이블의 임시 디렉터리 경로
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

AWS Glue에 Amazon Redshift 테이블 병합(upsert)

다음 Python 예제와 같이 데이터를 스테이징 테이블에 로드한 후 병합 쿼리를 생성합니다. 다음 값을 바꿉니다.

  • target_table: Amazon Redshift 테이블
  • test_red: 사용할 카탈로그 연결
  • stage_table: Amazon Redshift 스테이징 테이블
  • s3://s3path: Amazon Redshift 테이블의 임시 디렉터리 경로
post_query="begin;delete from target_table using stage_table where stage_table.id = target_table.id ; insert into target_table select * from stage_table; drop table stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists stage_table;create table stage_table as select * from target_table where 1=2;","dbtable": "stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

자세한 내용은 스테이징 테이블을 사용한 병합 실행(Upsert)을 참조하세요.

유효하지 않은 행 무시

다음 Python 예제와 같이 extracopyoptions 파라미터를 사용하여 더 높은 MAXERROR 값을 지정합니다. 다음 값을 바꿉니다.

  • test: 사용할 카탈로그 연결
  • testalblog2: 데이터를 로드할 Amazon Redshift 테이블
  • reddb: Amazon Redshift 데이터베이스
  • emp1: 데이터를 testalblog2에 로드한 후 데이터를 삭제할 Amazon Redshift 테이블
  • test_red: 사용할 카탈로그 연결
  • stage_table: Amazon Redshift 스테이징 테이블
  • s3://s3path: Amazon Redshift 테이블의 임시 디렉터리 경로
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

이 문서가 도움이 되었나요?


결제 또는 기술 지원이 필요하세요?