亚马逊AWS官方博客

AWS 云上 MongoDB/DocumentDB 数据定期归档

数据定期归档业务需求

MongoDB 是一款 NOSQL 数据库,它有高性能、无模式、文档型的特点,广泛应用在电商、视频、游戏、生命科学、医疗等泛互联网行业。DocumentDB 是 AWS 侧兼容 MongoDB 的数据库产品。

如果客户希望能够利用 AWS 的托管服务,减少运维成本,那么 DocumentDB 是一项非常合适的选择。如果希望维持 MongoDB 的全部功能,并且有自主控制操作系统层的能力,那么客户在 AWS EC2 上搭建 MongoDB 也是常见的做法。

随着业务量、用户量的持续增加,MongoDB/DocumentDB 的数据量也在持续增加,这对数据库的成本控制、查询性能以及数据管理都提出了挑战。其中,有部分 record 类数据在创建了一段时间之后,基本上就不会被查询,或者偶尔被查询。从数据库优化角度,此类数据应该被归档到存储成本更低的位置,同时,也要求这些归档数据可以被查询,以满足偶尔的归档数据查询需求。

本文针对此类定期归档 MongoDB/DocumentDB 数据的业务场景,提供了一套基于 AWS 无服务器化的自动归档与查询方案。

定期归档业务拆解:

  1. 在业务侧,以客户 profile 为例,对每天的 profiles 数据单独创建 collection,例如:profiles_20230607、profiles_20230608
  2. 按天将 collection 数据导出到 S3,并以当天日期为 prefix,例如:s3://bucketname/archivedoc/2023-06-07
  3. 以 SQL 方式对归档数据进行查询

样例数据:

db.profiles_20230607.insertMany([
            { "_id" : 1, "name" : "Matt", "status": "active", "level": 12, "score":202},
            { "_id" : 2, "name" : "Frank", "status": "inactive", "level": 2, "score":9},
            { "_id" : 3, "name" : "Karen", "status": "active", "level": 7, "score":87},
            { "_id" : 4, "name" : "Katie", "status": "active", "level": 3, "score":27}
            ])

db.profiles_20230608.insertMany([
            { "_id" : 1, "name" : "Matt", "status": "active", "level": 12, "score":202},
            { "_id" : 2, "name" : "Frank", "status": "inactive", "level": 2, "score":9},
            { "_id" : 3, "name" : "Karen", "status": "active", "level": 7, "score":87},
            { "_id" : 4, "name" : "Katie", "status": "active", "level": 3, "score":27},
            { "_id" : 5, "name" : "jessie", "status": "active", "level": 3, "score":28},
            { "_id" : 6, "name" : "Andy", "status": "active", "level": 3, "score":29},
            { "_id" : 7, "name" : "simon", "status": "active", "level": 3, "score":30}
            ])

实现步骤主要为两步:

  1. Glue 定期抽取 MongoDB/DocumentDB collection,并写入 S3
  2. Athena 按需查询 S3 归档数据

AWS 相关服务简介

Amazon DocumentDB 是 AWS 提供的一项快速、可靠和完全托管的数据库服务,并且与 MongoDB 兼容。使用 Amazon DocumentDB,用户可以在您可以运行相同的应用程序代码,使用与 MongoDB 相同的驱动程序和工具的同时,在云中轻松设置、操作和扩展数据库。

AWS Glue 是一项无服务器数据集成服务,可让您更轻松地发现、准备、移动和集成来自多个来源的数据,以便进行分析、机器学习 (ML) 和应用程序开发。Glue 为处理组织到 Hive 样式分区中的数据集提供了增强的支持;Glue 爬网程序会自动识别 S3 数据中的分区;Glue ETL(提取、转换和加载)库在使用 DynamicFrame 时原生支持分区。

Amazon S3 是一项对象存储服务,可提供行业领先的可扩展性、数据可用性、安全性和性能。借助经济高效的存储类和易于使用的管理功能,您可以优化成本、组织数据并配置微调的访问控制,以满足特定的业务、组织和合规性要求。

