亚马逊AWS官方博客

AppSync调试方法

一. GraphQL简介

GraphQL是一种新的API规范及查询语言,它按照客户的查询需求“不多不少”准确返回查询结果。它通过简明的类型系统描述查询及返回结果。GraphQL 通常通过单入口来提供 HTTP 服务的完整功能,这一实现方式与暴露一组 URL 且每个 URL 只暴露一个资源的 REST API 不同。GraphQL可以通过 GraphQL schema 的持续演进来避免版本控制。

关于GraphQL的规范请参考:https://graphql.org/

二.  AppSync简介

AWS AppSync 是一项完全托管的服务,通过处理与 AWS DynamoDB、Lambda 等数据源之间繁重的安全连接任务来简化 GraphQL API 的开发。添加缓存以提高性能、订阅以支持实时更新以及客户端数据存储以使离线客户端保持同步等操作也一样轻松简单。

通过托管的 GraphQL 订阅,AWS AppSync 可以通过 Websocket 向数百万客户端推送实时数据更新。对于移动和 Web 应用程序,AppSync 还可在设备离线时提供本地数据访问,并在它们重新上线后提供支持自定义冲突解决方案的数据同步功能。

关于AppSync的更详细的介绍请参考:https://aws.amazon.com/cn/appsync/

三. AppSync常见调试工具

AppSync支持以下认证授权方式:

  • API Key
  • IAM
  • OpenID Connect
  • Lambda

不同的授权方式,直接影响调试的复杂度。本文以最为常用的API Key及IAM授权方式进行分析。

tool Query and Mutation Real-time Subscription
API Key IAM API Key IAM
Postman x
websocat x x x
wscat x x x
curl x x x
wget x x x

四. AppSync的WebSocket验证授权方法

AppSync的验证授权遵循Amazon Web Service v4 Signature。但是对于real-time subscription,须遵循https://docs.aws.amazon.com/appsync/latest/devguide/real-time-websocket-client.html,完成websocket协议上的验证授权。


从上述图例,我们可以明确:

  1. Query和Mutation的endpoint协议为HTTPS。
  2. Real-Time subscription的endpoint协议为WebSocket。即Real-Time subscription无法接入协议为 HTTPS的endpoint。

在进行Real-Time subscription时,我们需要指定WebSocket sub protocol:

Sec-WebSocket-Protocol:graphql-ws

通过aws cli我们可以获得AppSync的endpoint 信息。

aws appsync get-graphql-api --api-id b553xzgnijdtjjvli67zc3zmly
{
    "graphqlApi": {
        "name": "demo1",
        "apiId": "b553xzgnijdtjjvli67zc3zmly",
        "authenticationType": "API_KEY",
        "logConfig": {
            "fieldLogLevel": "ERROR",
            "cloudWatchLogsRoleArn": "arn:aws-cn:iam::162611943124:role/service-role/appsync-graphqlapi-logs-cn-north-1",
            "excludeVerboseContent": false
        },
        "arn": "arn:aws-cn:appsync:cn-north-1:162611943124:apis/b553xzgnijdtjjvli67zc3zmly",
        "uris": {
            "REALTIME": "wss://vlvwhsddczatlel4l4b4k4ygri.appsync-realtime-api.cn-north-1.amazonaws.com.cn/graphql",
            "GRAPHQL": "https://vlvwhsddczatlel4l4b4k4ygri.appsync-api.cn-north-1.amazonaws.com.cn/graphql"
        },
        "tags": {},
        "xrayEnabled": false
    }
}

所有的验证授权都是在”GRAPHQL” endpoint上完成,对于Query和Mutation非常易于理解;而实时订阅首先从在”GRAPHQL” endpoint 上获得验证授权信息,然后在”REALTIME” endpoint上进行数据的交互。

客户端在handshake过程中完成与AppSync的验证授权。验证授权过程中必须携带的信息:

  • header

base64编码的字符串化的JSON对象。

  • payload

base64编码的负荷。

API KEY的验证授权

基于API KEY的验证授权,其header JSON对象为:

{

    "host":"example1234567890000.appsync-api.us-east-1.amazonaws.com",

    "x-api-key":"da2-12345678901234567890123456"

}

其payload JSON对象为:

{}

WebSocket连接串为:

wss://host/graphql?header&payload

例如:

wss://vlvwhsddczatlel4l4b4k4ygri.appsync-realtime-api.cn-north-1.amazonaws.com.cn/graphql?header=eyJob3N0IjogInZsdndoc2RkY3phdGxlbDRsNGI0azR5Z3JpLmFwcHN5bmMtYXBpLmNuLW5vcnRoLTEuYW1hem9uYXdzLmNvbS5jbiIsICJ4LWFwaS1rZXkiOiAiZGEyLWQ3YW5lcDdtYXJobnZuenp6NTQzbjY0ZHZ1In0=&payload=e30=

基于IAM的验证授权

基于IAM的验证授权,验证授权http的信息汇总为:

{
  url: "https://example1234567890000.appsync-api.us-east-1.amazonaws.com/graphql/connect",
  data: "{}",
  method: "POST",
  headers: {
    "accept": "application/json, text/javascript",
    "content-encoding": "amz-1.0",
    "content-type": "application/json; charset=UTF-8",
  }
}

