AWS Big Data Blog

Data Lake Ingestion: Automatically Partition Hive External Tables with AWS

Songzhi Liu is a Professional Services Consultant with AWS

The data lake concept has become more and more popular among enterprise customers because it collects data from different sources and stores it where it can be easily combined, governed, and accessed.

On the AWS cloud, Amazon S3 is a good candidate for a data lake implementation, with large-scale data storage. Amazon EMR provides transparent scalability and seamless compatibility with many big data applications on Hadoop. However, no matter what kind of storage or processing is used, data must be defined.

In this post, I introduce a simple data ingestion and preparation framework based on AWS Lambda, Amazon DynamoDB, and Apache Hive on EMR for data from different sources landing in S3. This solution lets Hive pick up new partitions as data is loaded into S3 because Hive by itself cannot detect new partitions as data lands.

Apache Hive

Hive is a great choice as it is a general data interfacing language thanks to its well-designed Metastore and other related projects like HCatalog. Many other Hadoop applications like Pig, Spark, and Presto, etc. can leverage the schemas defined in Hive.

Moreover, external tables make Hive a great data definition language to define the data coming from different sources on S3, such as streaming data from Amazon Kinesis, log files from Amazon CloudWatch and AWS CloudTrail, or data ingested using other Hadoop applications like Sqoop or Flume.

To maximize the efficiency of data organization in Hive, you should leverage external tables and partitioning. By properly partitioning the data, you can largely reduce the amount of data needs to be retrieved and improve the efficiency during ETL or other types of analysis.

Solving the problem with AWS services

For many of the aforementioned services or applications, data is loaded periodically, as in one batch every 15 minutes. Because Hive external tables don’t pick up new partitions automatically, you need to update and add new partitions manually; this is difficult to manage at scale. A framework based on Lambda, DynamoDB, and S3 can assist with this challenge.

Architectural diagram

As data is ingested from different sources to S3, new partitions are added by this framework and become available in the predefined Hive external tables.

Key components

S3 bucket
In this framework, S3 is the start point and the place where data is landed and stored. You will configure the S3 bucket notifications as the event source that triggers the Lambda function. When a new object is stored/copied/uploaded in the specified S3 bucket, S3 sends out a notification to the Lambda function with the key information.

Lambda function
Lambda is a serverless technology that lets you run code without a server. The Lambda function is triggered by S3 as new data lands and then adds new partitions to Hive tables. It parses the S3 object key using the configuration settings in the DynamoDB tables.

DynamoDB table
DynamoDB is a NoSQL database (key-value store) service. It’s designed for use cases requiring low latency responses, as it provides double-digit millisecond level response at scale. DynamoDB is also a great place for metadata storage, given its schemaless design and low cost when high throughput is not required. In this framework, DynamoDB stores the schema configuration, table configuration, and failed actions for reruns.

EMR cluster
EMR is the managed Hadoop cluster service. In the framework, you use Hive installed on an EMR cluster.

Walkthrough

In this walkthrough, you’ll find all the information you need to create the simple data ingestion and preparation framework.

Prerequisites

You must have the following before you can create and deploy this framework:

  • An AWS account
  • The AWS CLI installed and configured
  • An IAM user with permissions to create AWS resources (like creating the EMR cluster, Lambda function, DynamoDB tables, IAM policies and roles, etc.)

