亚马逊AWS官方博客

利用 Amazon DocumentDB 存储分时、K 线行情数据的可行性分析及迁移方案

交易数据是资本市场运行的基础数据,在交易层面它支撑着市场的交易撮合、清算结算等核心功能,在分析层面它通过不同维度(逐笔、分时、K 线等)的数据整合为投资者提供市场趋势判断、交易策略研究、风险控制等决策支持,同时对于监管机构而言,交易数据的分析也是市场监督、违规行为发现、政策制定的重要依据,是确保市场有效运行和健康发展的关键要素。

分析层面,交易数据大致可以分为逐笔行情、分时行情、K 线行情等几个大类。

  • 逐笔行情数据记录了市场上每一笔实际成交的时间、价格、成交量等详细信息,能够更精确地反映价格变动和市场交投情况,但数据量较大。逐笔行情数据主要应用于高频交易策略研究、市场微观结构分析、订单流量分析、大单跟踪、价格预测模型构建以及实时风控监测等场景;
  • 分时行情数据记录每分钟的成交均价、成交量和成交额,以连续曲线形式展示当日实时交易状态,颗粒度更细且一般固定为 1 分钟周期(也有个别的交易所提供粒度小于 1 分钟的分时,如 15 秒的分时数据)。分时行情数据主要应用于日内交易策略制定、盘中实时监控、开盘和收盘价格趋势研判、短线交易机会捕捉、以及量价关系分析等场景,是日内交易者和短线投资者进行决策的重要参考依据;
  • K 线行情数据是将某个时间周期(如 1 分钟、5 分钟、日线等)内的开盘价、最高价、最低价、收盘价汇总的数据,主要反映价格波动的整体走势。K 线行情数据主要应用于技术分析、趋势研判、形态识别、均线系统分析、支撑阻力位判断以及量价配合等中长期投资分析场景,是投资者进行市场研究和交易决策最常用的数据形式。本文称应用在分析场景的交易数据为行情数据。

在券商面向长尾 C 端用户展示分时、K 线行情的场景中,由于这些用户数量庞大,对于延时有较高的容忍度(通常看来可以接受 5 秒左右的延迟),且行情展示对事务性要求并不高,文档型数据库以其灵活的数据类型,优异的性价比,良好的查询性能以及水平扩展能力,成为存储分时、K 线行情数据的理想选择。

本文探讨了以优化数据存储架构为目标,券商面向长尾 C 端用户的分时、K 线行情数据展示场景中的行情数据利用 Amazon DocumentDB 存储的可行性,以及数据存储和 Mysql 数据迁移方案。这种方案将在保证数据库查询性能,数据库可用性 SLA 的前提下,显著降低数据存储成本,降低运维复杂度。

1. 行情数据特点及 Schema 描述

1.1 逐笔行情数据特点及 Schema 描述

逐笔行情数据记录了市场上每一笔实际成交的时间、价格、成交量、买卖方向等详细信息,具有数据量大、实时性强、写入频繁、对存储空间要求高等特点,是反映市场微观结构最精确和完整的数据。

逐笔行情数据对写入速度要求极高:在市场高峰交易期间,系统需要支持每秒数万至数十万笔交易数据的实时写入;同时要考虑多个交易品种的并发写入需求,总体写入吞吐量需要达到每秒数十万次甚至更高;

逐笔行情数据对写入延迟的要求也很高,主要体现在:系统必须在毫秒级别内完成单笔交易数据的写入,以确保实时反映市场交易状态。写入延迟过高可能导致数据丢失或影响实时行情展示、风控监测等关键业务功能。

逐笔行情数据对读取速度的要求主要体现在:需要支持毫秒级的实时查询以满足行情展示、策略分析等需求;同时要能高效处理历史数据的批量查询,如支持快速回溯特定时间段的交易明细;对于多维度的数据分析需求,要能在大数据量下保持较好的查询性能。