基于上述内容进行V4的签名认证,基于规范“/connect”被添加到cannocial uri后,其正确内容应为:

canonical_uri = '/graphql/connect'

基于上述信息完成V4签名后,基于IAM验证授权的header JSON对象为:

{
    "accept": "application/json, text/javascript",
    "content-encoding": "amz-1.0",
    "content-type": "application/json; charset=UTF-8",
    "host": "vlvwhsddczatlel4l4b4k4ygri.appsync-api.cn-north-1.amazonaws.com.cn",
    "x-amz-date": "20210903T065522Z",
    "Authorization": "AWS4-HMAC-SHA256 Credential=AKIASLXDNK3KFKRBJQU3/20210903/cn-north-1/appsync/aws4_request, SignedHeaders=host;x-amz-date, Signature=a50cc5ab866de3381e5c85ae0fefefb46f7522d62e0a79fb95f9340975382a75"
}

其payload JSON对象为:

{}

其wss链接串为:

wss://vlvwhsddczatlel4l4b4k4ygri.appsync-realtime-api.cn-north-1.amazonaws.com.cn/graphql?header=eyJhY2NlcHQiOiAiYXBwbGljYXRpb24vanNvbiwgdGV4dC9qYXZhc2NyaXB0IiwgImNvbnRlbnQtZW5jb2RpbmciOiAiYW16LTEuMCIsICJjb250ZW50LXR5cGUiOiAiYXBwbGljYXRpb24vanNvbjsgY2hhcnNldD1VVEYtOCIsICJob3N0IjogInZsdndoc2RkY3phdGxlbDRsNGI0azR5Z3JpLmFwcHN5bmMtYXBpLmNuLW5vcnRoLTEuYW1hem9uYXdzLmNvbS5jbiIsICJ4LWFtei1kYXRlIjogIjIwMjEwOTAzVDA2NTUyMloiLCAiQXV0aG9yaXphdGlvbiI6ICJBV1M0LUhNQUMtU0hBMjU2IENyZWRlbnRpYWw9QUtJQVNMWEROSzNLRktSQkpRVTMvMjAyMTA5MDMvY24tbm9ydGgtMS9hcHBzeW5jL2F3czRfcmVxdWVzdCwgU2lnbmVkSGVhZGVycz1ob3N0O3gtYW16LWRhdGUsIFNpZ25hdHVyZT1hNTBjYzVhYjg2NmRlMzM4MWU1Yzg1YWUwZmVmZWZiNDZmNzUyMmQ2MmUwYTc5ZmI5NWY5MzQwOTc1MzgyYTc1In0=&payload=e30=

五. Postman调试

(一)基于API Key的Query调试

选择授权类型为:API Key,设置key:x-api-key,及其值。

输入查询内容:

返回查询结果:

(二)基于API Key的Real-Time Subscription

第一步,选择New

第二步,选择WebSocket Request Beta

第三步,在“Params”内依次填入base64编码的:

  • header
  • payload

第四步,在Headers内填入:
第五步,在“Enter server URL”填入wss endpoint,然后点connect。第六步,在“Compose Message”内填入订阅信息:然后点“Send”

当有数据更新时,实时接收到更新

(三)基于IAM的实时订阅

由于涉及到V4签名,基于IAM的实时订阅无法通过Postman等工具进行测试。本文将基于python WebSocket进行测试。

代码可以从https://github.com/picomy/appsync-websocket下载。

import sys, os, hashlib, hmac
from base64 import b64encode, decode
from datetime import datetime
from uuid import uuid4
import websocket
import threading
import json

# ************************** Reference **************************
# https://docs.aws.amazon.com/appsync/latest/devguide/real-time-websocket-client.html
# https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
# https://docs.aws.amazon.com/apigateway/api-reference/signing-requests/
# ************************** Reference **************************

# The client connect to wss_url through websocket protocol 
wss_url = 'wss://vlvwhsddczatlel4l4b4k4ygri.appsync-realtime-api.cn-north-1.amazonaws.com.cn/graphql'

# The client will authenticate itself from http_url through http protocol
# Query and mutation will be ran within http_url endpoint
http_url = 'https://vlvwhsddczatlel4l4b4k4ygri.appsync-api.cn-north-1.amazonaws.com.cn/graphql'

host = http_url.replace('https://','').replace('/graphql','')
# IAM authentication method
method = 'POST'
region = 'cn-north-1'
service = 'appsync'
access_key = 'xxxxxxxxxx'
secret_key = 'xxxxxxxxxx'
canonical_uri = '/graphql/connect' 
canonical_querystring = ''
# payload type has to be string
payload = '{}'
accept = "application/json, text/javascript"
content_encoding = "amz-1.0"
content_type = "application/json; charset=UTF-8"

# ********************************** AWS V4 Signature **********************************
def sign(key, msg):
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

def getSignatureKey(key, dateStamp, regionName, serviceName):
    kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, 'aws4_request')
    return kSigning

