AWS Big Data Blog

Processing VPC Flow Logs with Amazon EMR

by Michael Wallman | on | | Comments

Michael Wallman is a senior consultant with AWS ProServ

It’s easy to understand network patterns in small AWS deployments where software stacks are well defined and managed. But as teams and usage grow, its gets harder to understand which systems communicate with each other, and on what ports. This often results in overly permissive security groups.

In this post, I show you how to gain valuable insight into your network by using Amazon EMR and Amazon VPC Flow Logs. The walkthrough implements a pattern often found in network equipment called ‘Top Talkers’, an ordered list of the heaviest network users, but the model can also be used for many other types of network analysis. Customers have successfully used this process to lock down security groups, analyze traffic patterns, and create network graphs.

VPC Flow Logs

VPC Flow Logs enables the capture of IP information flowing to and from network interfaces within a VPC. Each Flow Log record is a 5-tuple set of 5 different values that specify the source, destination, and protocol for an Internet protocol (IP) flow:

version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status

VPC Flow Logs can be enabled on a single network interface, on a subnet, or an entire VPC. When enabled on the VPC, it begins log collection on all network interfaces within that VPC. In large deployments of tens of thousands of instances, Flow Logs can easily generate terabytes of compressed log data per hour!

To process this data at scale, this post takes you through the steps in the following graphic.

Create the data source

The first challenge is getting structured VPC Flow Log data into Amazon S3. AWS provides a number of tools and services to facilitate this, which can be built by a single CloudFormation stack. Use the flowlogs-vpc-toptalker.json template in the AWS Big Data Blog Github repository, which contains the AWS CloudFormation template, working code, and sample Amazon Kinesis events.

With the template, you can deploy the following workflow in a single command. This design follows a serverless architecture, where no servers need to be managed or maintained.

A VPC Flow Log, or data-source, originates from all elastic network interfaces within a VPC. An IAM role allows the vpc-flow-logs service to write data to a defined log group in Amazon CloudWatch Logs. CloudWatch Log subscriptions allow log data to flow seamlessly into Amazon Kinesis, where AWS Lambda is then configured to batch data, apply some minor filtering and transformation, and then compress and upload to S3.

To get started, simply run the CloudFormation template, and once deployed you can use the AWS-Cli or Console to enable FlowLogs on your VPC. See the CloudFormation stack’s Outputs to determine S3 log location and the command to enable VPC Flow Logs:

$ aws cloudformation create-stack --stack-name toptalkers --template-body file://flowlogs-vpc-toptalker.json --capabilities CAPABILITY_IAM
... Wait until CREATE_COMPLETE ...
$ aws cloudformation describe-stacks --stack-name toptalkers  --query 'Stacks[].Outputs[?OutputKey==`S3LogLocation`].OutputValue' --output text
toptalkers-bucket-xyukipb8r9x9
$ aws cloudformation describe-stacks --stack-name toptalkers  --query 'Stacks[].Outputs[?OutputKey==`AddFlowLogToVpcCmd`].OutputValue' --output text
aws ec2 create-flow-logs --resource-type VPC --traffic-type ACCEPT --resource-ids <vpc_id> --log-group-name toptalkers-FlowLogs-UNUXO8DD9YLV --deliver-logs-permission-arn arn:aws:iam::XXXXXXXXXXXX:role/toptalkers-FlowLogsToCloudWatch-119JQ3CPHNVM

In the above command, replace VpcId accordingly. In this post, I focus on top-talkers, and so I am only concerned with IP connections that are actually made. Given this, –traffic-type is set to ACCEPT. The created bucket has a seven-day expiration policy.

Pre-process with an AWS Lambda function

As mentioned above, a Lambda function is used for both batching and preprocessing Flow Log data. The following code uses Lambda’s invocation id as the destination object name, and only includes messages that have a log-status of OK. This simplifies the data set, and allows for simplification of the mapper function.

