AWS Big Data Blog

Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight

Many business and operational processes require you to analyze large volumes of frequently updated data. Log analysis, for example, involves querying and visualizing large volumes of log data to identify behavioral patterns, understand application processing flows, and investigate and diagnose issues.

VPC flow logs capture information about the IP traffic going to and from network interfaces in VPCs in the Amazon VPC service. The logs allow you to investigate network traffic patterns and identify threats and risks across your VPC estate. Flow log data is stored using Amazon CloudWatch Logs. After you’ve created a flow log, you can view and retrieve its data in Amazon CloudWatch Logs.

Flow logs can help you with a number of tasks. For example, you can use them to troubleshoot why specific traffic is not reaching an instance, which in turn can help you diagnose overly restrictive security group rules. You can also use flow logs as a security tool to monitor the traffic that is reaching your instance.

This blog post shows how to build a serverless architecture by using Amazon Kinesis Firehose, AWS Lambda, Amazon S3, Amazon Athena, and Amazon QuickSight to collect, store, query, and visualize flow logs. In building this solution, you will also learn how to implement Athena best practices with regard to compressing and partitioning data so as to reduce query latencies and drive down query costs.

Summary of the solution

The solution described here is divided into three parts:

  • Send VPC Flow Logs to S3 for Analysis with Athena. This section describes how to use Lambda and Firehose to publish flow log data to S3, and how to create a table in Athena so that you can query this data.
  • Visualize Your Logs in QuickSight. Here you’ll learn how to use QuickSight and its Athena connector to build flow log analysis dashboards that you can share with other users in your organization.
  • Partition Your Data in Athena for Improved Query Performance and Reduced Costs. This section shows how you can use a Lambda function to automatically partition Athena data as it arrives in S3. This function will work with any Firehose stream and any other delivery mechanism that writes data to S3 using a year/month/day/hour prefix.

Partitioning your data is one of three strategies for improving Athena query performance and reducing costs. The other two are compressing your data, and converting it into columnar formats such as Apache Parquet. The solution described here automatically compresses your data, but it doesn’t convert it into a columnar format. Even if you don’t convert your data to a columnar format, as is the case here, it’s always worth compressing and partitioning it. For any large-scale solution, you should also consider converting it to Parquet.

Serverless Architecture for Analyzing VPC Flow Logs

Below is a diagram showing how the various services work together.

VPC_Flowlogs_Ian_Ben

When you create a flow log for a VPC, the log data is published to a log group in CloudWatch Logs. By using a CloudWatch Logs subscription, you can send a real-time feed of these log events to a Lambda function that uses Firehose to write the log data to S3.

Once the flow log data starts arriving in S3, you can write ad hoc SQL queries against it using Athena. For users that prefer to build dashboards and interactively explore the data in a visual manner, QuickSight allows you to easily build rich visualizations on top of Athena.

Send VPC Flow Logs to S3 for Analysis with Athena

In this section, we’ll describe how to send flow log data to S3 so that you can query it with Athena. The examples here use the us-east-1 region, but any region containing both Athena and Firehose can be used.

Create the Firehose delivery stream

Follow the steps described here to create a Firehose delivery stream with a new or existing S3 bucket as the destination. Keep most of the default settings, but select an AWS Identity and Access Management (IAM) role that has write access to your S3 bucket and specify GZIP compression. Name the delivery stream ‘VPCFlowLogsDefaultToS3’.

Create a VPC flow log

First, follow these steps to turn on VPC flow logs for your default VPC.

Create an IAM role for Lambda to write to Firehose

Before you create a Lambda function to deliver logs to Firehose, you need to create an IAM role that allows Lambda to write batches of records to Firehose. Create a role named ‘lambda_kinesis_exec_role’ by following the steps below.

First, embed the following inline access policy.

 

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:*:*:deliverystream/VPCFlowLogsDefaultToS3"
            ]
        }
    ]
}

Then, attach the following trust relationship to enable Lambda to assume this role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

Create a Lambda Function to deliver CloudWatch Logs to Firehose

To create a Lambda function for delivering log events from CloudWatch to your ‘VPCFlowLogsDefaultToS3’ Firehose delivery stream, do the following:

  1. From the Lambda console, create a new Lambda function and select Blank Function.
  2. Choose Next when asked to configure a trigger.
  3. On the Configure function page, name the function ‘VPCFlowLogsToFirehose’.

Select the Python run-time, and copy this code from GitHub into the code pane.

Add an environment variable named DELIVERY_STREAM_NAME whose value is the name of the delivery stream created in the first step of this walk-through (‘VPCFlowLogsDefaultToS3’):