逐笔行情数据的读取延迟要求严格:实时查询场景(如盘口展示、风控监测)需要在毫秒级别内响应,以确保数据时效性;高频交易策略执行需要在微秒级别获取最新交易数据;历史数据查询虽可接受稍高延迟,但对于大数据量的分析场景仍要求在秒级内返回结果。

不同交易所定义的逐笔行情数据的 Schema 不尽相同,相同的交易所对不同标的的行情数据的 Schema 定义也不尽相同。以 NYSE 对股票(Stock)的逐笔行情数据为例,Schema 如下:

Field Name Field Data Type 字段描述
RIC varchar 路透证券代码
Domain varchar 数据领域
DateTime varchar 日期时间
GMTOffset int64 时间偏移量
Type varchar 数据类型
ExCntrbID varchar 交易所贡献者 ID
LOC varchar 交易所地点
Price double 成交价格
Volume int64 成交量
MarketVWAP double 市场成交量加权平均价
BuyerID varchar 买方 ID
BidPrice double 买入价
BidSize int64 买入量
NoBuyers varchar 买方数量
SellerID varchar 卖方 ID
AskPrice double 卖出价
AskSize int64 卖出量
NoSellers varchar 卖方数量
Qualifiers varchar 限定符
SeqNo int64 序列号
ExchTime time 交易所时间
BlockTrd varchar 大宗交易标识
UpLimPrice double 涨停价
LoLimPrice double 跌停价
Date date 日期
BidTic varchar 买入价变动标识
TickDir varchar 价格变动方向
Open double 开盘价
High double 最高价
Low double 最低价
OpenInterest varchar 持仓量
BenchPrice varchar 基准价
AccVolume int64 累计成交量
Turnover varchar 成交额
MidPrice varchar 中间价
OriginalData varchar 原始数据
OriginalPrice varchar 原始价格
OriginalVolume varchar 原始成交量
OriginalSeqNo varchar 原始序列号
OriginalExchTime varchar 原始交易所时间
TradePriceCurrency varchar 交易价格货币
UniqueTradeIdentification int64 唯一交易标识
NetChange double 净价格变动
OriginalUniqueTradeIdentification varchar 原始唯一交易标识
ISIN varchar 国际证券识别码
UniqueQuoteIdentification varchar 唯一报价标识

表 1 NYSE Stock 逐笔行情数据 Schema

1.2 分时行情数据特点及 Schema 描述

分时行情数据以固定一分钟为时间间隔,记录证券交易的价格、成交量、成交额、均价等关键信息,同时包含当日累计成交数据和实时买卖盘口信息,数据量相对适中且写入频率固定,是观察日内市场实时交易状况、分析短期价格走势的重要数据来源。

分时行情数据的写入速度要求相对较低:系统需要每分钟定时处理所有交易品种的分时数据写入,通常是每个交易品种每分钟产生一条记录,即使在大型市场中交易品种数量达到数千只,每分钟的写入量也仅在数千条级别;但需要注意的是这些写入会在每分钟的固定时点集中发生,因此系统要能够承受这种周期性的写入压力峰值,通常使用关系型数据库就能满足这种写入需求。

分时行情数据的写入延迟要求相对适中:由于是每分钟固定时间点进行数据汇总写入,对写入延迟的容忍度可以在秒级别,通常要求在 1-2 秒内完成单个品种的分时数据写入;但在每分钟的写入时点会出现多个交易品种集中写入的情况,系统需要能够承受这种周期性的写入压力,确保数据及时更新且不影响市场行情的实时展示。

分时行情数据的读取速度要求主要体现在:需要支持大量用户同时查看不同品种的分时图表,要求在百毫秒级别内返回单个品种的当日分时数据;针对历史分时数据的查询,需要快速获取指定时间范围的数据用于技术分析;由于分时数据是盘中高频访问的数据,通常需要通过合理的缓存策略和内存数据库来提升查询性能,保证数据的实时性和访问效率。

