AWS 기술 블로그

효율적인 Amazon Redshift 감사 로그 검색을 위한 데이터 파이프라인 구성

Amazon Redshift는 엑사바이트 규모의 데이터를 분석하고 복잡한 분석 쿼리를 실행하여 널리 사용되는 클라우드 데이터 웨어하우스입니다.

Amazon Redshift는 보안 및 문제 해결을 목적으로 데이터베이스를 모니터링 할 수 있도록 감사 로그를 제공하고 있습니다. 감사 로그는 Amazon Redshift의 STL 시스템 뷰를 통해  최대 7일 미만의 로그를 조회할 수 있습니다. 만약 7일 이상의 로그를 보관하기 위해서는 Amazon CloudWatchAmazon Simple Storage Service(S3)에 저장해야 합니다.

Amazon S3에 저장된 Amazon Redshift 감사 로그는 서버리스 대화형 분석 서비스인 Amazon Athena를 사용하여  상세하게 분석할 수 있습니다.

이번 게시물에서는 CSV와 텍스트 형태로 저장된 Amazon Redshift 감사 로그를 Amazon Athena에서 정규식 함수를 사용해 조회 시 발생되는 성능 저하 이슈를 해결하는 방법을 알아 보겠습니다. 그리고 Amazon Redshift 감사 로그만으로 분석하는데 한계가 있을 수 있습니다. 예를 들어 Amazon Redshift 클러스터 리소스를 많이 소비하는 사용자별 시간별 롱텀 쿼리 빈도를 분석하는 경우입니다. 다양한 정보를 제공하는 Amazon Redshift 시스템 뷰를 참조하여 Amazon Redshift 감사 로그를 강화하는 방법을 알아 보겠습니다.

Amazon Redshift 감사 로그 이해

Amazon Redshift는 3가지 종류의 감사 로그를 제공합니다.

  • Connection log(연결 로그) – 인증 시도 횟수와 연결 및 연결 해제 정보
  • User activity log(사용자 작업 로그) – 데이터베이스에서 실행된 쿼리 정보
  • User log(사용자 로그) – 데이터베이스 사용자 정의 변경 사항에 대한 정보

Amazon Redshift 감사 로그의 스키마는 관리 안내서에서 확인 할 수 있습니다.

Amazon Redshift 감사 로그를 Amazon CloudWatch에 저장할 경우 준실시간 성으로 로그들이 저장됩니다. Amazon CloudWatch Logs에서 Connection log는 Message 컬럼에 CSV(comma-separated values) 형태로 저장된 내역을 조회 할 수 있습니다.

Amazon S3에는 1시간 단위로 감사 로그들이 분리되어 파일에 저장됩니다.

Connection log가 저장된 파일 내용은 아래와 같습니다.

User activity log는 CSV 형식이 아닌 임의의 문자열로 저장 되며, 실행된 쿼리는 한 줄 또는 여러 줄에 걸쳐 표시됩니다.

데이터 파이프라인 개요

AWS Lambda 함수에서 Amazon Redshift 감사 로그의 조회 속도를 개선하기 위해 Amazon S3에 저장된 Amazon Redshift 감사 로그의 포맷을 CSV와 텍스트에서 Parquet로 변환합니다. 그리고 Amazon Redshift 감사 로그의 분석 활용도를 높이기 위해 Amazon Redshift 시스템 뷰를 참조하여 DML(Data Manipulation Language) 쿼리 수행시간, DDL(Data Definition Language) 쿼리 문을 Amazon Redshift 감사 로그에 추가해서 Amazon S3에 저장하는 데이터 파이프라인을 구성합니다.
이 데이터 파이프라인은 하나의 예시입니다. 따라서 사용자는 분석할 워크로드에 필요한 데이터들을 Amazon Redshift 시스템 뷰를 참조해서 추가하면 됩니다.

[아키텍처]

[흐름도]

  1. Amazon Redshift에서 감사 로그를 Amazon S3에 저장
  2. Amazon S3에 감사 로그가 저장될 때 AWS Lambda 함수 호출
  3. AWS Lambda 함수에서 Amazon S3에 저장된 데이터(파일)와 Amazon Redshift 시스템 뷰의 데이터를 조인하여 Amazon S3에 Parquet 형식으로 저장
  4. Amazon QuickSight에서 Amazon Athena를 통해 Amazon S3에 저장된 데이터를 조회

