Front-End Web & Mobile

Integrating alternative data sources with AWS AppSync: Amazon Neptune and Amazon ElastiCache

This article was written by Josh Kahn, Senior Solutions Architect, AWS

September 14, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.

Since its launch at AWS re:Invent 2017, AWS AppSync has grown the number of and types of natively supported data sources. Today, AppSync supports NoSQL (Amazon DynamoDB), search (Amazon OpenSearch Service (successor to Amazon Elasticsearch Service)), and relational (Amazon Aurora Serverless) data sources among others. AppSync allows customers to quickly build rich, scalable, enterprise-ready backends with multiple security options that can perform complex operations across data sources in a unified GraphQL API.

“Seldom can one database fit the needs of multiple distinct use cases. The days of the one-size-fits-all monolithic database are behind us, and developers are now building highly distributed applications using a multitude of purpose-built databases,” said Werner Vogels, CTO and VP of Amazon.com. For more on this topic, see his post, A one size fits all database doesn’t fit anyone.

In this post, we explore how AWS AppSync can utilize AWS Lambda to integrate with alternative data sources—in other words, those not directly integrated out-of-the-box with AWS AppSync. While we look specifically at Amazon ElastiCache and Amazon Neptune here, you could support other data sources via a similar approach (including forthcoming services such as Amazon QLDB and Amazon Timestream).

To demonstrate the power of AppSync paired with ElastiCache and Neptune, we will build a restaurant finder API (specifically a Hot Dog Restaurant API). We’ll use ElastiCache for Redis to search for nearby restaurants and Neptune to power recommendations, Step Functions will populate the data stores with some sample data.

 

 

The start of our AppSync GraphQL schema is as follows. We will update the schema as we add functionality powered by ElastiCache and Neptune later in the post. For more information about using AWS AppSync, see the AppSync Developer Guide.

type Restaurant {
  id: ID!
  name: String!
  description: String
  address: String!
  city: String!
  state: String!
  zip: Int!
  longitude: Float!
  latitude: Float!
}

type Like {
  user: String!
  restaurant: Restaurant
}

type SearchResult {
  restaurant: Restaurant!
  distance: String
  units: String
}

input GPSInput {
  latitude: Float!
  longitude: Float!
  radius: Float
}

type Query {
  listRestaurants: [Restaurant]
  getRestaurant(id: ID!): Restaurant
}

schema {
  query: Query
}

 

Searching for Nearby Restaurants

We will use Amazon ElastiCache for Redis to search for nearby restaurants. ElastiCache offers super-fast access to data from in-memory data stores. While ElastiCache is often thought of foremost for caching needs, it can be used for numerous other purposes. For example, ElastiCache for Redis can also be used to build leaderboards and session storage.

To perform a geospatial search in Redis, you first need the user’s latitude and longitude. The user’s location is available on most modern devices (with permission) using JavaScript Geolocation API in the browser, CoreLocation on iOS, or Android LocationManager. Building on our earlier GraphQL schema, we can add a new query to search for restaurants by location:

type Query {
  ...
  searchByLocation(location: GPSInput!): [SearchResult]
}

Redis (version 3.2.0 and later) supports geolocation searches using the GEORADIUS command. We can construct a geolocation query in a new Lambda function, passing the user’s latitude and longitude in the payload from the AppSync resolver. After adding a new Lambda data source (we’ll call it ElastiCacheIntegration) to your API, you can configure the resolver for the searchByLocation query with the Invoke operation to call Lambda:

 

Resolver Request Mapping Template:

{
  "version": "2017-02-28",
  "operation": "Invoke",
  "payload": {
    "action": "searchByLocation",
    "arguments":  $utils.toJson($ctx.arguments)
  }
}

Resolver Response Mapping Template:

#if($ctx.result && $ctx.result.error)
  $util.error($ctx.result.error)
#end
$util.toJson($ctx.result)

In this example, the Lambda function performs potentially multiple actions, all related to interacting with ElastiCache. Another valid approach is to create a single function per query.

The integration function using Node.js 10 is as follows:

const Redis = require("ioredis")

const GEO_KEY = process.env.ELASTICACHE_GEO_KEY

let redis = new Redis.Cluster([
  {
    host: process.env.ELASTICACHE_ENDPOINT,
    port: process.env.ELASTICACHE_PORT
  }
])

async function searchByGeo(lat, lon, radius=10, units="mi") {
  try {
    let result = await redis.georadius(
          GEO_KEY,   // key
          lon,       // longitude
          lat,       // longitude
          radius,    // search radius
          units,     // search radius units
          "WITHCOORD",
          "WITHDIST"
        )

    if (!result) { return [] }

    // map from Redis response
    return result.map( (r) => {
      return { id: r[0], dist: r[1], units: units }
    }).sort((a, b) => { return a.dist - b.dist })

  } catch (error) {
    console.error(JSON.stringify(error))
    return { error: error.message }
  }
}