分时行情数据的读取延迟要求体现为面向终端用户的实时行情展示需要在百毫秒级别内返回数据,以确保分时图的流畅更新;当日分时数据作为高频访问数据,通常要求在 200-300 毫秒内完成查询响应;对于历史分时数据的查询可以容忍稍高的延迟,但一般也需要在 1 秒内返回结果。

分时行情数据往往由券商根据逐笔行情数据聚合而成。常见的分时行情数据 Schema 如下:

Field Name Field Data Type 字段描述
SecurityID varchar 证券代码
TradingDate date 交易日期
TimeStamp datetime 时间戳(精确到分钟)
Close double 当前分钟收盘价
Volume int64 当前分钟成交量
Amount double 当前分钟成交额
AvgPrice double 当前分钟成交均价
AccVolume int64 累计成交量
AccAmount double 累计成交额
OpenPrice double 今日开盘价
PreClose double 昨日收盘价
BidPrice double 申买价
AskPrice double 申卖价
BidVolume int64 申买量
AskVolume int64 申卖量

表 2 常见分时行情数据 Schema

1.3 K 线行情数据特点及 Schema 描述

K 线图(也称蜡烛图)是金融市场中最常用的价格展示形式,每根蜡烛代表特定时间周期内的价格变动情况。其核心数据包含四个基本价格点:开盘价(Open)、收盘价(Close)、最高价(High)和最低价(Low),通常以 OHLC 的形式记录和存储。在视觉呈现上,蜡烛由实体和上下影线组成,实体显示开盘价和收盘价之间的区间,影线则反映最高价和最低价的波动范围。当收盘价高于开盘价时,通常显示为红色或白色的阳线;当收盘价低于开盘价时,则显示为绿色或黑色的阴线。这种数据结构不仅能直观地展现价格波动,还能帮助分析师和交易者理解市场趋势、判断市场情绪,是技术分析中最基础和重要的数据形式之一。

K 线行情数据的写入速度要求相对较低:系统需要在不同周期(1 分钟、5 分钟、日 K 等)的时间点对数据进行汇总计算并写入,每个周期点的写入量取决于交易品种数量,即使在大型市场中,单个时间点的写入量也通常在数千条级别;且由于 K 线是周期性汇总数据,写入操作相对分散,不会出现密集写入的压力。

K 线行情数据的写入延迟要求相对宽松:由于 K 线是在各个周期(1 分钟、5 分钟、日 K 等)结束时对数据进行汇总计算并写入,因此通常允许在 1-2 秒内完成数据写入;即使在多个品种同时需要更新 K 线数据的情况下,系统也有足够的处理时间窗口;但需要注意的是,为了保证数据的连续性和准确性,写入操作仍需要在下一个周期开始前完成。

K 线数据的读取速度要求主要体现在:需要支持大量用户同时查看不同品种、不同周期(分钟、小时、日、周、月等)的 K 线图表,要求在 500 毫秒内返回单个品种的常用周期 K 线数据;对于技术分析和策略回测场景,需要快速获取大量历史 K 线数据;由于 K 线数据是市场分析的基础数据,访问频率高且用户量大,通常需要通过缓存机制和合理的数据分区策略来提升查询性能。

K 线数据的读取延迟要求相对灵活:实时 K 线展示需要在 500 毫秒内响应,以保证图表的流畅更新;常用周期(如日 K、周 K)的查询通常要求在 300-500 毫秒内完成;对于大批量历史 K 线数据的查询(如策略回测)可以容忍更长的延迟,但一般需要在 1-2 秒内返回结果。由于不同场景对延迟的要求不同,系统通常采用多级缓存策略,对热点数据优先缓存以提升查询效率。

K 线行情数据往往由券商根据逐笔行情数据聚合而成。常见的蜡烛图数据 Schema 如下:

Field Name Field Type 字段描述
id int64 Record id
symbol varchar stock symbol
endtime datetime 结束时间
symbolid int64 股票 id
openprice double 开盘价
high double 最高价
low double 最低价
close double 收盘价
volume int64 交易量
twap double 时间加权平均价
vwap double 成交量加权平均价
previousclose double 昨日收盘价
instrumentid varchar 股票代码唯一 id

表 3 常见 K 线行情数据 Schema

除了对性能有较高要求之外,由于行情数据作为金融市场的核心数据,对可用性和持久性要求极高:系统需要 7×24 小时稳定运行,尤其在交易时段内不允许出现服务中断,必须具备故障自动切换能力;在数据持久性方面,要求所有交易数据必须可靠存储且不允许出现丢失或损坏,需要通过多副本备份、容灾部署等机制来保障,同时要建立完善的数据备份和恢复方案,确保数据的安全性和完整性。

2. Amazon DocumentDB 存储行情数据

2.1 Amazon DocumentDB 的设计原理及应用场景

Amazon DocumentDB 是一款完全托管的文档数据库服务,其核心设计理念是将存储层和计算层完全分离,采用分布式、云原生架构。在存储层面,Amazon DocumentDB 使用一个分布式、自我修复的存储系统,该系统将数据分散存储在 6 个副本中,跨越 3 个可用区,确保数据的高可用性和持久性。它采用了基于日志的存储架构,所有写入操作首先被追加到分布式日志中,然后再异步应用到存储层,这种设计既保证了写入操作的原子性,也提供了出色的写入性能。在计算层面,Amazon DocumentDB 实现了 MongoDB 3.6/4.0/5.0 版本的 API 兼容,使得应用程序可以无缝迁移而无需修改代码。它的查询处理引擎经过优化,能够自动处理数据分片、索引管理和查询优化,同时支持读取器节点的自动扩展,可以根据负载动态调整资源。

目前,Amazon DocumentDB 提供了副本级(Instance Based Cluster)架构分片(Elastic Cluster)架构两种扩展方案,可以根据业务需求灵活选择。副本级架构采用一主多从的复制模式,支持最多 15 个只读副本,主节点处理写操作而副本节点提供读服务,通过异步复制确保数据最终一致性;该架构特别适合读多写少、需要提升读取性能和高可用性的场景,如内容管理系统和数据分析应用。副本级架构其存储卷可以自动扩展,最大支持 128TB,而无需手动管理存储空间。分片架构则采用无服务器的路由节点设计,通过单字段哈希分区实现数据分片,支持动态扩展分片数量;虽然目前不支持范围分区和组合分片键,但其水平扩展能力使其特别适合需要处理海量数据写入、存储空间需求大幅增长的场景,如物联网数据采集和日志存储系统。在安全性方面,Amazon DocumentDB 提供了静态加密、传输中加密以及基于 AWS Identity and Access Management (IAM)的细粒度访问控制,确保数据的安全性。此外,Amazon DocumentDB 还实现了高效的增量备份机制,支持时间点恢复(PITR),可以将数据恢复到过去 35 天内的任意时间点。

Amazon DocumentDB 的应用场景非常广泛,特别适合处理大规模、半结构化数据的现代应用程序。在内容管理系统(CMS)领域,由于文档数据库能够灵活处理不同类型的内容结构,Amazon DocumentDB 可以有效管理博客文章、用户评论、多媒体资料等多样化内容,同时支持全文搜索和复杂查询。在金融行业,它被广泛用于存储和处理市场行情数据、交易记录和用户投资组合,其强大的聚合管道功能使得复杂的数据分析变得简单高效。在物联网(IoT)应用中,Amazon DocumentDB 能够处理来自数百万设备的传感器数据,其自动扩展特性确保了系统可以应对突发的数据写入高峰。对于社交媒体平台,Amazon DocumentDB 适合存储用户档案、社交关系图谱和用户生成的内容,其灵活的数据模型使得应用程序可以快速适应新的功能需求。在游戏行业,它可以用来存储玩家档案、游戏状态和积分排行榜等数据,支持高并发的读写操作。电子商务平台利用 Amazon DocumentDB 存储产品目录、用户行为数据和订单信息,其强大的查询能力支持复杂的产品搜索和个性化推荐。