def gen_auth(servic_name, region_name, ak, sk, canonical_uri, canonical_querystring, payload):
    t = datetime.utcnow()
    amzdate = t.strftime('%Y%m%dT%H%M%SZ')
    datestamp = t.strftime('%Y%m%d') # Date w/o time, used in credential scope
    payload_hash = hashlib.sha256(payload.encode('utf-8')).hexdigest()
    canonical_headers = 'host:' + host + '\n' + 'x-amz-date:' + amzdate + '\n'
    signed_headers = 'host;x-amz-date'
    canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash
    algorithm = 'AWS4-HMAC-SHA256'
    credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request'
    string_to_sign = algorithm + '\n' +  amzdate + '\n' +  credential_scope + '\n' +  hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
    signing_key = getSignatureKey(secret_key, datestamp, region, service)
    signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()
    authorization_header = algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' +  'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature
    return ({'auth_date':amzdate,'auth_header':authorization_header})

# *************************** AppSync: websocket ***************************
# Generate IAM authentication header
auth = gen_auth(service,region,access_key,secret_key,canonical_uri,canonical_querystring,payload)

iam_header = {
    'accept': accept,
    'content-encoding': content_encoding,
    'content-type': content_type,
    'host': host,
    'x-amz-date': auth['auth_date'],
    'Authorization': auth['auth_header']
}

# GraphQL subscription Registration object
GQL_SUBSCRIPTION = json.dumps({
        'query': 'subscription MySubscription {subscribeToEventComments(eventId: "0534ad04-fd29-49d6-815a-fa64a80979c1") {commentId content createdAt eventId } }',
        'variables': {}
})


# Set up Timeout Globals
timeout_timer = None
timeout_interval = 10
# Subscription ID (client generated)
SUB_ID = str(uuid4())

# Calculate UTC time in ISO format (AWS Friendly): YYYY-MM-DDTHH:mm:ssZ
def header_time():
    return datetime.utcnow().isoformat(sep='T',timespec='seconds') + 'Z'

# Encode Using Base 64
def header_encode( header_obj ):
    return b64encode(json.dumps(header_obj).encode('utf-8')).decode('utf-8')

# reset the keep alive timeout daemon thread
def reset_timer( ws ):
    global timeout_timer
    global timeout_interval

    if (timeout_timer):
        timeout_timer.cancel()
    timeout_timer = threading.Timer( timeout_interval, lambda: ws.close() )
    timeout_timer.daemon = True
    timeout_timer.start()

# Socket Event Callbacks, used in WebSocketApp Constructor
def on_message(ws, message):
    global timeout_timer
    global timeout_interval

    print('### message ###')
    print('<< ' + message)

    message_object = json.loads(message)
    message_type   = message_object['type']

    if( message_type == 'ka' ):
        reset_timer(ws)

    elif( message_type == 'connection_ack' ):
        timeout_interval = int(json.dumps(message_object['payload']['connectionTimeoutMs']))
        payload = GQL_SUBSCRIPTION
        canonical_uri ='/graphql'
        sub_auth = gen_auth(service,region,access_key,secret_key,canonical_uri,canonical_querystring,payload)
        register = {
            'id': SUB_ID,
            'payload': {
                'data': GQL_SUBSCRIPTION,
                'extensions': {
                    'authorization': {
                        'host': host,
                        'Authorization': sub_auth['auth_header'],
                        'x-amz-date': sub_auth['auth_date']
                    }
                }
            },
            'type': 'start'
        }
        start_sub = json.dumps(register)
        print('>> '+ start_sub )
        ws.send(start_sub)

    elif(message_type == 'data'):
        deregister = {
            'type': 'stop',
            'id': SUB_ID
        }
        end_sub = json.dumps(deregister)
        print('>> ' + end_sub )
        ws.send(end_sub)

    elif(message_object['type'] == 'error'):
        print ('Error from AppSync: ' + message_object['payload'])
    
def on_error(ws, error):
    print('### error ###')
    print(error)

def on_close(ws):
    print('### closed ###')

def on_open(ws):
    print('### opened ###')
    init = {
        'type': 'connection_init'
    }
    init_conn = json.dumps(init)
    print('>> '+ init_conn)
    ws.send(init_conn)

if __name__ == '__main__':
    # Uncomment to see socket bytestreams
    #websocket.enableTrace(True)

    # Set up the connection URL, which includes the Authentication Header
    #   and a payload of '{}'.  All info is base 64 encoded
    connection_url = wss_url + '?header=' + header_encode(iam_header) + '&payload=e30='

    # Create the websocket connection to AppSync's real-time endpoint
    #  also defines callback functions for websocket events
    #  NOTE: The connection requires a subprotocol 'graphql-ws'
    print( 'Connecting to: ' + connection_url )

    ws = websocket.WebSocketApp( connection_url,
                            subprotocols=['graphql-ws'],
                            on_open = on_open,
                            on_message = on_message,
                            on_error = on_error,
                            on_close = on_close,)

    ws.run_forever()

实际运行结果:

本篇作者

杨帅军

资深数据架构师,专注于数据处理。目前主要为车企提供数据治理服务。