o_vpc-flow_2

  1. Specify the ‘lambda_kinesis_exec_role’ you created in the previous step, and set the timeout to one minute.

Create a CloudWatch subscription for your Lambda function

Within CloudWatch Logs, take the following steps:

  1. Choose the log group for your VPC flow logs (you might need to wait a few minutes for the log group to show up if the flow logs were just created).
  2. For Actions, choose Stream to AWS Lambda.

o_vpc-flow_3

  1. Select the ‘VPCFlowLogsToFirehose’ Lambda function that was created in the previous step.
  2. For the format, choose Amazon VPC Flow Logs.

o_vpc-flow_4-1

Create an external table in Athena

Amazon Athena allows you to query data in S3 using standard SQL without having to provision or manage any infrastructure. Athena works with a variety of common data formats, including CSV, JSON, Parquet, and ORC, so there’s no need to transform your data prior to querying it. You simply define your schema, and then run queries using the query editor in the AWS Management Console or programmatically using the Athena JDBC driver.

Athena stores your database and table definitions in a data catalog compatible with the Hive metastore. For this example, you’ll create a single table definition over your flow log files. The DDL for this table is specified later in this section. Before executing this DDL, take note of the following:

  • You will need to replace <bucket_and_prefix> with the name of the Firehose destination for your flow log data (including the prefix).
  • The CREATE TABLE definition includes the EXTERNAL keyword. If you omit this keyword, Athena will return an error. EXTERNAL ensures that the table metadata is stored in the data catalog without impacting the underlying data stored on S3. If you drop an external table, the table metadata is deleted from the catalog, but your data remains in S3.
  • The columns for the vpc_flow_logs table map to the fields in a flow log record. Flow log records comprise space-separated strings. To parse the fields from each record, Athena uses a serializer-deserializer class, or SerDe, which is a custom library that tells Athena how to handle your data.
  • The DDL specified here uses a regular expression SerDe to parse the space-separated flow log records. The regular expression itself is supplied using the “input.regex” SerDe property.

In the Athena query editor, enter the DDL below, and choose Run Query.

CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs (
Version INT,
Account STRING,
InterfaceId STRING,
SourceAddress STRING,
DestinationAddress STRING,
SourcePort INT,
DestinationPort INT,
Protocol INT,
Packets INT,
Bytes INT,
StartTime INT,
EndTime INT,
Action STRING,
LogStatus STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
    "input.regex" = "^([^ ]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([0-9]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)$") 
LOCATION 's3://<bucket_and_prefix>/';

 

Query your data in Athena

After creating the table, you should be able to select the eye icon next to the table name to see a sample set of rows.

o_vpc-flow_5

You can easily run various queries to investigate your flow logs. Here is an example that gets the top 25 source IPs for rejected traffic:

 

SELECT sourceaddress, count(*) cnt
FROM vpc_flow_logs
WHERE action = 'REJECT'
GROUP BY sourceaddress
ORDER BY cnt desc
LIMIT 25;

o_vpc-flow_6

Visualize Your Logs in QuickSight

QuickSight allows you to visualize your Athena tables with a few simple clicks. You can sign up for QuickSight using your AWS account and get 1 user and 1 GB of SPICE capacity for free.

Before connecting QuickSight to Athena, make sure to grant QuickSight access to Athena and the associated S3 buckets in your account as described here. You can then create a new data set in QuickSight based on the Athena table you created.

Log into QuickSight and choose Manage data, New data set. Choose Athena as a new data source.

o_vpc-flow_7

Name your data source “AthenaDataSource”. Select the default schema and the vpc_flow_logs table.

o_vpc-flow_8

Choose Edit/Preview data. For starttime and endtime, set the data format as a date rather than  a number. These two fields represent the start and end times of the capture window for the flow logs and come into the system as Unix seconds timestamps.

o_vpc-flow_9

Now select Save and visualize.

Let’s look at the start times for the different capture windows and the amount of bytes that were sent. We’ll do this by selecting StartTime and Bytes from the field list. Notice QuickSight will automatically display a time chart with the amount of traffic. You can easily change the date parameter to set different time granularities.
Here is an example showing a large spike of traffic for one day. This tells us that there was a lot of traffic on this day compared to the other days being plotted.

vpc-flow_10

Here is an example showing a large spike of traffic for one day. This tells us that there was a lot of traffic on this day compared to the other days being plotted.

o_vpc-flow_11