데이터 파이프라인 구성하기

단계1: Amazon Redshift 감사 로그 설정

Amazon Redshift 감사로그를 Amazon S3에 저장하기 위해서는 Amazon Redshift 콘솔에서 Amazon Redshift 클러스터 > Properties > Edit >  audit logging를 선택 후 아래와 같이 설정하면 됩니다.

[설정 내용]

‘Use existing bucket’을 선택하고 입력한 Amazon S3 버킷에 Amazon Redshift에서 로깅할 수 있도록 아래와 같이 정책을 추가해야 합니다. 다만 신규 버킷을 선택하면 정책이 자동으로 생성됩니다.

[버킷 정책 예시]

  {
      "Sid": "Stmt1335892150622",
      "Effect": "Allow",
      "Principal": {
          "Service": "redshift.amazonaws.com"
      },
      "Action": [
          "s3:GetBucketAcl",
          "s3:PutObject"
      ],
      "Resource": [
          "arn:aws:s3:::aws-svc-logs-4100738*****",
          "arn:aws:s3:::aws-svc-logs-4100738*****/*"
      ]
  }

User activity log를 Amazon S3에 저장하기 위해서는 Amazon Redshift 클러스터가 사용하는 파라미터 그룹내의 “enable_user_activity_logging” 값을 “true”로 변경하고 Amazon Redshift 클러스터를 재기동해야 합니다. 만약 Amazon Redshift 클러스터가 default 파라미터 그룹을 사용하고 있으면 신규로 파라미터 그룹을 생성 후 지정해야 합니다.

일정 시간이 경과 후에 Amazon S3 버킷에 아래와 같은 구조로 Amazon Redshift 감사 로그가 생성된 것을 확인할 수 있습니다.

  • AWSLogs/{AccountID}/redshift/{Region}/{Year}/{Month}/{Day}/{AccountID}_redshift_{Region}_{Redshift ClusterName}_{LogType}_{Timestamp}.gz

단계2: Amazon Athena Redshift connector 구성

Amazon Athena는 Amazon Redshift를 포함한 다양한 데이터 소스에 저장된 데이터를 조회할 수 있는 연합 쿼리(Federated Query) 기능을 제공하고 있습니다. Amazon Athena 콘솔에서 아래와 같이 AWS가 제공하는  AthenaRedshiftConnector(AWS Lambda 함수)를 이용하여 구성하면 됩니다.

“Create Lambda function”을 선택 후 AthenaRedshiftConnector를 생성합니다.

  • Application name: “AthenaRedshiftConnector”
  • SecretNamePrefix: “lab-redshift-cluster-1”
    • Amazon Redshift 클러스터에 접속하기 위하여 Amazon Redshift 클러스터의 JDBC URL에 AWS Secrets manager에 저장된 Secret(사용자명과 암호)을 사용할 경우 Secret을 입력하며, 미 사용시 임의의 값을 입력하면 됩니다. DefaultConnectionString 항목에서 사용 예제를 참고하세요.
  • SpillBucket: “yoosung-****-bucket”
    • Amazon Lambda 함수에서 6MB 한도보다 큰 데이터를 반환할 때 사용되는 Amazon S3 버킷을 입력합니다.
  • DefaultConnectionString: “${DatabaseType}://${NativeJdbcConnectionString}”
    • DefaultConnectionString 값은 접속할 Amazon Redshift 클러스터의 JDBC URL에 접속할 데이터베이스명과 사용자명 그리고 암호를 추가해서 입력합니다.
    • 예제
      • redshift://jdbc:redshift://lab-redshift-cluster-1.clnqtcolgxud.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=admin&password=Admin0000
      • redshift://jdbc:redshift://lab-redshift-cluster-1.clnqtcolgxud.ap-northeast-2.redshift.amazonaws.com:5439/dev?${secret명}
  • LambdaFunctionName: “athena-redshift-connector”
  • SecurityGroupIds: Amazon Lambda 함수가 사용할 Security group
    • Amazon Lambda 함수가 사용할 Security group을 입력하면 됩니다. 추가로 신규 생성되는 Amazon Lambda 함수가 Amazon Redshift 클러스터에 접근할 수 있도록 Amazon Redhisft 클러스터의 Security group이 설정되어 있어야 합니다.
  • SubnetIds: “subnet-08be180fcef5ef521,subnet-0ac3f4e6e5d072ae3,subnet-0c7bc719617687356,subnet-039fea92475a35b74“
    • Amazon Lambda 함수가 배포될 Subnet입니다. 본 블로그에서는 Amazon Redshift 클러스터가 배포된 Subnet ID 리스트를 입력합니다. Amazon Redshift 클러스터에 설정된 Subnet은 아래와 같이 확인 할 수 있습니다.