exports.handler = async(event) => {
  switch(event.action) {
    case "searchByLocation":
      let location = event.arguments.location
      let radius = event.arguments.radius
      let result = searchByGeo(location.latitude, location.longitude, radius)
      return result
    default:
      throw("No such method")
  }
}

Before returning the query result to the client, make sure that the shape of the response matches that defined in the schema (for searchByLocation, an array of SearchResults). For this example, we can use an AWS AppSync pipeline resolver to do the following:

  1. Query ElastiCache for distances.
  2. Retrieve restaurant data from DynamoDB using BatchGetItem.
  3. Merge the results to match the SearchResult schema.

Pipeline resolvers make it easy to define a reusable set of functions that can query multiple data sources with a single API call. Performing both queries and manipulating data in a Lambda function is also a valid approach. However, for this example, we preferred the composability of the pipeline resolver, as you use the DynamoDB BatchGetItem function again later.

Now, run a query against our GraphQL API to search for nearby restaurants:

query SearchByLocation {
  searchByLocation(location: {
    latitude: 41.8781,
    longitude: -87.6298
  }) {
    restaurant {
      name
    }
    distance
    units
  }
}

The result looks something like this:

{
  "data": {
    "searchByLocation": [
      {
        "restaurant": {
          "name": "Portillo’s"
        },
        "distance": "1.0694",
        "units": "mi"
      },
      {
        "restaurant": {
          "name": "Fatso’s Last Stand"
        },
        "distance": "3.0622",
        "units": "mi"
      },
      ...   
    ]
  }
}

In this project, we are augmenting our primary data store (DynamoDB) with a purpose-built database (ElastiCache) to take advantage of the latter’s capabilities (super-fast geolocation queries). Because we want data in ElastiCache to stay synchronized with the primary store, enable DynamoDB Streams to push applicable data to ElastiCache.

As shown in the following diagram, we can create a second Lambda function that is invoked by a DynamoDB stream when restaurant data changes. This function will modify our Redis sorted set by adding or updating restaurant data, specifically latitude and longitude.

Generating Recommendations

For our restaurant finder, we also want to recommend restaurants based on likes of others. Amazon Neptune is a fully-managed graph database built to model highly-connected datasets. Neptune is fast, reliable, and easy to work with. By traversing the rich collection of vertices and edges in a graph database, we can realize relationships indicated only by foreign keys in a relational database. For example, imagine a graph database that models a network of friends. While each friend’s name and birthdate are important, so too are the details of their relationship with others (e.g. date we became friends, type of relationship). An Amazon Neptune graph database allows us to model attributes of both the people (“vertices”) and the relationships (“edges”) between them.

In order to find restaurants, the graph will have vertices for both restaurants and people. The graph will also contain one types of edge, “likes”, that models a user liking a particular restaurant. For the purpose of our example, we’ll use a toy graph with fictitious data that looks like the following diagram:

We can now extend our GraphQL API to support the recommendation capability provided by Neptune. Here, pass the name of the user for whom we want to generate recommendations. In a real-world scenario; however, you would likely use a unique identifier for the logged in user instead.

type Query {
  ...
  getRecommendationsFor(user:String!): [Restaurant]
}

The resolver mapping templates for your Neptune integration look quite similar to the earlier template. Again, you could elect to perform more data manipulation in the template than your function.

 

Resolver Request Mapping Template:

{
  "version": "2017-02-28",
  "operation": "Invoke",
  "payload": {
    "action": "getRecommendations",
    "arguments":  $utils.toJson($ctx.arguments)
  }
}

Resolver Response Mapping Template:

#if($ctx.result && $ctx.result.error)
  $util.error($ctx.result.error)
#end
$util.toJson($ctx.result)

Following the same approach for Neptune as with ElastiCache, we’ll build a single function that can make several types of queries against Neptune. Amazon Neptune currently supports two query languages to access data in the graph: Gremlin and SPARQL. Either language could be used, though we’ll use Gremlin. For more on Gremlin, check out the excellent Practical Gremlin.

const gremlin = require('gremlin')
const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection

const Graph = gremlin.structure.Graph
const P = gremlin.process.P
const Order = gremlin.process.order
const Scope = gremlin.process.scope
const Column = gremlin.process.column

const dc = new DriverRemoteConnection(
  `wss://${process.env.NEPTUNE_ENDPOINT}:${process.env.NEPTUNE_PORT}/gremlin`
)
const graph = new Graph()
const g = graph.traversal().withRemote(dc)

