AWS Database Blog

Using the Data API to interact with an Amazon Aurora Serverless MySQL database

Amazon Aurora Serverless is an on-demand, automatically scaling configuration for Amazon Aurora (MySQL-compatible edition). The database automatically starts up, shuts down, and scales capacity based on your application’s needs. It enables you to run your database in the cloud without managing any database instances. It’s a simple, cost-effective option for infrequent, intermittent, or unpredictable workloads.

Recently, AWS announced the general availability of the Data API. This feature enables you to access a MySQL-compatible version of an Amazon Aurora Serverless database with a simple API endpoint and without the hassle of managing persistent database connections in your applications. If you use AWS Lambda, the Data API provides a secure way to access the database without the additional overhead for Lambda functions launched in a VPC. Also, the Data API can use database credentials stored in AWS Secrets Manager, eliminating the need to pass credentials in API calls.

In this post, I discuss how to provision an Aurora Serverless MySQL cluster using infrastructure as code (AWS CloudFormation). I provide code examples that show how the Data API can be used to issue SQL commands against the Aurora Serverless database for various use cases. If you’re looking for in-depth an example, see the full end-to-end sample Serverless application on GitHub.

As illustrated in the following figure, you can use the Data API across various types of AWS cloud-native or on-premises applications, including EC2 instances (VMs), containerized, and serverless applications:

You can use the AWS CLI and the various AWS SDKs to programmatically issue SQL commands against an Amazon Aurora Serverless database via the Data API endpoint. Programmatic commands offer advantages in the context of short-lived event-driven applications (for example, AWS Lambda) that now do not have to deal with the hassle of managing persistent database connections. Also, having a central endpoint for DB connections can reduce the load on the database when compared to a large number of individual Lambda functions managing connections of their own.

Provisioning the Amazon Aurora Serverless cluster

Start by provisioning an Amazon Aurora Serverless database. You use AWS CloudFormation to provision the database cluster. The provided template (see the following code example) includes an Aurora Serverless cluster, a Secrets Manager to generate and store database credentials, and a subnet group as resources. For a full reference to the template, see the sample code.

The template takes input parameters that are referenced by template resources, such as the following:

  • Environment type (envType)
  • Database cluster name (DBClusterName)
  • Database name (DatabaseName)
  • Database master user name (DBMasterUserName)
  • A list of subnets (DBSubnetList)

I configure Amazon Aurora to run in serverless mode (EngineMode: serverless) and to automatically pause the database (AutoPause: true) if there’s no database activity for 15 consecutive minutes (SecondsUntilAutoPause: 900). I also specify a minimum capacity (MinCapacity) of one ACU (Aurora Capacity Unit) and a maximum capacity (MaxCapacity) of four ACU.

Each ACU is a combination of processing and memory capacity. Based on these settings, Aurora Serverless automatically creates scaling rules, setting thresholds for CPU utilization, connections, and available memory. Database storage automatically scales from 10 GiB to 64 TiB, the same as storage in a standard Aurora DB cluster. For more information, see How Aurora Serverless Works.

 Aurora Serverless cluster template – code example #1

Resources:
  RDSCluster:
    Type: AWS::RDS::DBCluster
    Properties:
      DBClusterIdentifier: !Ref DBClusterName
      MasterUsername: !Join ['', ['{{resolve:secretsmanager:', !Ref DBSecret, ':SecretString:username}}' ]]
      MasterUserPassword: !Join ['', ['{{resolve:secretsmanager:', !Ref DBSecret, ':SecretString:password}}' ]]
      DatabaseName: !Ref DatabaseName
      Engine: aurora
      EngineMode: serverless
      EngineVersion: 5.6.10a
      ScalingConfiguration:
        AutoPause: true
        MaxCapacity: 4
        MinCapacity: 1
        SecondsUntilAutoPause: 900 # 15 min
      DBSubnetGroupName:
        Ref: DBSubnetGroup

You can use AWS CloudFormation dynamic references to auto-populate Aurora cluster properties. For example, MasterUsername and MasterUserPassword auto-populate from the Secrets Manager attributes username and password, respectively. For more information, see the previous and following code examples.

The username references a template’s input parameter called DBMasterUserName. The password, however, is generated dynamically and never revealed to anyone. Consequently, you never expose the database password in application or configuration code. Instead, it’s resolved at runtime when the application code retrieves the corresponding secret from Secrets Manager.

Aurora Serverless cluster template – code example #2 (resources section, continued …)

    DBSecret:
      Type: AWS::SecretsManager::Secret
      Properties:
        Name: !Sub "${EnvType}-AuroraUserSecret"
        Description: RDS database auto-generated user password
        GenerateSecretString:
          SecretStringTemplate: !Sub '{"username": "${DBMasterUserName}"}'
          GenerateStringKey: "password"
          PasswordLength: 30
          ExcludeCharacters: '"@/\'