정상적으로 데이터 소스가 생성되면 배포 상태가 “Create complete”로 표시 됩니다.

이제 Amazon Athena에서 Amazon Athena Redshift connector 가 정상적으로 동작되는지 확인해 보겠습니다. 데이터 소스에서 AmazonRedshift를 선택 후 데이터베이스 리스트가 조회되면 정상입니다. 사용자의 Amazon Redshift 클러스터 환경에 따라 데이터베이스와 테이블 리스트가 상이할 수 있습니다.

단계 3: Amazon Redshift 감사 로그 테이블 생성

Amazon Redshift 클러스터에서 Amazon S3에 저장한 감사 로그를 Amazon Athena에서 표준 쿼리로 조회할 수 있도록 Amazon Athena의 Query editor에서 “redshift_log” 테이블을 생성합니다. 생성된 테이블 정보는 Glue data catalog에서 관리합니다.

[Redshift 감사 로그 원본 테이블]

CREATE EXTERNAL TABLE redshift_log (
  logrecord STRING
)
PARTITIONED BY (date string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ''
  ESCAPED BY '\\'
  LINES TERMINATED BY '\n'
LOCATION 's3://aws-svc-logs-4100738*****/AWSLogs/4100738*****/redshift/ap-northeast-2/'
TBLPROPERTIES (
  'projection.enabled'='true',
  'projection.date.type'='date',
  'projection.date.format'='yyyy/MM/dd',
  'projection.date.range'='NOW-1YEARS,NOW',
  'storage.location.template'='s3://aws-svc-logs-4100738*****/AWSLogs/4100738*****/redshift/ap-northeast-2/${date}/'
);

Location과 storage.location.template 값은 Redshift 감사 로그 파일이 저장된 Amazon S3 버킷명과 프리픽스로 입력합니다.

감사 로그 원본 테이블은 “yyyy/mm/dd” 구조의 파티션을 가지고 있으며, TBLPROPERTIES절에  파티션 프로젝선을 선언하여 고도로 분할된 테이블의 쿼리 처리 속도를 높이고 파티션 관리를 자동화하였습니다.

데이터 파이프라인에서 가공한 Parquet 형식의 Amazon Redshift 감사 로그들을 저장할 테이블을 생성합니다.

[connectionlog 테이블]

CREATE EXTERNAL TABLE redshift_connectionlog (
  event            string,
  recordtime       string,
  remotehost       string,
  remoteport       string,
  pid              string,
  dbname           string,
  username         string,
  authmethod       string,
  duration         string,
  sslversion       string,
  sslcipher        string,
  mtu              string,
  sslcompression   string,
  sslexpansion     string,
  iamauthguid      string,
  application_name string,
  os_version       string,
  driver_version   string,
  plugin_name      string,
  protocol_version string,
  sessionid        string,
  compression      string
)
PARTITIONED BY (date string)
STORED AS PARQUET
LOCATION 's3://aws-svc-logs-4100738*****/AWSLogs-Curated/redshift_connectionlog/'
TBLPROPERTIES (
  'classification' = 'parquet',
  'compressionType' = 'snappy',
  'projection.enabled'='true',
  'projection.date.type'='date',
  'projection.date.format'='yyyy/MM/dd',
  'projection.date.range'='NOW-1YEARS,NOW',
  'storage.location.template'='s3://aws-svc-logs-4100738*****/AWSLogs-Curated/redshift_connectionlog/${date}/'
);

[user activity log 테이블]

CREATE EXTERNAL TABLE redshift_useractivitylog (
  recordtimestamp timestamp,
  db                         string,
  user                       string,
  pid                        string,
  userid                     string,
  queryid                    string,
  xid                        string,
  durationtime               string,
  starttime                  timestamp,
  endtime                    timestamp,
  aborted                    string,
  concurrency_scaling_status string,
  query_type                 string,
  query                      string
)
PARTITIONED BY (date string)
STORED AS PARQUET
LOCATION 's3://aws-svc-logs-4100738*****/AWSLogs-Curated/redshift_useractivitylog/'
TBLPROPERTIES (
  'classification' = 'parquet',
  'compressionType' = 'snappy',
  'projection.enabled'='true',
  'projection.date.type'='date',
  'projection.date.format'='yyyy/MM/dd',
  'projection.date.range'='NOW-1YEARS,NOW',
  'storage.location.template'='s3://aws-svc-logs-4100738*****/AWSLogs-Curated/redshift_useractivitylog/${date}/'
  );

[user log 테이블]

CREATE EXTERNAL TABLE redshift_userlog (
  userid      string,
  username    string,
  oldusername string,
  action      string,
  usecreatedb string,
  usesuper    string,
  usecatupd   string,
  valuntil    string,
  pid         string,
  xid         string,
  recordtime  string,
  query       string
)
PARTITIONED BY (date string)
STORED AS PARQUET
LOCATION 's3://aws-svc-logs-4100738*****/AWSLogs-Curated/redshift_userlog/'
TBLPROPERTIES (
  'classification' = 'parquet',
  'compressionType' = 'snappy',
  'projection.enabled'='true',
  'projection.date.type'='date',
  'projection.date.format'='yyyy/MM/dd',
  'projection.date.range'='NOW-1YEARS,NOW',
  'storage.location.template'='s3://aws-svc-logs-4100738*****/AWSLogs-Curated/redshift_userlog/${date}/'
  );

단계4: Amazon Redshift 감사 로그 강화

Amazon Redshift는 시스템 작동 방식에 대한 정보가 저장되어 있는 많은  시스템 테이블과 뷰를 제공하고 있습니다. 이를 활용하면 Amazon Redshift 감사 로그를 효율적으로 분석할 수 있도록 강화 할 수 있습니다.

아래는 Amazon Redshift 클러스터 리소스를 많이 소비하는 사용자별 시간별 롱텀 쿼리 빈도를 분석하기 위해 user activity log를 강화하고, 사용자가 수행한 DDL 쿼리를 파악하기 위해 user activity log와 user log를 강화하는 활용 예제입니다.

Amazon Redshift user activity log를 강화하기 위하여 추가된 컬럼들은 아래와 같습니다.

  • queryid: Amazon Redshift stl_query 뷰의 queryid 컬럼
  • durationtime: stl_query뷰의 endtime 컬럼 – stl_query 뷰의 starttime 컬럼 (단위 초)
  • aborted: stl_query 뷰의 aborted 컬럼
  • concurrency_scaling_status: stl_query 뷰의 concurrency_scaling_status 컬럼
  • query_type: 쿼리 타입 (SELECT/INSERT/UPDATE/DELETE)
  • query: user activity log의 query 또는 stl_query 뷰의 querytxt 컬럼

user activity log의 query 컬럼이 아래와 같이 멀티 라인으로 구성된 경우에 파싱 과정에서 첫번째 라인만 인식되는 문제점을 해소하기 위하여 stl_query 뷰의 querytxt 컬럼으로 대체하였습니다.

[쿼리 문]

SELECT a.recordtimestamp,
       a.db,
       a.user,
       a.pid,
       a.userid,
       cast(b.query as varchar) as queryid,
       a.xid,
       CASE WHEN starttime is not null THEN format('%,.2f', cast(to_milliseconds(endtime - starttime) as real)/1000 )
            ELSE '' END as durationtime,
       b.starttime,
       b.endtime,
       cast(b.aborted as varchar) as aborted,
       cast(b.concurrency_scaling_status as varchar) as concurrency_scaling_status,
       CASE WHEN b.querytxt is not null
            THEN CASE WHEN regexp_extract(upper(b.querytxt), '(^| )SELECT ') is not null AND regexp_extract(upper(b.querytxt), ' FROM ') is not null
                      THEN 'SELECT'
                      ELSE  trim(regexp_extract(upper(b.querytxt), '(^| )INSERT |(^| )UPDATE |(^| )DELETE '))
                 END
            ELSE CASE WHEN regexp_extract(upper(a.query), '(^| )SELECT ') is not null AND regexp_extract(upper(a.query), ' FROM ') is not null
                      THEN 'SELECT'
                      ELSE  trim(regexp_extract(upper(a.query), '(^| )INSERT |(^| )UPDATE |(^| )DELETE '))
                 END
       END as query_type,
       CASE WHEN b.querytxt is not null THEN b.querytxt
            ELSE a.query
       END as query,
       a.date
 FROM (SELECT date_parse(regexp_extract(logrecord, '\d+-\d+-\d+T\d+:\d+:\d+Z UTC'), '%Y-%m-%dT%TZ UTC') AS recordtimestamp,
              regexp_extract(logrecord, '\[ db=(.*?) user=(.*?) pid=(\d+) userid=(\d+) xid=(\d+) \]', 1) AS db,
              regexp_extract(logrecord, '\[ db=(.*?) user=(.*?) pid=(\d+) userid=(\d+) xid=(\d+) \]', 2) AS user,
              regexp_extract(logrecord, '\[ db=(.*?) user=(.*?) pid=(\d+) userid=(\d+) xid=(\d+) \]', 3) AS pid,
              regexp_extract(logrecord, '\[ db=(.*?) user=(.*?) pid=(\d+) userid=(\d+) xid=(\d+) \]', 4) AS userid,
              regexp_extract(logrecord, '\[ db=(.*?) user=(.*?) pid=(\d+) userid=(\d+) xid=(\d+) \]', 5) AS xid,
              regexp_extract(logrecord, 'LOG: (.*)', 1) AS query,
              date
         FROM redshift_log
        WHERE date = '2023/08/17' AND
              "$path" = 's3://aws-svc-logs-4100738*****/AWSLogs/4100738*****/redshift/ap-northeast-2/2023/08/17/4100738*****_redshift_ap-northeast-2_lab-redshift-cluster-1_useractivitylog_2023-08-17T00:00.gz' AND
              date_parse(regexp_extract(logrecord, '\d+-\d+-\d+T\d+:\d+:\d+Z UTC'), '%Y-%m-%dT%TZ UTC') is not null
      ) a LEFT OUTER JOIN AmazonRedshift.pg_catalog.stl_query b
                       ON cast(a.pid as integer) = b.pid AND
                          cast(a.xid as bigint) = b.xid AND
                          substr(a.query,1,10) = substr(b.querytxt,1,10);

이 쿼리는 user activity log를 강화한 예제입니다. 쿼리를 Amazon Athena에서 실행하기 위해서는 Where절의 date와 $path 조건값을 Amazon S3에 생성된 Amazon Redshift user activity log의 S3 URI로 변경해야 합니다.

user log를 강화하기 위하여 추가된 컬럼은 아래와 같습니다.

[쿼리 문]

SELECT a.userid,     a.username,   a.oldusername, a.action, a.usecreatedb,
       a.usesuper,   a.usecatupd,  a.valuntil,    a.pid,    a.xid,
       a.recordtime, b.query_text, a.date
 FROM (SELECT regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*)', 1) AS userid,
              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*)', 2) AS username,
              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*)', 3) AS oldusername,
              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 4) AS action,
              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*)', 5) AS usecreatedb,
              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(.*)', 6) AS usesuper,
              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(\d+)\|(.*)', 7) AS usecatupd,
              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(\d+)\|(.*?)\|(.*)', 8) AS valuntil,
              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(\d+)\|(.*?)\|(\d+)\|(.*)', 9) AS pid,
              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(\d+)\|(.*?)\|(\d+)\|(\d+)\|(.*)', 10) AS xid,
              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(\d+)\|(.*?)\|(\d+)\|(\d+)\|(.*)', 11) AS recordtime,
              date
         FROM redshift_log
        WHERE date = '2023/08/21' AND
             "$path" = 's3://aws-svc-logs-4100738*****/AWSLogs/4100738*****/redshift/ap-northeast-2/2023/08/21/4100738*****_redshift_ap-northeast-2_lab-redshift-cluster-1_userlog_2023-08-21T01:00.gz'
      ) a LEFT OUTER JOIN AmazonRedshift.pg_catalog.sys_query_history b
                       ON cast(a.xid as bigint) = b.transaction_id

