AWS Big Data Blog

How Takeda uses the GraphQL API with AWS AppSync to support data scientists

This is a guest blog post by Michael Song and Rajesh Mikkilineni at Takeda. In their own words, “Takeda is a global, values-based, R&D-driven biopharmaceutical leader committed to discover and deliver life-transforming treatments, guided by our commitment to patients, our people and the planet. Takeda’s R&D data engineering team aspires to build a robust and flexible data platform for their scientists and researchers to access data and derive value from it.”


The Global Medical Affairs team and other R&D teams at Takeda had to gain access to their data hub, a data repository shared across different teams, without going through the AWS Management Console. They used JupyterHub deployed on Amazon Elastic Compute Cloud (Amazon EC2) instances to access the data, without the overhead of managing user permissions with AWS Identity and Access Management (IAM) roles. To enable rapid product iterations, they wanted self-service, API-based access in which data scientists have the flexibility to fetch their datasets with minimal interaction with the engineering team. Providing API-based access to data with the flexibility of GraphQL enables researchers to access data the way they need it.

This post explains how Takeda set up this architecture for their researchers and scientists. Takeda uses a wide range of AWS services, such as Amazon Simple Storage Service (Amazon S3) for storage, Amazon EMR for processing, and Amazon Athena for data analytics.

Overview of solution

GraphQL is a query language for APIs that enables developers to query and manipulate data from multiple data sources and other APIs easily through a flexible runtime, using an intuitive syntax that describes data requirements and interactions with the backend. GraphQL has an API component to expose and access data, and a runtime component with which you can customize your business logic directly at the API layer.

AWS AppSync is a managed serverless GraphQL service that simplifies application development by letting you create a flexible API to securely access, manipulate, and combine data from one or more data sources with a single network call. With AWS AppSync, you can build scalable applications on a range of data sources, including Amazon DynamoDB NoSQL tables, Amazon Aurora Serverless relational databases, Amazon OpenSearch Service clusters, HTTP APIs, and serverless functions powered by AWS Lambda.

To deploy a GraphQL API on AWS AppSync, you need to define three components:

  • GraphQL schema – This is where the API definition is modeled in a GraphQL schema definition language (SDL)
  • Data source – This is the component that points AWS AppSync to where the data is stored (DynamoDB, Aurora, Amazon OpenSearch Service, Lambda, HTTP/REST APIs, or other AWS services)
  • Resolvers – These provide business logic linking or resolving types or fields defined in the GraphQL schema with the data in the data sources

In this post, we first focus on setting up the GraphQL schema in AWS AppSync, then we configure the data source using Lambda. This setup provides the flexibility to fetch and transform the data from multiple data sources in addition to the one directly supported by AWS AppSync. We then provide an example of how data scientists can use JupyterHub to access data using GraphQL.

We use COVID-19 datasets published by Johns Hopkins University. This dataset includes information about the country, province, and state of confirmed, recovered, and death instances related to COVID-19.

The following diagram illustrates the high-level architecture, in which data from Amazon S3 is served using Athena and accessed using GraphQL APIs configured in AWS AppSync.

In this post, we walk through the following steps:

  1. Set up an S3 bucket and its access using Athena.
  2. Create a GraphQL API and define the schema in AWS AppSync.
  3. Create a Lambda function that connects with Athena.
  4. Configure the Lambda function as the data source in AWS AppSync.
  5. Configure our GraphQL API settings in AWS AppSync.
  6. Access the data using JupyterHub.

Set up an S3 bucket and its access using Athena

This post assumes familiarity with creating a database in Athena. If you’re new to Athena, refer to the Getting Started guide and create a database before continuing.

The COVID-19 dataset is fetched at regular intervals and loaded as files into an S3 bucket. So set up your S3 bucket, configure an AWS Glue crawler to connect to your bucket, determine the data structures based on the file data in Amazon S3, and write tables into the AWS Glue Data Catalog. Then set up Athena to access data in Amazon S3 using the Data Catalog.

Create the GraphQL API and define the schema in AWS AppSync

To create your GraphQL API and define its schema, complete the following steps:

  1. On the AWS AppSync console, choose APIs.
  2. Choose Create API.
  3. In the Customize your API or import from Amazon DynamoDB section, select Build from scratch.
  4. Choose Start.
  5. For API name, enter a name.
  6. Choose Create.
  7. Under Define the schema, choose Edit Schema.

A GraphQL service is created by defining types and fields on those types, then providing functions for each field on each type. For example, if we want to get COVID-19 information by country, we can write the query like the following:

