AWS Database Blog

Using the AWS Database Migration Service, Amazon S3, and AWS Lambda for Database Analytics

Jeff Levine is a solutions architect for Amazon Web Services.

The AWS Database Migration Service (AWS DMS) supports Amazon S3 as a migration target. The services enable you to extract information from any database supported by DMS and write it to Amazon S3 in a format that can be used by almost any application. You can extract the entire database and replicate ongoing changes including additions, deletions, and updates using change data capture (CDC) technology. You can even process the changes with AWS Lambda or Amazon Kinesis Firehose. By using Lambda or Firehose, you can extend the capabilities of AWS services to existing database environments, both those within AWS and in other locations.

Overview of the example

Let’s consider an example that shows how to bring these services together. We use the AWS Database Migration service to migrate Twitter statistics produced by the City of Seattle, Washington from Amazon RDS for PostgreSQL to Amazon S3 and use AWS Lambda for analysis. We see how DMS handles both the migration of the initial database contents and ongoing change data capture. Here’s a diagram of what we plan to do.

We will do the following:

  1. Create two datasets containing Twitter account statistics. One initializes a database table. The other one performs subsequent updates to that table, processed by the change data capture feature.
  2. Setup security around the database to ensure it is only accessible by you.
  3. Create an Amazon RDS for PostgreSQL database instance, define a table, and load the first dataset.
  4. Create an Amazon S3 bucket to serve as the target.
  5. Create an AWS Lambda function to analyze S3 objects upon creation.
  6. Configure and invoke the AWS Database Migration Service to migrate the initial dataset that we just loaded into RDS to the S3 bucket. Doing this demonstrates the initial load phase of DMS and shows AWS Lambda processing the dataset.
  7. Copy the second dataset into the database. Doing this demonstrates the change data capture feature of DMS and shows AWS Lambda processing the dataset.
  8. Examine the contents of the Amazon S3 bucket to see the results of both the initial migration and the change data capture.
  9. View the results of the analysis in Amazon CloudWatch.

Prerequisites and assumptions

You will need the following:

  • An AWS account that provides access to the services shown in the diagram.
  • Working knowledge of these services.
  • A utility that connects to a PostgreSQL database, such as psql.

Additionally:

  • We will configure all services in the same VPC and region to simplify networking considerations.
  • The VPC must have an S3 endpoint.

Step 1: Download the sample datasets

  1. Go to the City of Seattle, Washington Open Data Program website.
  2. Enter Twitter statistics into the Search
  3. Choose the link City of Seattle official Twitter accounts statistics.
  4. Choose View Data, Filter, and Add a New Filter Condition.
  5. Select month for the column, is for the operator, and 16-Dec for the value. Then use the TAB key to move out of the value field. You should now see only entries that have a month of 16-Dec.
  6. Choose Export, choose CSV, and save the file to your desktop as twitter-16-Dec.csv. This file serves as the initial dataset.
  7. Select 17-Jan for the filter value, and export the file to twitter-17-Jan.csv. This file is the dataset to trigger the change data capture phase of DMS.
  8. Examine the CSV files with a text editor. We recommend a text editor because spreadsheet programs might have difficulty with the month column, which only contains a month and year.

