AWS Database Blog

ETL Job Orchestration with Matillion, Amazon DynamoDB, and AWS Lambda

Traditional ETL tools often run on a schedule. If you are due to receive a file from a vendor between 1 A.M. and 4 A.M. on the third Wednesday of the month, you likely have a job scheduled look for the file at 4 A.M. But what if the file arrives late? Or what if the file arrives a day early and is accidentally swept into an archive before the scheduler has a chance to run? The delays or miscommunications could have an adverse impact on critical business metrics.

If you are using Matillion ETL for Redshift for your ETL/ELT processing, there is another way to manage job executions with native AWS tools like Amazon Simple Queue Service, Amazon DynamoDB and AWS Lambda.

In this post, I will show you how to build an orchestration engine that will not only execute your job as your file arrives in Amazon S3, but will extend to manage thousands of jobs, as needed.

Prerequisites

  1. You should already have the following resources running in your environment:
    1. An Amazon Redshift cluster with an IAM role authorized for S3 COPY and UNLOAD. For step-by-step instructions about launching a cluster, see Getting Started Using Databases in the Amazon Redshift Developer Guide.
    2. An EC2 instance running Matillion ETL for Redshift with IAM role connectivity to the Amazon Redshift cluster in #a. For information about configuring Matillion, see the AMI from the AWS Marketplace and the Support Center.
    3. An S3 bucket and key that can be used to trigger a job when a file is uploaded to it.
  2. Review the SQS message format required by Matillion ETL for Redshift.

For simplicity, I used the public names data from the U.S. Social Security Administration. Use this link to download the names data to a bucket in your AWS account:  https://www.ssa.gov/oact/babynames/limits.html

You can also adapt the instructions in this post to use data you already have.

Create some SQS queues

Amazon SQS is a fully managed message queuing service for reliably communicating among distributed software components and microservices – at any scale. Using SQS, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be always available. SQS makes it simple and cost-effective to decouple and coordinate the components of a cloud application.

In the AWS Management Console, from the Messaging section, choose SQS to open the SQS console. From the Region drop-down list, choose the region where your job will run. Choose the Create New Queue button. In the Create New Queue dialog box, type a name for the listening queue.

Accept all the defaults, and then choose Create Queue.

Repeat these steps to create two additional queues. They will represent failed and succeeded queue submissions.

Create a job in Matillion

You can use your own job or you can import the sample job from this repository.

