Networking & Content Delivery

Analyzing and visualizing AWS Global Accelerator flow logs using Amazon Athena and Amazon QuickSight

AWS Global Accelerator simplifies multi-region cloud deployments while leveraging the AWS vast, highly available, and congestion-free global network. Global Accelerator uses a pair of static anycast IP addresses to direct you to the application that is geographically closest and has healthy endpoints, using routing policies that you configure.

This feature makes sure that you have a consistent user experience without worrying about stale DNS records for your application, routing over suboptimal public internet, or managing complex IP address configurations.

You can also configure flow logs in Global Accelerator to capture the 5-tuple IP address information (source IP address, destination IP address, source port, destination port, and protocol) and additional fields such as direction of traffic, number of packets, number of bytes etc.

Global Accelerator consolidates these flow log records into log files and publishes them to an Amazon S3 bucket at 5-minute intervals. The captured data grows based on the volume of traffic traversing the Global Accelerator.

You can use Amazon Athena (an interactive query tool for S3-based data) and Amazon QuickSight (a cloud-based business intelligence service) to analyze and visualize the flow log data and develop actionable business value. With this solution you can troubleshoot reachability issues for your application, identify security vulnerabilities, or get an overview of how end-users access your application.

Objectives

This post has the following objectives: analyze the flow log data using Athena by running SQL queries, and visualize the flow log data using Amazon QuickSight by building a dashboard.

Solutions Overview

This post describes a complete solution for analyzing and visualizing Global Accelerator flow log data. As soon as Global Accelerator generates a flow log record, it pushes the log into a pre-configured S3 bucket, which triggers an AWS Lambda function. The Lambda function imports this data into Athena. To visualize this data, you can connect Amazon QuickSight to use Athena as the data source.

The following diagram illustrates this workflow.

Figure 1: Workflow to analyze and visualize Global Accelerator flow logs.

Prerequisites

For this solution, complete the following prerequisites:

  • Deploy Global Accelerator.
  • Create an S3 bucket for storing the flow logs.

For more information, see Getting Started with AWS Global Accelerator and Flow Logs in AWS Global Accelerator.

Importing into Athena

Athena is a serverless, interactive query service that makes it easy to analyze data in S3. It’s capable of querying data in various formats including CSV, JSON, Parquet, and ORC, without executing a custom extract, transform, and load (ETL) process.

To use your flow log data, define the schema using a built-in delimited row format. You can query S3 data in Regions other than the Region where you run Athena. However, standard inter-Region data transfer rates for S3 apply, in addition to standard Athena charges. Therefore, run Athena in the same Region as your flow log bucket. For more information about pricing, see AWS Pricing.

Pay attention to the following items during your setup:

  • Replace $BUCKET with your S3 flow log bucket.
  • Replace $LOG_PREFIX with your flow log prefix.
  • Replace $AWS_ACCOUNT_ID with your AWS account ID.
  • Replace $REGION with the Region where you have the S3 bucket to store the flow logs.
  • Use a built-in delimited row format in your DDL to parse the data from the Flow Log Record Syntax. The columns for the aga_flow_logs table map to Flow Log Record Syntax.

Creating an external table in Athena

In the Athena query editor, enter the following DDL and choose Run Query.

CREATE EXTERNAL TABLE IF NOT EXISTS aga_flow_logs (
    Version INT,
    AwsAccountId STRING,
    AcceleratorId STRING,
    ClientIp STRING,
    ClientPort INT,
    Gip String,
    GipPort INT,
    EndpointIp String,
    EndpointPort INT,
    Protocol STRING,
    IpAddressType STRING,
    Packets INT,
    Bytes INT,
    StartTime INT,
    EndTime INT,
    Action STRING,
    LogStatus STRING,
    GlobalAcceleratorSourceIp STRING,
    GlobalAcceleratorSourcePort INT,
    EndpointRegion STRING,
    GlobalAcceleratorRegion STRING,
    Direction STRING,
    VPCId STRING
)
PARTITIONED BY (YEAR STRING, MONTH STRING, DAY STRING)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ' '
    ESCAPED BY '\\'
    LINES TERMINATED BY '\n'
