Front-End Web & Mobile

Practical use cases for AWS AppSync Pipeline Resolvers – Part 3: Heterogeneous Data Sources

 

This article was written by Salman Moghal, Application Architect, AWS, and Abdul Kitana, Security Architect, AWS

Overview

AWS AppSync is a fully managed service that allows to deploy Serverless GraphQL backends in the AWS cloud. It provides features that developers can use to create modern data driven applications allowing to easily query multiple databases, microservices, and APIs from a single GraphQL endpoint. Customers can leverage AppSync real-time capabilities, offline data synchronization, built-in server-side caching, fine-grained access control, security, support for business logic in the API layer using GraphQL resolvers, and more. In this article, we focus on how to implement and orchestrate backend logic directly in the AppSync GraphQL API layer using pipeline resolvers.

Resolvers are built-in functions in GraphQL that “resolve” types or fields defined in the GraphQL schema with the data in the data sources. Resolvers in AppSync can be of two types: unit resolvers or pipeline resolvers. Pipeline resolvers offer the ability to serially execute operations against multiple data sources in single API call, triggered by queries, mutations, or subscriptions.

AppSync Pipeline Resolvers significantly simplify client-side application complexity and help enforce server-side business logic controls by orchestrating requests to multiple data sources. They can be used to enforce certain aspects of the application’s logic at the API layer.

This is an article series in three parts where we use a sample application to highlight how pipeline resolvers can be used to solve common problems, such as:

You can follow the links above to access the other articles in the series. In the first article you can find a CloudFormation template that can be used to deploy all the configured services in your AWS account as well as steps necessary to create users to test the different use cases.

Our sample application is a simple web app where users sign-in and publish posts to their profile. Users have a quota or subscription that determine how many posts they can publish, and they can also add others users as friends. Finally, the content of the posts can be formatted in HTML and require to support a size of up to a few megabytes. We provide full implementation details of the use-cases listed above using an AppSync GraphQL API.

 

Use case #3 – Heterogeneous data sources

Pipeline resolvers can be used to access different types of data sources in one query or mutation. The data sources can be database services such as Amazon RDS, DynamoDB or Elasticsearch, or a Lambda data source.  With Lambda as data source, we can interact with backend services that are not directly supported by AppSync, for example, S3 buckets.

In our application, users might want to upload large posts, and storing them in DynamoDB might not be an option because of the size limitations. To work around DynamoDB row/document size limitation, we can use Amazon S3 to store and retrieve the contents of user’s posts.

In this example, we use a new feature of AppSync service called Direct Lambda Resolvers.  We demonstrate how a direct Lambda resolver can be bound to both Mutation and Query fields, and how we can still use a Pipeline resolver function to perform additional control logic before invoking a Direct Lambda resolver.  This approach enables us to treat a Direct Lambda Resolver as a microservice that interacts with its own data source.  Our Lambda implementation is generic to handle multiple operations performed against S3 as a data source, i.e. create contents in the S3 bucket and retrieve contents from the S3 bucket.  A user can call a Mutation to create contents in the S3 bucket, and similarly, users can invoke a Query to retrieve content of their posts from the S3 bucket. The S3 object key for each post content is constructed based on the userId + postId attributes using a Pipeline Resolver function.

Let’s first review our Pipeline Resolver architecture.  At a high-level, both Create Post Content and Get Post Content look similar.  The only difference is the type of field they are bound to: the Create Post Content resolver is bound to a Mutation type field whereas the Get Post Content resolver is bound to a Query type field.

 

 

As we can see in the GraphQL schema, the createPostContent Mutation field and the getPostContent Query field are linked to the Pipeline resolvers:

type Mutation {
  createPostContent(input: CreatePostContentInput!): String
}
type Query {
  getPostContent(input: GetPostContentInput!): String
}
input CreatePostContentInput {
  postId: ID!
  content: String!
}
input GetPostContentInput {
  postId: ID!
}

Next we take a closer look at the two pipeline resolvers. Starting with the CreatePostContent pipeline resolver, we notice the get_post_content_s3_key function has a request and response mapping template.  However the create_post_content_in_s3 function does not, as this function uses the S3BlogContent Lambda function as a data source. When we disable the VTL request and response mapping templates, AppSync treats the Lambda data source as a Direct Lambda resolver.

 

 

