Comment analyser mes journaux d'audit à l'aide d'Amazon Redshift Spectrum ?

Date de la dernière mise à jour : 19/08/2020

Je souhaite analyser mes journaux d'audit à l'aide d'Amazon Redshift Spectrum. Comment interroger les journaux d'audit ?

Brève description

Avant de commencer à utiliser Redshift Spectrum, effectuez les tâches suivantes :

1.    Activez vos journaux d'audit.

Remarque : un certain temps peut être nécessaire pour que vos journaux d'audit apparaissent dans votre compartiment Amazon Simple Storage Service (Amazon S3).

2.    Créez un compte utilisateur AWS Identity and Access Management (IAM)

3.    Associez le rôle IAM (RoleB) à votre cluster Amazon Redshift.

Pour interroger vos journaux d'audit dans Redshift Spectrum, créez des tables externes et configurez-les pour qu’elles pointent vers un dossier commun (utilisé par vos fichiers). Ensuite, utilisez la colonne $path cachée et la fonction regex pour créer des vues, afin de générer les lignes de votre analyse.

Solution

Pour interroger vos journaux d'audit dans Redshift Spectrum, effectuez les opérations suivantes :

1.    Créez un schéma externe :

create external schema s_audit_logs 
from data catalog 
database 'audit_logs' 
iam_role 'arn:aws:iam::your_account_number:role/role_name' create external database if not exists

Remplacez votre_numéro_de_compte par votre numéro de compte. Pour nom_du_rôle, spécifiez le rôle IAM associé à votre cluster Amazon Redshift.

2.    Créez les tables externes:

create external table s_audit_logs.user_activity_log(
    logrecord varchar(max)
)
 ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '|' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://bucket_name/logs/AWSLogs/your_account_id/redshift/region'

Dans cet exemple, vous créez une table de journal d'activité utilisateur. Remplacez nom_du_compartiment, ID_de_votre_compte et région par votre nom de compartiment, à votre ID de compte et votre région.

3.    Créez une table de journal des connexions :

CREATE EXTERNAL TABLE s_audit_logs.connections_log(
  event varchar(60),  recordtime varchar(60), 
  remotehost varchar(60),  remoteport varchar(60), 
  pid int,  dbname varchar(60), 
  username varchar(60),  authmethod varchar(60), 
  duration int,  sslversion varchar(60), 
  sslcipher varchar(150),  mtu int, 
  sslcompression varchar(70),  sslexpansion varchar(70), 
  iamauthguid varchar(50),  application_name varchar(300))
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '|' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://bucket_name/logs/AWSLogs/your_account_id/redshift/region';

Remplacez nom_du_compartiment, ID_de_votre_compte et région par votre nom de compartiment, à votre ID de compte et votre région.

4.    Créez un schéma local pour afficher les journaux d'audit :

create schema audit_logs_views;

5.    Créez des vues dans une base de données (à l'aide de l'option WITH NO SCHEMA BINDY ) pour accéder aux tables externes :

CREATE VIEW audit_logs_views.v_connections_log AS
select *
FROM s_audit_logs.connections_log
WHERE "$path" like '%connectionlog%'
with no schema binding;

Les fichiers renvoyés sont restreints par la colonne $path masquée pour correspondre aux entrées du journal des connexions.

Dans l'exemple suivant, la colonne $path masquée et la fonction regex sont utilisées pour restreindre les fichiers renvoyés pour journal_connexions_v :

CREATE or REPLACE VIEW audit_logs_views.v_useractivitylog AS
SELECT    logrecord,
          substring(logrecord,2,24) as recordtime,
          replace(regexp_substr(logrecord,'db=[^" "]*'),'db=','') as db,
          replace(regexp_substr(logrecord,'user=[^" "]*'),'user=','') AS user,
          replace(regexp_substr(logrecord, 'pid=[^" "]*'),'pid=','') AS pid,
          replace(regexp_substr(logrecord, 'userid=[^" "]*'),'userid=','') AS userid,
          replace(regexp_substr(logrecord, 'xid=[^" "]*'),'xid=','') AS xid,
          replace(regexp_substr(logrecord, '][^*]*'),']','') AS query
   FROM s_audit_logs.user_activity_log
   WHERE "$path" like '%useractivitylog%'
   with no schema binding;

Les fichiers renvoyés correspondent aux entrées useractivitylog .

Remarque : il existe une limitation liée aux requêtes multi-lignes dans les journaux d'activité des utilisateurs. Par conséquent, il est recommandé d'interroger directement les enregistrements du journal des colonnes.