Front-End Web & Mobile
Practical use cases for AWS AppSync Pipeline Resolvers – Part 1: Quota Management
This article was written by Salman Moghal, Application Architect, AWS, and Abdul Kitana, Security Architect, AWS
Overview
AWS AppSync is a fully managed service that allows to deploy Serverless GraphQL backends in the AWS cloud. It provides features that developers can use to create modern data driven applications allowing to easily query multiple databases, microservices, and APIs from a single GraphQL endpoint. Customers can leverage AppSync real-time capabilities, offline data synchronization, built-in server-side caching, fine-grained access control, security, support for business logic in the API layer using GraphQL resolvers, and more. In this article, we focus on how to implement and orchestrate backend logic directly in the AppSync GraphQL API layer using pipeline resolvers.
Resolvers are built-in functions in GraphQL that “resolve” types or fields defined in the GraphQL schema with the data in the data sources. Resolvers in AppSync can be of two types: unit resolvers or pipeline resolvers. Pipeline resolvers offer the ability to serially execute operations against multiple data sources in single API call, triggered by queries, mutations, or subscriptions.
AppSync Pipeline Resolvers significantly simplify client-side application complexity and help enforce server-side business logic controls by orchestrating requests to multiple data sources. They can be used to enforce certain aspects of the application’s logic at the API layer.
This is an article series in three parts where we use a sample application to highlight how pipeline resolvers can be used to solve common problems, such as:
- Part 1: Implement conditional checks on the server-side before creating records in a backend database. In this example, we implement a user’s quota management feature directly in the GraphQL API.
- Part 2: Perform nested database queries, i.e. given a key that is present in two DynamoDB tables, aggregate data across tables.
- Part 3: Interact with data from heterogeneous data sources that may or may not be directly supported by AWS AppSync in a single call, i.e. DynamoDB using its built-in AppSync data source support and S3 buckets using a Lambda data source.
You can follow the links above to access the other articles in the series. In this article we explain the backend configuration as well as the initial setup using AWS CloudFormation, then move on to the first use case.
Our sample application is a simple web app where users sign-in and publish posts to their profile. Users have a quota or subscription that determine how many posts they can publish, and they can also add others users as friends. Finally, the content of the posts can be formatted in HTML and require to support a size of up to a few megabytes. We provide full implementation details of the use-cases listed above using an AppSync GraphQL API.
Initial Setup
Use the following CloudFormation template to build the entire solution, including the required Amazon Cognito User Pool, DynamoDB tables, GraphQL schema, resolvers, and data sources including a Lambda function. An S3 bucket is also created where Lambda stores and retrieves blog content. You can quickly deploy the sample AppSync backend in your own account with the following template using the AWS CloudFormation console:
Description: AWSAppSync Patterns Blog Infrastructure
Parameters:
DdbPostTableGSI1:
Type: String
Description: Post Table global secondary index name
Default: userId-index
Resources:
# ----------------------------------------------------------
# create dynamodb tables
DdbUsersTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: "users"
ProvisionedThroughput:
ReadCapacityUnits: 2
WriteCapacityUnits: 2
AttributeDefinitions:
-
AttributeName: "userId"
AttributeType: "S"
KeySchema:
-
AttributeName: "userId"
KeyType: "HASH"
DdbPostsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: "posts"
ProvisionedThroughput:
ReadCapacityUnits: 2
WriteCapacityUnits: 2
AttributeDefinitions:
-
AttributeName: "userId"
AttributeType: "S"
-
AttributeName: "postId"
AttributeType: "S"
KeySchema:
-
AttributeName: "postId"
KeyType: "HASH"
GlobalSecondaryIndexes:
-
IndexName: !Ref DdbPostTableGSI1
KeySchema:
-
AttributeName: "userId"
KeyType: "HASH"
Projection:
ProjectionType: "ALL"
ProvisionedThroughput:
ReadCapacityUnits: 2
WriteCapacityUnits: 2
# ----------------------------------------------------------
# create lambda resources
AppsyncResolverLambdaPermissionPolicy:
Type: 'AWS::Lambda::Permission'
Properties:
FunctionName: !Ref AppsyncResolverLambda
Action: 'lambda:InvokeFunction'
Principal: "appsync.amazonaws.com"
AppsyncResolverLambdaExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path : "/"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: 'allow_s3'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
Resource:
Fn::Join:
- ""
-
- "arn:aws:s3:::"
- !Ref S3BucketBlogPostContent
- "/*"
AppsyncResolverLambda:
Type: 'AWS::Lambda::Function'
DependsOn: AppsyncResolverLambdaExecutionRole
Properties:
Runtime: nodejs12.x
Timeout: 30
Description: 'AppSync post content from S3'
FunctionName: 'appsync-direct-lambda-resolver-function'
Handler: 'index.handler'
Role: !GetAtt AppsyncResolverLambdaExecutionRole.Arn
Environment:
Variables:
CONTENT_BUCKET: !Ref S3BucketBlogPostContent
Code:
ZipFile: |
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const bucketName = process.env.CONTENT_BUCKET;
exports.handler = async (event, context) => {
//console.log('Event: ' + JSON.stringify(event));
//console.log('Context: ' + JSON.stringify(context));
//console.log('Bucket: ' + bucketName);
// we don't need to validate event.prev object or event.prev.result.items length
// because get_post_content_s3_key pipeline function only passes control over to Direct Lambda
// Resolver when DynamoDB Query operation returns exactly one valid result. As per our
// DynamoDB Post schema, there should never be multiple rows for a given userId and postId
let resp = {};
try {
// create contents in s3 bucket
if (event.info.fieldName === 'createPostContent') {
if (!event.arguments.input.postId || !event.arguments.input.content) {
const errorMessage = 'missing required parameters in createPostContent';
console.error('Exception occurred: ', errorMessage);
throw new Error(errorMessage);
}
const params = {
Body: event.arguments.input.content,
ContentType: 'text/plain',
Bucket: bucketName,
Key: event.prev.result.items[0].userId + '/' + event.arguments.input.postId
};
console.log('Creating object in bucket: ' + bucketName + ', s3 params: ' + JSON.stringify(params));
const data = await s3.putObject(params).promise();
resp = {
etag: data.ETag
};
}
// get contents from s3 bucket
else if(event.info.fieldName === 'getPostContent') {
if (!event.arguments.input.postId) {
const errorMessage = 'missing required parameters in getPostContent';
console.error('Exception occurred: ', errorMessage);
throw new Error(errorMessage);
}
const params = {
Bucket: bucketName,
Key: event.prev.result.items[0].userId + '/' + event.arguments.input.postId
};
console.log('Retrieving object from bucket: ' + bucketName + ', s3 params: ' + JSON.stringify(params));
const data = await s3.getObject(params).promise();
const content = data.Body.toString('utf-8');
resp = {
content: content
};
}
else {
const errorMessage = 'unsupported operation' + event.info.fieldName;
console.error('Exception occurred: ', errorMessage);
throw new Error(errorMessage);
}
}
catch (ex) {
console.error('Exception occurred: ', ex.message);
const promise = new Promise((resolve, reject) => {
reject(ex.message);
});
return promise;
}
return resp;
};
# ----------------------------------------------------------
# create cogito resources
SNSRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "cognito-idp.amazonaws.com"
Action:
- "sts:AssumeRole"
Policies:
- PolicyName: "CognitoSNSPolicy"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action: "sns:publish"
Resource: "*"
- Effect: "Deny"
Action: "sns:publish"
Resource: "arn:aws:sns:*:*:*"
UserPool:
Type: "AWS::Cognito::UserPool"
Properties:
UserPoolName: appsync-patterns-blog-user-pool
AutoVerifiedAttributes:
- email
MfaConfiguration: "OPTIONAL"
SmsConfiguration:
ExternalId: appsync-patterns-blog-external
SnsCallerArn: !GetAtt SNSRole.Arn
Schema:
- Name: email
AttributeDataType: String
Mutable: true
Required: true
UserPoolClient:
Type: "AWS::Cognito::UserPoolClient"
Properties:
ClientName: appsync-patterns-blog-client
GenerateSecret: false
UserPoolId: !Ref UserPool
AllowedOAuthFlows:
- code
AllowedOAuthFlowsUserPoolClient: true
AllowedOAuthScopes:
- email
- phone
- openid
CallbackURLs:
- https://aws.amazon.com/cognito/
LogoutURLs:
- https://aws.amazon.com/cognito/
DefaultRedirectURI: https://aws.amazon.com/cognito/
ExplicitAuthFlows:
- ALLOW_USER_PASSWORD_AUTH
- ALLOW_USER_SRP_AUTH
- ALLOW_REFRESH_TOKEN_AUTH
PreventUserExistenceErrors: ENABLED
SupportedIdentityProviders:
- COGNITO
UserPoolDomain:
Type: AWS::Cognito::UserPoolDomain
Properties:
Domain: !Sub 'appsync-patterns-blog-${AWS::AccountId}'
UserPoolId: !Ref UserPool
# ----------------------------------------------------------
# create appsync resources
AppSyncIamRole:
Type: 'AWS::IAM::Role'
Properties:
RoleName: IamRoleForAppSyncToDynamoDB
Description: Allow AppSync to access DynamoDB tabes
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- appsync.amazonaws.com
Action:
- 'sts:AssumeRole'
Policies:
- PolicyName: AllowAccessForAppsyncResolvers
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:Query
- dynamodb:Scan
- dynamodb:UpdateItem
Resource:
- !Join
- ''
- - !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/'
- !Ref DdbUsersTable
- '*'
- !Join
- ''
- - !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/'
- !Ref DdbPostsTable
- '*'
- Effect: Allow
Action:
- lambda:invokeFunction
Resource:
- !GetAtt AppsyncResolverLambda.Arn
GraphQLApi:
Type: AWS::AppSync::GraphQLApi
Properties:
Name: appsync-patterns-blog
AuthenticationType: AMAZON_COGNITO_USER_POOLS
UserPoolConfig:
AwsRegion: !Ref AWS::Region
DefaultAction: ALLOW
UserPoolId: !Ref UserPool
AdditionalAuthenticationProviders:
- AuthenticationType: AWS_IAM
UsersDynamoDBTableDataSource:
Type: "AWS::AppSync::DataSource"
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Name: DdbUsersTable
Type: AMAZON_DYNAMODB
ServiceRoleArn: !GetAtt AppSyncIamRole.Arn
DynamoDBConfig:
AwsRegion: !Ref AWS::Region
TableName: !Ref DdbUsersTable
PostsDynamoDBTableDataSource:
Type: "AWS::AppSync::DataSource"
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Name: PostsDynamoDBTable
Type: AMAZON_DYNAMODB
ServiceRoleArn: !GetAtt AppSyncIamRole.Arn
DynamoDBConfig:
AwsRegion: !Ref AWS::Region
TableName: !Ref DdbPostsTable
LambdaDataSource:
Type: "AWS::AppSync::DataSource"
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Name: S3BlogContent
Type: AWS_LAMBDA
ServiceRoleArn: !GetAtt AppSyncIamRole.Arn
LambdaConfig:
LambdaFunctionArn: !GetAtt AppsyncResolverLambda.Arn
S3BucketBlogPostContent:
Type: 'AWS::S3::Bucket'
Properties: {}
GraphQLSchema:
Type: "AWS::AppSync::GraphQLSchema"
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Definition: |
type Mutation {
createPost(input: CreatePostInput!): Post
createPostContent(input: CreatePostContentInput!): String
}
type Query {
getFriendsPosts: PostConnection
getPostContent(input: GetPostContentInput!): String
}
input CreatePostInput {
title: String!
content: String!
}
input CreatePostContentInput {
postId: ID!
content: String!
}
input GetPostContentInput {
postId: ID!
}
type Post {
userId: String!
postId: ID!
title: String!
content: String!
}
type SubscriptionDetails {
subscription_tier: String!
maxPosts: Int!
}
type User {
userId: String!
email: String!
name: String!
subscription: SubscriptionDetails
friends: [String]
}
type PostConnection {
items: [Post]
nextToken: String
}
# Usecase 1 - AppSync Pipeline Resolver - Mutation - createPost
CreatePostPipelineResolver:
Type: "AWS::AppSync::Resolver"
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
TypeName: Mutation
FieldName: createPost
Kind: PIPELINE
PipelineConfig:
Functions:
- !GetAtt GetSubscriptionLimitFunction.FunctionId
- !GetAtt CheckNumberOfPostsAgainstLimitFunction.FunctionId
- !GetAtt CreatePostFunction.FunctionId
RequestMappingTemplate: "{}"
ResponseMappingTemplate: "$util.toJson($context.result)"
DependsOn: GraphQLSchema
# AppSync Pipeline Resolver Function - get_subscription_limit
GetSubscriptionLimitFunction:
Type: AWS::AppSync::FunctionConfiguration
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Name: get_subscription_limit
DataSourceName: !GetAtt UsersDynamoDBTableDataSource.Name
FunctionVersion: "2018-05-29"
RequestMappingTemplate: |
{
"version" : "2017-02-28",
"operation" : "GetItem",
"key" : {
"userId" : $util.dynamodb.toDynamoDBJson($context.identity.username)
}
}
ResponseMappingTemplate: |
#set($result = {})
#set($result.limit = $context.result.subscription.maxPosts)
#if($context.error)
$util.error($context.error.message, $context.error.type)
#end
$util.toJson($result)
# AppSync Pipeline Resolver Function - check_number_of_posts_against_limit
CheckNumberOfPostsAgainstLimitFunction:
Type: AWS::AppSync::FunctionConfiguration
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Name: check_number_of_posts_against_limit
DataSourceName: !GetAtt PostsDynamoDBTableDataSource.Name
FunctionVersion: "2018-05-29"
RequestMappingTemplate: |
{
"version": "2017-02-28",
"operation": "Scan",
"index": "userId-index",
"filter": {
"expression": "userId = :userId",
"expressionValues": {
":userId": $util.dynamodb.toDynamoDBJson($context.identity.username)
}
}
}
ResponseMappingTemplate: |
#set ($limit = $context.prev.result.limit)
#set ($count = $context.result.items.size())
#if ($limit <= $count)
$util.error("Posts Limit Reached: limit: ${limit}, posts by ${context.identity.username}: ${context.result.items.size()}")
#end
#if($context.error)
$util.error($context.error.message, $context.error.type)
#end
$util.toJson($result)
# AppSync Pipeline Resolver Function - create_post
CreatePostFunction:
Type: AWS::AppSync::FunctionConfiguration
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Name: create_post
DataSourceName: !GetAtt PostsDynamoDBTableDataSource.Name
FunctionVersion: "2018-05-29"
RequestMappingTemplate: |
{
"version" : "2017-02-28",
"operation" : "PutItem",
"key" : {
"postId": $util.dynamodb.toDynamoDBJson($utils.autoId())
},
"attributeValues" : {
"userId" : $util.dynamodb.toDynamoDBJson($context.identity.username),
"title": $util.dynamodb.toDynamoDBJson($context.arguments.input.title),
"content": $util.dynamodb.toDynamoDBJson($context.arguments.input.content)
},
"condition": {
"expression": "attribute_not_exists(#postId) AND attribute_not_exists(#userId)",
"expressionNames": {
"#postId": "postId",
"#userId": "userId"
}
}
}
ResponseMappingTemplate: |
#if($context.error)
$util.error($context.error.message, $context.error.type)
#end
$util.toJson($context.result)
# Usecase 2 - AppSync Pipeline Resolver - Query - getFriendsPosts
GetFriendsPostsPipelineResolver:
Type: AWS::AppSync::Resolver
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
FieldName: getFriendsPosts
TypeName: Query
Kind: PIPELINE
PipelineConfig:
Functions:
- !GetAtt GetListOfFriendsFunction.FunctionId
- !GetAtt GetFriendsPostsFunction.FunctionId
# following represents BEFORE segment in pipeline resolver
RequestMappingTemplate: |
$util.qr($context.stash.put("userId", $context.identity.username))
{}
# following represents AFTER segment in pipeline resolver
ResponseMappingTemplate: |
$util.toJson($context.prev.result)
DependsOn: GraphQLSchema
# AppSync Pipeline Resolver Function - get_list_of_friends
# Description: Function to get list of friends for current userId
GetListOfFriendsFunction:
Type: AWS::AppSync::FunctionConfiguration
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Name: get_list_of_friends
DataSourceName: !GetAtt UsersDynamoDBTableDataSource.Name
FunctionVersion: "2018-05-29"
RequestMappingTemplate: |
{
"operation": "GetItem",
"key": {
"userId": $util.dynamodb.toDynamoDBJson($context.stash.userId)
}
}
ResponseMappingTemplate: |
$util.toJson($context.result.friends)
# AppSync Pipeline Resolver Function - get_friends_posts
# Description: Function performs "scan" operation on userId column in posts table
GetFriendsPostsFunction:
Type: AWS::AppSync::FunctionConfiguration
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Name: get_friends_posts
DataSourceName: !GetAtt PostsDynamoDBTableDataSource.Name
FunctionVersion: "2018-05-29"
RequestMappingTemplate: |
#set($expressionArr = [])
#set($expressionValueMap = {})
#foreach($friend in ${context.prev.result})
## build a template or placeholder
#set($expressionTemplate = ":t" + $foreach.count)
## now build expression array
#set($partialExpressionStr = "userId = ${expressionTemplate}")
$util.qr($expressionArr.add($partialExpressionStr))
## also build expression value map
$util.qr($expressionValueMap.put($expressionTemplate, $util.dynamodb.toString($friend)))
#end
## lets now build the final expression with OR conditions
#set($expressionStr = "")
#foreach($expr in ${expressionArr})
#if($foreach.count == $expressionArr.size())
#set($expressionStr = "${expressionStr}${expr}")
#else
#set($expressionStr = "${expressionStr}${expr} OR ")
#end
#end
{
"operation": "Scan",
"index": "userId-index",
"filter": {
#if(!$expressionArr.isEmpty())
"expression": $util.toJson($expressionStr),
"expressionValues" : $util.toJson($expressionValueMap)
#else
#set($expressionStr = "attribute_not_exists(postId)")
"expression": $util.toJson($expressionStr),
#end
},
"limit": $util.defaultIfNull($context.arguments.limit, 86400),
"nextToken": $util.toJson($util.defaultIfNullOrEmpty($context.arguments.nextToken, null)),
}
ResponseMappingTemplate: |
$util.toJson($context.result)
# Usecase 3 - AppSync Resolver - Mutation - createPostContent
# CreatePostContentResolver:
# Type: AWS::AppSync::Resolver
# Properties:
# ApiId: !GetAtt GraphQLApi.ApiId
# DataSourceName: !GetAtt LambdaDataSource.Name
# FieldName: createPostContent
# TypeName: Mutation
# DependsOn: GraphQLSchema
# Usecase 3 - AppSync Pipeline Resolver - Query - getPostContent
# GetPostContentResolver:
# Type: AWS::AppSync::Resolver
# Properties:
# ApiId: !GetAtt GraphQLApi.ApiId
# DataSourceName: !GetAtt LambdaDataSource.Name
# FieldName: getPostContent
# TypeName: Query
# DependsOn: GraphQLSchema
# Usecase 3 - AppSync Pipeline Resolver - Mutation - createPostContent
CreatePostContentPipelineResolver:
Type: "AWS::AppSync::Resolver"
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
TypeName: Mutation
FieldName: createPostContent
Kind: PIPELINE
PipelineConfig:
Functions:
- !GetAtt GetPostContentS3KeyFunction.FunctionId
- !GetAtt CreatePostContentInS3Function.FunctionId
RequestMappingTemplate: "{}"
ResponseMappingTemplate: "$util.toJson($context.result)"
DependsOn: GraphQLSchema
# AppSync Pipeline Resolver Function - create_post_content_in_s3
CreatePostContentInS3Function:
Type: AWS::AppSync::FunctionConfiguration
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Name: create_post_content_in_s3
DataSourceName: !GetAtt LambdaDataSource.Name
FunctionVersion: "2018-05-29"
# Usecase 3 - AppSync Pipeline Resolver - Query - getPostContent
GetPostContentPipelineResolver:
Type: "AWS::AppSync::Resolver"
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
TypeName: Query
FieldName: getPostContent
Kind: PIPELINE
PipelineConfig:
Functions:
- !GetAtt GetPostContentS3KeyFunction.FunctionId
- !GetAtt GetPostContentFromS3Function.FunctionId
RequestMappingTemplate: "{}"
ResponseMappingTemplate: "$util.toJson($context.result)"
DependsOn: GraphQLSchema
# AppSync Pipeline Resolver Function - get_post_content_s3_key
GetPostContentS3KeyFunction:
Type: AWS::AppSync::FunctionConfiguration
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Name: get_post_content_s3_key
DataSourceName: !GetAtt PostsDynamoDBTableDataSource.Name
FunctionVersion: "2018-05-29"
RequestMappingTemplate: |
{
"version" : "2017-02-28",
"operation" : "Query",
"query" : {
"expression": "postId = :postId",
"expressionValues" : {
":postId" : $util.dynamodb.toDynamoDBJson($context.arguments.input.postId)
}
},
"filter": {
"expression": "userId = :userId",
"expressionValues" : {
":userId" : $util.dynamodb.toDynamoDBJson($context.identity.username)
}
},
"scanIndexForward": true,
"limit": $util.defaultIfNull(${context.arguments.limit}, 1000),
"nextToken": $util.toJson($util.defaultIfNullOrBlank($context.arguments.nextToken, null))
}
ResponseMappingTemplate: |
#set ($count = $context.result.items.size())
#if ($count <= 0)
$util.error("Unknown postId: ${context.arguments.input.postId}, or userId: ${context.identity.username}")
#end
#if($context.error)
$util.error($context.error.message, $context.error.type)
#end
## Pass back the result from DynamoDB. **
$util.toJson($context.result)
# AppSync Pipeline Resolver Function - get_post_content_from_s3
GetPostContentFromS3Function:
Type: AWS::AppSync::FunctionConfiguration
Properties:
ApiId: !GetAtt GraphQLApi.ApiId
Name: get_post_content_from_s3
DataSourceName: !GetAtt LambdaDataSource.Name
FunctionVersion: "2018-05-29"
Outputs:
S3BucketBlogPostContent:
Value: !Ref S3BucketBlogPostContent
Description: S3 Bucket Name for Blog Post Content
Once the CloudFormation template above is deployed, it creates an AppSync API with the following GraphQL schema:
There are two predominant types in the GraphQL schema above, User
and Post
. These types represent records stored in the DynamoDB tables Users
and Posts
, respectively:
Users Table
Attribute 1 | Attribute 2 | Attribute 3 | Attribute 4 | Attribute 5 |
---|---|---|---|---|
userId | name | subscription | friends |
- userId is a unique identifier for each user in the Users table. AppSync automatically links the user ID to the Cognito’s
sub
user attribute present in the JWT token, which is an unique identifier for the user. - subscription represents user’s quota based on their subscription tier. This field contains an object that has two fields, i.e.
subscription_tier
andmaxPosts
.maxPosts
is a numeric field that indicates how many posts a user is allowed to publish in this tier. - friends contain a list of user IDs. These make up for user’s friends. Note that this attribute could be set to an empty array [] or can be completely missing from the specific user entry.
Posts Table
Attribute 1 | Attribute 2 | Attribute 3 | Attribute 4 |
---|---|---|---|
postId | userId | title | content |
- postId is a unique identifier for each post in Posts table.
- userId represents who the post belongs to. This is a global secondary index defined for this attribute, and it is not the sort key.
- content represents the post contents in string format. We demonstrate how we can store large articles in a S3 bucket as DynamoDB is limited at 400Kb for each record size.
The use cases we discuss in all articles in this series leverage the Users
and Posts
tables as well as the resources deployed in the CloudFormation template provided earlier.
Before diving into the use cases, we must register a user in the Cognito User Pool deployed by the CloudFormation stack mentioned previously. In order to do so:
- Navigate to Amazon Cognito in the AWS Management Console and access the
appsync-patterns-blog-user-pool
- Navigate to App Integration > App Client Settings on the left-hand-side menu.
- Click Launch Hosted UI to launch Cognito hosted UI login screen.
- Click the Sign-up link at the bottom of the dialog box.
- Register a new username user1 with a valid email address. Wait for Cognito to send a confirmation code to the email address and finish the sign-up process. You may be redirected to the Amazon Cognito product launch page, feel free to close the browser window.
- Verify the user1 exists in the
appsync-patterns-blog-user-pool
and the account status is set to CONFIRMED as seen below. - Optionally, repeat these registration steps and create more users in the user pool.
Use case #1 – Quota management
Our first use case deals with implementing simple business logic that is executed before sending a request to a data source such as a DynamoDB table. We show how to implement a user quota check feature with pipeline resolvers. In this example, users submit posts that are stored in a Posts
table. Users have subscription tiers with assigned quotas that dictate the maximum number of posts they can publish with each user’s subscription limits stored in a separate Users
table. Using Pipeline Resolvers, we validate the number of posts each user has stored in the Posts
table, and check that against the limit we retrieve from the Users
table.
As per the pipeline architecture above, we create three pipeline functions. These functions enforce the logic we require:
- get_subscription_limit – retrieves the maximum number of posts a user can publish from the Users table. The table contains a
subscription
attribute (not to be confused with GraphQL subscription operations) with information about the user’s subscription details. An example value may look like this:
{
"subscription_tier": "Premium",
"maxPosts" : 5
}
- check_number_of_posts_against_limit – checks the number of posts the user has submitted against the limit that was pulled in the previous function. If the number of posts exceeds the limit then an error is returned to the user, otherwise we proceed to the last function.
- create_post: Adds the post the user submitted to the Posts table (if the user is under their allocated limit).
A createPost
mutation operation is defined in the GraphQL schema as follows:
The Create Post pipeline resolver, linked to the createPost
mutation field, is defined as follows. Notice the three pipeline functions that we explained above, as seen in our pipeline architecture diagram.
The first pipeline function get_subscription_limit
reads a record from the Users
table and retrieves the subscription
data to determine what is the maximum limit of records a user is allowed to create in the Posts
table. The request template for this function is straight forward as it performs a simple DynamoDB GetItem
operation. The userId
is retrieved from the AppSync built-in $context.identity
object from the request. It contains the identity information about the logged in user.
We use the AppSync built-in $util
utility helpers in our resolver templates providing useful built-in helper methods. In this specific case, $util.dynamodb
makes it easier to read and write data to DynamoDB.
The response template for the get_subscription_limit
function is where we receive the user’s quota and assign it to the $result.limit
variable. We access this variable in our next pipeline function to evaluate the user quota. If there is any error executing the GetItem
operation, the response resolver catches it and throws an error with contextual information. If there are no errors, the result from the GetItem
operation is passed to next pipeline function in the sequence.
Next, we build the check_number_of_posts_against_limit
function. This pipeline function operates on the Posts
table. It executes a DynamoDB scan operation to list all records in the table that match a filter expression and lists the records that belong to the currently logged in user.
{
"version": "2017-02-28",
"operation": "Scan",
"index": "userId-index",
"filter": {
"expression": "userId = :userId",
"expressionValues": {
":userId": $util.dynamodb.toDynamoDBJson($context.identity.username)
}
}
}
The response template for the check_number_of_posts_against_limit
function first assigns $result.limit
that was set in the previous get_subscription_limit
pipeline function to a variable $limit
. The variable $context.prev
indicates that we want to access data set in the previous pipeline function, therefore $context.prev.result.limit
refers to the $result.limit
in the get_subscription_limit
pipeline function.
Now we need to access the total number of records the function check_number_of_posts_against_limit
request resolver retrieved after the filter was applied. These records (or items) are available in $context.result.items
as an array. We use the VTL size()
function to retrieve the total count. The variable $context.result.scannedCount
contains the total number of records the scan operation accessed in the table without applying the filter condition, therefore it is not useful in our use case. Note that $ctx
and $context
can be used interchangeably. We assign the size value in $context.result.items.size()
to the $count
variable.
Finally, to determine whether the user has breached their maximum posts limit, we can compare the variables $limit
and $count
. If $limit
is less than or equal to $count
, we throw an error denying the user from creating another post in the Posts
table. Otherwise, since user has not breached their maximum posts limit, we proceed to our next pipeline function where we create a record in the Posts
table.
Finally, we add the last pipeline function called create_post
. This function simply creates an entry in the Posts
table. It automatically generates the postId
using the utility helper $util.autoId()
. The value for the userId
is retrieved from the AppSync $context.identity
object as before. The title
and content
values are then stored in Posts
table.
There is a filter condition added to the PutItem
operation that prohibits duplicate entries from being created in Posts
table, guaranteeing a record containing specific postIds
and userIds
must always be unique.
The response template for the create_post
function is rather simple. It simply returns the result of the DynamoDB PutItem
operation to the caller. If there is any error executing the PutItem
operation, the response resolver catches it and throws an error with proper contextual information.
#if($context.error)
$util.error($context.error.message, $context.error.type)
#end
$util.toJson($context.result)
Before testing our pipeline resolver, let’s create sample records in the DynamoDB Users
table. Using the DynamoDB console, create the following records:
{
"email": "user1@email.com",
"friends": [
"user2"
],
"name": "Test User 1",
"subscription": {
"subscription_tier": "Premium",
"maxPosts" : 5
},
"userId": "user1"
}
Now, we can test by using the query console in AppSync. Before executing the query, AppSync prompts you to log in to the user pool. Use the user1
user you created during the setup after deploying the CloudFormation template. We can see the first post is created successfully:
However, once we pass the posts limit (set at 5), we receive an error when trying to create more posts:
Conclusion
With pipeline resolvers, developers can compose operations, orchestrate and execute business logic directly in AppSync, allowing to easily interact with multiple data sources with a single GraphQL API call, saving network bandwidth and resources.
In this post, we showcased how to combine multiple data sources in a pipeline resolver to create powerful and flexible backend business logic to enforce data integrity in our application. We went over a simple scenario of implementing user quota system directly in AppSync without the need to maintain any additional backend infrastructure. In the next articles we show you how to aggregate data across tables using pipeline resolvers and how to use Direct Lambda Resolvers in a pipeline to interact with an S3 bucket where posts content is stored.