AWS Big Data Blog

Query any data source with Amazon Athena’s new federated query

Organizations today use data stores that are the best fit for the applications they build. For example, for an organization building a social network, a graph database such as Amazon Neptune is likely the best fit when compared to a relational database. Similarly, for workloads that require flexible schema for fast iterations, Amazon DocumentDB (with MongoDB compatibility) is likely a better fit. As Werner Vogels, CTO and VP of Amazon.com, said: “Seldom can one database fit the needs of multiple distinct use cases.” Developers today build highly distributed applications using a multitude of purpose-built databases. In a sense, developers are doing what they do best: dividing complex applications into smaller pieces, which allows them to choose the right tool for the right job. As the number of data stores and applications increase, running analytics across multiple data sources can become challenging.

Today, we are happy to announce a Public Preview of Amazon Athena support for federated queries.

Federated Query in Amazon Athena

Federated query is a new Amazon Athena feature that enables data analysts, engineers, and data scientists to execute SQL queries across data stored in relational, non-relational, object, and custom data sources. With Athena federated query, customers can submit a single SQL query and analyze data from multiple sources running on-premises or hosted on the cloud. Athena executes federated queries using Data Source Connectors that run on AWS Lambda. AWS has open-sourced Athena Data Source connectors for Amazon DynamoDB, Apache HBase, Amazon DocumentDB, Amazon Redshift, Amazon CloudWatch Logs, AWS CloudWatch Metrics, and JDBC-compliant relational data sources such MySQL, and PostgreSQL under the Apache 2.0 license. Customers can use these connectors to run federated SQL queries in Athena across these data sources. Additionally, using Query Federation SDK, customers can build connectors to any proprietary data source and enable Athena to run SQL queries against the data source. Since connectors run on Lambda, customers continue to benefit from Athena’s serverless architecture and do not have to manage infrastructure or scale for peak demands.

Running analytics on data spread across applications can be complex and time consuming. Application developers pick a data store based on the application’s primary function. As a result, data required for analytics is often spread across relational, key-value, document, in-memory, search, graph, time-series, and ledger databases. Event and application logs are often stored in object stores such as Amazon S3. To analyze data across these sources, analysts have to learn new programming languages and data access constructs, and build complex pipelines to extract, transform and load into a data warehouse before they can easily query the data. Data pipelines introduce delays and require custom processes to validate data accuracy and consistency across systems. Moreover, when source applications are modified, data pipelines have to be updated and data has to be re-stated for corrections. Federated queries in Athena eliminate this complexity by allowing customers to query data in-place wherever it resides. Analysts can use familiar SQL constructs to JOIN data across multiple data sources for quick analysis or use scheduled SQL queries to extract and store results in Amazon S3 for subsequent analysis.

The Athena Query Federation SDK extends the benefits of federated querying beyond AWS provided connectors. With fewer than 100 lines of code, customers can build connectors to proprietary data sources and share them across the organization. Connectors are deployed as Lambda functions and registered for use in Athena as data sources. Once registered, Athena invokes the connector to retrieve databases, tables, and columns available from the data source. A single Athena query can span multiple data sources. When a query is submitted against a data source, Athena invokes the corresponding connector to identify parts of the tables that need to be read, manages parallelism, and pushes down filter predicates. Based on the user submitting the query, connectors can provide or restrict access to specific data elements. Connectors use Apache Arrow as the format for returning data requested in a query, which enables connectors to be implemented in languages such as C, C++, Java, Python, and Rust. Since connectors are executed in Lambda, they can be used to access data from any data source on the cloud or on-premises that is accessible from Lambda.

Data Source Connectors

You can run SQL queries against new data sources by registering the data source with Athena. When registering the data source, you associate an Athena Data Connector specific to the data source. You can use AWS-provided open-source connectors, build your own, contribute to existing connectors, or use community or marketplace-built connectors. Depending on the type of data source, a connector manages metadata information; identifies specific parts of the tables that need to be scanned, read or filtered; and manages parallelism. Athena Data Source Connectors run as Lambda functions in your account.

