Front-End Web & Mobile

Practical use cases for AWS AppSync Pipeline Resolvers – Part 2: Data Aggregation

 

This article was written by Salman Moghal, Application Architect, AWS, and Abdul Kitana, Security Architect, AWS

Overview

AWS AppSync is a fully managed service that allows to deploy Serverless GraphQL backends in the AWS cloud. It provides features that developers can use to create modern data driven applications allowing to easily query multiple databases, microservices, and APIs from a single GraphQL endpoint. Customers can leverage AppSync real-time capabilities, offline data synchronization, built-in server-side caching, fine-grained access control, security, support for business logic in the API layer using GraphQL resolvers, and more. In this article, we focus on how to implement and orchestrate backend logic directly in the AppSync GraphQL API layer using pipeline resolvers.

Resolvers are built-in functions in GraphQL that “resolve” types or fields defined in the GraphQL schema with the data in the data sources. Resolvers in AppSync can be of two types: unit resolvers or pipeline resolvers. Pipeline resolvers offer the ability to serially execute operations against multiple data sources in single API call, triggered by queries, mutations, or subscriptions.

AppSync Pipeline Resolvers significantly simplify client-side application complexity and help enforce server-side business logic controls by orchestrating requests to multiple data sources. They can be used to enforce certain aspects of the application’s logic at the API layer.

This is an article series in three parts where we use a sample application to highlight how pipeline resolvers can be used to solve common problems, such as:

You can follow the links above to access the other articles in the series. In the first article you can find a CloudFormation template that can be used to deploy all the configured services in your AWS account as well as steps necessary to create users to test the different use cases.

Our sample application is a simple web app where users sign-in and publish posts to their profile. Users have a quota or subscription that determine how many posts they can publish, and they can also add others users as friends. Finally, the content of the posts can be formatted in HTML and require to support a size of up to a few megabytes. We provide full implementation details of the use-cases listed above using an AppSync GraphQL API.

 

Use case #2 – Data aggregation

Often times in web applications, there is a need to aggregate data across multiple database tables.  A web application retrieves data from a table 1, the results from the API call are then used to perform a second or third operation against another table.  The client application ends up invoking multiple API calls across different data sources to consolidate and present data to end-users, potentially degrading the application performance and the user experience.  What if you could easily retrieve data from multiple data sources with a single call? AppSync pipeline resolvers provide an elegant server-side solution to solve this challenge using Pipeline functions.

In our application, let’s assume that every user has a list of friends. The friends attribute in the Users table contain a list of the userIds that represent the user’s friends.  Additionally, let’s also assume the Posts table contain posts from all users.  Our goal is to expose a GraphQL operation, getFriendsPosts, that retrieves a list of posts from the Posts table for the current user’s friend list stored in the Users table. The  getFriendsPosts operation requires the userId as a parameter to retrieve the user data, based on the authenticated user details in the $context.identity variable in the resolver template.

We can implement the getFriendsPosts query operation using a pipeline resolver with two functions. The Posts table has postId as a hash key with no sort key. The userId attribute is defined as a global secondary index (GSI).  The first pipeline function retrieves a list of friends from the Users table.  The second pipeline function performs a DynamoDB scan operation against the Posts table and uses a filter expression to narrow down results by comparing the userId attribute in the Posts table to match each friend’s userId from the Users table. The scan operation uses a GSI to optimize the search in the Posts table.

 

 

We use a pipeline resolver and pipeline functions to implement the getFriendsPost query operation.  Now, let’s go over the implementation details step by step.  The getFriendsPosts query operation is defined in the GraphQL schema as follows:

type Query {
    getFriendsPosts: PostConnection
}
type PostConnection {
	items: [Post]
	nextToken: String
}
type Post {
	userId: String!
	postId: ID!
	title: String!
	content: String!
}

The get friends posts pipeline resolver, bound to the getFriendsPosts query field, is depicted in the screenshot below.  Notice it has two functions, get_list_of_friends and get_friends_posts, as also highlighted in our pipeline architecture diagram above.  Additionally, we take a deeper look at the Before and After templates in the pipeline.

 

 

A pipeline resolver contains a series of Pipeline functions that allow developers to operate on supported AppSync data source types. In addition to that, pipeline resolvers also contain a Before mapping template and an After mapping template. These mapping templates execute before and after the pipeline functions execute, respectively.

To get started our pipeline resolver first retrieves the user’s identity from $context and store it in the context stash.  The stash is a way to store and access variables across multiple pipeline functions and across resolvers throughout the pipeline.

$util.qr($context.stash.put("userId", $context.identity.username))
{}

Next, we create a pipeline function get_list_of_friends that retrieves the friends list from the Users table. Note that in the request mapping template, we use the identity stored in $context.stash.userId as key value.

{
  "operation": "GetItem",
  "key": {
    "userId": { "S" : "${context.stash.userId}" }
  }
}