Next, we take a look at the GetPostContent pipeline resolver shown below.  Notice we are re-using the get_post_content_s3_key function.  Also notice that we are also reusing the S3BlogContent Lambda data source. As a Direct Lambda Resolver the create_post_content_in_s3 function does not have request and response mapping templates. The Lambda logic can be coded to handle multiple parent types and fields, we check how this is possible by introspecting the Lambda event object later in this section.

 

 

Both CreatePostContent and GetPostContent pipeline resolvers invoke the shared get_post_content_s3_key pipeline function.  The request mapping template of this function simply executes a DynamoDB query operation with a hash key and a filter expression.  The hash key is set to postId in the query expression.  A filter expression is used to further reduce the results that contain posts for the given userId.  Since we are using a DynamoDB query operation, which targets a set of records in the Posts table by postId, there is no need to use a global secondary index userId-index defined in the table.  We created this index specifically to help speed up DynamoDB scan operations in the previous use case.

{
    "version" : "2017-02-28",
    "operation" : "Query",
    "query" : {
      "expression": "postId = :postId",
      "expressionValues" : {
        ":postId" : $util.dynamodb.toDynamoDBJson($context.arguments.input.postId)
      }
    },
    "filter": {
        "expression": "userId = :userId",
        "expressionValues" : {
          ":userId" : $util.dynamodb.toDynamoDBJson($context.identity.username)
        }
    },
    "scanIndexForward": true,
    "limit": $util.defaultIfNull(${context.arguments.limit}, 1000),
    "nextToken": $util.toJson($util.defaultIfNullOrBlank($context.arguments.nextToken, null))
}

The response mapping template for the get_post_content_s3_key pipeline function looks at the result retrieved from DynamoDB. If the sizes of the items retrieved from DynamoDB are less than or equal to zero, the function throws an Error.  This blocks the execution of the direct lambda resolver.  This approach also allows to simplify our Lambda logic by not testing for empty results.  If the query operation retrieves results from DynamoDB and there are no errors, the pipeline function passes the results to next function in the pipeline linked to a Direct Lambda resolver.

#set ($count = $context.result.items.size())
#if ($count <= 0)
  $util.error("Unknown postId: ${context.arguments.input.postId}, or userId: ${context.identity.username}")
#end

#if($context.error)
    $util.error($context.error.message, $context.error.type)
#end
## Pass back the result from DynamoDB. **
$util.toJson($context.result)  

There is no request or response templates for Direct Lambda Resolvers and no need to use VTL, therefore the create_post_content_in_s3 and get_post_content_from_s3 pipeline functions simply invoke the same Lambda data source, i.e. S3BlogContent.  We create this data source next, but let us first take a look at the Lambda event object that AppSync sends through to the Direct Lambda Resolver.

Lambda receives an AppSync event object containing several key fields.  A sample AppSync Lambda event is shown below.  In Lambda code, we can introspect the info object to determine the type of logic we want to perform.  This gives us a lot of flexibility to code the business logic the way we want.  In order to create or retrieve content from the S3 bucket, we construct the S3 object key dynamically based on the userId and the postId passed in the Lambda event object. We take a look at how we select the userId and the postId in the Lambda business logic next.

{
    "arguments": {
        "input": {
            "postId": "1001",
            "content": "<html><body>post contents - 1001</body></html>"
        }
    },
    "identity": {
        "claims": {
            "sub": "256475df-cea6-43b6-bb6b-bd9c434a421f",
            "aud": "m46cd9iugaot4pqn797sccvm8",
            "email_verified": true,
            "event_id": "17b0df60-bab4-4ced-85e9-185a55e7da42",
            "token_use": "id",
            "auth_time": 1603003763,
            "iss": "https://cognito-idp.us-east-2.amazonaws.com/us-east-XYZ",
            "cognito:username": "user1",
            "exp": 1603053330,
            "iat": 1603049730,
            "email": "user1@amazon.com"
        },
        "defaultAuthStrategy": "ALLOW",
        "groups": null,
        "issuer": "https://cognito-idp.us-east-2.amazonaws.com/us-east-XYZ",
        "sourceIp": [
            "A.B.C.D"
        ],
        "sub": "256475df-cea6-43b6-bb6b-bd9c434a421f",
        "username": "user1"
    },
    "source": null,
    "request": {
        "headers": {
            // list of headers
        }
    },
    "prev": {
        "result": {
            "items": [
                {
                    "postId": "1001",
                    "title": "title1",
                    "userId": "user1",
                    "content": "first post"
                }
            ],
            "nextToken": null,
            "scannedCount": 1,
            "startedAt": null
        }
    },
    "info": {
        "selectionSetList": [],
        "selectionSetGraphQL": "",
        "parentTypeName": "Mutation",
        "fieldName": "createPostContent",
        "variables": {}
    },
    "stash": {}
}