对于读多写少且数据量相对小(128TB 以下)的业务负载,如在线内容展示、产品目录查询等场景,适合使用 Amazon DocumentDB 的副本级架构,通过配置多个只读副本来分担读取压力,提升查询性能和并发处理能力。而对于数据量大、写入压力高、需要大规模水平扩展的业务负载,如物联网数据采集、用户行为日志存储、大规模交易记录等场景,则更适合使用分片架构,通过数据分片来分散写入压力和存储压力。本文探讨的分时、K 线行情数据存储的负载特点是读多写少且数据量相对小,适合使用 Amazon DocumentDB 的副本级架构。因此,本文在之后的探讨中以 Amazon DocumentDB 副本级架构做为讨论的目标。

2.2 Amazon DocumentDB 存储行情数据的可行性分析

  • 性能

Amazon DocumentDB 的写入性能在默认配置下单实例可达 1-2 万次/秒,通过批量写入优化可提升至 5-6 万次/秒,使用集群模式最多支持15个副本且可通过分片水平扩展,整体写入性能可达数十万次/秒;但写入性能会受实例规格、存储容量、网络带宽、文档大小和索引数量等因素影响,需要通过选择合适实例规格、优化索引设计等方式来提升性能,适合一般业务数据存储,但不适用于高频交易数据。

Amazon DocumentDB 的写入延迟在单条写入时通常为 10-50 毫秒,使用批量写入时平均每条可降至 5-20 毫秒;写入延迟会受实例规格、网络延迟、存储系统负载、并发请求数量等因素影响,可通过选择更高规格实例、优化网络配置、使用批量写入及合理设置写入确认级别等方式来优化;这种延迟水平适合一般业务场景,但不适用于毫秒级响应的高频交易场景。

Amazon DocumentDB 的读取性能表现为:单实例查询可达每秒数万次,通过增加只读副本(最多 15 个)可显著提升读取性能。以单实例读取速度 2-3 万次/秒计算,15 个只读副本理论上可以达到 30-45 万次/秒的读取速度。实际性能受多个因素影响会略低。

Amazon DocumentDB 的读取延迟表现为:简单查询通常在 5-20 毫秒,索引查询在 10-30 毫秒,复杂查询可能需要 50-200 毫秒,聚合查询可能达到数百毫秒;读取延迟受实例规格、查询复杂度、数据量大小、索引使用情况、并发访问量和网络状况等因素影响,可通过合理设计索引、选择合适实例规格、增加只读副本、优化查询语句等方式来改善;这种延迟水平对常规业务查询足够,但对要求低延迟的实时行情查询场景可能不够理想。

  • 可用性

此外,Amazon DocumentDB 集群通过多可用区部署和副本集架构保障高可用性:支持最多 15 个只读副本分布在不同可用区,实现数据冗余和读取扩展;具备自动故障转移机制,当主实例故障时能在 30 秒内完成故障切换;系统自动进行数据备份和恢复,支持时间点恢复;提供 99.99%的服务可用性承诺,通过自动化运维、监控告警和故障自愈等机制,确保集群稳定运行和数据安全性。

  • 结论

Amazon DocumentDB 在存储分时和 K 线行情数据方面展现出良好的可行性,这主要体现在以下三个方面:

首先,从数据写入特征来看,分时和 K 线数据具有写入频率固定、压力可控的特点。Amazon DocumentDB 提供的每秒数万次的写入性能以及 10-50 毫秒的写入延迟,完全能够满足这类数据的写入需求。

其次,在查询性能方面,Amazon DocumentDB 通过配置多个只读副本的方式,不仅可以有效支持大量用户的并发访问,其 5-20 毫秒的读取延迟也能很好地满足行情数据的实时展示要求。