Use the provided list of subnets as input to the template to create a subnet group resource (DBSubnetGroup) for the Aurora Serverless. For more information, see property DBSubnetGroupName in the preceding example #1 and example #3, which follows.

Aurora Serverless cluster template – code example #3 (resources section, continued …)

    DBSubnetGroup:
      Type: AWS::RDS::DBSubnetGroup
      Properties:
        DBSubnetGroupDescription: CloudFormation managed DB subnet group.
        SubnetIds:
          - !Select [0, !Ref DBSubnetList ]
          - !Select [1, !Ref DBSubnetList ]
          - !Select [2, !Ref DBSubnetList ]

After the template deploys, the stack provisions the Aurora Serverless cluster.

Using the Data API to interact with the Aurora Serverless database

Follow these steps to interact with the Aurora Serverless database.

Enabling the Data API endpoint

You must enable the Data API for the corresponding Aurora Serverless cluster to use it.

In the RDS console, select your instance and choose Network & Security, select the Data API check box, and apply the changes to the cluster immediately. For more information, see Modifying an Aurora Serverless DB cluster or the Data API public launch blog post.

You can also enable the Data API using the AWS CLI, with the following command:

aws rds modify-db-cluster --db-cluster-identifier [add-db-cluster-name-here] --enable-http-endpoint --apply-immediately

Make sure that you’re using the latest version of the AWS CLI.

Using the Query Editor for Amazon Aurora Serverless

By enabling the Data API, you can also now use the Query Editor for Amazon Aurora Serverless to issue SQL statements against the Aurora Serverless database, as shown in the following diagram. Make sure that your IAM user and role have permissions to access the Query Editor. For more information, see Using the Query Editor for Aurora Serverless.

Using the AWS SDKs to invoke the Data API

You can use the AWS CLI or the various SDKs to invoke the Data API. I walk you through several examples using the AWS SDK for Python (Boto3).

You can obtain complete versions of the code examples discussed in this post from aws-aurora-serverless-data-api-sam GitHub repo.

Start by importing the boto3 library and creating an RDSDataService client to interact with the Data API (see rds_client following object). Access the Data API functionality using the client object.

import boto3
rds_client = boto3.client('rds-data')

Then, specify the database name, database ARN, and the Secrets Manager secret ARN for your Aurora Serverless cluster as module-scope variables. In the following code example, update the values for your own environment:

database_name = “add-database-name-here”
db_cluster_arn = “add-cluster-arn-here”
db_credentials_secrets_store_arn = “add-secrets-store-arn-here”

In practice, you can export these values via the AWS CloudFormation stack that provisioned the Aurora Serverless cluster and import them automatically using scripts such as this one. For instance, look at the output parameters from the sample CloudFormation template.

The RDSDataService client provides a function called execute_statement() that enables the issuance of SQL statements against our Aurora Serverless database. This function takes the parameters discussed previously. Create a wrapper function, so you don’t have to pass these parameters for every invocation. For more information, see the following wrapper function execute_statement():

def execute_statement(sql):
    response = rds_client.execute_statement(
        secretArn=db_credentials_secrets_store_arn,
        database=database_name,
        resourceArn=db_cluster_arn,
        sql=sql
    )
    return response

This function takes a SQL statement as an input parameter, executes the statement against the Aurora Serverless database, and returns a raw response object. I use this function in several examples that follow.

Example 1 – Issuing DDL commands to create the database and a table

You can now use the wrapper execute_statement() function to execute DDL commands. For instance, create a database table.

First, create a file with the following content and name it package_table.sql:

CREATE TABLE IF NOT EXISTS package (
    package_name VARCHAR(100) NOT NULL,
    package_version VARCHAR(50) NOT NULL,
    PRIMARY KEY (package_name, package_version)
)

You use this file to create a table called package to store the name and version of software packages (for example, MySQL 5.7.21).

Next, create a MySQL database object and the package table by calling function execute_statement(), as in the following code example:

# create MySQL database object
execute_statement(f'create database if not exists {database_name}')
# create ‘package’ table
table_ddl_script_file = 'package_table.sql
print(f’Creating table from DDL file: {table_ddl_script_file}’)
with open(table_ddl_script_file, 'r') as ddl_script:
    ddl_script_content=ddl_script.read()
    execute_statement(ddl_script_content)

To create multiple tables, you can use an array to iterate through multiple DDL .sql files and use function execute_statement(), again. For more information, see the aws-aurora-serverless-data-api-sam GitHub repo.

Example 2 – Creating a simple query