query MyQuery {
  getCovidInfos {
    nextToken
  }
  getCovidInfosByCountry(country, maxResults: 10,nextToken:" "){
    nextToken
    covidInfo {
      confirmed
      country
      deaths
    }
  }
}

The following code is the GraphQL schema for the COVID-19 dataset at Takeda:

#Covid infomation for current date
type CovidInfo {
	#State or Provice of the Country
	provincestate: String
	#Country name
	country: String!
	#Region of the country like EU etc
	region: String
	#Last data refreshed date
	lastupdate: AWSDate
	#Number of comfirmed patients
	confirmed: Int!
	#Number of dead patients
	deaths: Int!
	#Number of recovered patients
	recovered: Int!
	#Latitude of the location provinceorstate if provide else country
	latitude: Float
	#Longitude of the location provinceorstate if provide else country
	longitude: Float
}
type Query {
	# Get Today's COVID  data
	##@ SELECT provincestate, country, region,
	###@        cast(lastupdate as Date) as lastupdate, confirmed, deaths,
	###@        recovered, latitude, longitude
	###@ FROM covid19.covid ORDER BY region, country, provincestate
	getCovidInfos(#Next page token optional
nextToken: String, #Maxium results to include
maxResults: Int): coviddata
	
	# Get Today's COVID data for specified country
	##@ SELECT provincestate, country, region,
	###@        cast(lastupdate as Date) as lastupdate, confirmed, deaths,
	###@        recovered, latitude, longitude
	###@ FROM covid19.covid
	###@ WHERE country = :country
	###@ ORDER BY region, country, provincestate
	getCovidInfosByCountry(#specify country of interest
country: String!, #Next page token
nextToken: String, #Maxium results to include
maxResults: Int): coviddata
	
}

#Covid data and pagination information
type coviddata {
	#Array of covid information
	covidInfo: [CovidInfo]
	#Next page token
	nextToken: String
}

schema {
	query: Query
}

Create a Lambda function to connect with Athena

To create the Lambda function, complete the following steps:

  1. On the Lambda console, choose Create function.
  2. Select Author from scratch.
  3. For Function name, enter Athena_AppSync.
  4. For Runtime, choose Python 3.8.
  5. For Choose or create an execution role, select Create new role with basic Lambda permissions.
  6. Chose Create function.
  7. On the Permissions tab, update the IAM role to have a policy that gives access to Athena, AWS Glue, and to the Athena query results location in Amazon S3.