The response mapping template of the get_list_of_friends pipeline function simply returns the contents of the friends field (mapped to the friends list in the Users table). The data retrieved is either an array, an empty array, or undefined depending on how the value is set in the Users table. We have to catch these results in our next function to ensure our pipeline resolver does not fail.

$util.toJson($context.result.friends)

Now, we define a new pipeline function get_friends_posts that uses VTL to perform pre-processing checks on the friends array. VTL provides powerful constructs that we can leverage in Request and Response mapping templates. These constructs allow us to perform iterations, conditional checks, validation and more.

Our goal is to create a filter expression for the DynamoDB Scan operation against the Posts table that narrows down the results based on friends userIds we retrieve from the previous resolver function.

First we define a variable $expressionArr that is used to generate the final scan filter block.  This array contains a list of conditions that narrow down the scan operation results.  For each friend we receive in the friends array, we use an expression in $expressionArr that resembles userId = :placeholder.  A :placeholder is DynamoDB’s way of providing a runtime substitution for an actual expression attribute value.   Therefore, the $expressionValueMap map contains key-value pairs for every expression attribute value we have used in $expressionArr.  The following request mapping code shows how we build $expressionArr and $expressionValueMap.

#set($expressionArr = [])
#set($expressionValueMap = {})

#foreach($friend in ${context.prev.result})
  ## build a template or placeholder
  #set($expressionTemplate = ":t" + $foreach.count)
  ## now build expression array
  #set($partialExpressionStr = "userId = ${expressionTemplate}")
  $util.qr($expressionArr.add($partialExpressionStr))
  ## also build expression value map
  $util.qr($expressionValueMap.put($expressionTemplate, $util.dynamodb.toString($friend)))
#end

Next, we flatten out our $expressionArr and create a string representation of the filter expression. For a user who has three friends, the flatten filter expression string looks like userId = :t1 OR userId = :t2 OR userId = :t3. Note that the expression still contains DynamoDB placeholders for expression attribute values.

## lets now build the final expression with OR conditions
#set($expressionStr = "")
#foreach($expr in ${expressionArr})
  #if($foreach.count == $expressionArr.size())
    #set($expressionStr = "${expressionStr}${expr}")
  #else
    #set($expressionStr = "${expressionStr}${expr} OR ")
  #end
#end

Our request template uses a scan operation with a filter expression to narrow down the results for a given userId and list of friends.  We use VTL conditions to check for empty $expressionArr arrays.  It allows us to handle two edge cases:

  1. friends attribute in the Users table containing an empty array, i.e. [],
  2. friends attribute is not defined for a given user entry in the Users table.

If any of these conditions are met, the else statement of VTL condition executes.  Here we set the filter expression to the attribute_not_exists(postId) DynamoDB conditional expression function.  This function allows us to return an empty result when this request template executes.  The function attempts to filter the result of the scan operation by items that do not contain the hash key attribute postId.  Note that there is never an entry in the Posts table that exists without postId because it is the hash key.  This is a nice way to return an empty result from the resolver.  Note that we use a global secondary index userId-index to optimize the scan operation.

{
  "operation": "Scan",
  "index": "userId-index",
  "filter": {
    #if(!$expressionArr.isEmpty())
      "expression": $util.toJson($expressionStr),
      "expressionValues" : $util.toJson($expressionValueMap)
    #else
      #set($expressionStr = "attribute_not_exists(postId)")
      "expression": $util.toJson($expressionStr),
    #end
  },
  "limit": $util.defaultIfNull($ctx.args.limit, 86400),
  "nextToken": $util.toJson($util.defaultIfNullOrEmpty($ctx.args.nextToken, null)),
}

The complete request template is shown below:

#set($expressionArr = [])
#set($expressionValueMap = {})

#foreach($friend in ${context.prev.result})
  ## build a template or placeholder
  #set($expressionTemplate = ":t" + $foreach.count)
  ## now build expression array
  #set($partialExpressionStr = "userId = ${expressionTemplate}")
  $util.qr($expressionArr.add($partialExpressionStr))
  ## also build expression value map
  $util.qr($expressionValueMap.put($expressionTemplate, $util.dynamodb.toString($friend)))
#end

## lets now build the final expression with OR conditions
#set($expressionStr = "")
#foreach($expr in ${expressionArr})
  #if($foreach.count == $expressionArr.size())
    #set($expressionStr = "${expressionStr}${expr}")
  #else
    #set($expressionStr = "${expressionStr}${expr} OR ")
  #end
#end

{
  "operation": "Scan",
  "index": "userId-index",
  "filter": {
    #if(!$expressionArr.isEmpty())
      "expression": $util.toJson($expressionStr),
      "expressionValues" : $util.toJson($expressionValueMap)
    #else
      #set($expressionStr = "attribute_not_exists(postId)")
      "expression": $util.toJson($expressionStr),
    #end
  },
  "limit": $util.defaultIfNull($ctx.args.limit, 86400),
  "nextToken": $util.toJson($util.defaultIfNullOrEmpty($ctx.args.nextToken, null)),
}