Now use the function execute_statement() to run a simple query and print the raw results returned in the standard output:

response = execute_statement('select * from package')
print(response['records'])

Example 3 – Creating a parameterized query

The RDSDataService client also supports parameterized queries by allowing you to use placeholder parameters in SQL statements. Escaped input values permit the resolution of these parameters at runtime. Parameterized queries are useful to prevent SQL injection attacks.

For instance, the following SQL query uses a parameter called package_name. It’s a placeholder for a package name that resolves at runtime when the query is executed using the value from variable package_name.

sql = 'select * from package where package_name=:package_name'
package_name = 'package100'
sql_parameters = [{'name':'package_name', 'value':{'stringValue': f'{package_name}'}}]
response = execute_statement(sql, sql_parameters)
print(response['records'])

Now refactor the function execute_statement() to take an extra parameter (sql_parameters) to support parameterized SQL statements.

def execute_statement(sql, sql_parameters=[]):
    response = rds_client.execute_statement(
        secretArn=db_credentials_secrets_store_arn,
        database=database_name,
        resourceArn=db_cluster_arn,
        sql=sql,
        parameters=sql_parameters
    )
    return response

Example 4 – Formatting query results

As you’ve probably noticed already, the function execute_statement() returns raw results that likely need parsing and formatting before use. The following code example uses a couple of small Python functions (formatRecords(), formatRecord() and, formatField()) to format a list of returned records by processing individual records and fields.

# Formatting query returned Field
def formatField(field):
   return list(field.values())[0]
# Formatting query returned Record
def formatRecord(record):
   return [formatField(field) for field in record]
# Formatting query returned Field
def formatRecords(records):
   return [formatRecord(record) for record in records]
sql = 'select package_name, package_version from package'
response = execute_statement(sql)
print(formatRecords(response['records']))

Example 5 – Creating a parameterized SQL insert

You can also parameterize other SQL statements, including insert statements. For example, the following code example uses the execute_statement() function to insert tuple (“package-2”, “version-1”) into the package table.

sql = 'insert into package (package_name, package_version) values (:package_name, :package_version)'
sql_parameters = [
    {'name':'package_name', 'value':{'stringValue': 'package-2'}},
    {'name':'package_version', 'value':{'stringValue': 'version-1'}}
]
response = execute_statement(sql, sql_parameters)
print(f'Number of records updated: {response["numberOfRecordsUpdated"]}')

Example 6 – Batching SQL inserts

The Data API also supports batching by executing a SQL statement multiple times against a set of specified parameters using a single API call. Batching can lead to significant performance gains, as the overall network time to process multiple SQL statements is drastically reduced (for example, inserting hundreds of rows in a table).

The RDSDataService’s batch_execute_statement() function makes these gains possible. This function takes similar parameters as the execute_statement() function but uses a two-dimensional array for the list of parameterized values.

Wrap the RDSDataService’s batch_execute_statement() function to simplify its list of parameters and make it easier for callers to invoke the function. The following wrapper batch_execute_statement() function takes a SQL statement and a two-dimensional array (set of lists) as parameters, invokes the original batch_execute_statement() function, and returns a raw response object:

def batch_execute_statement(sql, sql_parameter_sets):
    response = rds_client.batch_execute_statement(
        secretArn=db_credentials_secrets_store_arn,
        database=database_name,
        resourceArn=db_cluster_arn,
        sql=sql,
        parameterSets=sql_parameter_sets
    )
    return response

You can now invoke your batch_execute_statement() wrapper function to insert multiple rows using a single API call.

For instance, batch insert 10 packages in your packages table. First, populate the sql_parameters_sets 2-dimensional array with package names and versions. Then call batch_execute_statement() passing the SQL insert statement and the array as parameters, as shown in the following code example:

sql = 'insert into package (package_name, package_version) values (:package_name, :package_version)'
    sql_parameter_sets = []
    for i in range(1,11):
        entry = [
                {'name':'package_name', 'value':{'stringValue': f'package{i}'}},
                {'name':'package_version', 'value':{'stringValue': 'version-1'}}
        ]
        sql_parameter_sets.append(entry)
    response = batch_execute_statement(sql, sql_parameter_sets)
    print(f'Number of records updated: {len(response["updateResults"])}')

Example 7 – Handling exceptions

Now, create a specific exception type to raise and catch database-related errors in your code. Start by creating a simple class DataAccessLayerException, as shown in the following code example:

class DataAccessLayerException(Exception):
   pass

Your new exception type can now be used to raise and catch database-related errors.