LOCATION 's3://$BUCKET/$FLOW_LOG_PREFIX/AWSLogs/$AWS_ACCOUNT_ID/globalaccelerator/$REGION/'
TBLPROPERTIES ('skip.header.line.count'='1');

This step creates the mappings for querying your flow logs in Athena using an SQL-like query language. However, if you preview your data now, no records return. The data is partitioned and there are no partitions loaded. By partitioning your data, you can restrict the amount of data scanned by each query, thus improving performance and reducing cost.

Importing partitions

The flow log data has partitions for year, month, and day, but the underlying S3 prefixes aren’t in a format that Athena can import automatically. To import the data, create a Lambda function that triggers on every S3 PUT request, such as when flow records push to the S3 bucket, and use the key of the new object to load any new partitions.

Creating IAM permissions

The Lambda function needs IAM permissions to access Amazon CloudWatch logs, Athena, and S3. Create an IAM policy and attach this IAM policy to the IAM role.

Creating an IAM policy

To create an IAM policy, complete the following steps:

  1. Sign in to the AWS Management Console.
  2. In the IAM console, choose Policies, Create Policy.

You can select the visual editor or enter the JSON policy. The following example JSON policy has the necessary permissions for executing the Lambda function. However, you can update the policy document per your organization’s security posture requirements.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucketMultipartUploads",
                "s3:AbortMultipartUpload",
                "s3:CreateBucket",
                "s3:ListBucket",
                "s3:GetBucketLocation",
                "s3:ListMultipartUploadParts"
            ],
            "Resource": [
                "arn:aws:s3:::aws-athena-query-results-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:*:*:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryExecution",
                "athena:GetTable",
                "athena:GetQueryResults",
                "athena:GetQueryResultsStream"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetPartitions",
                "glue:GetTable",
                "glue:UpdateTable",
                "glue:BatchCreatePartition",
                "glue:CreatePartition"
            ],
            "Resource": "*"
        }
    ]
}
  1. Choose Review.
  2. Enter the new policy name as AthenaLambda.
  3. Optionally, add a description.
  4. Choose Create policy.

Creating an IAM role

You now have an IAM policy to attach to the IAM role. To create your IAM role, complete the following steps:

  1. Open the IAM console, and choose Roles, Create Role.
  2. From the AWS Service menu, choose Lambda, and choose Next: Permissions.
  3. Locate and select the policy you created, AthenaLambda, and choose Next: Tags.
  4. Optionally, you can add tags.
  5. Choose Next: Review.
  6. For Role Name, enter AthenaLambda.
  7. Optionally, add a description.
  8. Choose Create role.

Creating a Lambda function

Your Lambda function must exist in the same Region as your S3 bucket. This makes sure that you can use the S3 to publish events to invoke the Lambda function.

  1. Open the Lambda console, and choose Functions, Create function.
  2. For Function name, enter a unique name for your Lambda function. This post uses AGAFlowLogPartitionImporter.
  3. For Runtime, select Python 3.7.
  4. For Execution role, select Use an existing role.
  5. For Existing role, select your new AthenaLambda role.
  6. Choose Create function.

Figure 2: Creating a Lambda function.

Adding an S3 trigger

To add an S3 trigger to your Lambda function, complete the following steps:

  1. For Bucket, select your flowlogs bucket.
  2. For Prefix, enter the following value: $FLOW_LOG_PREFIX/AWSLogs/$AWS_ACCOUNT_ID/globalaccelerator/$REGION.
  3. Choose Add.

Figure 3: Configuring S3 trigger to the Lambda function.

Updating your function

Edit your function code Inline. Enter the following Python code for your lambda_function.py file:

import json
import boto3
import re
import os
import time

athena_region = os.environ['ATHENA_REGION']
athena_database = os.environ['ATHENA_DATABASE']
athena_table = os.environ['ATHENA_TABLE']

session = boto3.session.Session(region_name = athena_region)

account_id = session.client('sts').get_caller_identity()['Account']