Each data connector is composed of two Lambda functions, each specific to a data source: one for metadata and one for record reading. The connector code is open-source and should be deployed as Lambda functions. You can also deploy Lambda functions to AWS Serverless Application Repository and use them with Athena. Once the Lambda functions are deployed, they produce a unique Amazon Resource Name or ARN. You must register these ARNs with Athena. Registering an ARN allows Athena to understand with which Lambda function to talk during query execution. Once both the ARNs are registered, you can query the registered data source.

When a query runs on a federated data source, Athena fans out the Lambda invocations reading metadata and data in parallel. The number of parallel invocations depends on the Lambda concurrency limits enforced in your account. For example, if you have a limit of 300 concurrent Lambda invocations, Athena can invoke 300 parallel Lambda functions for record reading. For two queries running in parallel, Athena invokes twice the number of concurrent executions.

Diagram 1 shows how Athena Federated Queries work. When you submit a federated query to Athena, Athena will invoke the right Lambda-based connector to connect with your Data Source. Athena will fan out Lambda invocations to read metadata and data in parallel.

Diagram 1: Athena Federated Query Architecture

Example

This blog post demonstrates how data analysts can query data in multiple databases for faster analysis in one SQL query. For illustration purposes, consider an imaginary e-commerce company whose architecture leverages the following purpose-built data sources:

Imagine a hypothetical e-commerce company who’s architecture uses:

  1. Payment transaction records stored in Apache HBase running on EMR
  2. Active orders, defined as customer orders not yet delivered, stored in Redis so that the processing engine can retrieve such orders quickly.
  3. Customer data such as email addresses, shipping information, etc., stored in DocumentDB.
  4. Product Catalog stored in Aurora.
  5. Order Processor’s log events housed in Amazon CloudWatch Logs.
  6. Historical orders and analytics in Redshift.
  7. Shipment tracking data in DynamoDB.
  8. A fleet of drivers performing last-mile delivery while using IoT-enabled tablets.

Customers of this imaginary e-commerce company have a problem. They have complained that their orders are stuck in a weird state. Some orders show as pending even though they have actually been delivered while other orders show as delivered but have not actually been shipped.

The company management has tasked the customer service analysts to determine the true state of all orders.

Using Athena federated queries

Using Athena’s query federation, the analysts can quickly analyze records from different sources. Additionally, they can setup a pipeline that can extract data from these sources, store them in Amazon S3 and use Athena to query them.

Diagram 2 shows Athena invoking Lambda-based connectors to connect with data sources that are on On Premises and in Cloud in the same query. In this diagram, Athena is scanning data from S3 and executing the Lambda-based connectors to read data from HBase on EMR, Dynamo DB, MySQL, RedShift, ElastiCache (Redis) and Amazon Aurora.

Diagram 2: Federated Query Example.

Analysts can register and use the following connectors found in this repository and run a query that:

  1. Grabs all active orders from Redis. (see athena-redis)
  2. Joins against any orders with ‘WARN’ or ‘ERROR’ events in Cloudwatch logs by using regex matching and extraction. (see athena-cloudwatch)
  3. Joins against our EC2 inventory to get the hostname(s) and status of the Order Processor(s) that logged the ‘WARN’ or ‘ERROR’. (see athena-cmdb)
  4. Joins against DocumentDB to obtain customer contact details for the affected orders. (see athena-docdb)
  5. Joins against DynamoDB to get shipping status and tracking details. (see athena-dynamodb)
  6. Joins against HBase to get payment status for the affected orders. (see athena-hbase)

Data Source Connector Registration

Analysts can register a data source connector using the Connect data source Flow in the Athena Query Editor.

  1. Choose Connect data source or Data sources on the Query Editor.
  2. Select the data source to which you want to connect, as shown in the following screenshot. You can also choose to write your own data source connector using the Query Federation SDK.
  3. Follow the rest of the steps in the UX to complete the registration. They involve configuring the connector function for your data source (as shown in the following screenshot), selecting a Name as the Catalog Name to use in your query, and providing a description.