Amazon Athena 是一项交互式查询服务,可让您使用标准 SQL 直接从 Amazon S3 分析数据。Athena 是无服务器的,因此无需设置或管理基础设施,您可以立即开始分析数据。您甚至不需要将数据加载到 Athena 中,也不需要复杂的 ETL 流程。 Athena 可以直接处理存储在 S3 中的数据。

Glue 抽取 DocumentDB 或 MongoDB collection,并写入 S3

1. 配置 Glue VPC Endpoint,Glue 通过内网访问 DocumentDB/MongoDB

在 VPC 中选择“终端节点”,点击“创建终端节点”;并在服务中输入“glue”,选择对应的 glue 服务后,配置该 interface 的网络属性,如下图:

确保 VPC、子网和安全组配置,可以对 DocumentDB/MongoDB 进行内网访问。

2. 创建 S3 桶,存放归档数据:s3://bucketname/archivedoc/

Glue 会在该 prefix 下按天创建 partition,并将当天的数据写入对应的 partition。

3. 配置 Glue ETL job

1)在 Glue 控制台,点击“Visual ETL”,选择“Spark script editor”,然后创建 ETL job。

2)编辑 Spark 脚本,并保存。

DocumentDB 样例脚本如下:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import boto3
import time
import logging
import json
import datetime

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# 获取传递给AWS Glue作业的参数
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# 创建SparkContext和GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

def oid_to_str(oid):
    return str(oid)

# 将函数注册为 Spark SQL UDF
oid_to_str_udf = udf(oid_to_str, StringType())


# 配置连接字符串
username = "xxxxx-docdb"
password = "xxxxx-docdb"
host = "testarchive.cluster-c47uzx4ngbfq.us-west-2.docdb.amazonaws.com"
port = "27017"

documentdb_uri = "mongodb://" + host + ":" + port

# 数据表名称为profiles_date,例如profiles_20230608
table = "profiles_" + str(datetime.date.today()).replace('-','')

read_docdb_options = {
    "uri": documentdb_uri,
    "database": "test",
    "collection": table,
    "username": username,
    "password": password,
    "ssl": "true",
    "ssl.domain_match": "false",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"
}

# Create the DynamicFrame using the options

mongo_dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="documentdb",                                      connection_options=read_docdb_options)


# 将 ObjectId 字段转换为字符串字段
df = mongo_dynamic_frame.toDF()
df = df.withColumn("_id", oid_to_str_udf(df["_id"]))

# 将 DataFrame 转换为 Glue DynamicFrame
dynamic_frame = DynamicFrame.fromDF(df, glueContext, "my_dynamic_frame")

# 转换数据,增加ID列,并写入Amazon S3

output_path = "s3://bucketname/archivedoc/" + str(datetime.date.today()) + "/"
output_format = "parquet"
output_mode = "append"

glueContext.write_dynamic_frame.from_options(
    frame=dynamic_frame,
    connection_type="s3",
    connection_options={
        "path": output_path,
        "mode": "overwrite" # 防止重写
    },
    format=output_format,
    transformation_ctx="output"
)

job.commit()

此脚本从 DocumentDB 中读取某个指定的 collection,并将全表数据导出到指定的 S3 prefix。

MongoDB 样例脚本如下:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext, SparkConf
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import boto3
import time
import logging
import json
import datetime

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# 获取传递给AWS Glue作业的参数
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# 创建SparkContext和GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

def oid_to_str(oid):
    return str(oid)

# 将函数注册为 Spark SQL UDF
oid_to_str_udf = udf(oid_to_str, StringType())


# 配置连接字符串
username = "xxxx-mongo"
password = "xxxx-mongo"
host = "ec2-mongo.compute-1.amazonaws.com"
port = "27017"

mongodb_uri = "mongodb://" + host + ":" + port

# 数据表名称为profiles_date,例如profiles_20230608
table = "profiles_" + str(datetime.date.today()).replace('-','')

read_mongodb_options = {
    "uri": mongodb_uri,
    "database": "sample",
    "collection": "test",
    "username": username,
    "password": password,
    "ssl": "false",
    "ssl.domain_match": "false",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"
}

