Comment analyser les journaux de flux Amazon VPC à l'aide d'Amazon Athena ?

Date de la dernière mise à jour : 16/04/2021

Je souhaite analyser mes journaux de flux Amazon Virtual Private Cloud (Amazon VPC) à l'aide d'Amazon Athena.

Brève description

Les journaux de flux Amazon VPC vous permettent de collecter les informations relatives au trafic IP entrant et sortant dans les interfaces réseau de votre VPC. Les journaux peuvent être utilisés pour étudier les schémas du trafic réseau et identifier les menaces et les risques sur votre réseau Amazon VPC.

Résolution

Analyser les journaux VPC à l'aide d'Athena

Pour analyser les journaux d'accès à l'aide d'Amazon Athena, procédez comme suit :

1.    Dans l'onglet Amazon Athena console query editor (Éditeur de requête de la console Amazon Athena), créez une base de données test_db_vpclogs en exécutant une commande comme celle-ci :
Important : il est recommandé de créer la base de données dans la même région AWS que celle du compartiment Amazon S3 où vous souhaitez enregistrer les journaux de flux.

CREATE DATABASE test_db_vpclogs;

Remarque : veillez à remplacer test_db_vpclogs par le nom de la base de données que vous souhaitez créer.

2.    Dans la base de données que vous avez créée, créez une table pour les journaux de flux VPC en exécutant une commande comme celle-ci. Avec cette commande, vous pouvez créer une table, la partitionner et remplir les partitions automatiquement en fonction de votre cas d'utilisation à l'aide de la projection de partitions.

CREATE EXTERNAL TABLE IF NOT EXISTS test_table_vpclogs (
version int,
account string,
interfaceid string,
sourceaddress string,
destinationaddress string,
sourceport int,
destinationport int,
protocol int,
numpackets int,
numbytes bigint,
starttime int,
endtime int,
action string,
logstatus string,
vpcid string,
subnetid string,
instanceid string,
tcpflags int,
type string,
pktsrcaddr string,
pktdstaddr string,
region string,
azid string,
sublocationtype string,
sublocationid string,
pktsrcawsservice string,
pktdstawsservice string,
flowdirection string,
trafficpath string
)
PARTITIONED BY (region string, day string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LOCATION 's3://awsexamplebucket/awsexampleprefix/awsexamplelogs/1111222233334444/vpcflowlogs/test_region_code/'
TBLPROPERTIES
(
"skip.header.line.count"="1",
"projection.enabled" = "true",
"projection.region.type" = "enum",
"projection.region.values" = "us-east-1,us-west-2,ap-south-1,eu-west-1",
"projection.day.type" = "date",
"projection.day.range" = "2021/01/01,NOW",
"projection.day.format" = "yyyy/MM/dd",
"storage.location.template" = "s3://awsexamplebucket/awsexampleprefix/awsexamplelogs/1111222233334444/vpcflowlogs/test_region_code/${region}/${day}"
)

Veillez à effectuer les opérations suivantes :

  • Remplacez test_table_vpclogs dans la requête par le nom de votre table.
  • Modifiez le paramètre LOCATION (EMPLACEMENT) de la requête pour qu'il pointe vers le compartiment Amazon S3 contenant vos données de journal.

Remarque : Athena projette la partition même en l'absence de partition projetée dans Amazon S3. Il est recommandé d'utiliser des attributs partitionnés dans vos requêtes.

3.    Utilisez l'éditeur de requête sur la console pour exécuter des instructions SQL sur la table. Vous pouvez enregistrer les requêtes, afficher les requêtes précédentes ou télécharger les résultats de requêtes au format CSV.

Exemples de requêtes

Remarque : remplacez test_table_vpclogs dans les requêtes par le nom de la table que vous avez créée. Modifiez les valeurs de colonnes et d'autres variables en fonction de votre requête.

1.    Pour afficher les 100 premières entrées du journal d'accès dans l'ordre chronologique pendant une certaine période, exécutez une requête comme celle-ci :

SELECT * FROM test_table_vpclogs WHERE day >= '2021/02/01' AND day < '2021/02/28' ORDER BY time ASC LIMIT 100;

2.    Pour afficher le serveur qui reçoit les dix premiers paquets HTTP pendant une période donnée, exécutez une requête comme celle-ci :

SELECT SUM(numpackets) AS
  packetcount,
  destinationaddress
FROM test_table_vpclogs
WHERE destinationport = 443 AND day >= '2021/03/01' AND day < '2021/03/31'
GROUP BY destinationaddress
ORDER BY packetcount DESC
LIMIT 10;

Cette requête compte le nombre de paquets reçus sur le port HTTPS 443, les regroupe par adresse IP de destination et renvoie les 10 premières entrées de la semaine précédente.

3.    Pour vérifier les journaux créés pendant une période donnée, exécutez une requête comme celle-ci :

SELECT interfaceid, sourceaddress, action, protocol, to_iso8601(from_unixtime(starttime))
AS start_time, to_iso8601(from_unixtime(endtime))
AS end_time
FROM test_table_vpclogs
WHERE day >= '2021/04/01' AND day < '2021/04/30';

Cette requête renvoie les journaux créés entre le 04/12/2020 à 11:28:19.000 et le 04/12/2020 à 11:28:33.000.

4.    Pour afficher les journaux d'accès d'une adresse IP source spécifique entre des périodes données, exécutez une requête comme celle-ci :

SELECT * FROM test_table_vpclogs WHERE sourceaddress= '10.117.1.22' AND day >= '2021/02/01' AND day < '2021/02/28';

5.    Pour répertorier les connexions TCP rejetées, exécutez une requête comme celle-ci :

SELECT day_of_week(date) AS
day,
date,
interfaceid,
sourceaddress,
action,
protocol
FROM test_table_vpclogs
WHERE action = 'REJECT' AND protocol = 6 AND day >= '2021/02/01' AND day < '2021/02/28' LIMIT 10;

6.    Pour afficher les journaux d'accès de la plage d'adresses IP commençant par « 10.117 », exécutez une requête comme celle-ci :

SELECT * FROM test_table_vpclogs WHERE split_part(sourceaddress,’.’, 1)=’10’ AND split_part(sourceaddress,’.’, 2) =‘117’;

7.    Pour afficher les journaux d'accès d'une adresse IP de destination spécifique dans une période donnée, exécutez une requête comme celle-ci :

SELECT * FROM test_table_vpclogs WHERE destinationaddress= '10.0.1.14' AND day >= '2021/01/01' AND day < '2021/01/31';