For instance, the following code example shows how the function add_package() catches any errors that can potentially happen when issuing a SQL statement against the database. It wraps and re-raises the original exception as a DataAccessLayerException exception using Python 3’s “raise from” statement:

    def add_package():
        try:
            sql = 'insert into package (package_name, package_version) values (:package_name, :package_version)'
            sql_parameters = [
                {'name':'package_name', 'value':{'stringValue': f'package-2'}},
                {'name':'package_version', 'value':{'stringValue': 'version-1'}}
            ]
            response = execute_statement(sql, sql_parameters)
            print(f'Number of records updated: {response["numberOfRecordsUpdated"]}')
        except Exception as e:
            raise DataAccessLayerException(e) from e

You can now use a try/except block to handle DataAccessLayerException exceptions when calling function add_package(), as shown in the following code example:

 try:
     add_package()
 except DataAccessLayerException as e:
     print(e)

Example 8 – Committing or rolling back transactions

The Data API supports transactions. Your code can start a transaction, execute SQL commands within the context of that transaction, and then commit the transaction. If an exception occurs during this process, the transaction can be rolled back entirely.

Now, try updating the functions execute_statement() and batch_execute_statement() to support transactions. These functions now take an optional transaction_id parameter. If the caller doesn’t provide the transaction_id value, the functions execute as normal, and changes resulting from the call commit automatically. Otherwise, an explicit call to RDSDataService’s functions commit_transaction() or rollback_transaction() is required.

Refactored to support transactions, the function execute_statement() renders as follows:

def execute_statement(sql, sql_parameters=[], transaction_id=None):
     parameters = {
         'secretArn': db_credentials_secrets_store_arn,
         'database': database_name,
         'resourceArn': db_cluster_arn,
          'sql': sql,
         'parameters': sql_parameters
     }
     if transaction_id is not None:
         parameters['transactionId'] = transaction_id
     response = rds_client.execute_statement(**parameters)
     return response

Refactored to support transactions, function batch_execute_statement() renders as follows:

def batch_execute_statement(sql, sql_parameter_sets, transaction_id=None):
     parameters = {
         'secretArn': db_credentials_secrets_store_arn,
         'database': database_name,
         'resourceArn': db_cluster_arn,
         'sql': sql,
         'parameterSets': sql_parameter_sets
     }
     if transaction_id is not None:
         parameters['transactionId'] = transaction_id
     response = rds_client.batch_execute_statement(**parameters)
     return response

Use the updated version of function batch_execute_statement() to batch insert data within a transactional context. If no errors occur, the transaction commits. Otherwise, it rolls back.

In the code example, the transaction_id passes to function batch_execute_statement() using the transaction object returned by function begin_transaction().

transaction = rds_client.begin_transaction(
     secretArn=db_credentials_secrets_store_arn,
     resourceArn=db_cluster_arn,
     database=database_name)
try:
    sql = 'insert into package (package_name, package_version) values (:package_name, :package_version)'
    sql_parameter_sets = []
    for i in range(30,40):
        entry = [
                {'name':'package_name', 'value':{'stringValue': f'package{i}'}},
                {'name':'package_version', 'value':{'stringValue': 'version-1'}}
        ]
        sql_parameter_sets.append(entry)
    response = batch_execute_statement(sql, sql_parameter_sets, transaction['transactionId'])
except Exception:
    transaction_response = rds_client.rollback_transaction(
        secretArn=db_credentials_secrets_store_arn,
        resourceArn=db_cluster_arn,
        transactionId=transaction['transactionId'])
else:
    transaction_response = rds_client.commit_transaction(
        secretArn=db_credentials_secrets_store_arn,
        resourceArn=db_cluster_arn,
        transactionId=transaction['transactionId'])
    print(f'Number of records updated: {len(response["updateResults"])}')
print(f'Transaction Status: {transaction_response["transactionStatus"]}')

Conclusion

In this post, I discussed how to provision an Aurora Serverless MySQL cluster using infrastructure as code (AWS CloudFormation). I provided code examples that showed how the Data API can be used to issue SQL commands against the Aurora Serverless database for various use cases. If you’re looking for in-depth examples, see the full end-to-end sample Serverless application on GitHub.

The Data API is available in the US East (N. Virginia), US East (Ohio), US West (Oregon), Asia Pacific (Tokyo), and Europe (Ireland) Regions. There is no charge for the API, but you pay the usual price for data transfer out of AWS.

 


About the Author

Marcilio Mendonca is a Sr. Global Consultant in the Professional Services Team at Amazon Web Services. He has helped AWS customers design, build, and deploy best-in-class, cloud-native AWS applications using VMs, containers, and serverless architectures. Before joining AWS, Marcilio was a Software Development Engineer at Amazon. Marcilio also holds a Ph.D. in Computer Science. In his spare time, he enjoys playing drums, riding his motorcycle, and spending quality time with family and friends.