AWS Big Data Blog

Analyzing Data in S3 using Amazon Athena

Neil Mukerje is a Solution Architect for Amazon Web Services
Abhishek Sinha is a Senior Product Manager on Amazon Athena

Amazon Athena is an interactive query service that makes it easy to analyze data directly from Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to set up or manage and you can start analyzing your data immediately. You don’t even need to load your data into Athena, or have complex ETL processes.  Athena works directly with data stored in S3.

Athena uses Presto, a distributed SQL engine to run queries. It also uses Apache Hive to create, drop, and alter tables and partitions. You can write Hive-compliant DDL statements and ANSI SQL statements in the Athena query editor. You can also use complex joins, window functions and complex datatypes on Athena. Athena uses an approach known as schema-on-read, which allows you to project your schema on to your data at the time you execute a query. This eliminates the need for any data loading or ETL.

Athena charges you by the amount of data scanned per query. You can save on costs and get better performance if you partition the data, compress data, or convert it to columnar formats such as Apache Parquet. For more information, see Athena pricing.

In this post, we demonstrate how to use Athena on logs from Elastic Load Balancers, generated as text files in a pre-defined format. We show you how to create a table, partition the data in a format used by Athena, convert it to Parquet, and compare query performance.

For this example, the raw logs are stored on Amazon S3 in the following format. There is a separate prefix for year, month, and date, with 2570 objects and 1 TB of data.

o_athena_1

Creating tables

If you are familiar with Apache Hive, you may find creating tables on Athena to be familiar. You can create tables by writing the DDL statement on the query editor, or by using the wizard or JDBC driver. Copy and paste the following DDL statement in the Athena query editor to create a table.

CREATE EXTERNAL TABLE IF NOT EXISTS elb_logs_raw_native (
  request_timestamp string, 
  elb_name string, 
  request_ip string, 
  request_port int, 
  backend_ip string, 
  backend_port int, 
  request_processing_time double, 
  backend_processing_time double, 
  client_response_time double, 
  elb_response_code string, 
  backend_response_code string, 
  received_bytes bigint, 
  sent_bytes bigint, 
  request_verb string, 
  url string, 
  protocol string, 
  user_agent string, 
  ssl_cipher string, 
  ssl_protocol string ) 
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
         'serialization.format' = '1','input.regex' = '([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*)[:\-]([0-9]*) ([-.0-9]*) ([-.0-9]*) ([-.0-9]*) (|[-0-9]*) (-|[-0-9]*) ([-0-9]*) ([-0-9]*) \\\"([^ ]*) ([^ ]*) (- |[^ ]*)\\\" (\"[^\"]*\") ([A-Z0-9-]+) ([A-Za-z0-9.-]*)$' ) 
LOCATION 's3://athena-examples/elb/raw/';

Note the regular expression specified in the CREATE TABLE statement.  You can specify any regular expression, which tells Athena how to interpret each row of the text. You can also use Athena to query other data formats, such as JSON. A regular express is not required if you are processing CSV, TSV or JSON formats. After the statement succeeds, the table and the schema appears in the data catalog (left pane).  Athena has an internal data catalog used to store information about the tables, databases, and partitions. It’s highly durable and requires no management. You can interact with the catalog using DDL queries or through the console.

 

o_athena_2

You created a table on the data stored in Amazon S3 and you are now ready to query the data. Note that table elb_logs_raw_native points towards the prefix s3://athena-examples/elb/raw/. Therefore, when you add more data under the prefix, e.g., a new month’s data, the table automatically grows. Run a simple query:

SELECT * FROM elb_logs_raw_native WHERE elb_response_code = '200' LIMIT 100;

o_athena_3_1

You now have the ability to query all the logs, without the need to set up any infrastructure or ETL.

Partitioning data

Customers often store their data in time-series formats and need to query specific items within a day, month, or year. Without a partition, Athena scans the entire table while executing queries. With partitioning, you can restrict Athena to specific partitions, thus reducing the amount of data scanned, lowering costs, and improving performance.

Athena uses Apache Hive–style data partitioning.  You can partition your data across multiple dimensions―e.g., month, week, day, hour, or customer ID―or all of them together.

To use partitions, you first need to change your schema definition to include partitions, then load the partition metadata in Athena. Use the same CREATE TABLE statement but with partitioning enabled.

CREATE EXTERNAL TABLE IF NOT EXISTS elb_logs_raw_native_part (
  request_timestamp string, 
  elb_name string, 
  request_ip string, 
  request_port int, 
  backend_ip string, 
  backend_port int, 
  request_processing_time double, 
  backend_processing_time double, 
  client_response_time double, 
  elb_response_code string, 
  backend_response_code string, 
  received_bytes bigint, 
  sent_bytes bigint, 
  request_verb string, 
  url string, 
  protocol string, 
  user_agent string, 
  ssl_cipher string, 
  ssl_protocol string ) 
PARTITIONED BY(year string, month string, day string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
         'serialization.format' = '1','input.regex' = '([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*)[:\-]([0-9]*) ([-.0-9]*) ([-.0-9]*) ([-.0-9]*) (|[-0-9]*) (-|[-0-9]*) ([-0-9]*) ([-0-9]*) \\\"([^ ]*) ([^ ]*) (- |[^ ]*)\\\" (\"[^\"]*\") ([A-Z0-9-]+) ([A-Za-z0-9.-]*)$' )