def submit_query(query):
    client = session.client('athena')
    try:
        print("Execute Query: " + query)
        output_location = 's3://aws-athena-query-results-' + account_id + '-' + athena_region
        query_response = client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={'Database': athena_database},
            ResultConfiguration={'OutputLocation': output_location})
        query_id = query_response['QueryExecutionId']
        print("Waiting for Query: " + query_id + " to finish")
        while True:
            query_status_response = client.get_query_execution(QueryExecutionId=query_id)
            query_state = query_status_response['QueryExecution']['Status']['State']
            if query_state == 'FAILED':
                raise Exception("Query Failed: " + query + " due to :" + query_status_response['QueryExecution']['Status']['StateChangeReason'])
            elif query_state == 'CANCELLED':
                raise Exception("Query Cancelled: " + query)
            elif query_state == 'SUCCEEDED':
                query_result_response = client.get_query_results(
                    QueryExecutionId=query_id,
                    MaxResults=1000
                )
                query_results=[]
                for row in query_result_response['ResultSet']['Rows']:
                    for item in row['Data']:
                        query_results.append(item['VarCharValue'])
                return query_results
            else:
                time.sleep(1)
    except Exception as e:
        print("Error submitting query: " + query + ", " + str(e))

def partition_string(year, month, day):
    return "year=" + year + "/month=" + month + "/day=" + day

def partition_exists(year, month, day):
    partitions = submit_query("SHOW PARTITIONS " + athena_table)
    return partition_string(year, month, day) in partitions

def create_partition(year, month, day, s3_location):
    print("create partition: " + partition_string(year, month, day))
    submit_query("ALTER TABLE " + athena_table + " ADD PARTITION (year='" + year + "', month='" + month + "', day='" + day + "') LOCATION '" + s3_location + "'")

def lambda_handler(event, context):
    s3_put_events = [x for x in event.get('Records', []) if x.get('eventName') == 'ObjectCreated:Put']

    for s3_put_event in s3_put_events:
        s3_object = s3_put_event['s3']
        bucket = s3_object['bucket']['name']
        key = s3_object['object']['key']
        print("Bucket: " + bucket)
        print("File Name: " + key)
        matches = re.search("globalaccelerator/[\w-]*\/(\d*)\/(\d*)\/(\d*)", key)
        matched_groups = matches.groups()
        year = matched_groups[0]
        month = matched_groups[1]
        day = matched_groups[2]

        if not partition_exists(year, month, day):
            s3_location = "s3://" + bucket + "/" + "/".join(key.split("/")[:-1])
            create_partition(year, month, day, s3_location)
        else:
            print("Partition: " + partition_string(year, month, day) + " Already exists")

    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

Adding environment variables
Add the following environment variables:

  • ATHENA_DATABASE – The database that your new table exists in. By default, this is sampledb.
  • ATHENA_REGION – The region in which your Athena table exists.
  • ATHENA_TABLE – The name of your Athena table. By default, this is aga_flow_logs.

Update Timeout to 5 minutes.

Querying the flow Log data in Athena

You now have an aga_flow_logs table for querying the flow log data using Athena. You can build custom queries to query the flow log data using standard SQL-based queries. For more information, see Querying Data in Amazon Athena Tables. This post provides example Athena queries to derive the top talkers accessing your Global Accelerator based on aggregate bytes, direction (ingress or egress), and number of packets.

In the Query Editor in Athena, add a new query.

Figure 4: Example to query flow log data in Athena.

The following are details for your example queries:

  • Top 50 Talkers by Bytes
SELECT clientip, sum(bytes) as bytes FROM "sampledb"."aga_flow_logs" group by clientip order by bytes desc limit 50;
  • Top 50 Talkers by Ingress Bytes
SELECT clientip, sum(bytes) as bytes FROM "sampledb"."aga_flow_logs" where direction='INGRESS' group by clientip order by bytes desc limit 50;
  • Top 50 Talkers by Egress Bytes
SELECT clientip, sum(bytes) as bytes FROM "sampledb"."aga_flow_logs" where direction='EGRESS' group by clientip order by bytes desc limit 50;
  • Top 50 Talkers by Packet