Step 1. Create the execution role for the Lambda function

  1. Open the IAM console and choose Policies, Create Policy.
  2. Choose Create Your Own Policy.
  3. For Policy Name, enter “LambdaExecutionPolicy”.
  4. For Policy Document, paste the policy text provided below and choose Create.
{
    "Version": "2012-10-17",
    "Statement": [
        {
           "Action": [
                "dynamodb:DeleteItem",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:ListTables",
                "dynamodb:PutItem",
                "dynamodb:Query",
                "dynamodb:Scan",
                "dynamodb:UpdateItem",
                "dynamodb:UpdateTable"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:dynamodb:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:*"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::*"
        }
    ]
}
  1. Choose Roles, Create New Role.
  2. For Role Name, enter “LambdaExe”.
  3. For Role Type, choose AWS Lambda.
  4. Attach the “LambdaExecutionPolicy” policy that you just created.

Step 2. Create a new Lambda function

I’ve created a deployment package for use with this function.

  1. Open the Lambda console and choose Create Function, Configure function.
  2. Fill in the blanks as indicated below. If this is your first time using Lambda, you may not see Create Function;  choose Get Started Now instead.

  1. For S3 Link URL, enter the following URL: https://s3.amazonaws.com/lambda.hive.demo/AddHivePartition.zip

If you are not using a US region, you may not be able to create the Lambda function. In this case, download the AddHivePartion.zip file from the link above and for Code entry type, select Upload a .zip file.

  1. For Handler, enter “lambda_function.handle”.
  2. For Role, select the role that you just created.
  3. Under Advanced settings, for Memory (MB), choose 128 and for Timeout, choose 5 minutes.

  1. For VPC, choose No VPC, which deploys the Lambda function in an AWS-managed VPC.

You could choose to deploy the function in your own VPC. Make sure that the route to the EMR cluster is properly configured. For more information, see Configuring a Lambda Function to Access Resources in an Amazon VPC

This Lambda function actually parses the S3 object key after a new file lands in S3. After that, it parses the key and retrieves the partition values. Then, it uses these values to create new partitions in Hive. During this process, it queries DynamoDB for partition string format configuration in order to understand the right way to parse the S3 object key.

The Lambda function leverages external Python modules (impyla, thrift_sasl, and pure_sasl) to run Hive queries using Python. The provided package has all the dependencies well packaged. For more information about the Lambda function implemented here, download and unzip the package and look at the lambda_function.py program. To customize the function, unzip the package, modify the code in lambda_function.py, and recompress it.

Note: You need to compress all the files in the folder instead of compressing the folder.

Step 3. Configure the S3 buckets where your data will land and link it with the Lambda function

S3 provides configuration options to send out notifications as certain events happen.

  1. Open the S3 console and create a new bucket or use an existing bucket.
  2. Select the icon to the left of the bucket name as shown below to bring up the bucket properties.

  1. On the right side, choose Events, fill the fields as in the following screenshot, and then choose Save.

In the Prefix and Suffix fields, you could further limit the scope that will trigger the notifications by providing a prefix like demo/testtriggerdata/data or suffix like gz. If not specified, all the objects created in the bucket trigger the notification.

Step 4. Launch the EMR cluster with Hive

In this step, you launch the cluster in a public subnet. If you have a cluster running with Hive already, then you just need to note the public DNS name.

Again, you could choose to launch the cluster in a private subnet inside your VPC. If you choose to do so and you chose No VPC in Step 2, you need to configure a NAT instance for the cluster and enable the routes and security groups to allow traffic to the NAT instance. For more information about creating a EMR cluster in a private subnet and configuring a NAT instance, see Setting Up a VPC to Host Clusters.

  1. Open the EMR console and choose Create cluster, Quick Options.

For more information about how to create a new EMR cluster, see Launch the Sample Cluster.

  1. Specify an EC2 key pair because you need to log onto the cluster later.
  2. Note the public DNS name.
  3. Select the master security group and choose Inbound rules.

  1. Add a Custom TCP rule for port 10000 (for HiveServer2)
  2. Add an SSH rule for port 22 and allow to anywhere. In a production environment, you would deploy both Lambda and EMR in a VPC and open the port to the Lambda security group.

For more information, see Adding Rules to a Security Group.

After the EMR cluster status changes to “Waiting”, ssh onto the cluster and type “hive” at the command line to enter the Hive interactive shell. Then, create a new Hive table using the DDL code below:

CREATE EXTERNAL TABLE wiki (
site STRING,
page STRING,
views BIGINT,
total_bytes INT)
PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LINES TERMINATED BY 'n';

Step 5. Create two DynamoDB tables for storing configurations

This is the most important part of the configuration. When a new file lands in the S3 bucket, S3 sends a notification to Lambda with the key of the new object. The configuration entries you set up in this step tell Lambda how to parse the key and get the latest partition values.

Run the following AWS CLI commands to create two tables.

Table: TestHiveSchemaSettings

aws dynamodb create-table 
--attribute-definitions AttributeName=ClusterID,AttributeType=S AttributeName=SchemaName,AttributeType=S 
--table-name TestHiveSchemaSettings 
--key-schema AttributeName=ClusterID,KeyType=HASH AttributeName=SchemaName,KeyType=RANGE 
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

Table: TestHiveTableSettings

aws dynamodb create-table 
--attribute-definitions AttributeName=RecordID,AttributeType=S AttributeName=RecordType,AttributeType=S 
--table-name TestHiveTableSettings 
--key-schema AttributeName=RecordID,KeyType=HASH AttributeName=RecordType,KeyType=RANGE 
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

Step 6. Insert configuration entries to the DynamoDB tables

Choose Items, Create item and then choose Text instead of Tree. Paste the following entries into the TestHiveSchemaSettings table that you just created:

Cluster setting entry:

{
  "ClusterID": "ClusterDemo",
  "SchemaName": "ClusterSetting",
  "Hostname":"<Put the cluster’s public DNS name created in Step 4>"
}

Schema setting entry:

{
  "ClusterID": "ClusterDemo",
  "SchemaName": "default",
  "SearchPaths": [
  "demo/testtriggerdata/data"
  ]
}

Next, insert the following entry to the TestHiveTableSettings table by pasting the following document below:

{
"RecordID": "wikistats",
  "RecordType": "TableConfig",
  "TableName": "wiki",
  "PartitionGroupingRules": [
     [
       3,
       "-"
     ]
   ]
}

To learn more about the configuration of the two DynamoDB tables that enable the AWS Lambda function to parse the object key passed by Amazon S3, see Data Lake Ingestion: Automatic External Table Partitioning with Hive and AWS DynamoDB Table Configuration Details

Step 7. Add a new file to the bucket and verify the new partition

Run the following AWS CLI command to add a new data file to S3:

aws s3 cp s3://lambda.hive.demo/demo/data/wikistats/pagecounts-20091001-000000 s3://<Your bucket name>/demo/testtriggerdata/data/wikistats/2009/10/01/ --source-region us-east-1 --region <Your current Region>     

Run the Hive command on EMR:

SELECT * FROM wiki WHERE dt = '2009-10-01' LIMIT 10;
SELECT * FROM wiki WHERE dt = '2008-10-01' LIMIT 10;
SHOW PARTITIONS wiki;

You should see that the data for 2009 is available, and the partition for 2008 is not.

Run the following command to add another file that belongs to another partition:

aws s3 cp s3://lambda.hive.demo/demo/data/wikistats/pagecounts-20081001-000000 s3://<Your Bucket Name>/demo/testtriggerdata/data/wikistats/2008/10/01/ --source-region us-east-1 --region <Your current Region>

Then run the following Hive query:

SHOW PARTITIONS wiki;

Now, partitions for both 2008 and 2009 should be available.

Summary

When data from different sources needs to be stored, combined, governed, and accessed, you can use AWS services and Apache Hive to automate ingestion. In this framework, Lambda and DynamoDB play important roles for the automation of adding partitions to Hive. DynamoDB, in particular, provides an easy way to store configuration parameters and keep runtime metadata for the Lambda function.

You could extend this framework and enable it to handle more complicated data lake ingestion use cases based on your needs and even add support for on-premises Hadoop clusters; however, remember that more configurations would be needed to invoke the Lambda function.

If you have questions or suggestions, please leave a comment below.

———————–

Related

Using Spark SQL for ETL

Want to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.