ETL Job Orchestration with Matillion, Amazon DynamoDB, and AWS Lambda
Traditional ETL tools often run on a schedule. If you are due to receive a file from a vendor between 1 A.M. and 4 A.M. on the third Wednesday of the month, you likely have a job scheduled look for the file at 4 A.M. But what if the file arrives late? Or what if the file arrives a day early and is accidentally swept into an archive before the scheduler has a chance to run? The delays or miscommunications could have an adverse impact on critical business metrics.
If you are using Matillion ETL for Redshift for your ETL/ELT processing, there is another way to manage job executions with native AWS tools like Amazon Simple Queue Service, Amazon DynamoDB and AWS Lambda.
In this post, I will show you how to build an orchestration engine that will not only execute your job as your file arrives in Amazon S3, but will extend to manage thousands of jobs, as needed.
- You should already have the following resources running in your environment:
- An Amazon Redshift cluster with an IAM role authorized for S3 COPY and UNLOAD. For step-by-step instructions about launching a cluster, see Getting Started Using Databases in the Amazon Redshift Developer Guide.
- An EC2 instance running Matillion ETL for Redshift with IAM role connectivity to the Amazon Redshift cluster in #a. For information about configuring Matillion, see the AMI from the AWS Marketplace and the Support Center.
- An S3 bucket and key that can be used to trigger a job when a file is uploaded to it.
- Review the SQS message format required by Matillion ETL for Redshift.
For simplicity, I used the public names data from the U.S. Social Security Administration. Use this link to download the names data to a bucket in your AWS account: https://www.ssa.gov/oact/babynames/limits.html
You can also adapt the instructions in this post to use data you already have.
Create some SQS queues
Amazon SQS is a fully managed message queuing service for reliably communicating among distributed software components and microservices – at any scale. Using SQS, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be always available. SQS makes it simple and cost-effective to decouple and coordinate the components of a cloud application.
In the AWS Management Console, from the Messaging section, choose SQS to open the SQS console. From the Region drop-down list, choose the region where your job will run. Choose the Create New Queue button. In the Create New Queue dialog box, type a name for the listening queue.
Accept all the defaults, and then choose Create Queue.
Repeat these steps to create two additional queues. They will represent failed and succeeded queue submissions.
Create a job in Matillion
You can use your own job or you can import the sample job from this repository.
If you use the sample job, after importing your Amazon Redshift cluster, database, user name, and password, replace the information in the environments section (lines 1729 – 1746, shown here).
Matillion import job
Open Matillion in your web browser and sign in with your Matillion credentials. For information about connecting to Matillion, see the Connecting to the Instance on the Matillion site.
On the Join Project page, open or create a project.
From the Project menu, choose Import. Choose the Browse button, and then choose the example_job.json file. Choose all the jobs, environments, and variables, and then press OK. After the job is imported, in the Matillion Management Console, edit the environment named test.
The imported job is a simple, two-step job that creates the stage table, ssa_names, if it does not exist, and then loads the data from S3 to populate the table.
To configure Matillion to listen for and receive messages from a queue, choose the Project button in the upper-left corner of the Matillion Management Console. From the Project menu, choose SQS Configuration.
Choose your deployment region and the queue created for your Matillion launch queue. This is the queue that Matillion listens on.
Configure the other two queues that will be used by Matillion to write success and failure messages. In the success and failure message sections, leave the Compress boxes cleared. Press OK to save the configuration and start the listening process.
Amazon DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability. DynamoDB can easily store JSON documents. Matillion consumes messages from the queue as JSON documents, which reduces impedance mismatch when storing or sending messages.
The simplest way to create a DynamoDB table is to use the DynamoDB console. In the AWS Management Console, navigate to DynamoDB from the main menu or search bar. Choose the Create table button, and for the table name, type matillion-job-control. For the hash name, type bucket-with-prefix.
Alternatively, you can use this AWS CLI command to create your DynamoDB table.
Now add a record for your job to the table. The record contains the information to submit to the queue. You can submit the following DynamoDB JSON directly through the console by using Tables > matillion-job-control > Items > Create item > Text. You can use the JSON in the example-job-dynamodb-record.json file. Replace the values of <Your bucket name> and <your key> with the values of your bucket and key.
Alternatively, you can use this AWS CLI command to add the item to your DynamoDB table.
AWS Lambda function
Now that the table entry is complete, you create an AWS Lambda function that will be triggered when a file is uploaded to your bucket. The Lambda trigger will respond to an object creation event on your bucket, query DynamoDB for a corresponding entry, and when it finds one, submit a message to the queue monitored by Matillion. The following diagram illustrates that flow.
In the AWS Management Console, choose AWS Lambda to open the Lambda console. Choose Functions, and then choose Create a Lambda function. From the blueprints, choose s3-get-object-python.
Configure the trigger for your bucket and the event type Object Created (All).
Select the Enable trigger check box, and then choose Next. On the Configure function page, type a name for your function. Replace the code entry with the following code from matillion-job-queue-trigger.py.
Choose an IAM role that has read access to S3, DynamoDB, and SQS and the AWSLambdaBasicExecutionRole for writing to CloudWatch. Choose Next, and then choose Create function.
Testing your function
To exercise your function, you can upload one or more of the files from the Social Security Administration Names site into the target bucket and key location. You can use the Matillion Management Console to watch the job run. The queue-initiated process will be displayed in the Task pane.
To look for issues in your job, review the CloudWatch logs for your function. The CloudWatch log group will be named for your Lambda function. The log streams are generated in timestamp order.
You can also look in the SQS success and SQS failed queues to see if your SQS message was accepted or rejected by Matillion.
Extending the solution
The use of the DynamoDB as your job control allows you to write the code once and reuse it for any Matillion jobs that can be triggered when a file arrives in S3. It works well for files uploaded to the same bucket and lets you process those jobs as soon as the files arrive.
To add new jobs to the same S3 bucket, add an entry to the DynamoDB table for the bucket and key values with SQS message entries that correspond to the new job.
To add new jobs to a different S3 bucket, add an entry to the DynamoDB table, and add a trigger to your Lambda function for the S3 bucket.
About the Author
Wendy Neu has worked as a Data Architect with Amazon since January 2015. Prior to joining Amazon, she worked as a consultant in Cincinnati, OH helping customers integrate and manage their data from different unrelated data sources.