In addition, make sure the role is also associated to a policy with Amazon S3 read access to the bucket where the COVID-19 data file is stored. The role should also have a trust relationship with AWS AppSync as shown in the policy below:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "appsync.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
  1. On the Configuration tab, replace the existing text with the following code, which uses Lambda to connect Athena and AWS AppSync:
    from athena_type_converter import convert_result_set, TYPE_CONVERTERS
    from backoff import on_predicate, fibo
    from base64 import b64encode
    from boto3 import client
    from concurrent.futures import ThreadPoolExecutor, wait
    from json import dumps as jsondumps
    from logging import getLogger, INFO
    from os import environ
    import datetime
    
    import sql_query
    
    
    def json_handler(o):
        if hasattr(o, 'isoformat'):
            return o.isoformat()
        else:
            return str(o)
    
    # setting parameters for connecting to Athena
    getLogger().setLevel(INFO)
    __ATHENA = client('athena')
    __DATABASE = environ.get('DATABASE', 'default')
    __MAX_CONCURRENT_QUERIES = int(environ.get('MAX_CONCURRENT_QUERIES', 5))
    __WORKGROUP = environ.get('WORKGROUP', 'primary')
    __timestamp = TYPE_CONVERTERS['timestamp']
    TYPE_CONVERTERS['timestamp'] = lambda x: __timestamp(x).isoformat()
    __date = TYPE_CONVERTERS['date']
    TYPE_CONVERTERS['date'] = lambda x: __date(x).isoformat()
    __time = TYPE_CONVERTERS['time']
    TYPE_CONVERTERS['time'] = lambda x: __time(x).isoformat()
    __varbinary = TYPE_CONVERTERS['varbinary']
    TYPE_CONVERTERS['varbinary'] = lambda x: b64encode(__varbinary(x))
    TYPE_CONVERTERS['decimal'] = lambda x: float(x) if x else None
    
    # handling user permission, execution request and concurrency
    def handler(event, context):
        getLogger().info('Processing event {}'.format(jsondumps(event)))
    result = {}
    # checking user permission
        if type(event) is dict:
            getLogger().info('Processing Invoke operation')
    
            user_sub = None
            if event.get('identity'):
                user_sub = event['identity']['sub']
    
            if event['auth']['allow']:
                if user_sub not in event['auth']['allow']:
                    raise Exception(f'Access not granted to {repr(user_sub)}')
    
            if user_sub in event['auth']['deny']:
                raise Exception(f'Access denied to {repr(user_sub)}')
    
            params = event.get('params', {})
            for date_param in event.get('special_types', {}).get('date', []):
                # cast these parameters to datetime.date
                if date_param in params:
                    params[date_param] = datetime.date.fromisoformat(
                        params[date_param])
     
     # initiating execution request              
            execution_request = {
                'QueryString': sql_query.Query(event['query']).sub(params),
                'QueryExecutionContext': {
                    'Database': event.get('database', __DATABASE)
                },
                'WorkGroup': event.get('workgroup', __WORKGROUP)
            }
            if event.get('nextToken'):
                query_execution_id, next_token = event['nextToken'].split('.', 1)
                result = __get_results(query_execution_id,
                                       event.get('maxResults', 100),
                                       event.get('listName'),
                                       next_token)
            else:
                result = __execute_query(execution_request,
                                         event.get('maxResults', 100),
                                         event.get('listName'))
    # handling concurrent queries
        else:
            getLogger().debug(
                f'Processing BatchInvoke operation with a batch of {len(event)}'
            )
            with ThreadPoolExecutor(max_workers=__MAX_CONCURRENT_QUERIES) as executor:
                future_query_results = []     
                for batch_event in event:
                    future_query_results.append(handler(batch_event, context))
                wait(future_query_results)
                result = []
                for future in future_query_results:
                    result.append(future.result)        
        return result
    
    # getting query execution status from Athena
    @on_predicate(fibo,
                  lambda x: x not in ('SUCCEEDED', 'FAILED', 'CANCELLED'),
                  max_time=30)
    def __poll_query_status(query_execution_id):
        response = __ATHENA.get_query_execution(
            QueryExecutionId=query_execution_id
        )
        print(jsondumps(response, default=json_handler))
        return response['QueryExecution']['Status']['State']
    
    # getting Athena query results
    def __get_results(query_execution_id, max_results, list_name=None,
                      next_token=None):
        params = {
            'QueryExecutionId': query_execution_id,
            'MaxResults': max_results
        }
        if next_token:
            params['NextToken'] = next_token
        response = __ATHENA.get_query_results(**params)
        print(jsondumps(response, default=json_handler))
        results = convert_result_set(response['ResultSet'])
        if list_name is not None:
            results = {list_name: results, 'nextToken': None}
            if response.get('NextToken'):
                results['nextToken'] = query_execution_id + '.' + response['NextToken']
        print(jsondumps(results, default=json_handler))
        return results
    
    # use lambda function to execute query
    def __execute_query(execution_request, max_results, list_name=None):
        query_execution_id = __ATHENA.start_query_execution(
            **execution_request)['QueryExecutionId']
        query_status = __poll_query_status(query_execution_id)
        if query_status != 'SUCCEEDED':
            if query_status in ('FAILED', 'CANCELLED'):
                raise Exception('Query execution failed with status {}'.format(query_status))
            else:
                # stop query
                __ATHENA.stop_query_execution(QueryExecutionId=query_execution_id)
                raise Exception('Query timed out with status {}'.format(query_status))
        results = __get_results(query_execution_id, (max_results or 1) + 2,
                                list_name)
        if max_results == 0 and not list_name:
            if len(results):
                return results[0]
            else:
                return {}
        else:
            return results
    

