Category: Amazon Simple Storage Services (S3)*


Amazon S3 深度实践系列之二:如何实现 S3 数据跨区域高效可靠传输

背景

Amazon S3 深度实践系列之一:S3 CLI深度解析及性能测试一文中,我们深度剖析了AWS CLI S3相关命令的实际工作原理及单机下载S3数据的基本性能测试情况。在实际工作场景中,很多客户会在AWS多个区域的S3桶里面存储大量数据,而且会遇到将数据批量从一个区域一次转移到另外一个区域的情形;因此,在本篇中,作者和大家一起来探讨下出现这样的需求我们如何进行架构设计及高效实现。

架构设计

存储在S3中的对象随着时间的推移,对象数量逐渐增加,而且总体的数据量也不断膨胀,如果碰到需要将数据从某一个区域的S3存储桶完全复制到另外一个S3存储桶里面,我们会遇到哪些挑战呢?

  • 网络传输带宽的限制
  • 存储桶里面所有对象的分析和列表
  • 源存储桶和目标存储桶权限的设定
  • 传输失败识别和重试的挑战
  • 如何利用并发来加速传输及降低成本
  • 如何判断目标存储桶中的对象和源存储桶中的对象差异及完整性

在通用架构设计环节,我们将复杂的问题分解成一系列的子问题进行分析,并讨论在不同场景下的具体实现时要考虑的因素。如下图所示,我们将该任务分解成独立的五个环节,从图上我们也可以看出来,如何实现大规模数据或任务的并发执行是每个环节能否高效完成的一个很关键的技术要求;而且,只有在步骤三执行数据传输任务时,才会涉及到具体场景中的技术限制,因此我们在执行数据传输任务章节来讨论,同区域不同存储桶之间,AWS海外不同区域存储桶之间,以及AWS海外和国内不同存储桶之间的具体技术考量点。

S3对象“清单”

了解源和目标存储桶里的S3 对象是非常重要的准备工作,该章节我们讨论,如何获得S3存储桶的所有对象列表,包含对象的基本的信息,比如最新版本的对象大小,ETag等等。

Amazon S3本身提供了存储桶管理功能之清单生成功能,该功能是一个异步的AWS后台定期执行,可以实现每天生成一个存储桶清单保存成Excel格式。

同时我们也看到很多用户提问,如何实现一个自定义的清单功能,满足大家对于对象变化比较频繁的存储桶对象的实时统计场景以及更多高级自定义的业务逻辑。

接下来我们来看看这两种方法的具体实现逻辑。

利用S3 CLI实现高效的清单功能

作者利用AWS S3 CLI实现高效的清单功能基于以下两个事实前提:

  1. s3api 的 list-objects-v2虽然文档中说明最多返回1000个对象,但实测可以获得所有对象列表
  2. 同样利用s3api 的 list-objects-v2的delimiter和prefix参数,我们可以实现类似文件夹目录逐级扫描功能

基于以上两个事实,我们实现桶清单的主要逻辑如下图所示:

  • 输入参数主要是:bucket,region和IAM 配置的profile名字,profile默认为default;另外depth控制扫描的“目录”层级
  • 当depth为零时,我们直接尝试利用list-objects-v2一次性获取存储桶中所有对象列表并生成一个json格式的文件(但当桶里面对象太多时,该操作会超时)
  • 当depth为零即单线程无法直接生成存储桶清单时,我们就尝试如下迭代逻辑:
    • 生成存储桶当前“目录”里面的所有对象和该目录中所有“子目录”列表
    • 遍历上一步的“子目录”列表,迭代生成该目录下的对象列表和“子目录”列表
    • 如果遍历的深度等于输入参数depth=n,或者“子目录”列表为空,那么停止遍历子目录,直接生成该层级“目录”里面所有的对象列表

以下是几个关键点实现的代码说明,首先,生成某个“目录”前缀下所有对象列表的AWS S3 CLI命令参考,如下命令将在操作系统后台执行并生成存储桶jason中“目录”前缀“qwikLabs/”下的所有对象列表(包括所有嵌套“子目录”中的所有对象):

$ nohup aws s3api list-objects-v2 --bucket "jason" --prefix "qwikLabs/" --profile bjs > 0.obj. 2>&1

其次,如下命令将仅仅生成指定“目录”前缀“qwikLabs/”下的对象列表(不包括嵌套“子目录“的对象)和所有下一层“子目录“列表,为了加强”子目录“输出格式,我们增加了query参数:

$ nohup aws s3api list-objects-v2 --bucket "jason" --prefix "qwikLabs/" --delimiter "/" --query "{Keys:Contents[].{Key: Key, Size: Size,ETag:ETag},CommonPrefixes:CommonPrefixes[].Prefix}" --profile bjs > 0.1.obj. 2>&1