再次,就数据可用性而言,Amazon DocumentDB 采用的高可用架构设计和可靠的数据持久性机制,为行情数据的安全存储提供了有力保障。

特别值得一提的是,Amazon DocumentDB 作为一个现代化的文档数据库,特别适合处理大规模的半结构化数据。其 Schema-free 的灵活文档模型让数据结构的调整变得异常便捷,这一特性有效解决了传统关系型数据库在修改数据结构(如增减字段)时遇到的困难,能够更好地适应业务需求的快速迭代和变化。

2.3 Amazon DocumentDB 存储行情数据的设计要点

在使用 Amazon DocumentDB 副本级架构存储行情数据时,需要重点考虑以下设计要点:

(1)为应对每分钟集中写入的分时数据(数千条级别)以及不同周期 K 线数据的更新,需要在应用层实现批量写入机制,并通过合理的索引设计(如对{symbol, time}建立复合索引)来保证写入和查询性能。

(2)考虑到行情数据高频访问的特点,可以考虑配合使用 Redis 等内存数据库作为缓存层,将当日分时数据和热门品种的 K 线数据缓存在内存中,构建多级缓存架构来提升查询效率。

3. 分时、K 线行情数据从 Mysql 到 Amazon DocumentDB 的迁移设计及实施

3.1 方案设计

图 1 迁移方案架构图

考虑到实施验证的便捷,本文使用托管的 Amazon RDS for Mysql 数据库实例替代了真实场景中的自建的 Mysql 实例。目标库中有多张表,对应不同组别标的的分时、K 线行情数据。迁移过程将这些表格 1 比 1 的迁移到 Amazon DocumentDB 的 collection 中。Data Migration Service Task (以下简称 DMS Task)在迁移的过程中会为每一个记录打上全局唯一的- id。这个 id 可以作为 collections 在 Amazon DocumentDB 集群上的分片键,从而降低了数据存储设计和运维的成本。表迁移完成之后可以考虑将相同 Schema 的表合并成一张大表,以进一步降低管理维护的成本。

3.2 亚马逊云科技 DMS 工作机制

亚马逊云科技 Database Migration Service(以下简称 DMS)是一项云服务,其工作原理基于复制任务和变更数据捕获(CDC)技术,通过在源数据库和目标数据库之间建立实时数据管道来实现数据迁移。在具体实现上,DMS 首先会在 AWS 云环境中创建一个复制实例,这个实例作为迁移的核心组件,负责协调和执行整个迁移过程。复制实例运行在 EC2 实例上,可以根据数据量和性能需求选择不同规格的实例类型。

DMS 旨在简化数据库迁移过程,其工作原理基于一个高度自动化和灵活的架构。DMS 的核心是复制实例,它作为迁移过程的中心枢纽。当启动迁移任务时,DMS 首先在复制实例上创建源数据库的连接,然后建立目标数据库的连接。复制实例负责管理这些连接,并协调整个数据传输过程。对于全量数据迁移,DMS 会读取源数据库的所有现有数据,将其转换为适合目标数据库的格式,然后批量写入目标数据库。与此同时,DMS 还能捕获源数据库上发生的持续变更,这一过程称为变更数据捕获(CDC)。CDC 使用数据库的日志文件(如 MySQL 的二进制日志或 Oracle 的重做日志)来识别和捕获数据变更。这些变更会被暂存在复制实例上,然后按顺序应用到目标数据库,确保数据的一致性。

DMS 支持异构数据库迁移,能够在不同类型的数据库之间进行数据转换,如从 Oracle 到 PostgreSQL,或从 MySQL 到 Amazon Redshift。在这个过程中,DMS 会自动处理数据类型映射、字符集转换等复杂任务。为了保证迁移的可靠性,DMS 实现了多层错误处理和重试机制。如果在迁移过程中遇到网络中断或其他临时故障,DMS 会自动尝试重新连接和继续迁移,从上次成功的检查点恢复。