(Code adapted from: https://github.com/QuiNovas/appsync-athena-resolver/blob/master/src/lambda_function/function.py)

Configure the Lambda function as a data source and resolver in AWS AppSync

Use the Lambda function created in previous step to connect AWS AppSync with Athena.

  1. On the AWS AppSync console, under My AppSync App, choose Data Sources.
  2. Choose Create data source.
  3. Enter a name for your data source.
  4. For Data source type, choose AWS Lambda Function.

AWS AppSync can identify a DynamoDB table, Amazon OpenSearch Service domain, Lambda function, relational database, or HTTP endpoint as the data source. For this post, we have datasets stored in Amazon S3 and registered in Athena, so we create a Lambda function to connect AWS AppSync to Athena.

Now that we’ve registered an our Lambda function as the data source and have a valid GraphQL schema, we can connect our GraphQL fields to our data source using resolvers.

  1. Attach a resolver to the Lambda function for the following fields:
    getCovidInfos( nextToken: String, maxResults: Int): coviddata
    getCovidInfosByCountry(country: String!, nextToken: String, maxResults: Int): coviddata

Configure your API settings and authorization strategy

You can define any authorization strategy for your API access. In this case, we use API key-based authorization.

  1. On the AWs AppSync console, in the navigation pane, choose Settings.
  2. For API URL and API ID, enter the URL and ID to access the AWS AppSync APIs, respectfully.
  3. For API Key, enter the authorization parameter.

These details are used when accessing the API from outside.

Use JupyterHub and Python to access data through GraphQL

We can now use JupyterHub and Python to access the data through GraphQL.

  1. Open a new Jupyter notebook.
  2. Import the necessary packages:
    # pip install graphqlclient
    from graphqlclient import GraphQLClient
    import json
    import os
    import pandas as pd
    import matplotlib.pyplot as plt
    
    client = GraphQLClient(‘API_URL WE COPIED FROM APPSYNC’)
    client.inject_token(os.environ["x-api-key"] ,API_KEY WE COPIED FROM APPSYNC') #os.environ["x-api-key"]
  1. Use the following code to load the dataset we want from the data source:
    def loadFullDataSet(query,operationName,schemaType,pageToken,variables) :
        ERRORKEY = "errors"
        DATAKEY = "data"
        ERRORMSGKEY = "message" 
        dataList = []
        nextToken = ""
        page=1
        try:
            while nextToken is not None :
                variables[pageToken] = nextToken  
                #Executing the query
                data = client.execute(query=query, variables=variables)
                jsonDict = json.loads(data)
                errors = None
                #Check for errors
                if ERRORKEY in jsonDict :
                    errors = jsonDict[ERRORKEY]
                if errors is None :
                    currentArr = jsonDict[DATAKEY][operationName][schemaType]
                    nextToken = jsonDict[DATAKEY][operationName][pageToken]
                    if currentArr is not None and len(currentArr) >0 :
                        dataList.extend(currentArr) 
                    print("page {} total size {}".format(page,len(dataList)))
                    page = page +1
                else:
                    errorMsg = errors[0][ERRORMSGKEY]
                    raise Exception(errorMsg)
        except Exception as e:
                print('Handling error:', e)
                raise e
        return dataList
  2. Now we can run a query to fetch the data:
    # Create the query string and variables required for the request. Note current pagesize is small will take long time.
    query = """
        query covidQuery($nextToken: String){
         getCovidInfosByCountry(maxResults : 990, country: "US", nextToken : $nextToken)  {
            covidInfo
            {
              country
              provincestate
              region
              updated_date
              cases
              casetype
              latitude
              longitude
    
            }
            nextToken
          }
        }
    """
    #$startDate: String, $endDate: String,
    operationName = "getCovidInfosByCountry"
    schemaType = "covidInfo"
    pageToken = "nextToken"
    variables = {"country": "US", pageToken: ""}
    totalData = loadFullDataSet(query,operationName,schemaType,pageToken,variables)

We can see in the following code that GraphQL is starting to fetch the data:

page 1 total size 991
page 2 total size 1980
page 3 total size 2969
page 4 total size 3958
page 5 total size 4947
page 6 total size 5936
page 7 total size 6925
page 8 total size 7914
page 9 total size 8903
page 10 total size 9892
  1. We now load the data into a Pandas data frame:
    df = pd.DataFrame(totalData)
    df

The following screenshot shows our results.

Summary

In this post, we walked through the process of deploying an AWS AppSync API and using JupyterHub to access data via GraphQL. We explored how we use Lambda to connect AWS AppSync to Athena. We then used JupyterHub to create a simple query to fetch the dataset using the GraphQL API we deployed in AWS AppSync.


About the Authors

Michael Song is a data engineer at Takeda Pharmaceuticals in Cambridge. Michael joined Takeda in 2018 and has worked on many projects within the firm’s data engineering and data science initiatives. Most recently, Michael is working on R&D IT’s API strategy, focusing on AWS AppSync and GraphQL.

Rajesh Mikkilineni is a lead data engineer at Takeda Pharmaceuticals in Cambridge. Raj is an experienced softwaredeveloper, pipeline developer and worked on multiple projects to bring actionable insight of data. He has implemented cloud-based data platforms at multiple companies.

Karl Gutwin is the Director for Software Engineering Services at BioTeam, based in the Boston, MA area. Since 2017, Karl has worked with numerous clients on software development, cloud architecture and implementation, data management, and more. As part of Takeda’s long-standing relationship with BioTeam, Karl has been a key part of building their data infrastructure in AWS.

Anusha Dharmalingam is a Solutions Architect at Amazon Web Services, with a passion for Application Development and Big Data solutions. Anusha works with enterprise customers to help them architect, build, and scale applications to achieve their business goals.