Step 2: Create the PostgreSQL instance and set up the table

  1. Create an RDS PostgreSQL 9.5 parameter group named pg-to-s3 and set the parameter rds.logical_replication to 1. You set this parameter to support ongoing replication with the change data capture technology for AWS DMS.
  2. Create a PostgreSQL 9.5.4-R1 instance.
  3. If you are using the “Default VPC”, ensure that RDS does not have public access. Create a new “Security Group” and give it a name.
  4. You can use a Dev/Test t2.micro instance in a single Availability Zone. For this example, set the “Master Username” and “Master Password” in accordance to your organization’s security policy. Set the “DB instance identifier” name of your choice. Assign the pg-to-s3 parameter group to the instance.
  5. Make sure that you can connect to the database instance from your workstation.
  6. Go to the Configuration Details page and note the security group and endpoint assigned to the PostgreSQL instance and the endpoint. Later, you modify the security group to allow access by the AWS DMS replication instance. You also need the endpoint to set up the source for DMS. You can see an example of the Configuration Details page following.
  7. Create an Amazon Linux EC2 using t2.micro settings into the same VPC which RDS reside with the following settings:
    1. Create a new security group with no rules (not even port 22). (copy this sg-id as you will need it below in step 11)
    2. Create a new IAM role and give it a name.
    3. Attach the managed policy called “arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore”
    4. Ensure that the role is assigned to the ec2 prior to creation.
    5. Launch instance.
    6. Once the instance has launched, select the ec2, and click “Connect”
    7. Select “Sessions Manager” and click on “Connect”
    8. A new window will open and a simulated SSH session will be presented.
    9. At the command prompt: “sudo amazon-linux-extras install postgresql10 vim epel -y“
    10. Type “psql –version” to check.
    11. From your RDS instance, update the security group inbound rule to allow the database port to talk to the security group ID for this ec2.
  8. Using psql, create the database table and load the first dataset, as shown following.
    CREATE TABLE twitterstats (
    account varchar(50),
    tweets integer,
    following integer,
    followers integer,
    month varchar(6),
    likes integer,
    lists integer,
    moments integer,
    dept varchar(10),
    department varchar(50)
    );
    
    \copy twitterstats FROM 'twitter-16-Dec.csv' WITH csv HEADER;