另外,为了实现并发我们利用了迭代算法以及操作系统后台异步执行AWS S3 CLI命令的方法,最终程序会生成一系列的json文件结果,存储桶中所有的对象列表分布在这些文件当中。

S3自带的清单功能

在了解了我们通过AWS CLI S3命令行工具实现自定义的清单功能之后,我们再来对比下,Amazon S3自带的清单功能。

在存储桶页面,导航到“管理“标签,Amazon S3目前提供了四项S3管理功能,其中跟本文相关的是”清单“功能。该功能支持我们对某一个存储桶,定义多个清单,用户可以根据需要,定义针对不同S3对象前缀生成各自的清单列表,并可以存储在独立的存储桶中:

同时,自带清单功能还支持定义生成清单的频率及清单中包含的对象字段,检查并确定好清单选项之后,服务会帮助我们在保存清单的目标存储桶中设置好相应的IAM策略:

清单任务保存之后,后台会异步定期执行,每次都会按时间生成manifest.json 文件和一系列的清单文件,manifest.json 里面包含这次生成的所有清单文件列表:

S3对象清单小结

Amazon S3 存储桶已经内置了清单功能,基本可以满足我们的日常需求,我们不需要重复造轮子;本章节所讨论的利用AWS S3 CLI 命令行自定义实现清单功能,更多的是作者好奇的发现,AWS S3 CLI 本身非常好用,也可以帮助我们实现类似文件目录的逐级对象列表功能,提供给有特殊场景需求的用户参考。

对象清单分解成传输任务

有了存储桶中所有的对象清单,接下来,我们就看看如何设计传输任务。设计传输任务的原则如下:

  • 如果网络条件非常良好,比如同区域的不同存储桶之间,按照作者的测试,复制带宽平均可以达到xxMB/s,如此可以直接利用S3 cp命令
  • 尽可能将单进程的复制任务分解成多个子任务并发执行,任务分解后进入到Amazon SQS队列,这样将任务分解和任务执行进行解耦
  • 如果网络条件非常一般,比如平均在10KB/s并且网络抖动大的情况下,对于超过一定大小的文件需要切割成小文件,组成子任务并发执行

传输任务分解算法的设计,涉及到几个关键参数:

  • max_task_size_mb:单个任务的对象总大小上限,比如最大大小限制在100MB,那么单个任务最多有100MB的对象列表,或者该任务就一个对象,该对象本身大小就超过了100MB
  • max_task_objects:单个任务的对象数量上限,比如数量上限为50,那么单个任务中最多有50个对象需要传输
  • multipart_threshold及multipart_chunksize:对象太大时,需要分割成多个小对象传输任务,那么多大的对象需要进行分解?分段的单位大小是多少?比如阈值是10MB,单位大小是2MB,那么大于10MB的对象都需要再分解成2MB的多个子对象并发续传

Amazon SQS任务队列设计与实现

传输任务在设计时分成两大类,一种是本身对象就是小文件,我们按照max_task_size_mb 和 max_task_objects 进行分组,即每个任务总数据量大小不会超过max_task_size_mb,而且对象数量也不超过 max_task_objects ;这些任务我们会发送到自动创建的S3Task_NormalQueue开头的SQS队列中,每个队列的消息数量上限本文设为80000条;另外一类是,对象大小超过multipart_threshold 限制的,我们会进一步把该对象分解成 multipart_chunksize大小的独立对象,同样按照 max_task_size_mb 和 max_task_objects 的算法进行分组,但这些任务会保存到自动创建的以S3Task_BigSizeQueue开头的队列中。

另外遵循Amazon SQS操作的最佳实践,我们分别为这两类任务队列设定了同样的死信队列,当消息被读取10次而没被处理成功的会自动转移到S3Task_DeadQueue进行存储和后续处理。

执行数据传输任务

当任务队列产生之后,接下来,就到了如何高效执行如何多的传输任务的阶段,很多网络和客观条件的限制,我们都放到了如何分解传输任务的算法里面进行了实现,在执行数据传输任务环节,逻辑非常简单:

  • 读取一条SQS任务
  • 根据任务中的具体对象,每个对象利用独立线程进行复制或者下载再上传
  • 只有该任务中所有的对象都传输成功,才把该任务消息从SQS队列中删除

同区域S3数据复制

同区域不同S3存储桶之间数据复制,由于网络条件较好,IAM权限简单,可以尽量利用boto3的copy-object方法直接利用S3服务本身能力,进行快速数据复制,该方法数据无需经过命令执行的机器中转。因此在任务分解环节,multipart_threshold 的值需要设置一个比较大而且合理的值,避免大文件被分片之后,需要先下载后上传,这样会消耗更多的流量。

