AWS Database Blog

Capturing Data Changes in Amazon Aurora Using AWS Lambda

February 9, 2024: Amazon Kinesis Data Firehose has been renamed to Amazon Data Firehose. Read the AWS What’s New post to learn more.

Re Alvarez-Parmar is a solutions architect at Amazon Web Services. He helps enterprises achieve success through technical guidance and thought leadership. In his spare time, he enjoys spending time with his two kids and exploring outdoors.

At Amazon, we are constantly improving our services to help customers solve their operational problems. One of the features we announced last year enabled integration between Amazon Aurora and other AWS services through AWS Lambda functions and Amazon S3. In this post, we explore how this integration feature helps extend the functionality of Amazon Aurora.

This post shows how to do the following:

  • Use Lambda functions with Amazon Aurora to capture data changes in a table.
  • Export this data to Amazon Athena.
  • Visualize the data using Amazon QuickSight.

In this post, we build a serverless architecture by using the following services to collect, store, query, and visualize data in Amazon Aurora:

Serverless architecture for capturing and analyzing Aurora data changes
Consider a scenario in which an ecommerce web application uses Amazon Aurora for a database layer. The company has a sales table that captures every single sale, along with a few corresponding data items. This information is stored as immutable data in a table. Business users want to monitor the sales data and then analyze and visualize it. In this example, you take the changes in data in an Aurora database table and send it to Amazon QuickSight for real-time dashboard visualization.

By the end of this post, you will understand how to capture data events in an Aurora table and push them out to other AWS services using Lambda.

The following diagram shows the flow of data as it occurs in this tutorial:

The starting point in this architecture is a database insert operation in Amazon Aurora. When the insert statement is executed, a custom trigger calls a Lambda function and forwards the inserted data. Lambda writes the data that it received from Amazon Aurora to a Kinesis Firehose stream. Kinesis Firehose writes the data to an Amazon S3 bucket. Once the data is in an Amazon S3 bucket, it is queried in place using Amazon Athena. Finally, Amazon QuickSight uses Athena as a data source and provides a visualization of the data in Amazon S3.

Creating an Aurora database
First, create a database for the lab by following these steps in the Amazon RDS console:

  1. Choose Launch DB Instance.
  2. For Engine, choose Amazon Aurora.
  3. Choose a DB Instance Class. This example uses a small, since this is not a production database.
  4. In Multi-AZ Deployment, choose No.
  5. Configure DB Instance Identifier, Master Username, and Master Password.
  6. Launch the DB Instance.

After you create the database, use MySQL Workbench to connect to the database using the CNAME from the AWS Management Console. For information about connecting to an Aurora database, see Connecting to an Amazon Aurora DB Cluster.

The following screenshot shows the MySQL Workbench configuration:

Next, create a table in the database by running the following SQL statement:

Create Table
CREATE TABLE Sales (
InvoiceID int NOT NULL AUTO_INCREMENT,
ItemID int NOT NULL,
Category varchar(255),
Price double(10,2), 
Quantity int not NULL,
OrderDate timestamp,
DestinationState varchar(2),
ShippingType varchar(255),
Referral varchar(255),
PRIMARY KEY (InvoiceID)
)

You can now populate the table with some sample data. To generate sample data in your table, copy  and run this script. Ensure the highlighted variables are replaced with proper values.

#!/usr/bin/python
import MySQLdb
import random
import datetime

db = MySQLdb.connect(host="AURORA_CNAME",
                     user="DBUSER",
                     passwd="DBPASSWORD",
                     db="DB")

states = ("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN",
"IA","KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ",
"NM","NY","NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT","VA",
"WA","WV","WI","WY")

shipping_types = ("Free", "3-Day", "2-Day")

product_categories = ("Garden", "Kitchen", "Office", "Household")
referrals = ("Other", "Friend/Colleague", "Repeat Customer", "Online Ad")

for i in range(0,10):
    item_id = random.randint(1,100)
    state = states[random.randint(0,len(states)-1)]
    shipping_type = shipping_types[random.randint(0,len(shipping_types)-1)]
    product_category = product_categories[random.randint(0,len(product_categories)-1)]
    quantity = random.randint(1,4)
    referral = referrals[random.randint(0,len(referrals)-1)]
    price = random.randint(1,100)
    order_date = datetime.date(2016,random.randint(1,12),random.randint(1,30)).isoformat()

    data_order = (item_id, product_category, price, quantity, order_date, state,
    shipping_type, referral)

    add_order = ("INSERT INTO Sales "
                   "(ItemID, Category, Price, Quantity, OrderDate, DestinationState, \
                   ShippingType, Referral) "
                   "VALUES (%s, %s, %s, %s, %s, %s, %s, %s)")

    cursor = db.cursor()
    cursor.execute(add_order, data_order)

    db.commit()