LOCATION 's3://athena-examples/elb/raw/';

o_athena_4

Note the PARTITIONED BY clause in the CREATE TABLE statement. The data is partitioned by year, month, and day. In the Results section, Athena reminds you to load partitions for a partitioned table.

The ALTER TABLE ADD PARTITION statement allows you to load the metadata related to a partition. For example to load the data from the s3://athena-examples/elb/raw/2015/01/01/ bucket, you can run the following:

ALTER TABLE elb_logs_raw_native_part ADD PARTITION (year='2015',month='01',day='01') location 's3://athena-examples/elb/raw/2015/01/01/'
show partitions elb_logs_raw_native_part

o_athena_5_1

Now you can restrict each query by specifying the partitions in the WHERE clause. In this case, Athena scans less data and finishes faster. Here is an example:

SELECT distinct(elb_response_code),
         count(url)
FROM elb_logs_raw_native_part
WHERE year='2015'
        AND month= '01'
        AND day='01'
GROUP BY  elb_response_code

o_athena_6_1

If you have a large number of partitions, specifying them manually can be cumbersome. You can automate this process using a JDBC driver. You don’t need to do this if your data is already in Hive-partitioned format.

Converting data to columnar formats

Athena allows you to use open source columnar formats such as Apache Parquet and Apache ORC. Converting your data to columnar formats not only helps you improve query performance, but also save on costs.

There are several ways to convert data into columnar format. In this post, you can take advantage of a PySpark script, about 20 lines long, running on Amazon EMR to convert data into Apache Parquet. The script also partitions data by year, month, and day. At the time of publication, a 2-node r3.x8large cluster in US-east was able to convert 1 TB of log files into 130 GB of compressed Apache Parquet files (87% compression) with a total cost of $5.

Here is the layout of files on Amazon S3 now:

o_athena_7

Note the layout of the files. This format of partitioning, specified in the key=value format, is automatically recognized by Athena as a partition. It allows you to load all partitions automatically by using the command msck repair table <tablename>. This is similar to how Hive understands partitioned data as well. If the data is not the key-value format specified above, load the partitions manually as discussed earlier.

Create a table on the Parquet data set. Note that your schema remains the same and you are compressing files using Snappy.

CREATE EXTERNAL TABLE IF NOT EXISTS elb_logs_pq (
  request_timestamp string,
  elb_name string,
  request_ip string,
  request_port int,
  backend_ip string,
  backend_port int,
  request_processing_time double,
  backend_processing_time double,
  client_response_time double,
  elb_response_code string,
  backend_response_code string,
  received_bytes bigint,
  sent_bytes bigint,
  request_verb string,
  url string,
  protocol string,
  user_agent string,
  ssl_cipher string,
  ssl_protocol string )
PARTITIONED BY(year int, month int, day int) 
STORED AS PARQUET
LOCATION 's3://athena-examples/elb/parquet/'
tblproperties ("parquet.compress"="SNAPPY");

o_athena_8

To allow the catalog to recognize all partitions, run msck repair table elb_logs_pq. After the query is complete, you can list all your partitions.

msck repair table elb_logs_pq
show partitions elb_logs_pq

o_athena_9_1

Comparing performance

You can compare the performance of the same query between text files and Parquet files.

SELECT elb_name,
       uptime,
       downtime,
       cast(downtime as DOUBLE)/cast(uptime as DOUBLE) uptime_downtime_ratio
FROM 
    (SELECT elb_name,
        sum(case elb_response_code
        WHEN '200' THEN
        1
        ELSE 0 end) AS uptime, sum(case elb_response_code
        WHEN '404' THEN
        1
        ELSE 0 end) AS downtime
    FROM elb_logs_pq
    GROUP BY  elb_name)

Query on compressed, partitioned, and columnar data

o_athena_10_1

Query on raw text files

SELECT elb_name,
       uptime,
       downtime,
       cast(downtime as DOUBLE)/cast(uptime as DOUBLE) uptime_downtime_ratio
FROM 
    (SELECT elb_name,
        sum(case elb_response_code
        WHEN '200' THEN
        1
        ELSE 0 end) AS uptime, sum(case elb_response_code
        WHEN '404' THEN
        1
        ELSE 0 end) AS downtime
    FROM elb_logs_raw_native
    GROUP BY  elb_name)

o_athena_11_1

Athena charges you by the amount of data scanned per query. By converting your data to columnar format, compressing and partitioning it, you not only save costs but also get better performance. The following table compares the savings created by converting data into columnar format.

Dataset Size on Amazon S3 Query Run time Data Scanned Cost
Data stored as text files 1 TB 236 seconds 1.15 TB $5.75
Data stored in Apache Parquet format* 130 GB 6.78 seconds 2.51 GB $0.013
Savings / Speedup 87% less with Parquet 34x faster 99% less data scanned 99.7% savings

(*compressed using Snappy compression)

Summary

Amazon Athena allows you to analyze data in S3 using standard SQL, without the need to manage any infrastructure. You can also access Athena via a business intelligence tool, by using the JDBC driver. Athena charges you on the amount of data scanned per query. As was evident from this post, converting your data into open source formats not only allows you to save costs, but also improves performance.

You can try Amazon Athena in the US-East (N. Virginia) and US-West 2 (Oregon) regions. To learn more, see the Amazon Athena product page or the Amazon Athena User Guide.