The Lambda function manages data ingestion to an S3 bucket.  If you executed the CloudFormation template, you should already have a Lambda function called appsync-direct-lambda-resolver-function. If you’re creating it manually:

  1. In the Lambda console, under Lambda > Functions, click Create Function
  2. Enter a function name, i.e. appsync-direct-lambda-resolver-function.
  3. Set the Runtime as nodejs12.x
  4. Update the timeout for the function to something larger than the default, for example: 30 seconds
  5. Add an environment variable called CONTENT_BUCKET and set it to the S3 bucket Lambda should connect to.
  6. Paste in the following code contents:
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const bucketName = process.env.CONTENT_BUCKET;

exports.handler = async (event, context) => {
    
    // we don't need to validate event.prev object or event.prev.result.items length
    // because get_post_content_s3_key pipeline function only passes control over to Direct Lambda
    // Resolver when DynamoDB Query operation returns exactly one valid result.  As per our
    // DynamoDB Post schema, there should never be multiple rows for a given userId and postId

    let resp = {};
    try {
      // create contents in s3 bucket
      if (event.info.fieldName === 'createPostContent') {
          if (!event.arguments.input.postId || !event.arguments.input.content) {
              const errorMessage = 'missing required parameters in createPostContent';
              console.error('Exception occurred: ', errorMessage);
              throw new Error(errorMessage);
          }
          const params = {
              Body: event.arguments.input.content,
              ContentType: 'text/plain',
              Bucket: bucketName,
              Key: event.prev.result.items[0].userId + '/' + event.arguments.input.postId
          };
          console.log('Creating object in bucket: ' + bucketName + ', s3 params: ' + JSON.stringify(params));
          const data = await s3.putObject(params).promise();
          resp = {
              etag: data.ETag 
          };
      }
      // get contents from s3 bucket
      else if(event.info.fieldName === 'getPostContent') {
          if (!event.arguments.input.postId) {
              const errorMessage = 'missing required parameters in getPostContent';
              console.error('Exception occurred: ', errorMessage);
              throw new Error(errorMessage);
          }
          const params = {
            Bucket: bucketName,
            Key: event.prev.result.items[0].userId + '/' + event.arguments.input.postId
          };
          console.log('Retrieving object from bucket: ' + bucketName + ', s3 params: ' + JSON.stringify(params));
          const data = await s3.getObject(params).promise();
          const content = data.Body.toString('utf-8');
          resp = {
              content: content
          };
      }
      else {
          const errorMessage = 'unsupported operation' + event.info.fieldName;
          console.error('Exception occurred: ', errorMessage);
          throw new Error(errorMessage);
      }
    }
    catch (ex) {
      console.error('Exception occurred: ', ex.message);
      const promise = new Promise((resolve, reject) => {
        reject(ex.message);
      });
      return promise;
    }

    return resp;
};

The code inspects the event.info.fieldName details to determine how this Lambda was invoked via GraphQL with the operation details and what type of parameters it should expect.  These parameters are passed in through the Mutation or Query operations.  The Lambda logic above simply figures out whether to create an S3 object containing the contents or whether to read the an existing S3 Object.

As indicated earlier, in order to store or retrieve post contents from the S3 bucket, Lambda uses the userId from the event.prev.result.items array and the postId from Lambda’s event.argument.input object.  The S3 object key is therefore created as  userId/postId.  Note the items array always contains one element because of the way we have structured the Posts table.  When a user calls the createPostContent mutation, the content is sent in the argument as event.arguments.input.content. Lambda takes the content from this attribute and stores it in the S3 bucket using the S3 object key format above. An S3 putObject API call is used to create the object and it returns the etag ID of the created S3 object.  Lambda simply passes this etag ID back to the AppSync service.