cursor.close()
db.close()

The following shows how the table appears with the sample data:

Sending data from Amazon Aurora to Amazon S3
There are two methods available to send data from Amazon Aurora to Amazon S3:

  • Using a Lambda function
  • Using SELECT INTO OUTFILE S3

To demonstrate the ease of setting up integration between multiple AWS services, we use a Lambda function to send data to Amazon S3 using Kinesis Firehose.

Alternatively, you can use a SELECT INTO OUTFILE S3 statement to query data from an Amazon Aurora DB cluster and save it directly into text files that are stored in an Amazon S3 bucket.

Creating a Kinesis Firehose delivery stream
The next step is to create a Kinesis Firehose delivery stream since it’s a dependency of the Lambda function.

To create a delivery stream:

  1. Choose Create Delivery Stream.
  2. In Destination, choose Amazon S3.
  3. In Delivery Stream Name, type AuroraChangesToS3.
  4. In the S3 Bucket dropdown list, choose an existing bucket or create a new one.
  5. Enter a prefix if needed, and choose Next.
  6. In Data Compression, select GZIP.
  7. In IAM role, select either an existing role that has access to write to Amazon S3, or choose to automatically generate one. Choose Next.
  8. Review all the details on the screen, and choose Create Delivery Stream when you’re finished.

Creating a Lambda function
Now you can create a Lambda function that is called every time there is a change that needs to be tracked in the database table. This Lambda function passes the data to the Kinesis Firehose Delivery Stream that you created earlier.