You can easily build a rich analysis of REJECT and ACCEPT traffic across ports, IP addresses, and other facets of your data. You can then publish this analysis as a dashboard that can be shared with other QuickSight users in your organization.

o_vpc-flow_12

Partition Your Data in Athena for Improved Query Performance and Reduced Costs

The solution described so far delivers GZIP-compressed flow log files to S3 on a frequent basis. Firehose places these files under a /year/month/day/hour/ key in the bucket you specified when creating the delivery stream. The external table definition you used when creating the vpc_flow_logs table in Athena encompasses all the files located within this time series keyspace.

Athena is priced per query based on the amount of data scanned by the query. With our existing solution, each query will scan all the files that have been delivered to S3. As the number of VPC flow log files increases, the amount of data scanned will also increase, which will affect both query latency and query cost.

You can reduce your query costs and get better performance by compressing your data, partitioning it, and converting it into columnar formats. Firehose has already been configured to compress the data delivered to S3. Now we will look at partitioning. (Converting the data to a columnar format, like Apache Parquet, is out of scope for this article.)

Partitioning your table helps you restrict the amount of data scanned by each query. Many tables benefit from being partitioned by time, particularly when the majority of queries include a time-based range restriction. Athena uses the Hive partitioning format, whereby partitions are separated into folders whose names contain key-value pairs that directly reflect the partitioning scheme (see the Athena documentation for more details).

The folder structure created by Firehose (for example, s3://my-vpc-flow-logs/2017/01/14/09/’) is different from the Hive partitioning format (for example, s3://my-vpc-flow-logs/dt=2017-01-14-09-00/). However, using ALTER TABLE ADD PARTITION, you can manually add partitions and map them to portions of the keyspace created by the delivery stream.

The solution presented here uses a Lambda function and the Athena JDBC driver to execute ALTER TABLE ADD PARTITION statements on receipt of new files into S3, thereby automatically creating new partitions for Firehose delivery streams.

Create an IAM role for Lambda to execute Athena queries

Before you create the Lambda function, you will need to create an IAM role that allows Lambda to execute queries in Athena. Create a role named ‘lambda_athena_exec_role’ by following the instructions here.

First, embed the following inline access policy.

 

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "athena:RunQuery",
                "athena:GetQueryExecution",
                "athena:GetQueryExecutions",
                "athena:GetQueryResults"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:ListMultipartUploadParts",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::aws-athena-query-results-*"
            ]
        }
    ]
}

 

Then, attach the following trust relationship to enable Lambda to assume this role.

 

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

Create a partitioned version of the vpc_flow_log table

The vpc_flow_log external table that you previously defined in Athena isn’t partitioned. To create a table with a partition named ‘IngestDateTime’, drop the original, and then recreate it using the following modified DDL.

 

DROP TABLE IF EXISTS vpc_flow_logs;

CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs (
Version INT,
Account STRING,
InterfaceId STRING,
SourceAddress STRING,
DestinationAddress STRING,
SourcePort INT,
DestinationPort INT,
Protocol INT,
Packets INT,
Bytes INT,
StartTime INT,
EndTime INT,
Action STRING,
LogStatus STRING
)
PARTITIONED BY (IngestDateTime STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
    "input.regex" = "^([^ ]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([0-9]+)\\s+([0-9]+)\\s+([^ ]+)\\s+([^ ]+)$") 
LOCATION 's3://<bucket_and_prefix>/';

Create a Lambda function to create Athena partitions

To create the Lambda function:

  1. Clone the Lambda Java project from GitHub.
  2. Compile the .jar file according to the instructions in the README file, and copy it to a bucket in S3.
  3. Create a new Lambda function and select Blank Function.
  4. Choose Next when asked to configure a trigger.
  5. On the Configure function page, name the function ‘CreateAthenaPartitions’.
  6. Select the Java run-time.
  7. For Code entry type, choose Upload a file from Amazon S3.
  8. For the S3 link URL, enter the HTTPS-format URL of the .jar file you uploaded to S3.
  9. For the Lambda function, you’ll need to set several environment variables:
  • PARTITION_TYPE: Supply one of the following values: Month, Day, or Hour. This environment variable is optional. If you omit it, the Lambda function will default to creating new partitions every day. For this example, supply ‘Hour’.
  • TABLE_NAME: Use the format <database>.<table_name>—for example, ‘default.vpc_flow_logs’.
  • S3_STAGING_DIR: An Amazon S3 location to which your query output will be written. (Although the Lambda function is only executing DDL statements, Athena still writes an output file to S3. The IAM policy that you created earlier assumes that the query output bucket name begins with ‘aws-athena-query-results-’.)
  • ATHENA_REGION: The region in which Athena is located. For this example, use ‘us-east-1’.