The response mapping template of the get_friends_posts pipeline function simply returns the results of the scan operation:

$util.toJson($context.result)

Finally, the After mapping template is a straight-forward pass-through that takes the result from the last function execution.

$util.toJson($context.prev.result)

Let’s test our pipeline resolver.  Sample records in the Users table look like this:

{
  "email": "user1@amazon.com",
  "friends": [
    "user2"
  ],
  "name": "Test User 1",
  "subscription": {
    "subscription_tier": "Premium",
    "maxPosts" : 5
  },
  "userId": "user1"
},
{
  "email": "user2@amazon.com",
  "friends": [],
  "name": "Test User 2",
  "subscription": {
    "subscription_tier": "Premium",
    "maxPosts" : 5
  },
  "userId": "user2"
},
{
  "email": "user3@amazon.com",
  "friends": [
    "user1"
  ],
  "name": "Test User 3",
  "subscription": {
    "subscription_tier": "Premium",
    "maxPosts" : 5
  },
  "userId": "user3"
}

We need entries in the Posts table that would represent posts from friends.  We can manually create these entries as seen in the table below by using the DynamoDB console, or we can use the Queries section in the AppSync console and issue mutations accordingly to create the records:

{
  "content": "first post",
  "postId": "1001",
  "title": "title1",
  "userId": "user1"
},
{
  "content": "second post",
  "postId": "1002",
  "title": "title2",
  "userId": "user1"
},
{
  "content": "first post",
  "postId": "1003",
  "title": "title1",
  "userId": "user2"
},
{
  "content": "second post",
  "postId": "1004",
  "title": "title2",
  "userId": "user2"
},
{
  "content": "third post",
  "postId": "1005",
  "title": "title3",
  "userId": "user2"
},
{
  "content": "first post",
  "postId": "1006",
  "title": "title1",
  "userId": "user3"
},
{
  "content": "second post",
  "postId": "1007",
  "title": "title2",
  "userId": "user3"
},
{
  "content": "third post",
  "postId": "1008",
  "title": "title3",
  "userId": "user3"
}

To create the entries in the Posts table by using GraphQL mutations in the AppSync console, log in with different users by clicking the Login with User Pools button then execute the createPost mutation for each user with the sample data above.  Since the postID attribute is automatically generated by one of the resolver utility helpers, you don’t need to specify it.

We can test by invoking the getFriendsPosts query operation and confirm we get a list of posts for the user’s friends:

 

 

We can test the edge case where the friends attribute is empty or missing.  A sample record in the Users table that has no friends attribute defined is shown below.  You can modify the existing record in DynamoDB and remove the friends attribute to test.

{
  "email": "user1@amazon.com",
  "name": "Test User 1",
  "subscription": {
    "subscription_tier": "Premium",
    "maxPosts" : 5
  },
  "userId": "user1"
}

When we run the same query, we see an empty result coming back validating that our VTL conditional logic in the get_friends_post pipeline function request template is working properly.

 

 

As an alternate approach it’s possible to use a Lambda function to implement the entire pipeline resolver logic.  In the Lambda function, we could use a DynamoDB query operation to be more efficient.  DynamoDB queries only require a hash key to return results even when the table has a sort key.  The pipeline resolver implementation provided in this section does not use a query operation in the  get_friends_posts request template directory because our use case requires us to fetch results for all a specific user’s friends and not only one friend.  Ideally, a DynamoDB BatchGetItem operation should be used to consolidate multiple query operations into one call.  However, the BatchGetItem operation requires both a hash key and a sort key to be present in the query.  Since we don’t know the postId for all friends’ posts, a BatchGetItem operation doesn’t work in this scenario.  Therefore, we have to use either a scan operation with our pipeline resolvers or a Lambda function connecting to DynamoDB to achieve the desired outcome. Either way, you have good serverless options to interact with your data without managing infrastructure by defining the business logic directly in your API layer with AppSync pipeline resolvers or leveraging Lambda. We look at adding a Lambda function to our application backend business logic in next article

 

Conclusion

With pipeline resolvers, developers can compose operations, orchestrate and execute business logic directly in AppSync, allowing to easily interact with multiple data sources with a single GraphQL API call, saving network bandwidth and resources.

In this article, we showcased how flexible pipeline resolvers are when it comes to performing data aggregation from different DynamoDB tables on the backend and how to combine multiple tables in a pipeline resolver to create powerful and flexible backend business logic to retrieve the data we need with a single GraphQL API call.  In the previous article we implemented a user quota management system with pipeline resolvers directly in AppSync without the need to maintain any additional backend infrastructure. In the next article we show you how to use Direct Lambda Resolvers in a pipeline to interact with an S3 bucket where posts content is stored.