To create the Lambda function:

  1. Sign in to the AWS Management Console, and navigate to AWS Lambda.
  2. Ensure that you are in the Region where your Amazon Aurora database is located.
  3. If you have no Lambda functions yet, choose Get Started Now. Otherwise, choose Create a Lambda Function.
  4. Choose Blank Function.
  5. Choose Next on the trigger selection screen.
  6. Paste the following code in the code window. Change the stream_name variable to the Kinesis Firehose stream that you created in the previous step.
    import boto3
    import json
    
    firehose = boto3.client('firehose')
    stream_name = ‘AuroraChangesToS3’
    
    
    def Kinesis_publish_message(event, context):
        
        firehose_data = (("%s,%s,%s,%s,%s,%s,%s,%s\n") %(event['ItemID'], 
        event['Category'], event['Price'], event['Quantity'],
        event['OrderDate'], event['DestinationState'], event['ShippingType'], 
        event['Referral']))
        
        firehose_data = {'Data': str(firehose_data)}
        print(firehose_data)
        
        firehose.put_record(DeliveryStreamName=stream_name,
        Record=firehose_data

Note the Amazon Resource Name (ARN) of this Lambda function.

Giving Aurora permissions to invoke a Lambda function
To give Amazon Aurora permissions to invoke a Lambda function, you must attach an IAM role with appropriate permissions to the cluster. For more information, see Invoking a Lambda Function from an Amazon Aurora DB Cluster.

Once you are finished, the Amazon Aurora database has access to invoke a Lambda function.

Creating a stored procedure and a trigger in Amazon Aurora
Now go back to MySQL Workbench and run the following command to create a new stored procedure. When this stored procedure is called, it invokes the Lambda function you created. Change the ARN in the following code to your Lambda function’s ARN.

DROP PROCEDURE IF EXISTS CDC_TO_FIREHOSE;
DELIMITER ;;
CREATE PROCEDURE CDC_TO_FIREHOSE (IN ItemID VARCHAR(255), 
									IN Category varchar(255), 
									IN Price double(10,2),
                                    IN Quantity int(11),
                                    IN OrderDate timestamp,
                                    IN DestinationState varchar(2),
                                    IN ShippingType varchar(255),
                                    IN Referral  varchar(255)) LANGUAGE SQL 
BEGIN
  CALL mysql.lambda_async('arn:aws:lambda:us-east-1:XXXXXXXXXXXXX:function:CDCFromAuroraToKinesis', 
     CONCAT('{ "ItemID" : "', ItemID, 
            '", "Category" : "', Category,
            '", "Price" : "', Price,
            '", "Quantity" : "', Quantity, 
            '", "OrderDate" : "', OrderDate, 
            '", "DestinationState" : "', DestinationState, 
            '", "ShippingType" : "', ShippingType, 
            '", "Referral" : "', Referral, '"}')
     );
END
;;
DELIMITER ;

Create a trigger TR_Sales_CDC on the Sales table. When a new record is inserted, this trigger calls the CDC_TO_FIREHOSE stored procedure.

DROP TRIGGER IF EXISTS TR_Sales_CDC;
 
DELIMITER ;;
CREATE TRIGGER TR_Sales_CDC
  AFTER INSERT ON Sales
  FOR EACH ROW
BEGIN
  SELECT  NEW.ItemID , NEW.Category, New.Price, New.Quantity, New.OrderDate
  , New.DestinationState, New.ShippingType, New.Referral
  INTO @ItemID , @Category, @Price, @Quantity, @OrderDate
  , @DestinationState, @ShippingType, @Referral;
  CALL  CDC_TO_FIREHOSE(@ItemID , @Category, @Price, @Quantity, @OrderDate
  , @DestinationState, @ShippingType, @Referral);
END
;;
DELIMITER ;

If a new row is inserted in the Sales table, the Lambda function that is mentioned in the stored procedure is invoked.

Verify that data is being sent from the Lambda function to Kinesis Firehose to Amazon S3 successfully. You may have to insert a few records, depending on the size of your data, before new records appear in Amazon S3. This is due to Kinesis Firehose buffering. To learn more about Kinesis Firehose buffering, see the “Amazon S3” section in Amazon Kinesis Firehose Data Delivery.

Querying data in Amazon Athena
In this section, you use the data you produced from Amazon Aurora and consume it in Amazon Athena. Athena is a serverless query service that makes it easy to analyze large amounts of data stored in Amazon S3 using Standard SQL.

First, create the Athena table:

CREATE EXTERNAL TABLE IF NOT EXISTS ecommerce_sales(
  ItemID int,
  Category string,
  Price double,
  Quantity int,
  OrderDate DATE,
  DestinationState string,
  ShippingType string,
  Referral string)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      ESCAPED BY '\\'
      LINES TERMINATED BY '\n'
LOCATION 's3://{BUCKET_NAME}/CDC/'

Scan the table to verify that data is being loaded properly:

Every time a new record is inserted in the sales table, a stored procedure is called, and it updates data in Amazon S3.

According to AWS best practices, the data in Amazon S3 should be partitioned as explained in Analyzing Data in S3 using Amazon Athena. Partitioning is not covered in this post.

Visualizing data in Amazon QuickSight
The final step is to query the Athena table from Amazon QuickSight.

  1. Go to the Amazon QuickSight console and choose New Analysis.
  2. Choose New dataset.
  3. Choose Athena as the source.
  4. Provide a name for your data source—this example uses RDS_SALES_DATA. Then choose Create data source.
  5. Choose sampledb. This is where the ecommerce_sales table is located in the Athena database. Then choose the table in the next step.
  6. Choose to directly query the data because it needs to be dynamic.
  7. Next, create a visual with price (sum) as the x-axis and orderdate (MONTH) as the y-axis. The results should appear as follows:

As data is added to the Sales table in Amazon Aurora, the data in Amazon QuickSight is updated automatically.

Final notes
Amazon QuickSight can also read data in Amazon S3 directly. However, with the method demonstrated in this post, you have the option to manipulate, filter, and combine data from multiple sources before visualizing it in Amazon QuickSight.

In this example, we dealt with data being inserted, but triggers can be activated in response to an INSERT, UPDATE, or DELETE trigger.

  • Keep the following in mind:Be careful when invoking a Lambda function from triggers on tables that experience high write traffic. This would result in a large number of calls to your Lambda function. Lambda function execution is asynchronous; however, lambda function invocation is synchronous. It generally takes about tens of milliseconds to invoke and start a Lambda function, during this the trigger is blocked and the INSERT that invoked the trigger is also blocked. These factors should be taken into account while using this method for capturing changes.
  • Similarly, Kinesis Firehose limits must be accounted for. By default, Kinesis Firehose is limited to a maximum of 5,000 records/second. For more information, see Monitoring Amazon Kinesis Firehose.

In certain cases, it may be optimal to use AWS Database Migration Service (AWS DMS) to capture data changes in Aurora and use Amazon S3 as a target. For example, AWS DMS might be a good option if you don’t need to transform data from Amazon Aurora. DMS will also capture all changes made in the cluster and not just where triggers are placed. The method used in this post gives you the flexibility to transform data from Aurora using Lambda before sending it to Amazon S3. Additionally, the architecture has the benefits of being serverless, whereas AWS DMS requires an Amazon EC2 Instance for replication.