Step 3: Create an S3 bucket, an IAM policy, and an IAM role

  1. Create an Amazon S3 bucket named pg-to-s3-bucket.
  2. Ensure “Block all Public Access” is selected in the s3 configuration.
  3. Create an AWS Identity and Access Management (IAM) policy named pg-to-s3-policy to provide access to the S3 bucket, using the definition following.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::pg-to-s3-bucket/*",
                    "arn:aws:s3:::pg-to-s3-bucket"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::pg-to-s3-bucket"
                ]
            }
        ]
    }
  4. Modify the IAM role named dms-vpc-role and attach the policy pg-to-s3-policy.
  5. View the role and make a note of the Amazon Resource Name (ARN) for the role. The ARN takes the form arn:aws:iam::############:role/dms-vpc-role where ############ is your AWS account number.

Step 4: Create the AWS Lambda function

  1. On the permissions selection, create a new policy with the following:
  2. Create a Lambda function as follows:
    1. Use the blueprint s3-get-object-python with the Python 3.7 runtime.
    2. Name your function.
    3. Create a new execution role for lambda.
    4. Enter pg-to-s3 for the bucket name.
    5. Choose Object Created (All) for the event type.
    6. Select the Enable trigger check box.
    7. Enter twitterstats for the function name.
    8. Replace all the code in the window with the following Python code:
      from __future__ import print_function
      
      import json
      import urllib
      import boto3
      import uuid
      import os
      
      s3 = boto3.client('s3')
      
      def lambda_handler(event, context):
          bucket = event['Records'][0]['s3']['bucket']['name']
          key = \
              urllib.unquote_plus(
                  event['Records'][0]['s3']['object']['key'].encode('utf8')
              )
      
          try:
              response = s3.get_object(Bucket=bucket, Key=key)
              download_path = '/tmp/{}'.format(uuid.uuid4())
              s3.download_file(bucket, key, download_path)
          except Exception as e:
              print(e)
              print("Error getting object ", key, "from bucket ", bucket)
              raise e
      
          keyhead, keytail = os.path.split(key)
      
          awkcmd = \
              "awk -F, " + \
              "-v KT=" + keytail + " " +\
              "'" + \
              "{tweets +=$2} " +\
              "END {print KT, \": Twitter account data entries loaded :\", NR}" + \
              "END {print KT, \": Total tweets across all entries :\", tweets}" + \
              "END {print KT, \": Average tweets/entry :\", int(tweets/NR)}" + \
              "' " + \
              download_path
      
          os.system(awkcmd)
          os.remove(download_path)

The preceding code fetches the S3 object and runs it through an awk program. This program prints the number of entries in the file, the total number of tweets across all entries, and the average number of tweets per entry.

  1. Update the Lambda IAM execution role. It should be named “<myFunctionName>-role-*”
  2. Attach the policy you created above: “pg-to-s3-policy

Step 5: Set up the AWS Database Migration Service

  1. Go to the AWS DMS console and choose Replication instances.
  2. Create a replication instance named pg-to-s3, as shown following, and choose Create replication instance. The Publicly accessible box isn’t checked because the DMS replication instance and the RDS instance are in the same VPC. Wait until the instance status is available before proceeding.
  3. Choose Replication instances and look at the IP address assigned to your replication instance. An example of the replication instance dashboard appears following.
  4. Modify the VPC security group that you created with the RDS instance in step 2 to allow PostgreSQL access. Allow access from the IP address for that replication instance on TCP port 5432.
  5. Go back to the DMS dashboard and choose Endpoints. Create a source endpoint for the PostgreSQL database using the instance endpoint, as shown following.

Choose Run test and test the endpoint. If the test succeeds, be sure to save the endpoint. If the test fails, verify the endpoint and security group configurations.

  1. Create a target endpoint for the S3 connection, as shown following.

Choose Run test and test the endpoint. If the test succeeds, be sure to save the endpoint. If the test fails, verify the role configuration and the S3 bucket name.

  1. Choose Tasks then choose Create tasks.
  2. For the task name, use pg-to-s3. Use the replication instance and endpoint that you created preceding. Choose Migrate existing data and replicate ongoing for the migration type.

  1. Accept the default values in the Task Settings
  2. In the Table mappings section, choose public for the schema field and twitterstats for the Table name is like field, as shown following. Choose Add selection rule.

  1. Choose Create task. The migration task then runs, because the Start task on create box is checked. The task status changes from Creating to Starting, then to Running, and finally to Load Complete. At this point, DMS has migrated the initial dataset. After this, DMS monitors the RDS database for changes.
  2. Load the second dataset into the database using psql, as shown following.
    \copy twitterstats FROM 'twitter-17-Jan.csv' WITH csv HEADER;

    The change data capture feature of DMS detects the update to the database table and migrates the additions to Amazon S3. In S3, the additions are processed in turn by the Lambda function.

Step 6: Examine the contents of the S3 bucket

Go to the Amazon S3 console and select the pg-to-s3 Choose the key (folder) named twitterstats. Choose the key public, which is the schema name. Choose twitterstats, which is the table name.

You will see two objects. One is named LOAD00000001.csv and contains the twitter-16-Dec.csv dataset that was in place before the invocation of the DMS task. The other object, which has a timestamp name, contains the data migrated through the change data capture technology. In other words, this is the twitter-17-Jan.csv dataset. You can download these files and examine them with a text editor if you want.

Step 7: Display the results from CloudWatch Logs

View the CloudWatch Logs information for the twitterstats log group. You should see something similar to what appears following.

Notice the two sets of information. One is for LOAD00000001.csv, which was processed during the initial load. The other is from the file with the timestamp as its name, which was processed by the change data capture feature. Each set shows the number of entries loaded, the total number of tweets across the entries, and the average tweets per entry. This data shows that DMS successfully migrated the initial dataset and the change dataset to Amazon S3, where it was subsequently processed with AWS Lambda.

Conclusion

The AWS Database Migration Service can migrate your data to Amazon S3, where it can be further processed by services such as AWS Lambda and Amazon Kinesis. This combination of services enables you to unite your data—even your on-premises data—with the analytical capabilities of AWS.

It’s easy to migrate your database and capture ongoing changes with just a few clicks using AWS Database Migration Service. To get started, follow our Getting Started Guide.

 

 

This post has been updated as of February 18, 2020.