SELECT clientip, sum(packets) as num_packets FROM "sampledb"."aga_flow_logs" group by clientip order by num_packets desc limit 50;

You can also save your frequently run Athena queries and download the results in CSV format.

Figure 5: Saving frequently used Athena query.

Figure 6: Downloading Athena query results.

Visualizing the flow log data in Amazon QuickSight

From the console, in Amazon QuickSight, select the same Region as Athena. If this is your first time using Amazon QuickSight, enable it from your AWS account. For more information about troubleshooting Athena, see I Can’t Connect to Amazon Athena.

Create dataset

To create your dataset, complete the following steps:

  1. Choose Manage data, New dataset.
  2. For the new data source, choose Athena.
  3. For the name of your data source, enter aga_flow_logs.
  4. Choose Create data source.
  5. Select your database (for this post, use sampledb).
  6. Select your recently created table (aga_flow_logs).
  7. Choose Select.

Figure 7: Creating new dataset in QuickSight.

  1. To update the fields, choose Edit/Visualize datasource.
  2. The starttime and endtime fields are Unix timestamps, which you should convert to a Date type to allow for more intuitive aggregations.
  3. Update your start and end time fields to use an easier-to-understand datatype:
    • Choose starttime, Change data type, Date.
    • Choose endtime, Change data type, Date.
  4. Choose Save & visualize.

Figure 8: Updating start time and end time fields to the type Date in QuickSight.

Creating an analysis

You can create a dashboard that helps you visualize the top talkers based on traffic volume and traffic distribution across multiple Global Accelerator endpoints.

Top talkers are based on traffic volume using the client IP address.

  1. From the list of Visual types at the bottom of the Amazon QuickSight console, select Horizontal bar chart.
  2. In the field list, select clientip for Y axis and bytes for Value.

Figure 9: Example to visualize top talkers by client IP against traffic volume.

To filter out the 10 top talkers, complete the following steps:

  1. Choose Filter and create one using clientip.
  2. Select clientip.
  3. For Filter type, select Top and bottom filter.
  4. For Show top, enter 10.
  5. For By, select bytes.
  6. Choose Apply.

Figure 10: Filter 10 top talkers by client IP against traffic volume.

Configure your dashboard to visualize traffic distribution across Global Accelerator endpoints.

  1. From the Amazon QuickSight console, choose Add Visual.
  2. From the list of Visual types, select Line chart.
  3. In the field list, select starttime for X axis, bytes for Value, and endpointip for Color.

Figure 11: Example to visualize traffic volume per Global Accelerator endpoints.

By default, the starttime filter aggregates the data by day. To have a more granular view of the traffic distribution, complete the following steps:

  1. For X axis, choose starttime.
  2. For Aggregate, select Hour.

Figure 12: Aggregating timeline per hour.

You can also apply additional filters. For example, you can narrow down the period to a specific time range using the filter starttime, the filter type Time Range, and providing a start date and end date.

Figure 13: Filtering start time to a specific time range.

Amazon QuickSight offers several options to format the visual. For more information, see Formatting a Visual in Amazon QuickSight.

You now have a dashboard that provides a quick overview of the traffic traversing and accessing your Global Accelerator.

Figure 14: Example QuickSight dashboard

Summary

This post described a complete solution for analyzing and visualizing Global Accelerator flow log data. You now know how to use a combination of Lambda and Athena to build a dashboard using Amazon QuickSight to visualize the flow log data and derive critical business insights, such as top talkers using client IP addresses and traffic distribution across Global Accelerator endpoints.

This solution uses services in the AWS serverless portfolio, thereby eliminating the administrative overhead to run this solution. Instead of having to focus on the underlying infrastructure to run the complex queries, you can analyze and visualize the network traffic passing through Global Accelerator to develop actionable business decisions.


About the Authors


&nbsp

 

Aniruddha Agharkar is a Technical Account Manager at AWS. He loves to whiteboard architectures and develop solutions while adhering to AWS Well Architected pillars.

 

Bhavin Desai is a Sr. Solutions Architect at AWS. He enjoys providing technical guidance to customers, and helping them architect and build solutions to adopt the art of the possible on AWS.