AWS Big Data Blog

Extracting and joining data from multiple data sources with Athena Federated Query

With modern day architectures, it’s common to have data sitting in various data sources. We need proper tools and technologies across those sources to create meaningful insights from stored data. Amazon Athena is primarily used as an interactive query service that makes it easy to analyze unstructured, semi-structured, and structured data stored in Amazon Simple Storage Service (Amazon S3) using standard SQL. With the federated query functionality in Athena, you can now run SQL queries across data stored in relational, non-relational, object, and custom data sources and store the results back in Amazon S3 for further analysis.

The goals for this series of posts are to discuss how we can configure different connectors to run federated queries with complex joins across different data sources, how to configure a user-defined function for redacting sensitive information when running Athena queries, and how we can use machine learning (ML) inference to detect anomaly detection in datasets to help developers, big data architects, data engineers, and business analysts in their daily operational routines.

Athena Federated Query

Athena uses data source connectors that run on AWS Lambda to run federated queries. A data source connector is a piece of code that translates between your target data source and Athena. You can think of a connector as an extension of Athena’s query engine. Prebuilt Athena data source connectors exist for data sources like Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB, Amazon Elasticsearch Service (Amazon ES), Amazon ElastiCache for Redis, and JDBC-compliant relational data sources such as MySQL, PostgreSQL, and Amazon RedShift under the Apache 2.0 license. You can also use the Athena Query Federation SDK to write custom connectors. After you deploy data source connectors, the connector is associated with a catalog name that you can specify in your SQL queries. You can combine SQL statements from multiple catalogs and span multiple data sources with a single query.

When a query is submitted against a data source, Athena invokes the corresponding connector to identify parts of the tables that need to be read, manages parallelism, and pushes down filter predicates. Based on the user submitting the query, connectors can provide or restrict access to specific data elements. Connectors use Apache Arrow as the format for returning data requested in a query, which enables connectors to be implemented in languages such as C, C++, Java, Python, and Rust. Because connectors run in Lambda, you can use them to access data from any data source on the cloud or on premises that is accessible from Lambda.

The first post of this series discusses how to configure Athena Federated Query connectors and use them to run federated queries for data residing in HBase on Amazon EMR, Amazon Aurora MySQL, DynamoDB, and ElastiCache for Redis databases.

Test data

To demonstrate Athena federation capabilities, we use the TPCH sample dataset. TPCH is a decision support benchmark and has broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, run queries with a high degree of complexity, and give answers to critical business questions. For our use case, imagine a hypothetical ecommerce company with the following architecture:

  • Lineitems processing records stored in HBase on Amazon EMR to meet requirements for a write-optimized data store with high transaction rate and long-term durability
  • ElastiCache for Redis stores Nations and ActiveOrders tables so that the processing engine can get fast access to them
  • An Aurora with MySQL engine is used for Orders, Customer, and Suppliers accounts data like email addresses and shipping addresses
  • DynamoDB hosts Part and Partsupp data, because DynamoDB offers high flexibility and high performance

The following diagram shows a schematic view of the TPCH tables and their associated data stores.

Building a test environment using AWS CloudFormation

Before following along with this post, you need to create the required AWS resources in your account. To do this, we have provided you with an AWS CloudFormation template to create a stack that contains the required resources: the sample TPCH database on Amazon Relational Database Service (Amazon RDS), HBase on Amazon EMR, Amazon ElastiCache for Redis, and DynamoDB.

The template also creates the AWS Glue database and tables, S3 bucket, Amazon S3 VPC endpoint, AWS Glue VPC endpoint, Athena named queries, AWS Cloud9 IDE, an Amazon SageMaker notebook instance, and other AWS Identity and Access Management (IAM) resources that we use to implement the federated query, user-defined functions (UDFs), and ML inference functions.

This template is designed only to show how you can use Athena Federated Query, UDFs, and ML inference. This setup isn’t intended for production use without modification. Additionally, the template is created for use in the us-east-1 Region, and doesn’t work in other Regions.

Before launching the stack, you must have the following prerequisites:

  • An AWS account that provides access to AWS services
  • An IAM user with an access key and secret key to configure the AWS Command Line Interface (AWS CLI), and permissions to create an IAM role, IAM policies, and stacks in AWS CloudFormation