쿼리가 실행되기 위해서는 Where절의 date와 $path 조건값으로 Amazon S3에 생성된 Amazon Redshift user log의 S3 URI로 변경해야 합니다.

단계5: AWS Lambda 생성

Amazon S3에 Amazon Redshift 감사 로그가 저장될 때 Amazon S3에 저장된 데이터(파일)와 Amazon Redshift 시스템 뷰를 조인하여 Amazon S3에 Parquet로 저장하는 AWS Lambda 함수를 아래와 같이 생성합니다.

신규 생성한 AWS Lambda 함수의 코드를 아래와 같이 작성하고 배포합니다. 코드내의 Database는 Amazon Athena의 Database명입니다. OUTPUT은 Amazon Athena의 Query editor Settings에서 지정한 Query result location 값입니다.

[Code]

import boto3
import time

print('Loading function')

s3 = boto3.client('s3')

DATABASE = 'default'
OUTPUT = "s3://aws-athena-query-results-ap-northeast-2-4100738*****
RETRY_COUNT = 50


def lambda_handler(event, context):
    # print("Received event: " + json.dumps(event, indent=2))

    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    print("Bucket: " + bucket)
    print("Key: " + key)

    try:
        path = f"s3://{bucket}/{key}"
        temp = key.split('/')
        date_partkey = f"{temp[4]}/{temp[5]}/{temp[6]}"

        print("Path: " + path)
        print("Date: " + date_partkey)

        if "connectionlog" in path:
            insert_dml = (
                "INSERT INTO redshift_connectionlog "
                "SELECT regexp_extract(logrecord, '(.*?)\|(.*)', 1) AS event, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*)', 2) AS recordtime, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*)', 3) AS remotehost, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 4) AS remoteport, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*)', 5) AS pid, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*)', 6) AS dbname, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*)', 7) AS username, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 8) AS authmethod, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*)', 9) AS duration, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*)', 10) AS sslversion, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*)', 11) AS sslcipher, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(\d+)\|(.*)', 12) AS mtu, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*)', 13) AS sslcompression, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*)', 14) AS sslexpansion, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 15) AS iamauthguid, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 16) AS application_name, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 17) AS os_version, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 18) AS driver_version, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 19) AS plugin_name, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 20) AS protocol_version, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 21) AS sessionid, "
                "       regexp_extract(logrecord, '(.*?)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(\d+)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 22) AS compression, "
                "       date "
                "  FROM redshift_log "
                f" WHERE date = '{date_partkey}' AND "
                f"       \"$path\" = '{path}' "
            )
        elif "useractivitylog" in path:
            insert_dml = (
                "INSERT INTO redshift_useractivitylog "
                "SELECT a.recordtimestamp, "
                "       a.db, "
                "       a.user, "
                "       a.pid, "
                "       a.userid, "
                "       cast(b.query as varchar) as queryid, "
                "       a.xid, "
                "       CASE WHEN starttime is not null THEN format('%,.2f', cast(to_milliseconds(endtime - starttime) as real)/1000 )  "
                "            ELSE '' END as durationtime, "
                "       b.starttime,  "
                "       b.endtime,  "
                "       cast(b.aborted as varchar) as aborted,  "
                "       cast(b.concurrency_scaling_status as varchar) as concurrency_scaling_status, "
                "       CASE WHEN b.querytxt is not null "
                "            THEN CASE WHEN regexp_extract(upper(b.querytxt), '(^| )SELECT ') is not null AND regexp_extract(upper(b.querytxt), ' FROM ') is not null "
                "                      THEN 'SELECT' "
                "                      ELSE  trim(regexp_extract(upper(b.querytxt), '(^| )INSERT |(^| )UPDATE |(^| )DELETE ')) "
                "                 END "
                "            ELSE CASE WHEN regexp_extract(upper(a.query), '(^| )SELECT ') is not null AND regexp_extract(upper(a.query), ' FROM ') is not null "
                "                      THEN 'SELECT' "
                "                      ELSE  trim(regexp_extract(upper(a.query), '(^| )INSERT |(^| )UPDATE |(^| )DELETE '))  "
                "                 END "
                "       END as query_type, "
                "       CASE WHEN b.querytxt is not null THEN b.querytxt "
                "            ELSE a.query  "
                "       END as query, "
                "       a.date "
                " FROM (SELECT date_parse(regexp_extract(logrecord, '\d+-\d+-\d+T\d+:\d+:\d+Z UTC'), '%Y-%m-%dT%TZ UTC') AS recordtimestamp, "
                "              regexp_extract(logrecord, '\[ db=(.*?) user=(.*?) pid=(\d+) userid=(\d+) xid=(\d+) \]', 1) AS db, "
                "              regexp_extract(logrecord, '\[ db=(.*?) user=(.*?) pid=(\d+) userid=(\d+) xid=(\d+) \]', 2) AS user, "
                "              regexp_extract(logrecord, '\[ db=(.*?) user=(.*?) pid=(\d+) userid=(\d+) xid=(\d+) \]', 3) AS pid, "
                "              regexp_extract(logrecord, '\[ db=(.*?) user=(.*?) pid=(\d+) userid=(\d+) xid=(\d+) \]', 4) AS userid, "
                "              regexp_extract(logrecord, '\[ db=(.*?) user=(.*?) pid=(\d+) userid=(\d+) xid=(\d+) \]', 5) AS xid, "
                "              regexp_extract(logrecord, 'LOG: (.*)', 1) AS query,  "
                "              date  "
                "         FROM redshift_log "
                f"        WHERE date = '{date_partkey}' AND "
                f"              \"$path\" = '{path}' AND "
                "               date_parse(regexp_extract(logrecord, '\d+-\d+-\d+T\d+:\d+:\d+Z UTC'), '%Y-%m-%dT%TZ UTC') is not null "
                "      ) a LEFT OUTER JOIN AmazonRedshift.pg_catalog.stl_query b "
                "                       ON cast(a.pid as integer) = b.pid AND "
                "                          cast(a.xid as bigint) = b.xid AND "
                "                          substr(a.query,1,10) = substr(b.querytxt,1,10) "
            )
        elif "userlog" in path:
                "INSERT INTO redshift_userlog "
                "SELECT a.userid,     a.username,   a.oldusername, a.action, a.usecreatedb, "
                "       a.usesuper,   a.usecatupd,  a.valuntil,    a.pid,    a.xid, "
                "       a.recordtime, b.query_text, a.date "
                " FROM (SELECT regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*)', 1) AS userid, "
                "              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*)', 2) AS username, "
                "              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*)', 3) AS oldusername, "
                "              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(.*)', 4) AS action, "
                "              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(.*)', 5) AS usecreatedb, "
                "              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(.*)', 6) AS usesuper, "
                "              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(\d+)\|(.*)', 7) AS usecatupd, "
                "              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(\d+)\|(.*?)\|(.*)', 8) AS valuntil, "
                "              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(\d+)\|(.*?)\|(\d+)\|(.*)', 9) AS pid, "
                "              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(\d+)\|(.*?)\|(\d+)\|(\d+)\|(.*)', 10) AS xid, "
                "              regexp_extract(logrecord, '(\d+)\|(.*?)\|(.*?)\|(.*?)\|(\d+)\|(\d+)\|(\d+)\|(.*?)\|(\d+)\|(\d+)\|(.*)', 11) AS recordtime, "
                "              date "
                "         FROM redshift_log "
                f"       WHERE date = '{date_partkey}' AND "
                f"             \"$path\" = '{path}' "
                "      ) a LEFT OUTER JOIN AmazonRedshift.pg_catalog.sys_query_history b "
                "                       ON cast(a.xid as bigint) = b.transaction_id "                  else:
            raise Exception("Invalid filename")

        print(insert_dml)

        # start uery execution
        client = boto3.client('athena')
        response = client.start_query_execution(
            QueryString = insert_dml,
            QueryExecutionContext= {
                'Database': DATABASE
            },
            ResultConfiguration={
                'OutputLocation': OUTPUT,
            }
        )

        # get query execution id
        query_execution_id = response['QueryExecutionId']
        print(query_execution_id)

        # get execution status
        for i in range(1, 1 + RETRY_COUNT):
            # get query execution
            query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
            query_execution_status = query_status['QueryExecution']['Status']['State']

            if query_execution_status == 'SUCCEEDED':
                print("STATUS: " + query_execution_status)
                break

            if query_execution_status == 'FAILED':
                print("ErrorMessage: " + query_status['QueryExecution']['Status']['AthenaError']['ErrorMessage'])
                raise Exception("STATUS: " + query_execution_status)

            else:
                print("STATUS: " + query_execution_status)
                time.sleep(i)
        else:
            client.stop_query_execution(QueryExecutionId=query_execution_id)
            raise Exception('TIME OVER')

        # get query results
        result = client.get_query_results(QueryExecutionId=query_execution_id)
        print(result)

        return result
    except Exception as e:
        print(e)
        raise e