Instead of using the userId in the event.prev.result.items array, we could use the value from event.identity.username instead.  However, in order to leverage the DynamoDB data and to ensure the user has indeed an existing post in Posts table, we use the userId present in the event.prev.result.items array.  Consequently, this means that we cannot execute the createPostContent mutation or the getPostContent query if the user’s post does not exist in the Posts table. We’re leveraging both AppSync’s built-in resolvers in the pipeline as well as Lambda to perform distinct checks and organize the business logic in the pipeline.

Lambda retrieves the name of the S3 bucket from an environment variable called CONTENT_BUCKET.  Set this environment variable prior to testing the createPostContent mutation or getPostContent query.  The environment variable is automatically set to the name of bucket that was created when you used the provided CloudFormation template.  Refer to the stack output in the CloudFormation service console to determine the name of the S3 content bucket.

Lambda must have an IAM role assigned to it.  This IAM role must have permissions to read and write objects from the S3 bucket.  A sample IAM policy for Lambda is provided below.  Replace <YOUR_S3_BUCKET_NAME> with your bucket.  If you deployed the CloudFormation template, the Lambda execution role is automatically created for you and the S3 bucket name is automatically set.

AppsyncResolverLambdaExecutionRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path : "/"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: 'allow_s3'
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
            - Effect: Allow
              Action:
              - s3:GetObject
              - s3:PutObject
              Resource: 
                Fn::Join: 
                  - ""
                  - 
                    - "arn:aws:s3:::"
                    - "<YOUR_S3_BUCKET_NAME>"
                    - "/*"

Once the Lambda function is created, we can add it as a new data source in the AppSync API.  Again, if you deployed the CloudFormation template in the beginning of this article, you should already have a Lambda data source linked to AppSync and called S3BlogContent.

  1. Under AppSync > Data Sources
  2. Click New Data Source
  3. Specify a data source name, i.e. S3BlogContent, select the AWS Lambda Function as the data source type by selecting the region and the Lambda function created in the previous step, i.e. appsync-direct-lambda-resolver-function.

You should now have the Lambda function as a data source in your GraphQL API:

 

 

We can now test the createPostContent mutation and the getPostContent query in the AppSync console.  There are two ways to prepare the Posts table prior to testing.

  • If there is an existing post created by the currently logged in user, note down the postId and follow the steps below to invoke the createPostContent mutation and the getPostContent query operation.
  • Alternatively, create some sample data directly in the DynamoDB console in the Users table for user1.
{
  "email": "user1@amazon.com",
  "friends": [
    "user2"
  ],
  "name": "Test User 1",
  "subscription": {
    "maxPosts": 5,
    "subscription_tier": "Premium"
  },
  "userId": "user1"
}

Let’s assume the Posts table does not contain any user posts.  Create a new post by invoking a createPost mutation.  The createPost mutation operation returns a postId that we use in the next step.  You can skip this step if you already have a valid postId present in the Posts table for user1.

 

 

Now invoke the createPostContent mutation and pass the postId returned above:

 

 

We can confirm a folder with the userId was created with the postId as file name in the S3 bucket:

 

 

Finally, let’s retrieve the content of the S3 object that we just created by calling the getPostContent query.  This operation only takes postId as parameter and retrieves the content of the S3 object we just created.

 

 

As you can see, both operations work as expected.

Conclusion

With pipeline resolvers, developers can compose operations, orchestrate and execute business logic directly in AppSync, allowing to easily interact with multiple data sources with a single GraphQL API call, saving network bandwidth and resources.

In this article we demonstrated how to take advantage of Direct Lambda Resolvers to build flexible serverless business logic in your programming language of choice on GraphQL with AppSync.  We also demonstrated how we can easily mix and match Direct Lambda Resolvers as a pipeline function inside a Pipeline Resolver. In the previous articles we implemented a user quota management system with pipeline resolvers directly in AppSync without the need to maintain any additional backend infrastructure and how to perform data aggregation from different DynamoDB tables leveraging pipeline resolvers.