AWS海外不同区域S3数据复制

海外不同区域的S3存储桶之间复制,和同区域的不同存储桶复制场景类似,但由于跨区域传输,网络状况取决于两个区域的位置及它们之间的互联网状况。

AWS海外区域和BJS区域S3数据复制

该情况最复杂,AWS海外和国内是独立的区域,需要不同的账号权限体系,因此,对象需要先进行下载再上传,这样就需要占用执行命令的机器的内存和网络带宽;

  • 在任务分解时,需要尽量把大对象分解成小的片段比如2MB或者1MB的大小以提高单次数据传输的成功率。
  • 在任务执行时,需要尽量并发,以单任务小带宽累积成可以接受的总体平均传输速度,并充分AWS出口带宽的优势

数据跨区域迁移实践

基于我们之前的架构设计,那我们分阶段来具体动手实践一个具体场景, Amazon官网有很多公开的数据集,我们选定Next Generation Weather Radar (NEXRAD) 作为数据源,该数据源在美东(us-east-1)区域;目标存储桶我们选择在BJS区域。该实验仅仅为了验证技术可行性,完整的参考代码见s3deepdive github;代码不作为生产用途仅仅用来学习用途。

准备环境

为了简单地完成技术验证,我们所有的测试环境基于一台r4.2xlarge的Amazon Linux机型展开,系统需要准备好:

  • 在AWS Global 美东区域创建一台EC2实例
  • 关联一个IAM Role,需要有访问S3及SQS相关的管理权限
  • Python2.7.x
  • Boto3
  • 300GB gp2 EBS 磁盘

配置好目标存储桶的IAM Profile及修改默认获取IAM Role的临时Token的默认超时时间和重试次数:

[ec2-user@youserver]$ aws configure --profile bjs
AWS Access Key ID [None]: AKI***************A
AWS Secret Access Key [None]: 7j+R6*****************oDrqU
Default region name [None]: cn-north-1
Default output format [None]:
[ec2-user@youserver]$ vi ~/.aws/configure
[default]
region = us-east-1
metadata_service_timeout = 5
metadata_service_num_attempts = 5
[profile bjs]
region = cn-north-1
metadata_service_timeout = 5
metadata_service_num_attempts = 5

生成对象清单

Amazon公共数据集没有提供清单列表,因此,我们利用前文的逻辑,并尝试利用AWS S3 CLI命令生成该存储桶的对象清单。该数据集按照年月进行数据分区,我们设定对象的Prefix的迭代深度为3,后台执行以下命令,并观察执行日志:

[ec2-user@youserver]$ cd NEXRAD_Demo/inventory
[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_inventory.py -b noaa-nexrad-level2 -r us-east-1 -d 3 > noaa-nexrad-level2.log 2>&1 & 

由于数据集非常大,该命令执行需要点时间,最终,3层的扫描帮助我们并发生成了2751个对象清单文件,总大小4.5GB:

[ec2-user@youserver]$ cd NEXRAD_Demo/inventory
[ec2-user@youserver]$ ls noaa-nexrad-level2.*obj* | wc –l
2751

设计并提交传输任务

由于该场景下,源存储桶和目标存储桶之间的单次传输的速度非常有限,实测该场景下大概在9KB/s左右,而且网络抖动比较厉害,因此,我们尽量缩小单个任务的总数据量大小,并设定大对象的大小阈值设置为2MB;具体参数需要在Python常量参数中修改:

由于清单文件太多,总数据量太大,因此,我们可以数据清单分成多个目录,分别进行计算,比如如下命令:

  • 我们把大小小于800000 bytes的文件放到目录./1/里面
  • 把大小小于2MB的文件放到./2/里面
  • 把大小小于6MB的文件放到./3/里面

大家可以根据自己的需要,分成不同的对象清单文件夹

[ec2-user@youserver]$ cd NEXRAD_Demo/inventory
[ec2-user@youserver]$ mkdir 1 2
[ec2-user@youserver]$ find ./ -size -800000c -print0 | xargs -0 -I {} mv {} ../1/
[ec2-user@youserver]$ find ./ -size -2M -print0 | xargs -0 -I {} mv {} ../2/
[ec2-user@youserver]$ find ./ -size -6M -print0 | xargs -0 -I {} mv {} ../3/
[ec2-user@youserver]$ cd NEXRAD_Demo/tasksubmit
[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_submit.py -d ../inventory/1/ -r us-east-1 > noaa-nexrad-level2.task1.log 2>&1 &
[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_submit.py -d ../inventory/2/ -r us-east-1 > noaa-nexrad-level2.task2.log 2>&1 &
[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_submit.py -d ../inventory/3/ -r us-east-1 > noaa-nexrad-level2.task3.log 2>&1 &

为了演示,我们没有生成所有对象清单的传输任务,仅仅选取了其中某连个文件夹,生成的传输任务如下图所示,有些队列的消息数为0,表示我们后台还有传输任务消息没有发送到队列中:

我们来看看队列里面的一个任务的结构组成,S3Task_Bigsize*队列中的任务相比于普通队列中的任务多了一组分片的Range范围:

设计并执行传输任务

在并发执行数据传输任务之前,我们先看看单个任务执行情况,任务执行需要指明任务队列,源和目的存储桶以及访问目标存储桶的IAM Profile名:

[ec2-user@youserver]$ cd NEXRAD_Demo/taskexec
[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_NormalQueue15098144850121 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_NormalQueue15098144850121.exec1.log 2>&1 &
[ec2-user@youserver]$ nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_BigSizeQueue1 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_BigSizeQueue1.exec1.log 2>&1 & 

从执行日志可以分析出,对于NormalQueue中的单个任务,由于是小对象,而且数量是10,因此我们的执行代码可以并发执行,总体执行时间是26秒;对比BigsizeQueue中的任务,虽然总体数据大小和NormalQueue差不多,但由于只有2个对象并发复制,该任务的总体执行时间是363秒。

关于并发任务执行,本质上是一个批处理的业务逻辑,假定有1000个任务列表,

  • 每个任务数据量上限20MB,如果传输速度在10KB/s那么一个任务需要大概需要2048秒即34分钟,但我们的的任务执行是多线程并发操作,按每个任务最多10个对象算,在10KB/s的速度下,一个任务最快需要执行3.4分钟左右(10个对象并发上传),最慢34分钟(一个对象的情况下)
  • 如果同时100个并发执行,完成所有任务,需要至少执行10次,总时长在34分钟到340分钟之间
  • 如果并发1000个,完成所有任务需要至少执行1次;总时长3.4分钟到34分钟之间

本实验为了学习的目的,我们在测试机r4.2xlarge的机器上,后台并发执行100个任务,并观察数据传输的实际状况,

[ec2-user@youserver]$ cd NEXRAD_Demo/taskexec
[ec2-user@youserver]$ vi parallel_run.sh
#!/bin/bash

for((i=2; i<52;i++))

do

  nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_BigSizeQueue1 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_BigSizeQueue1.exec_$i.log 2>&1 &

done

for((i=2; i<52;i++))

do

  nohup python ../../s3deepdive/s3_task_exec.py -q S3Task_NormalQueue15098144850121 -source_bucket noaa-nexrad-level2 -dest_bucket bjsdest -dest_profile bjs > S3Task_NormalQueue15098144850121.exec_$i.log 2>&1 &

done
[ec2-user@youserver]$ chmod +x parallel_run.sh
[ec2-user@youserver]$ ./parallel_run.sh

针对下面这两个队列,每个运行了50个并发任务,因此在SQS界面上,可以看到传输中的消息是50,也就是同时有50个消息任务正在被处理:

可以看到BigsizeQueue队列一次就成功完成的数据传输任务总数为 79949+50-79971=28;NormalQueue队列一次就成功完成的数据传输任务总数为79950+50-79958=42;我们定义任务的成功与否,为该任务中所有的对象都成功传输完成;该实验我们对于分段采取的大小是2MB,在9KB/s左右的互联网传输速度下,还是有点大,容易失败;普通队列上中的任务,对象大小都在几百KB左右,一次传输成功的概率大很多。

数据完整性检查

本文不对单个对象的完整性问题展开探讨,对于用户首先最关心的问题是,源存储桶的对象有没有完全迁移到目前存储桶中;因此,可以定期生成目标存储桶的对象清单,并比对源存储桶的对象清单,在自定义的清单程序中,我们是逐级生产对象清单文件,有一定的规律,如果两个存储桶使用同样的depth参数生成,生成的对象清单文件个数首先一致的;具体到识别出有没有遗漏的迁移对象,可以进一步对比清单中的对象列表。

总结

本文就跨区域S3数据迁移整体架构作了基本探讨,并在架构的基础上,学习和实践了利用AWS S3 CLI以及boto3库如何实现自定义的对象清单,传输任务分解及执行逻辑。现实的生产场景下,还需要更多细节的思考和实践,接下来,我们会继续在大规模批处理,大规模对象集的完整性校验方面和大家继续探讨。

 

作者介绍:

薛军

AWS 解决方案架构师,获得AWS解决方案架构师专业级认证和DevOps工程师专业级认证。负责基于AWS的云计算方案架构的咨询和设计,同时致力于AWS云服务在国内的应用和推广,在互联网金融、保险、企业混合IT、微服务等方面有着丰富的实践经验。在加入AWS之前已有接近10年的软件开发管理、企业IT咨询和实施工作经验。