To create your resources, complete the following steps:

  1. Choose Launch Stack:
  2. Select I acknowledge that this template may create IAM resources.

This template creates resources that incur costs while they remain in use. Follow the cleanup steps at the end of this post to delete and clean up the resources to avoid any unnecessary charges.

  1. When the CloudFormation template is complete, record the outputs listed on the Outputs tab on the AWS CloudFormation console.

The CloudFormation stack takes approximately 20–30 minutes to complete. Check the AWS CloudFormation console and wait for the status CREATE_COMPLETE.

When stack creation is complete, your AWS account has all the required resources to implement this solution.

  1. On the Outputs tab of the Athena-Federation-Workshop stack, capture the following:
    1. S3Bucket
    2. Subnets
    3. WorkshopSecurityGroup
    4. EMRSecurityGroup
    5. HbaseConnectionString
    6. RDSConnectionString

You need all this information when setting up connectors.

  1. When the stacks are complete, check the status of the Amazon EMR steps on the Amazon EMR console.

It can take up to 15 minutes for this step to complete.

Deploying connectors and connecting to data sources

Preparing to create federated queries is a two-part process: deploying a Lambda function data source connector, and connecting the Lambda function to a data source. In the first part, you give the Lambda function a name that you can later choose on the Athena console. In the second part, you give the connector a name that you can reference in your SQL queries.

We want to query different data sources, so in the following sections we set up Lambda connectors for HBase on Amazon EMR, Aurora MySQL, DynamoDB, and Redis before we start creating complex joins across data sources using Athena federated queries. The following diagram shows the architecture of our environment.

Installing the Athena JDBC connector for Aurora MySQL

The Athena JDBC connector supports the following databases:

  • MySQL
  • PostGreSQL
  • Amazon Redshift

To install the Athena JDBC connector for Aurora MySQL, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaJdbcConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name, AthenaJdbcConnector.
    2. SecretNamePrefix – Enter AthenaJdbcFederation.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. DefaultConnectionString – Enter the RDSConnectionString value from the AWS CloudFormation outputs.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. LambdaFunctionName – Enter mysql.
    7. LambdaMemory – Leave it as the default value 3008.
    8. LambdaTimeout – Leave it as the default value 900.
    9. SecurityGroupIds – Enter the WorkshopSecurityGroup value from the AWS CloudFormation outputs.
    10. SpillPrefix – Change the default value to athena-spill/jdbc.
    11. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena JDBC connector for Aurora MySQL; you can refer to this Lambda function in your queries as lambda:mysql.

For more information about the Athena JDBC connector, see the GitHub repo.

Installing the Athena DynamoDB connector

To install Athena DynamoDB Connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaDynamoDBConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaDynamoDBConnector.
    2. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    3. AthenaCatalogName – Enter dynamo.
    4. DisableSpillEncryption – Leave it as the default value false.
    5. LambdaMemory – Leave it as the default value 3008.
    6. LambdaTimeout – Leave it as the default value 900.
    7. SpillPrefix – Enter athena-spill-dynamo.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys Athena DynamoDB connector; you can refer to this Lambda function in your queries as lambda:dynamo.

For more information about the Athena DynamoDB connector, see the GitHub repo.

Installing the Athena HBase connector

To install the Athena HBase connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaHBaseConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaHBaseConnector
    2. SecretNamePrefix – Enter hbase-*.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. AthenaCatalogName – Enter hbase.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. DefaultConnectionString – Enter the HbaseConnectionString value from the AWS CloudFormation outputs.
    7. LambdaMemory – Leave it as the default value of 3008.
    8. LambdaTimeout – Leave it as the default value of 900.
    9. SecurityGroupIds – Enter the EMRSecurityGroup value from the AWS CloudFormation outputs.
    10. SpillPrefix – Enter athena-spill-hbase.
    11. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena HBase connector; you can refer to this Lambda function in your queries as lambda:hbase.

For more information about the Athena HBase connector, see the GitHub repo.

Installing the Athena Redis connector

