This blog post was written by Brice Pellé – Principal Solution Architect – AWS AppSync.
Amplify DataStore is a library that provides a programming model for leveraging shared and distributed data without writing additional code for offline and online scenarios. With the Amplify CLI, you can easily set up a new application that leverages AWS AppSync and sets up Amazon DynamoDB to power your DataStore application. However, there are scenarios where developers want to use DataStore to build offline-first applications while leveraging existing data sources in their AWS account or in their on-prem datacenter. In this article, I will show you how you can use the Amplify CLI to build an AppSync API, for your DataStore application, that connects to an existing Aurora MySql database using an AppSync Lambda resolver and an Amazon RDS Proxy.
Starting point
I am going to build a backend that accesses two tables (Posts
and Comments
) in an existing Aurora MySQL database. These tables are used to power a blog and already contain data. The database is only accessible from a VPC, and does not allow public access. There is a RDS Proxy, with IAM database authentication, configured that a Lambda function can use to connect to the database.
Preparing the database
I start by adding metadata columns to the tables to turn them into versioned data sources. I introduce a _datastore_uuid
column that stores the ID provided by DataStore when a new item is created. This column will be mapped back to the id
field when data is returned to the client. It is also used to retrieve and lock a row during update operations. The tables’ own id
field is not sent back to the client.
You can find more information about the added _version
, _deleted
, _lastChangedAt
, and _ttl
columns in the Conflict Detection and Sync documentation. Notice that I update the createdAt
and updatedAt
columns to automatically set the current time on creation and update.
I introduce two new Delta Sync tables, one for each of our model tables. A Delta Sync table tracks changes to a versioned data source and optimizes incremental updates. When our client comes online, it will query the AppSync API to synchronize any changes that may have occurred while it was offline. AppSync will then call the Lambda resolver to fetch the data. The Lambda function will determine whether to sync from the base tables, or to return the records of change from the Delta Sync table for an efficient and incremental update.
Finally, I set up an event to remove deleted items on a daily schedule. You can optionally turn this off or adjust the schedule frequency. For your Aurora MySQL database, make sure you set the event_scheduler
to ON
in your parameter group.
Configuring the backend with the Amplify CLI
The current version of the Amplify CLI makes it easy to configure a DynamoDB-based Delta Sync data source that works with the Amplify DataStore client. I use this functionality to set up a GraphQL schema that represents my data model, and that implements the same GraphQL operations that the DataStore client expects: the sync queries, and the create, update, and delete mutations. I will then connect these fields to my database tables using an AppSync Lambda resolver created with the amplify @function
directive.
I start by creating a new amplify project in my app directory with amplify init
, and walk through the prompts. I then set up my functions. I will be using mysql2 to connect to the database. I’ll configure that in a layer that my Lambda function will use.
I then create the lambda function dataStoreLink
, and configure the attached layer.
The function needs to be configured to access the Database Proxy in my VPC. I update the generated cloudformation JSON template at ./amplify/backend/function/dataStoreLink/datastoreLink-cloudformation-template.json
to add the proper permissions and environment variables. To the Lambda execution role, I add a managed policy ARN to allow the function to access ENIs (read more about configuring Lambda for VPC access here, and examples with AWS AppSync here):
To the Lambda function definition, I add my VPC configuration and the environment variables my function will use to connect:
I add an inline policy resource that allows the function to use my database proxy:
Next, I set up a new AppSync API, and make sure to enable conflict detection with the auto merge resolution strategy.
I model my database tables and relationships in my GraphQL schema using the @model
directive. I do this in a file called ./base-schema.graphql
. Setting queries to null
in the @model
directive turns off all query fields except for the synch operations. Those are the only ones we need.
I then execute the following script to generate my actual model and final artifacts.
At step 6, I update all the query and mutation fields with the @function
directive that specifies the dataStoreLink
Lambda function. Compiling the schema again at step 7 generates the required resolvers, AppSync functions, and defines an IAM role that allows AppSync to call the Lambda function. At step 8, I update the resolver response mapping templates. This new custom template checks to see if the Lambda function returned errors. If so, the resolver will trigger an AppSync error that is sent back to the application. This allows the DataStore client to handle errors such as unhandled conflict.
Note: after pushing your changes to the cloud with amplify push
, your models may get overwritten. Simply restore them with this command:
Reviewing the Lambda function code
With this done, I have all the assets needed to deploy my API. Let’s take a closer look at the dataStoreLink
function implementation at ./amplify/backend/function/dataStoreLink/src/index.js
. Keep in mind that I’m using the database proxy with the AWS signer to connect to my database, but you can use any method that is appropriate for your use case.
const AWS = require('aws-sdk')
const mysql = require('mysql2/promise')
const { RDS_PROXY_URL, DATABASE, USERNAME, REGION } = process.env
const DeltaSyncConfig = {
DeltaSyncTableTTL: 30,
BaseTableTTL: 30 * 24 * 60,
}
const MIN_TO_MILLI = 60 * 1_000
const DELTA_SYNC_PREFIX = 'DeltaSync'
const signer = new AWS.RDS.Signer({
region: REGION,
port: 3306,
username: USERNAME,
hostname: RDS_PROXY_URL,
})
const initConn = () => {
const connectionConfig = {
host: RDS_PROXY_URL,
database: DATABASE,
user: USERNAME,
ssl: 'Amazon RDS',
authPlugins: { mysql_clear_password: () => () => signer.getAuthToken() },
}
return mysql.createConnection(connectionConfig)
}
const tableName = (belongsTo) =>
belongsTo[0].toUpperCase() + belongsTo.slice(1) + 's'
const deltaSyncTable = (baseTable) => DELTA_SYNC_PREFIX + baseTable
const toModel = (row, belongsTo) => {
const mysql_id = row.id
let pid, _deleted
const id = row._datastore_uuid || `datastore-uuid-${row.id}`
if (belongsTo) {
pid = row.parentUUID
_deleted = row.parentDeleted
}
return {
...row,
mysql_id,
id,
_lastChangedAt: parseInt(new Date(row._lastChangedAt).getTime()),
...(belongsTo && pid && _deleted !== undefined
? { [belongsTo]: { id: pid, _deleted } }
: null),
}
}
const _runQuery = async (conn, sql, values) => {
console.log(`execute sql >`)
console.log(sql.trim().replace(/\s+/g, ' '))
console.log(`with values >`)
console.log(JSON.stringify(values, null, 2))
const [result] = await conn.query(sql, values)
console.log(`result >`)
console.log(JSON.stringify(result, null, 2))
return result
}
const _selectRow = async ({ table, lookupId, belongsTo, connection }) => {
let sql = null
if (belongsTo) {
const parentTable = tableName(belongsTo)
sql = `
SELECT ${table}.*, ${parentTable}._datastore_uuid as parentUUID, ${parentTable}._deleted as parentDeleted
FROM ${table}
LEFT JOIN ${parentTable} ON ${table}.${belongsTo}ID = ${parentTable}.id
WHERE ${table}.id = ?`
} else {
sql = `SELECT * FROM ${table} WHERE id = ?`
}
const values = [lookupId]
const [row] = await _runQuery(connection, sql, values)
return row
}
const _writeToDeltaSyncTable = async ({ row, table, connection }) => {
const ds = Object.assign({}, row)
delete ds.id
delete ds._ttl
delete ds.parentUUID
delete ds.parentDeleted
const keys = Object.keys(ds)
const sql = `INSERT INTO ${deltaSyncTable(table)} (${keys.join(
','
)}, _ttl) VALUES(${keys
.map((k) => '?')
.join(',')}, TIMESTAMPADD(MINUTE, ?, CURRENT_TIMESTAMP(3)))`
const values = keys.map((k) => ds[k])
values.push(DeltaSyncConfig.DeltaSyncTableTTL)
return await _runQuery(connection, sql, values)
}
const _doUpdateTransactionWithRowLock = async ({
sql,
values,
uuid,
table,
connection,
belongsTo,
}) => {
await connection.query(`START TRANSACTION`)
const locksql = `SELECT id FROM ${table} WHERE _datastore_uuid=? LOCK IN SHARE MODE;`
const [existing] = await _runQuery(connection, locksql, [uuid])
const result = await _runQuery(connection, sql, values)
const row = await _selectRow({
table,
lookupId: existing.id,
belongsTo,
connection,
})
await connection.query('COMMIT;')
if (result.affectedRows !== 1) {
console.error('Error: version mismatch on item')
return {
data: toModel(row, belongsTo),
errorMessage: 'Conflict',
errorType: 'ConflictUnhandled',
}
}
if (row && row.id) {
await _writeToDeltaSyncTable({ row, table, connection })
}
return { data: toModel(row, belongsTo) }
}
const _query = async ({
args: { limit = 1_000, lastSync, nextToken: inNextToken },
table,
connection,
belongsTo,
}) => {
const startedAt = Date.now()
const moment = startedAt - DeltaSyncConfig.DeltaSyncTableTTL * MIN_TO_MILLI
let sql
let values = []
let offset = 0
if (inNextToken) {
const tokenInfo = JSON.parse(Buffer.from(inNextToken, 'base64').toString())
offset = tokenInfo.offset
}
if (belongsTo) {
const parentTable = tableName(belongsTo)
sql = `
SELECT ${table}.*, ${parentTable}._datastore_uuid as parentUUID, ${parentTable}._deleted as parentDeleted
FROM ${table}
LEFT JOIN ${parentTable} ON ${table}.${belongsTo}ID = ${parentTable}.id`
} else {
sql = `SELECT * FROM ${table}`
}
if (lastSync === undefined) {
sql += ` ORDER BY ${table}.id LIMIT ?, ?`
values = [offset, limit]
} else if (lastSync < moment) {
sql += ` WHERE ${table}._lastChangedAt > FROM_UNIXTIME(?/1000) ORDER BY ${table}.id LIMIT ?, ?`
values = [lastSync, offset, limit]
} else {
const dsTable = deltaSyncTable(table)
if (belongsTo) {
const parentTable = tableName(belongsTo)
sql = `
SELECT ${dsTable}.*, ${parentTable}._datastore_uuid as parentUUID, ${parentTable}._deleted as parentDeleted
FROM ${dsTable}
LEFT JOIN ${parentTable} ON ${dsTable}.${belongsTo}ID = ${parentTable}.id`
} else {
sql = `SELECT ${dsTable}.* FROM ${dsTable}`
}
sql += ` WHERE ${dsTable}._lastChangedAt > FROM_UNIXTIME(?/1000) ORDER BY ${dsTable}.id LIMIT ?, ?`
values = [lastSync, offset, limit]
}
const rows = await _runQuery(connection, sql, values)
let nextToken = null
if (rows.length >= limit) {
nextToken = Buffer.from(
JSON.stringify({ offset: offset + rows.length })
).toString('base64')
}
const items = rows.map((row) => toModel(row, belongsTo))
return { data: { items, startedAt, nextToken } }
}
const _create = async ({ args: { input }, table, connection, belongsTo }) => {
const { id, ...rest } = input
const item = { ...rest, _datastore_uuid: id }
if (belongsTo) {
const sql = `select id from ${tableName(
belongsTo
)} where _datastore_uuid = ?`
const values = [item[`${belongsTo}ID`]]
const [row] = await _runQuery(connection, sql, values)
item[`${belongsTo}ID`] = row.id
}
const keys = Object.keys(item)
const sql = `INSERT INTO ${table} (${keys.join(',')}) VALUES(${keys
.map((k) => '?')
.join(',')})`
const values = keys.map((k) => item[k])
const result = await _runQuery(connection, sql, values)
const row = await _selectRow({
table,
lookupId: result.insertId,
belongsTo,
connection,
})
if (row && row.id) {
await _writeToDeltaSyncTable({ row, table, connection })
}
return { data: toModel(row, belongsTo) }
}
const _update = async ({ args: { input }, table, connection, belongsTo }) => {
const { id: uuid, _version = 0, ...item } = input
const keys = Object.keys(item)
const sql = `UPDATE ${table} SET ${keys
.map((k) => k + ' = ?')
.join(
', '
)}, _version=_version+1 WHERE _datastore_uuid = ? AND _version = ?`
const values = keys.map((k) => item[k])
values.push(uuid)
values.push(_version)
return await _doUpdateTransactionWithRowLock({
sql,
values,
uuid,
table,
connection,
belongsTo,
})
}
const _delete = async ({ args: { input }, table, connection, belongsTo }) => {
const { id: uuid, _version = 0 } = input
const sql = `
UPDATE ${table} SET _deleted=true, _version=_version+1, _ttl = TIMESTAMPADD(MINUTE, ?, CURRENT_TIMESTAMP(3))
WHERE _datastore_uuid = ? AND _version = ?`
const values = [DeltaSyncConfig.BaseTableTTL, uuid, _version]
return await _doUpdateTransactionWithRowLock({
sql,
values,
uuid,
table,
connection,
belongsTo,
})
}
const operations = {
syncPosts: { fn: _query, table: 'Posts' },
createPost: { fn: _create, table: 'Posts' },
updatePost: { fn: _update, table: 'Posts' },
deletePost: { fn: _delete, table: 'Posts' },
syncComments: { fn: _query, table: 'Comments', belongsTo: 'post' },
createComment: { fn: _create, table: 'Comments', belongsTo: 'post' },
updateComment: { fn: _update, table: 'Comments', belongsTo: 'post' },
deleteComment: { fn: _delete, table: 'Comments', belongsTo: 'post' },
}
exports.handler = async (event) => {
try {
console.log(`passed event >`, JSON.stringify(event, null, 2))
const { fieldName: operation, arguments: args } = event
if (operation in operations) {
const connection = await initConn()
const { fn, table, belongsTo } = operations[operation]
const result = await fn.apply(undefined, [
{ table, args, connection, belongsTo },
])
await connection.end()
return result
}
} catch (error) {
console.log(`Error: unhandled error >`, JSON.stringify(error, null, 2))
return {
data: null,
errorMessage: error.message || JSON.stringify(error),
errorType: 'InternalFailure',
}
}
}
When the function is triggered, it looks at the fieldName that it is resolving, and calls the appropriate function with the specific table for the model. If you take a closer look at the _query
function, you see that it applies the steps outlined in Sync Operations to query either the base tables or the Delta Sync tables. The _update
and _delete
functions try to update a row with the exact same id
and _version
as specified in the operation. If not matching row is found, a ConflictUnhandled
error is returned. Each mutation stores the new version of a row in the corresponding Delta Sync table.
Conclusion
In this article, I showed you how to leverage the Amplify CLI to build an AppSync powered backend for your DataStore application that connects to an existing relational database. Customizing your DataStore datasource is possible by implementing the query and mutation fields that the DataStore client uses, following the design as outlined in Conflict Detection and Sync, and using a Lambda function. This allows you to implement offline-first applications that connect to a wide variety of resources in your VPC, in your AWS account, or even in your on-prem datacenter. For example, you could connect to an Amazon Timestream table to power a read-only mobile app that allows users to review their latest logs at any time. Or, you could connect to an on-prem PostgreSQL database to power an offline-first field service application that allows agents to work anywhere and synch data when connectivity is available.