If you use the sample job, after importing your Amazon Redshift cluster, database, user name, and password, replace the information in the environments section (lines 1729 – 1746, shown here).

  "environments": [
    {
      "index": 0,
      "name": "test",
      "instanceCredentials": true,
      "credentialsName": "",
      "schema": "public",
      "encrypted": true,
      "url": "<REDSHIFT CLUSTER>",
      "port": "5439",
      "database": "<REDSHIFT DATABASE>",
      "driver": "org.postgresql.Driver",
      "user": "<REDSHIFT USER>",
      "passwordName": "<REDSHIFT USER PASSWORD>",
      "variables": {
        "file_to_load": null,
        "s3Bucket": null
      }

Matillion import job

Open Matillion in your web browser and sign in with your Matillion credentials. For information about connecting to Matillion, see the Connecting to the Instance on the Matillion site.

On the Join Project page, open or create a project.

From the Project menu, choose Import. Choose the Browse button, and then choose the example_job.json file. Choose all the jobs, environments, and variables, and then press OK. After the job is imported, in the Matillion Management Console, edit the environment named test.

The imported job is a simple, two-step job that creates the stage table, ssa_names, if it does not exist, and then loads the data from S3 to populate the table.

SQS configuration

To configure Matillion to listen for and receive messages from a queue, choose the Project button in the upper-left corner of the Matillion Management Console. From the Project menu, choose SQS Configuration.

Choose your deployment region and the queue created for your Matillion launch queue. This is the queue that Matillion listens on.

Configure the other two queues that will be used by Matillion to write success and failure messages. In the success and failure message sections, leave the Compress boxes cleared. Press OK to save the configuration and start the listening process.

DynamoDB table

Amazon DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability. DynamoDB can easily store JSON documents. Matillion consumes messages from the queue as JSON documents, which reduces impedance mismatch when storing or sending messages.

The simplest way to create a DynamoDB table is to use the DynamoDB console. In the AWS Management Console, navigate to DynamoDB from the main menu or search bar. Choose the Create table button, and for the table name, type matillion-job-control. For the hash name, type bucket-with-prefix.

Alternatively, you can use this AWS CLI command to create your DynamoDB table.

aws dynamodb create-table \
    --table-name matillion-job-control \
    --attribute-definitions \
        AttributeName=bucket-with-prefix,AttributeType=S \
    --key-schema AttributeName=bucket-with-prefix,KeyType=HASH \
    --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1

Now add a record for your job to the table. The record contains the information to submit to the queue. You can submit the following DynamoDB JSON directly through the console by using Tables > matillion-job-control > Items > Create item > Text. You can use the JSON in the example-job-dynamodb-record.json file. Replace the values of <Your bucket name> and <your key> with the values of your bucket and key.

DynamoDB JSON

{
  "bucket-value": {"S": "<Your bucket name>"},
  "bucket-with-prefix": {"S": "<Your bucket name>/<your key>"},
  "job-sqs-message": {
    "M": {"environment": {"S": "test"},
          "group": {"S": "Sample"},
          "job": {"S": "load_sample_files"},
          "project": {"S": "example_job"},
          "variables": {"M": {"file_to_load": {"S": "filename_from_event"},
                              "s3Bucket": {"S": "bucketName"}}},
          "version": {"S": "default"}}},
  "key-value": {"S": "example_job"}
}

Alternatively, you can use this AWS CLI command to add the item to your DynamoDB table.

aws dynamodb put-item \
--table-name matillion-job-control  \
--item file://example-job-dynamodb-record.json

AWS Lambda function

Now that the table entry is complete, you create an AWS Lambda function that will be triggered when a file is uploaded to your bucket. The Lambda trigger will respond to an object creation event on your bucket, query DynamoDB for a corresponding entry, and when it finds one, submit a message to the queue monitored by Matillion. The following diagram illustrates that flow.

In the AWS Management Console, choose AWS Lambda to open the Lambda console. Choose Functions, and then choose Create a Lambda function. From the blueprints, choose s3-get-object-python.

Configure the trigger for your bucket and the event type Object Created (All).

Select the Enable trigger check box, and then choose Next. On the Configure function page, type a name for your function. Replace the code entry with the following code from matillion-job-queue-trigger.py.

import boto3
import json
import collections
import uuid
import os
import datetime
from boto3.dynamodb.conditions import Key, Attr
# configure by environment
region=os.environ['AWS_REGION']
queue_name='ETL_Launchpad'
# setup the dynamoDB query pieces to retrieve the job control that matches the job
dynamodb = boto3.resource('dynamodb', region_name=region)
# dynamo table that has the SQS message to submit
job_table = dynamodb.Table('matillion-job-control')
job_table_key = 'bucket-with-prefix'
# setup the queue resource to write the message to
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=queue_name)
debug = True
# message writer for CW logs
def debug ( dict ):
    if debug:
        for i in dict:
            print "    DEBUG> %s: %s" % ( i, dict.get(i) )
# function to consume trigger event data
def get_trigger_event_data ( event ):
    for record in event['Records']:
        debug({"payload from event" : record})
        split_on='/'
        bucketName = record['s3']['bucket']['name']
        filename_from_event = record['s3']['object']['key']
        # added a value that combines the bucket and prefix for lookup to dynamoDB
        if split_on in filename_from_event:
            bucket_key = bucketName+split_on+filename_from_event.rsplit(split_on, 1)[0]
        else:
            bucket_key = bucketName
        s3trigger = collections.namedtuple('s3trigger', ['bucketName', 'filename_from_event', 'bucket_key'])
        st = s3trigger( bucketName, filename_from_event, bucket_key )
        debug({ "values": st })
        return st
# main lambda handler
def lambda_handler (event, context):
    ted = get_trigger_event_data  ( event )
    #query dynamodb for the table and value
    response = job_table.query(
        KeyConditionExpression=Key(job_table_key).eq(ted.bucket_key)
    )
    # loop through the results
    for i in response[u'Items']:
        debug({"items" : i})
        # get the list of variables
        var=i['job-sqs-message']['variables']
        # for all the variables, look for something that wants to be given a filename
        for key in var.keys():
            # if it matches, then update the value of that key with the filename received
            # Currently there are two possible values that may require replacing
            if i['job-sqs-message']['variables'][key] == "filename_from_event":
                i['job-sqs-message']['variables'][key] = ted.filename_from_event
            elif i['job-sqs-message']['variables'][key] == "bucketName":
                i['job-sqs-message']['variables'][key] = ted.bucketName
        # get just the SQS message part
        sqs_msg=i['job-sqs-message']
        debug({"  submitting to queue: " : sqs_msg})
        queue.send_message(MessageBody=json.dumps(sqs_msg))
        break

Choose an IAM role that has read access to S3, DynamoDB, and SQS and the AWSLambdaBasicExecutionRole for writing to CloudWatch. Choose Next, and then choose Create function.

Testing your function

To exercise your function, you can upload one or more of the files from the Social Security Administration Names site into the target bucket and key location. You can use the Matillion Management Console to watch the job run. The queue-initiated process will be displayed in the Task pane.

To look for issues in your job, review the CloudWatch logs for your function. The CloudWatch log group will be named for your Lambda function. The log streams are generated in timestamp order.

You can also look in the SQS success and SQS failed queues to see if your SQS message was accepted or rejected by Matillion.

Extending the solution

The use of the DynamoDB as your job control allows you to write the code once and reuse it for any Matillion jobs that can be triggered when a file arrives in S3. It works well for files uploaded to the same bucket and lets you process those jobs as soon as the files arrive.

To add new jobs to the same S3 bucket, add an entry to the DynamoDB table for the bucket and key values with SQS message entries that correspond to the new job.

To add new jobs to a different S3 bucket, add an entry to the DynamoDB table, and add a trigger to your Lambda function for the S3 bucket.


About the Author

Wendy Neu has worked as a Data Architect with Amazon since January 2015. Prior to joining Amazon, she worked as a consultant in Cincinnati, OH helping customers integrate and manage their data from different unrelated data sources.