async function getRecommendationsFor(userName) {
  try {
    // based on Gremlin recommendation recipe:
    // http://tinkerpop.apache.org/docs/current/recipes/#recommendation
    let result = await g.V()
      .has('User', 'name', userName).as('user')
      .out('likes').aggregate('self')
      .in_('likes').where(P.neq('user'))
      .out('likes').where(P.without('self'))
      .values('id')
      .groupCount()
      .order(Scope.local)
        .by(Column.values, Order.decr)
      .select(Column.keys)
      .next()
    
    return result.value.map( (r) => {
      return { id: r }
    })
  } catch (error) {
    console.error(JSON.stringify(error))
    return { error: error.message }
  }
}

exports.handler = async(event) => {
  switch(event.action) {
    case "getRecommendations":
      return await getRecommendationsFor(event.arguments.user)
    default:
      return { error: "No such method" }
  }
}

With these changes, you can query the GraphQL API for a list of restaurant recommendations for a particular user:

query Recommendations {
  getRecommendationsFor(user: "Joe") {
    id
    name
  }
}

The result of this query looks something like this:

{
  "data": {
    "getRecommendationsFor": [
      {
        "id": "F24B37E4-C89B-48AA-871B-46E5DE47118F",
        "name": "Wolfy's"
      },
      {
        "id": "96B7CB80-6DC4-445F-8925-69316B222DCC",
        "name": "Hot 'G' Dog"
      }
    ]
  }
}

This recommendation algorithm uses an approach called collaborative filtering, which uses the opinions of others to inform a recommendation for a different person. For more information about this algorithm, see the Apache TinkerPop project.

As with ElastiCache, we primarily use Neptune to augment our application’s source of truth in this example — it performs a particular function. In the case of Neptune, we may want to store some data that is not in DynamoDB – for example, whether a user likes a particular restaurant.

In that case, we would perform mutations directly against Neptune. To further our example functionality, let’s add a mutation that creates a new “like” in the graph database. Modeling a “like” in Neptune involves an edge, named “like”, and two vertices, a restaurant and a user.

Because AWS AppSync can support real-time subscriptions for any data source, we can also subscribe to new likes created by a user. In this case, the client application may listen for changes to new likes so that it can query for new recommendations based on new data.

First, the updates to our GraphQL schema:

type Mutation {
  addLike(user: String!, restaurantId: String!): Like
}

type Subscription {
  onLike(user: String!): Like
    @aws_subscribe(mutations: ["addLike"])
}

Next, we enhance the Neptune integration function to support adding a new like to the graph. Use Gremlin again to find the restaurant and user in the graph and then add an edge from the user to the restaurant named “likes”.

...

async function addLike(user, restaurantId) {
  try {
    await g.V()
      .has("Restaurant", "id", restaurantId).as("restaurant")
      .V()
      .has("User", "name", user)
      .addE("likes")
      .to("restaurant")
      .next()
      
    return { user: user, restaurantId: restaurantId }
  } catch (error) {
    console.error(JSON.stringify(error))
    return { error: error.message }
  }
}

exports.handler = async(event) => {
  switch(event.action) {
    case "getRecommendations":
      return await getRecommendationsFor(event.arguments.user)
    case "addLike":
      return await addLike(event.arguments.user, event.arguments.restaurantId)
    default:
      return { error: "No such method" }
  }
}

Now, create a new like for Dorothy using the mutation described earlier:

mutation AddLike {
    addLike(
      user: "Dorothy",
      restaurantId: "EB8941AC-C3AD-4263-B97D-B7A29B36FB5F"
    ) {
      user
    }             
}

You can also subscribe to new likes made by Dorothy:

subscription AddLike {
  onLike(user:"Dorothy") {
    user
  }
}

This subscription would result in the following data being returned to the client. While this example is limited, in a full-scale application, adding a like could trigger the client to retrieve new recommendations or perhaps alert the restaurant:

{
  "data": {
    "onLike": {
      "user": "Joe",
      "__typename": "Like"
    }
  }
}

As noted earlier, AWS AppSync supports real-time data for all data sources, even an alternative data source such as Neptune that is integrated with Lambda. This functionality enables a wide range of use cases across almost any data source that you integrate.

You can find a complete, working example of the project described in this post on GitHub.

Conclusion

While AWS AppSync supports a wide range of data sources out-of-the-box, it can also be extended to support many other data sources using Lambda, including ElastiCache and Neptune. This approach allows you to pick the best database for the job while quickly building new capabilities in your applications.

The AWS AppSync team is eager to see how you take advantage of this capability. Please reach out on the AWS AppSync Forum or the AppSync Community GitHub repository with any feedback.