此外,DMS 提供了详细的监控和日志记录功能,使用户能够实时跟踪迁移进度,并在必要时进行故障排除。为了优化性能,DMS 采用了多线程并行处理架构,可以同时处理多个表的数据传输。它还支持数据验证功能,可以比较源和目标数据库中的数据,确保迁移的准确性。

DMS 的另一个重要特性是其与亚马逊云科技其他服务的紧密集成,如与亚马逊云科技 Schema Conversion Tool(SCT)的集成,可以自动转换数据库架构,简化异构数据库迁移的复杂性。总的来说,DMS 通过其全面的功能设计和自动化流程,大大简化了数据库迁移的复杂性,减少了迁移过程中的停机时间,为企业提供了一个高效、可靠的数据库迁移解决方案。

DMS 的产品文档见链接 1

3.3 Amazon RDS for Mysql 实例创建及数据准备

可以按照链接 2 的指导进行 RDS for Mysql 实例的发放。发放之后的实例配置截图如下图所示。

图 2 Amazon RDS for Mysql 实例配置展示 Dashboard

3.4 Amazon DocumentDB 实例创建

可以按照链接 3的指导进行 Amazon DocumentDB Instance Based Cluster 的发放。发放之后的实例配置截图如下图所示。

图 3 Amazon DocumentDB 集群配置展示 Dashboard

3.5 DMS 任务创建

详细实施指南可参考链接 4

按照链接 4 操作,以下 3 个要点需要特别注意:

第一:作为后续任务4.2(将多个蜡烛图数据表合并到一个 DocDB 表)的准备工作,你必须为所有目标 MySQL 表添加一个名为’_id’的 NULL 字段。具体 SQL 命令如下:

ALTER TABLE <target table name> ADD COLUMN _id varchar(20) NULL;
SQL

_id 字段将作为主键。在 DMS 任务执行过程中,该字段将被填充全局唯一值。注意:索引不会被迁移,需要在合并表后重新创建(具体操作请参考”创建合并表索引”部分)。

第二:不要使用链接 4 中提到的’rds-combined-ca-bundle.pem’(如图 4)。请改用在 ec2 connect docdb manually 中出现的’global-bundle.pem’(如图 5)。

图 4 链接 4 文档中的 rds-combined-ca-bundle.pem

图 5 ec2 connect docdb manually 文档中的 globl-bundle.pem

获取 global-bundle.pem 的命令:

wget https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem
PowerShell

第三:当通过 EC2 控制台连接 DocDB 实例时,确保使用 Amazon Linux 2 AMI 来配置 EC2 实例。这是因为 Amazon Linux 2 已预装了 mongo shell 所需的相关软件包/库。

图 6 ec2 发放 console

通过 EC2 控制台连接 DocDB 实例的详细操作指南请参考 ec2 connect docdb manually

在 DMS 任务成功完成后,使用以下命令检查 MySQL 中的原始数据是否已成功迁移到 DocDB:

#从ec2 console登录docdb
mongo --ssl --host <specific docdb cluster endpoint> --sslCAFile global-bundle.pem --username <specific user name> --password <specific password>

#显示docdb实例中的所有Schema
show dbs

#切换到目标Schema
use <target Schema name>

#显示目标Schema下的所有表
db.getCollectionNames()

#显示某个表中的一条记录
db.<specific collection name>.find()

#统计某个表中的记录数量
db.<specific collection name>.count()
PowerShell

3.6 Amazon DocumentDB Collection 合并脚本