Sample Analyst Query

Once the registration of the data source connectors is complete, the customer service analyst can write the following sample query to identify the affected orders in one SQL query, thus increasing the organization’s business velocity.

Below you’ll find a video demonstration of a sample federated query:

WITH logs 
     AS (SELECT log_stream, 
                message                                          AS 
                order_processor_log, 
                Regexp_extract(message, '.*orderId=(\d+) .*', 1) AS orderId, 
                Regexp_extract(message, '(.*):.*', 1)            AS log_level 
         FROM 
     "lambda:cloudwatch"."/var/ecommerce-engine/order-processor".all_log_streams 
         WHERE  Regexp_extract(message, '(.*):.*', 1) != 'WARN'), 
     active_orders 
     AS (SELECT * 
         FROM   redis.redis_db.redis_customer_orders), 
     order_processors 
     AS (SELECT instanceid, 
                publicipaddress, 
                state.NAME 
         FROM   awscmdb.ec2.ec2_instances), 
     customer 
     AS (SELECT id, 
                email 
         FROM   docdb.customers.customer_info), 
     addresses 
     AS (SELECT id, 
                is_residential, 
                address.street AS street 
         FROM   docdb.customers.customer_addresses),
     shipments 
     AS ( SELECT order_id, 
                 shipment_id, 
                 from_unixtime(cast(shipped_date as double)) as shipment_time,
                 carrier
        FROM lambda_ddb.default.order_shipments),
     payments
     AS ( SELECT "summary:order_id", 
                 "summary:status", 
                 "summary:cc_id", 
                 "details:network" 
        FROM "hbase".hbase_payments.transactions)
         
SELECT _key_            AS redis_order_id, 
       customer_id, 
       customer.email   AS cust_email, 
       "summary:cc_id"  AS credit_card,
       "details:network" AS CC_type,
       "summary:status" AS payment_status,
       status           AS redis_status, 
       addresses.street AS street_address, 
       shipments.shipment_time as shipment_time,
       shipments.carrier as shipment_carrier,
       publicipaddress  AS ec2_order_processor, 
       NAME             AS ec2_state, 
       log_level, 
       order_processor_log 
FROM   active_orders 
       LEFT JOIN logs 
              ON logs.orderid = active_orders._key_ 
       LEFT JOIN order_processors 
              ON logs.log_stream = order_processors.instanceid 
       LEFT JOIN customer 
              ON customer.id = customer_id 
       LEFT JOIN addresses 
              ON addresses.id = address_id 
       LEFT JOIN shipments
              ON shipments.order_id = active_orders._key_
       LEFT JOIN payments
              ON payments."summary:order_id" = active_orders._key_

Additionally, Athena writes all query results in an S3 bucket that you specify in your query. If your use-case mandates you to ingest data into S3, you can use Athena’s query federation capabilities statement to register your data source, ingest to S3, and use CTAS statement or INSERT INTO statements to create partitions and metadata in Glue catalog as well as convert data format to a supported format.

Conclusion

In this blog, we introduced Athena’s new federated query feature. Using an example, we saw how to register and use Athena data source connectors to write federated queries to connect Athena to any data source accessible by AWS Lambda from your account. Finally, we learnt that we can use federated queries to not only enable faster analytics, but also to extract, transform and load data into your datalake in S3.

Athena federated query is available in Preview in the us-east-1 (N. Virginia) region. Begin your Preview now by following these steps in the Athena FAQ.
To learn more about the feature, please see documentation the Connect to a Data Source documentation here.
To get started with using an existing connector, please follow this Connect to a Data Source guide.
To learn how to build your own data source connector using the Athena Query Federation SDK, please visit this Athena example in GitHub .

 


About the Author

Janak Agarwal is a product manager for Athena at AWS.