亚马逊AWS官方博客

AppSync实现行情数据订阅/发布

在一个交易系统中,一次完整的交易,大致可有三个步骤:接收行情→分析行情→发出买卖指令并成交。可以看出,行情数据是非常基本也是非常重要的部分。特别是对于量化高频的交易者来说,行情数据的精度和实时性就尤其重要。

AWS AppSync是一个完全托管的服务,允许在AWS Cloud中部署无服务器GraphQL后端。它提供了开发人员可以用来创建现代数据驱动应用程序的功能,允许从单个GraphQL端点轻松查询多个数据库、微服务和API。客户可以利用AppSync实时功能、离线数据同步、内置服务器端缓存、细粒度访问控制、安全性、使用GraphQL解析器支持API层中的业务逻辑等。在本文中,我们将重点介绍AWS AppSync内置功能,以实现实时行情发布订阅的用例。

推荐方案

如上图,需要发布的数据,通过flink聚合后,结果被Lambda程序推入到Appsync中;或者数据无需聚合,以单笔的方式直接发布到AppSync中。AppSync通过和客户端的websocket通道,把这些数据推送给订阅者。

在我们接下来的Demo中,我们使用MSK(AWS 托管的Kafka)作为流数据处理平台,使用Lambda作为流数据的消费端程序平台。客户在生产实现中,可以把Lambda程序替换为基于EC2的kafka消费程序,获得更低的时延。

准备

  • Cloud9:程序开发环境
  • MSK(Amazon Managed Streaming for Apache Kafka)
  • Lambda函数(在VPC内)

1.     创建Kafka 集群

参见链接:https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-cluster.html

2.在Cloud9中关联kafka集群

参见链接:https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-client-machine.html

3.为kafka创建主题

登入cloud9,请确保上面第2步已经完成。为kafka创建一个心的topic。参见链接:https://docs.aws.amazon.com/zh_cn/msk/latest/developerguide/create-topic.html 在这里我们使用multipartition作为主题的名称

4.创建虚拟行情数据生成程序

登入cloud9,创建一个新的python文件,文件请下载

https://github.com/sun-biao/appsync-demo/blob/main/ticker_kafka.py

修改代码中的bootstrap_servers 值,如果topic的名称发生了改变,请一并修改topicName参数。

5.创建一个VPC内的Lambda

我们使用Lambda来模拟行情推送程序,作用是从kafka拉取行情数据推送到发布系统上。这里的Lambda需要能够访问Kafka,请将Lambda配置到kafka所在的vpc内,并确保Lambda可以访问kafka和Internet. 参考链接下面链接。本文使用Python3.9 作为开发语言。 https://docs.aws.amazon.com/lambda/latest/dg/configuration-vpc.html

AppSync 设置

  1. 在console中打开Appsync服务https://us-west-2.console.aws.amazon.com/appsync/
  2. 点击Create API
  3. 选择Build from scratch 并点击右侧的Start
  4. 输入名称TickerDemo并点击Create
  5. 在创建好的AppSync API界面上,点击左侧Schema按钮,并在出现的输入框中粘贴如下内容,之后点击右上角Save Schema
type Mutation {
	tickerlocal(id: ID!, ticker: String!, price: Float!): Tickerdata!
}
type Query {
	allTicker: [ID]
}
type Subscription {
	tickerlocal(ticker: String): Tickerdata
		@aws_subscribe(mutations: ["tickerlocal"])
}
type Tickerdata {
	id: ID!
	ticker: String!
	price: Float!
}
schema {
	query: Query
	mutation: Mutation
	subscription: Subscription
}
  1. 点击界面左侧的Data Sources链接,然后点击右侧Create data source
  2. 在Data source name中输入名称tickerresolover,Data source type里选择 None,点击Create
  3. 回到Schema页面,找到右侧Mutation下的tickerlocal(…): Tickerdata!,点击右侧的Attach按钮。
  4. 在弹出的Data source name中选择刚刚创建的tickerresolover
  5. 在下面弹出的Configure the request mapping template输入框中复制如下语句:
{
    "version": "2017-02-28",
    "payload": {
        "id": "${context.arguments.id}",
        "ticker": "${context.arguments.ticker}",
        "price":  "${context.arguments.price}"
    }
}
  1. 点击右上角Save Resolver
  2. 点击左侧setting链接,记录下API URL和API KEY

配置行情推送程序

将kafka作为数据源与Lambda绑定

  1. 打开之前创建的Lambda程序,在Function overview中,点击Add trigger
  2. 数据源选择MSK,然后选中之前创建的MSK集群。
  3. 确认Enable trigger 被选中,Topic name 设置为之前创建的multipartition
  4. 点击保存

点击Lambda程序的Code标签,替换为如下代码。注意替换代码中的API_KEY和API_URL

import json
import base64
import urllib3

header_obj = {'x-api-key':'API_KEY'}
http = urllib3.PoolManager()
def lambda_handler(event, context):
    for item in event['records'].values():
        for ite in item:
            tmpstr = base64.b64decode(ite['value']).decode('utf-8')
            print('partition is {} and value is {}'.format(ite['partition'],tmpstr))
            val = json.loads(tmpstr)
            data = """mutation local {{ tickerlocal(id:{id} ticker:{format1}  price:{format2}  ) {{  id ticker price  }}}}"""
            _data = data.format(format1='"'+val['ticker']+'"',format2=val['price'],id=val['id'])
            r = http.request(
                method='POST', 
                url=’API_URL',
                headers = header_obj,
                body = json.dumps({'query':_data})
            )
            
    return "done!"
  1. 配置Lambda程序的权限,使Lambda具有Kafka读取权限

模拟AppSync消费端程序

我们使用python程序来订阅AppSync中发布的数据。将文件https://github.com/sun-biao/appsync-demo/blob/main/appsyncrealtimeclient.py 下载到Cloud9,并修改文件中的API_KEY和API_URL。

测试

现在,Cloud9中有两个python程序,ticker_kafka.py 和 appsyncrealtimeclient.py。分别运行两个程序,会看到屏幕上会显示发送的消息和接收的消息,确定我们的订阅成功。

同时,我们也可以测试订阅的过滤功能,我们将 appsyncrealtimeclient.py文件中的

‘query’: ‘subscription mysub { tickerlocal { id ticker price  } }’

修改为

‘query’: ‘subscription mysub { tickerlocal(ticker:”BTC”) { id ticker price  } }’

我们就可以实现只订阅BTC数据的目标。

总结:

在这篇文章中,我们展示了如何利用AWS AppSync实时功能来解决行情系统中的一个重要用例。使用实时行情更新解决方案,您可以根据基于最佳实践设计的参考架构,在自己的AWS帐户中部署可扩展的无服务器架构。最重要的是,您不需要管理服务器或基础设施来实现可扩展和可靠的实时后端。您可以自定义解决方案和代码,现在可以通过访问解决方案页面开始构建它。

参考文档:

https://docs.aws.amazon.com/appsync/latest/devguide/aws-appsync-real-time-data.html

本篇作者

孙标

亚马逊云科技资深解决方案架构师。拥有多年金融,移动互联网研发及数字货币交易所架构经验。