将多个 DocDB Collection 合并为一个 DocDB Collection。具体步骤如下:

  • 使用 export.py 将目标 docdb Schema 中的集合(即 mongodb 表)数据导出为 json 文件
  • 使用 import.py 通过第一步生成的导出文件创建新的集合
  • 在合并后的表上创建索引
    db.timetrend_merged.createIndex( { "instrumentId": 1, "EndTime": 1 },{ unique: true } )
    
    db.timetrend_merged.getIndexes()
    
    PowerShell

    需要特别注意 mongoexport/mongoimport 安装命令:

    sudo yum install mongodb-org-tools
    PowerShell

    export.py 脚本代码如下:

    import os
    from pymongo import MongoClient
    
    #MongoDB Connection Info 
    host = "<specific docdb endpoint>"
    username = "xxxxadmin"
    password = "********"
    database_name = "docdb-xxxx"
    
    #Create MongoDB Client
    client = MongoClient(f"mongodb://{username}:{password}@{host}/{database_name}?tls=true&tlsCAFile=global-bundle.pem")
    
    
    #Get Database Object
    db = client[database_name]
    
    #Get Collections List
    collections = db.list_collection_names()
    
    #for each collection execute mongoexport
    for collection in collections:
        output_file = f"{collection}.json"
        command = f"mongoexport --ssl \
        --host=<specific docdb endpoint>:27017 \
        --collection={collection} \
        --db=docdb-xxxx \
        --out={output_file} \
        --username={username} \
        --password={password} \
        --sslCAFile global-bundle.pem"
        os.system(command)
        print(f"Exported collection {collection} to {output_file}")
    
    client.close()
    
    Python

    import.py 脚本代码如下:

    import os
    import subprocess
    
    host = "<specific docdb endpoint>"
    username = "xxxxadmin"
    password = "********"
    database_name = "docdb-xxxx"
    collection_name = "candlestick_merge"
    
    json_dir = "/root/export_file"
    
    for filename in os.listdir(json_dir):
        if filename.endswith(".json"):
            file_path = os.path.join(json_dir, filename)
            command = f"mongoimport --ssl \
            --host={host} \
            --collection={collection_name} \
            --file={file_path} \
            --db={database_name} \
            --username={username} \
            --password={password} \
            --sslCAFile global-bundle.pem"
    
            try:
                subprocess.run(command, shell=True, check=True)
                print(f"Imported data from {filename}")
            except subprocess.CalledProcessError as e:
                print(f"Error importing {filename}: {e}")
    
    Python

4. 总结

本文主要探讨了利用 Amazon DocumentDB 存储证券市场分时和 K 线行情数据的可行性及迁移方案。Amazon DocumentDB 通过其灵活的文档模型、可靠的性能表现和高可用架构,非常适合存储和处理分时、K 线这类读多写少的行情数据。本文详细分析了行情数据的特点,Amazon DocumentDB 的技术架构,并阐述了一个从 MySQL 到 Amazon DocumentDB 的完整迁移方案。该方案可以在保证查询性能和可用性的同时,显著降低数据存储成本和运维复杂度。

资源释放

运行下面命令删除 DMS Replication Instance:

aws dms delete-replication-instance --replication-instance-arn <specific replication instance arn>
PowerShell

运行下面命令删除 RDS 实例:

aws rds delet-db-instance --db-instance-identifier <instance resource identifier>
PowerShell

运行下面命令删除 DocDB 集群:

aws docdb delete-db-instance --db-instance-identifier <specific instance identifier>

aws docdb delete-db-cluster --db-cluster-identifier <specific cluster identifier>
PowerShell

参考资料

本篇作者

魏诗洋

亚马逊云科技资深解决方案架构师。专注金融行业云上系统架构及解决方案设计。关注大数据、机器学习在金融行业的应用,以及金融行业监管合规对云上系统架构设计的影响机制。10年+数据领域研发及架构设计经验。

刘冰冰

亚马逊云科技数据库解决方案架构师,负责基于亚马逊云科技的数据库解决方案的咨询与架构设计,同时致力于大数据方面的研究和推广。在加入亚马逊云科技之前曾在 Oracle 工作多年,在数据库云规划、设计运维调优、DR 解决方案、大数据和数仓以及企业应用等方面有丰富的经验。