# Create the DynamicFrame using the options

mongo_dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",connection_options=read_mongodb_options)


# 将 ObjectId 字段转换为字符串字段
df = mongo_dynamic_frame.toDF()
df = df.withColumn("_id", oid_to_str_udf(df["_id"]))

# 将 DataFrame 转换为 Glue DynamicFrame
dynamic_frame = DynamicFrame.fromDF(df, glueContext, "my_dynamic_frame")

# 转换数据,增加ID列,并写入Amazon S3

output_path = "s3://alex-mongodb-archive/archivedoc/" + str(datetime.date.today()) + "/"
output_format = "parquet"
output_mode = "append"

glueContext.write_dynamic_frame.from_options(
    frame=dynamic_frame,
    connection_type="s3",
    connection_options={
        "path": output_path,
        "mode": "overwrite" # 防止重写
    },
    format=output_format,
    transformation_ctx="output"
)

job.commit()

此脚本从 MongoDB 中读取某个指定的 collection,并将全表数据导出到指定的 S3 prefix。

3)配置 Glue ETL job

确保该 role 对 Documentdb 有读数据权限。

4)保存配置后,运行并测试该 job;并在“Schedules”一栏,配置该 job 运行周期。

Job 运行结束后,还可以在 S3 中查看归档数据是否生成。

Athena 查询 S3 归档数据

1. 配置 Glue Crawlers 爬网程序,对归档数据建立元数据表

1)在 Data Catalog 中,选择“Crawlers”,并点击“Create Crawlers”,添加 S3 归档数据为数据源

2)运行该 Crawlers

3)运行结束后,在“Data catalog tables”中,Crawlers 会创建名为 archivedoc 的表。检查表结构与 partition 信息:

4)验证元数据正确后,则在 Crawlers 中配置 schedule,定期运行。

该 schedule 需要设置为在 Glue ETL job 执行完毕之后,这样可以确保 Glue 先往 S3 导出数据,再由 Crawlers 读取新 partition 的信息,以更新到 archivedoc 表中。

2. 配置 Athena 查询结果位置

3. 查询 S3 归档数据

全表扫描:select * from default.archivedoc;

按分区查询:select * from default.archivedoc where partition_0=’2023-06-07′;

成本对比

以新加坡 region 为例,DocumentDB 的存储成本为 $0.11 每 GB-Month。

S3 的标准存储成本为:$0.025 每 GB-Month。

通过归档到 S3,单 GB 的存储成本可以降低 65% 以上。但还需考虑到额外成本,包括 Glue 数据处理的费用和 Athena 数据查询的费用,这两个费用和数据量正相关。实际可以节省多少费用,还需要结合业务需求进行测试后,统一进行计算与对比。但总体来说,有大量数据的情况下,对比 DocumentDB/MongoDB,S3 的存储成本有较大优势,而且还可以结合 S3 的智能分层与生命周期管理等功能,进一步优化 S3 的存储成本。

总结

随着业务量、用户量的持续增加,DocumentDB/MongoDB 上的数据量也在持续增加,这对数据库的成本控制、查询性能以及数据管理都提出了挑战。本文提供了一套基于 AWS 无服务器化的自动归档与查询方案,大幅降低数据存储成本的同时,也保证了一定的数据查询能力;同时,整套方案基于 AWS 无服务器服务,客户侧无任何运维工作,降低客户运维管理负担。

本篇作者

付小飞

AWS 资深解决方案架构师,负责基于 AWS 的云计算方案的咨询与架构设计。专注于游戏行业,帮助客户利用 AWS 全球基础设施与强大的技术能力打造爆款游戏,降低游戏运行成本。

肖冰

亚马逊云科技解决方案架构师,负责基于 AWS 的云计算方案的咨询与架构设计。在应用现代化改造,Serverless,云迁移,大数据等方向具有丰富的实践经验。在加入 AWS 之前,曾就职于 EMC、微软、西云数据、腾讯等科技公司,拥有丰富的公有云领域的架构优化和技术支持经验。