신규 생성한 AWS Lambda 함수는Amazon S3에 저장된 Amazon Redshift 감사 로그 파일의 크기에 따라 처리 속도가 상이합니다. Timeout 값을 최대 값인 15분으로 변경합니다.

신규 생성한 AWS Lambda 함수의 실행 롤에 AmazonS3FullAccess, AmazonAthenaFullAccess, CloudWatchLogsFullAccess 정책과 인라인 정책을 추가 합니다.

AWS Lambda 함수에 설정된 Role name을 선택하여 AWS IAM 콘솔로 이동 후 아래와 같이 3개의 정책을 추가하면 됩니다.

“Create inline policy”를 선택하고 Policy editor를 JSON 모드로 변경 후 아래의 정책을 추가하고 저장하면 됩니다. 정책내의 Resource 값은 단계 2에서 생성한 Athena Redshift connector의 Lambda 함수입니다.

[inline policy]

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:ap-northeast-2:41007389****:function:athena-redshift-connector"
            ]
        }
    ]
}

Amazon S3 버킷에 Amazon Redshift 감사로그 파일이 저장된 시점에 신규 생성한 AWS Lambda 함수가 호출되도록 아래와 같이 설정합니다.

단계6: Amazon Redshift 감사 로그 조회

Amazon Athena의 Query editor에서 AWS Lambda 함수에서 생성한 감사 로그를 조회 할 수 있습니다.

결론

데이터 파이프라인을 구성하여 Amazon Redshift 감사 로그를 배치(1시간)로 CSV나 텍스트 형식에서 Parquet 형식으로 변경하여 조회 성능을 개선하는 방법을 알아 보았습니다. 그리고 Amazon Redshift 시스템 뷰의 유용한 데이터를 Amazon Redshift 감사 로그에 추가하여 Amazon S3에 저장하는 방법을 알아 보았습니다.

이제 사용자는 Amazon Athena에서 Ad-Hoc 쿼리 형태로 사용자별 사용 패턴, 사용자별 리소스 사용 추이 그리고 악성 쿼리등를 조회 할 수 있습니다. 또한 Amazon QuickSight를 사용하면 강화된 Amazon Redshift 감사 로그를 다양한 관점에서 시각화 하여 공유할 수도 있습니다.

YooSung Jeon

YooSung Jeon

전유성 솔루션즈 아키텍트는 통신/공공 산업군에서 데이터 분석과 다양한 오픈소스 활용 경험을 바탕으로 DNB(Digital Native Business) 고객을 대상으로 고객의 비즈니스 성과를 달성하도록 최적의 아키텍처를 구성하는 역할을 수행하고 있습니다.