To install Athena Redis Connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaRedisConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaRedisConnector.
    2. SecretNameOrPrefix – Enter redis-*.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. AthenaCatalogName – Enter redis.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. LambdaMemory – Leave it as the default value 3008.
    7. LambdaTimeout – Leave it as the default value 900.
    8. SecurityGroupIds – Enter the EMRSecurityGroup value from the AWS CloudFormation outputs.
    9. SpillPrefix – Enter athena-spill-redis.
    10. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena Redis connector; you can refer to this Lambda function in your queries as lambda:redis.

For more information about the Athena Redis connector, see the GitHub repo.

Redis database and tables with the AWS Glue Data Catalog

Because Redis doesn’t have a schema of its own, the Redis connector can’t infer the columns or data type from Redis. The Redis connector needs an AWS Glue database and tables to be set up so it can associate the data to the schema. The CloudFormation template creates the necessary Redis database and tables in the Data Catalog. You can confirm this on the AWS Glue console.

Running federated queries

Now that the connectors are deployed, we can run Athena queries that use those connectors.

  1. On the Athena console, choose Get Started.
  2. Make sure you’re in the workgroup AmazonAthenaPreviewFunctionality. If not, choose Workgroups, select AmazonAthenaPreviewFunctionality, and choose Switch Workgroup.

On the Saved Queries tab, you can see a list of pre-populated queries to test.

The Sources saved query tests your Athena connector functionality for each data source, and you can make sure that you can extract data from each data source before running more complex queries involving different data sources.

  1. Highlight the first query up to the semicolon and choose Run query.

After successfully testing connections to each data source, you can proceed with running more complex queries, such as:

  • FetchActiveOrderInfo
  • ProfitBySupplierNationByYr
  • OrdersRevenueDateAndShipPrio
  • ShippedLineitemsPricingReport
  • SuppliersWhoKeptOrdersWaiting

If you see an error on the HBase query like the following, try rerunning it and it should resolve the issue.

GENERIC_USER_ERROR: Encountered an exception[java.lang.RuntimeException] from your LambdaFunction[hbase] executed in context[retrieving meta-data] with message[org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location for replica 0]

As an example of the advanced queries, the SuppliersWhoKeptOrdersWaiting query identifies suppliers whose product was part of a multi-supplier order (with current status of F) and they didn’t ship the required parts on time. This query uses multiple data sources: Aurora MySQL and HBase on Amazon EMR. As shown in the following screenshot, the query extracts data from the supplier table on Aurora MySQL, the lineitem table on HBase, and the orders tables on Aurora MySQL. The results are returned in 7.13 seconds.

Cleaning up

To clean up the resources created as part of our CloudFormation template, complete the following steps:

  1. On the Amazon S3 console, empty the bucket athena-federation-workshop-<account-id>.
  2. If you’re using the AWS CLI, delete the objects in the athena-federation-workshop-<account-id> bucket with the following code (make sure you’re running this command on the correct bucket):
    aws s3 rm s3://athena-federation-workshop-<account-id> --recursive
  3. On the AWS CloudFormation console, delete all the connectors so they’re no longer attached to the elastic network interface (ENI) of the VPC. Alternatively, go to each connector and deselect the VPC so it’s no longer attached to the VPC created by AWS CloudFormation.
  4. On the Amazon SageMaker console, delete any endpoints you created as part of the ML inference.
  5. On the Athena console, delete the AmazonAthenaPreviewFunctionality workgroup.
  6. On the AWS CloudFormation console or the AWS CLI, delete the stack Athena-Federation-Workshop.

Summary

In this post, we demonstrated the functionality of Athena federated queries by creating multiple different connectors and running federated queries against multiple data sources. In the next post, we show you how you can use the Athena Federation SDK to deploy your UDF and invoke it to redact sensitive information in your Athena queries.


About the Authors

Saurabh Bhutyani is a Senior Big Data Specialist Solutions Architect at Amazon Web Services. He is an early adopter of open-source big data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

 

 

 

Amir Basirat is a Big Data Specialist Solutions Architect at Amazon Web Services, focused on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation, where he helps customers craft distributed analytics applications on the AWS platform. Prior to his AWS Cloud journey, he worked as a big data specialist for different technology companies. He also has a PhD in computer science, where his research primarily focused on large-scale distributed computing and neural networks.