o_vpc-flow_13

  1. Now specify the handler and role:
  • Handler: com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3Event::handleRequest
  • Role: Select ‘Choose an existing role’
  • Existing role: Select ‘lambda_athena_exec_role’

o_vpc-flow_14

  1. Finally, set the timeout to one minute.

Configure S3 to send new object notifications to your Lambda function

On the Properties page for the bucket containing your VPC flow log data, expand the Events pane and create a new notification:

  • Name: FlowLogDataReceived
  • Events: ObjectCreated(All)
  • Send To: Lambda function
  • Select the ‘CreateAthenaPartitions’ Lambda function from the dropdown.

o_vpc-flow_15

Now, whenever new files are delivered to your S3 bucket by Firehose, your ‘CreateAthenaPartitions’ Lambda function will be triggered. The function parses the newly received object’s key. Based upon the year/month/day/hour portion of the key, together with the PARTITION_TYPE you specified when creating the function (Month, Day, or Hour), the function determines which partition the file belongs in. It will then query Athena to determine whether this partition already exists. If the partition doesn’t exist, the function will create the partition, mapping it to the relevant portion of the S3 keyspace.

Let’s examine this logic in a bit more detail. Assume you’ve configured your ‘CreateAthenaPartitions’ Lambda function to create hourly partitions, and that Firehose has just delivered a file containing flow log data to s3://my-vpc-flow-logs/2017/01/14/07/xxxx.gz

Looking at the S3 key for this new file, the Lambda function will infer that it belongs in an hourly partition whose spec is ‘2017-01-14-07’. On checking Athena, the function discovers that this partition does not exist, so it executes the following DDL statement.

ALTER TABLE default.vpc_flow_logs ADD PARTITION (IngestDateTime='2017-01-14-07') LOCATION 's3://my-vpc-flow-logs/2017/01/14/07/';

If the Lambda function had been configured to create daily partitions, the new partition would be mapped to ‘s3://my-vpc-flow-logs/2017/01/14/’; if monthly, the LOCATION would be ‘s3://my-vpc-flow-logs/2017/01/’.

Note that the partitions represent the date and time at which the logs were ingested into S3, which will be some time after the StartTime and EndTime values for the individual records in each partition.

Query the partitioned data using Athena

Your queries can now take advantage of the partitions.

 

SELECT sourceaddress, count(*) cnt
FROM vpc_flow_logs
WHERE ingestdatetime > '2017-01-15-00'
AND action = 'REJECT'
GROUP BY sourceaddress
ORDER BY cnt desc
LIMIT 25;

To query the data ingested over the course of the last three hours, run the following query (assuming you’re using an hourly partitioning scheme).

 

SELECT sourceaddress, count(*) cnt
FROM vpc_flow_logs
WHERE date_parse(ingestdatetime, '%Y-%m-%d-%H') >= 
      date_trunc('hour', current_timestamp - interval '2' hour)
AND action = 'REJECT'
GROUP BY sourceaddress
ORDER BY cnt desc
LIMIT 25;

As the following screenshots show, by using partitions you can reduce the amount of data scanned per query. In so doing, you can reduce query costs and latencies. The first screenshot shows a query that ignores partitions.

vpc-flow_1o_6

This second screenshot shows the use of partitions in the WHERE clause.

o_vpc-flow_17

As you can see, by using partitions this query runs in half the time and scans less than a tenth of the data scanned by the first query.

Conclusion

In the past, to analyze logs you had to extensively prepare data for specific query use cases or provision and operate storage and compute resources. With Amazon Athena and Amazon QuickSight, you can now publish, store, analyze, and visualize log data more flexibly. Instead of focusing on the underlying infrastructure needed to perform the queries and visualize the data, you can focus on investigating the logs.


About the Authors

ben_snively_90

Ben Snively is a Public Sector Specialist Solutions Architect. He works with government, non-profit and education customers on big data and analytical projects, helping them build solutions using AWS. In his spare time he adds IoT sensors throughout his house and runs analytics on it.

 

Ian_pic_1_resizedIan Robinson is a Specialist Solutions Architect for Data and Analytics. He works with customers throughout EMEA, helping them to use AWS to create value from the connections in their data. In his spare time he’s currently restoring a reproduction 1960s Dalek.

 

Related

Analyze Security, Compliance, and Operational Activity Using AWS CloudTrail and Amazon Athena

o_athena-cloudtrail_1