// Set key name 
exports.handler = function(event, context) {
    var key = path + "/" + context.invokeid + ".gz"; 

// Apply message filter 
var fl = JSON.parse(buffer.toString('ascii'));   
data = data.concat(fl.logEvents.map(function(item) {
    return item.message;
})).filter(function(item) {
    return (item.indexOf('OK') == -1) ? false : true;			               
});

After it is running, this Lambda function stores compressed Flow Log network traffic in S3.  To examine Flow Log data being written by the Lambda function, follow the steps below. First, list and then download a single object from the S3 Flow Log target. After you unzip the object, the processed Flow Log data appears.

## List bucket 
$ aws s3 ls s3://toptalkers-bucket-xyukipb8r9x9/flowlogs
2016-02-24 04:25:07        139 00166138-965e-4211-9d03-a53a84b8db2f.gz
2016-02-24 04:30:08       1469 05c1cbd9-bf80-4107-a752-42e199c1040c.gz
...

## Download object format
$  aws s3 cp s3://toptalkers-bucket-xyukipb8r9x9/flowlogs/00166138-965e-4211-9d03-a53a84b8db2f.gz .

## View contents of processed Flow Log data
$ gunzip -c 00166138-965e-4211-9d03-a53a84b8db2f.gz | head
2 998287060599 eni-565b501d 10.0.7.184 104.131.53.252 28159 123 17 9 684 1456287302 1456287888 ACCEPT OK
2 998287060599 eni-565b501d 10.0.7.184 108.61.194.85 23781 123 17 9 684 1456287302 1456287888 ACCEPT OK
2 998287060599 eni-565b501d 10.0.12.219 10.0.7.184 123 123 17 9 684 1456287302 1456287888 ACCEPT OK
2 998287060599 eni-565b501d 10.0.13.221 10.0.7.184 123 123 17 9 684 1456287302 1456287888 ACCEPT OK

NOTE: In some cases, it can take some time (10+ minutes) after you’ve created the Flow Log for the log group to be created and for data to be displayed.

Process with MapReduce

MapReduce processing comes in two Apache Streaming steps, and uses the Hadoop Aggregate package, along with comparator and field selection classes. First, flowlogs-counter.py collapses the input into <src_ip>:<dst_ip>:<dst_port> format, and uses the built-in aggregate reducer to combine like entries.

Below is an excerpt of the flowlogs-counter.py file that formats the messages for use with the aggregate:

## flowlogs-counter summary
for line in fileinput.input("-"):
    i = shlex.split(line.rstrip('rn'))
    print "LongValueSum:%s:%s:%st%s" % (i[3], i[4], i[6], 1)

## Example output of Step 1 (after reduce)
10.0.0.167:59.72.153.2:56980	1
10.0.0.227:192.88.99.255:0	14
10.0.0.227:52.91.67.35:57942	1
10.0.11.186:10.0.0.227:49813	1
10.0.11.186:10.0.0.227:49850	1
10.0.11.186:10.0.0.227:49874	1
10.0.11.186:10.0.0.227:49933	2
...

The second step uses an awk filter to remove connections outside of well-known ports or system ports. It then uses Hadoop’s comparator class to sort based on the input’s second field, and reverses order with the largest number of connections on top. Finally, a single reducer pipes the output into a compressed gz file on S3.

NOTE: To remove the awk filter, replace with cat as the mapper function.

In essence, this step is Hadoop’s way of parallelizing the shell script below.

## toptalker example in shell
$ hdfs dfs -cat /aggregate-dir/* | awk -F "[:|t]" '{if($3<=1024) print $0}'| sort -T /mnt -n -r -k2 | gzip > /mnt/toptalkers.gz

NOTE: With in-memory caching and optimized execution, Apache Spark on Hadoop YARN can be used for increased performance. Spark is supported natively by EMR; VPC Flow Log processing with Spark will be addressed in future AWS blog posts.

Run example

With most AWS services, the AWS CLI provides a command line interface to the API; EMR is no exception. The CLI commands can execute the workflow described above.

Step 1: After a sufficient data set is collected, disable VPC Flow Log. Otherwise the single S3 output location becomes infinitely large. For longer-running Flow Log captures, see Amazon Kinesis Firehose.

$ aws ec2 delete-flow-logs --flow-log-ids 

Step 2: Create an emr_toptalker_step.json file.

NOTE: Update all instances of toptalkers-bucket-xyukipb8r9x9 with the bucket created by the CloudFormation stack.

$ cat emr_toptalker_steps.json
[
  {
    "Name": "flowlogs-counter",
    "Type": "STREAMING",
    "ActionOnFailure": "CANCEL_AND_WAIT",
    "Args": [
      "-files", "s3://aws-big-data-blog/vpc-toptalkers/flowlogs-counter.py",
      "-mapper", "flowlogs-counter.py",
      "-reducer", "aggregate",
      "-input", "s3://toptalkers-bucket-xyukipb8r9x9/flowlogs",
      "-output","s3://toptalkers-bucket-xyukipb8r9x9/aggregate"
    ]
  },
  {
    "Name": "flowlogs-sort",
    "Type": "STREAMING",
    "ActionOnFailure": "CONTINUE",
    "Args": [
      "-D", "mapreduce.job.reduces=1",
      "-D", "mapreduce.output.fileoutputformat.compress=true",
      "-D", "mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec",
      "-D", "mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator",
      "-D", "stream.num.map.output.key.fields=2",
      "-D", "mapreduce.partition.keycomparator.options=-k2,2nr",
      "-files", "s3://aws-big-data-blog/vpc-toptalkers/portfilter.awk",
      "-mapper", "portfilter.awk",
      "-reducer", "cat",
      "-input", " s3://toptalkers-bucket-xyukipb8r9x9/aggregate",
      "-output", "s3://toptalkers-bucket-xyukipb8r9x9/toptalker"
    ]
  }
]

Step 3:  Create a cluster and run steps (update <keyname>).

For high traffic VPCs, 10 task nodes per 1000 nodes in the VPC is a good place to start.

$ aws emr create-cluster --release-label emr-5.0.0 --name TopTalker --tags Name=TopTalker --ec2-attributes KeyName= --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=r3.4xlarge InstanceGroupType=CORE,InstanceCount=10,InstanceType=m3.2xlarge --use-default-roles --steps file://emr_toptalker_steps.json --auto-terminate

Step 4:  See results in <src_ip>:<dst_ip>:<dst_port>t<count>.

$ aws s3 cp s3://toptalkers-bucket-xyukipb8r9x9/toptalkers/part-0000.gz toptalker.gz
$ gunzip –c toptalkers.gz
10.0.1.16:10.0.10.253:80	197326
10.0.10.253:10.0.12.140:3306	122012
10.0.2.191:10.0.10.253:80	100947
10.0.2.133:52.35.46.46:80	56462
52.37.13.115:10.0.1.16:80	43416
10.0.2.133:52.36.86.115:80	41085
52.37.13.115:10.0.2.191:80	31257
10.0.4.83:10.0.12.219:80	158

Toptalker transformations

Now that you have a toptalker file in IP format, you can process it outside of Hadoop.

Below are a number of transformations that customers have applied, which in many cases collapsed the file further. For example, you might choose to treat all instances within a Auto Scaling group as a single entry, which can be achieved by converting the IP address to a security group or use the instance name tag. Or you might want to map ports to their service names, i.e., 80  httpd.

The command below is a transformation in the simplest form. Using the CLI, you can quickly create an IP address to security group name mapping.

$ aws ec2 describe-network-interfaces --query 'NetworkInterfaces[].[PrivateIpAddress,Groups[0].GroupName]' --output text
10.0.8.126	WebServerSecurityGroup
10.0.0.167	launch-wizard-3
10.0.5.143	ELBSecurityGroup
10.0.2.133	BastionSecurityGroup
10.0.0.58	ELBSecurityGroup
10.0.12.140	RDSSecurityGroup
...

Conclusion

Looking forward, we would like to start collecting scripts/programs that turn top-talkers into extended and more useful maps. So clone the repo, and get “top talking”!

For more on this subject, see the Security Group Visualization blog post. If you have questions or suggestions, please leave a comment below.

———————–

Related

Securely Access Web Interfaces on Amazon EMR Launched in a Private Subnet

Want to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.