如何使用 Redshift Spectrum 分析我的审计日志?

2 分钟阅读
0

我想使用 Amazon Redshift Spectrum 来分析我的审计日志。

简短描述

在使用 Redshift Spectrum 之前,请完成以下任务:

1.    启用您的审计日志

**注意:**您的审计日志可能需要一些时间才能出现在您的 Amazon Simple Storage Service(Amazon S3)桶中。

2.    创建一个 AWS Identity and Access Management(IAM)角色

3.    将 IAM 角色关联到您的 Amazon Redshift 集群

要在 Redshift Spectrum 中查询您的审计日志,请创建外部表,然后将其配置为指向(您的文件使用的)公共文件夹。然后,使用隐藏的 $path 列和正则表达式函数创建视图,生成供分析所用的行。

解决方法

要在 Redshift Spectrum 中查询您的审计日志,请按照以下步骤进行操作:

1.    创建外部架构

create external schema s_audit_logs
from data catalog
database 'audit_logs'
iam_role 'arn:aws:iam::your_account_number:role/role_name' create external database if not exists

your_account_number 替换为您的真实账号。对于 role_name,请指定连接到您的 Amazon Redshift 集群的 IAM 角色。

2.    创建外部表

**注意:**在以下示例中,请将 bucket_nameyour_account_idregion 替换为您的桶名称、账户 ID 和 AWS 区域。

创建用户活动日志表:

create external table s_audit_logs.user_activity_log(
	logrecord varchar(max)
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS INPUTFORMAT
	'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
	'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
	's3://bucket_name/logs/AWSLogs/your_account_id/redshift/region'

创建连接日志表:

CREATE EXTERNAL TABLE s_audit_logs.connections_log(
	event varchar(60), recordtime varchar(60),
	remotehost varchar(60), remoteport varchar(60),
	pid int, dbname varchar(60),
	username varchar(60), authmethod varchar(60),
	duration int, sslversion varchar(60),
	sslcipher varchar(150), mtu int,
	sslcompression varchar(70), sslexpansion varchar(70),
	iamauthguid varchar(50), application_name varchar(300))
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS INPUTFORMAT
	'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
	'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://bucket_name/logs/AWSLogs/your_account_id/redshift/region';

创建用户日志表:

create external table s_audit_log.user_log(
	userid varchar(255),
	username varchar(500),
	oldusername varchar(500),
	usecreatedb varchar(50),
	usesuper varchar(50),
	usecatupd varchar(50),
	valuntil varchar(50),
	pid varchar(50),
	xid varchar(50),
	recordtime varchar(50))
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS INPUTFORMAT
	'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
	'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://bucket_name/logs/AWSLogs/your_account_id/redshift/region’;

3.    创建本地架构以查看审计日志:

create schema audit_logs_views

4.    要访问外部表,请使用无架构绑定选项在数据库中创建视图

CREATE VIEW audit_logs_views.v_connections_log AS
select *
FROM s_audit_logs.connections_log
WHERE "$path" like '%connectionlog%'
with no schema binding;

返回的文件受限于隐藏的 $path 列,以匹配 connectionlog 条目。

在以下示例中,隐藏的 $path 列和正则表达式函数限制了为 v_connections_log 返回的文件:

CREATE or REPLACE VIEW audit_logs_views.v_useractivitylog AS
SELECT    logrecord,
          substring(logrecord,2,24) as recordtime,
          replace(regexp_substr(logrecord,'db=[^" "]*'),'db=','') as db,
          replace(regexp_substr(logrecord,'user=[^" "]*'),'user=','') AS user,
          replace(regexp_substr(logrecord, 'pid=[^" "]*'),'pid=','') AS pid,
          replace(regexp_substr(logrecord, 'userid=[^" "]*'),'userid=','') AS userid,
          replace(regexp_substr(logrecord, 'xid=[^" "]*'),'xid=','') AS xid,
          replace(regexp_substr(logrecord, '][^*]*'),']','') AS query
   FROM s_audit_logs.user_activity_log
   WHERE "$path" like '%useractivitylog%'
   with no schema binding;

返回的文件与 useractivitylog 条目相匹配。

**注意:**存在与用户活动日志中的多行查询有关的限制。最佳做法是直接查询列日志记录。

相关信息

使用 Amazon Redshift Spectrum 分析数据库审计日志的安全性和合规性

STL_CONNECTION_LOG

AWS 官方
AWS 官方已更新 1 年前