亚马逊AWS官方博客

分布式 Lambda 从海外到中国自动同步S3文件

现在,越来越多的中国公司,在AWS海外区域部署业务。利用 S3 作为数据湖,存储海量的数据,包括图片、视频、日志、备份等等。很多场景下,需要把海外的 S3 数据复制到中国,在中国进行进一步分析处理。AWS  S3 在海外提供跨区域自动复制功能 (Cross region replication, CRR) 。但是,由于中国和海外区域隔离,不能使用CRR功能。

一个简单的命令行,调用不同的 profile ,利用管道,把第一个命令的输出流,作为第二个命令的输入流,可以复制 S3 文件:

$ aws s3 cp s3 ://globalbucket/testfile - --profile global | aws  s3  cp -  s3 ://chinabucket/testfile --profile china

 

但是,命令行需要有服务器来运行。更重要的问题是,没有机制来获取 S3 对象的变化信息,很难实现类似 S3  CRR的自动复制。aws  s3  sync同步命令,可以比较源和目标的变化,但是sync同步需要先list列出。在源 S3 有大量文件的时候,性能会受到影响。

 

下面介绍一些解决方案,利用 Lambda 来实现海外和中国之间的 S3 自动同步。

先了解一下方案1:

 

S3 有通知功能 (Notification) ,每当有对象创建或者删除时,发送通知触发 Lambda ,复制 S3 对象到其他区域。具体实现上, Lambda 下载 S3 对象到本地 /tmp目录,然后上传到中国。

此方案比较简单,但是, Lambda 容器 /tmp目录只有512MB,更大的 S3 文件不能下载。如果采用 Stream 流的方式,大文件很容易把 Lambda 内存用尽。并且,海外到中国的网络如果不稳定,传输中断,文件复制失败,没有任务监控和重新传输的机制。

 

 

再来看看方案2:

此方案使用 S3  Get Object 方法,对于大文件,分别按照 range 下载成多个小块 (5MB) ,每个小块上传到同区域的其他 S3  bucket。每个小块上传之后,触发多个 Lambda ,复制到中国 S3 。

相对方案1,此方案可以传输更大的文件,上传到本地 S3 之后触发的 Lambda ,可以并行异步完成多个块的传输任务,效率更高。但是,分成多个 range 下载到 Lambda  /tmp目录,再从 Lambda 上传到 S3 ,此过程是串行的,仍然需要时间。即使 Lambda 和 S3 都位于 AWS 网络,传输速度很快,5MB的分块,从主 Lambda 触发到上传同区域 S3 完成,也需要1-2秒左右。 Lambda 最长执行时间是15分钟(900秒)。按照最乐观的速度1秒来计算,最大支持的文件=900*5=4500(MB) 。

虽然 Lambda 支持多线程,但是 Lambda 需要配置大内存,才能真正体现出来多线程的效率提升。并且每个分块的线程执行情况,除了 Lambda 之外并没有地方可以保存任务结果。一旦 Lambda 超时,整个任务的情况无从知晓。

 

推荐方案3:

此方案把任务分解成多个 Lambda 函数,并行处理。任务状态和处理结果,存储在 DynamodDB 数据库。有专门的 Lambda 函数,定期检查状态。发现有失败的任务,会自动再次调用,直到整个任务完成。

 

具体逻辑实现,首先 S3 通知触发 Lambda 函数 ”Main”,调用 S3  Head Object 获得 S3 对象大小,每个5MB分块触发一个 Lambda  “MPU” (Multi Part Upload) 函数,并且生成一个中国区 S3  Upload Id 。每个 MPU 函数使用 Get Object 下载自己对应的 S3  range 部分,并调用 Upload Part 上传到中国区 S3 ,完成之后把任务和结果信息写入 DynamodDB 。除了完成 Upload Part 任务,还会统计完成任务的数量。如果已完成任务数量等于 S3 分段数量,调用中国 S3  Complete Upload  ,结束任务。

Lambda  MPU函数超时设置为5分钟。对于 5MB 的数据,这个时间一般足够了。但是,海外到中国区网络可能会有稳定, Lambda  MPU 函数如果出现超时,此分段就不能再执行。 Lambda 是无状态的,运行结束,数据不会保留。

为了监控每个分块函数的执行情况,加入了 Lambda  “Monitor” 函数,由 Cloudwatch  Events 每5分钟定期触发。Monitor函数扫描 DynamodDB 的任务执行情况,如果出现未完成的任务,并且超时,就重新执行,直到所有任务执行完成。

这里引入了分布式的设计理念,利用一个主调度函数Main,调用执行函数 MPU 处理每个分块,并有监控函数 Monitor 检查任务执行结果。对于大文件,调用多个 Lambda 并行处理,提高处理效率和传输速度,理论上可以支持到 S3 最大单个文件5TB。每个 Lambda 函数可以只用最低的内存128MB ,减少成本。

S3  分段上传,先创建一个 Upload ID ,然后把文件分成多个分块并行传输,最后当所有分块完成之后,调用 Complete Upload 完成整个上传。在分块传输阶段,每个分块单独传输。只要不执行 complete 或者 abort 操作,任务就不会结束。而已经上传的部分会暂时存储在目标 S3 ,直到任务结束。从这里来看,哪怕是分块传输中断,也只需要传输这部分的数据。

 

总结方案3的优势:

  1. 完全自动化操作。
  2. 监控任务检查上传任务状态,失败则自动调用。
  3. 支持大文件。
  4. 并行处理提高效率。
  5. 节约成本,无需购买服务器。

 

 

理论介绍完,进入实战。

 

首先创建 Lambda  IAM 角色 Lambda-S3 copy ,信任 Lambda 服务。自定义角色策略示例如下,请修改帐号652692642590为自己的帐号,以及区域。

 

{

"Version": "2012-10-17",

"Statement": [

{

"Effect": "Allow",

"Action": [

"logs:CreateLogGroup",

"logs:CreateLogStream",

"logs:PutLogEvents"

],

"Resource": "arn:aws:logs:us-west-1: 652692642590:*"

},

{

"Sid": "AllowService",

"Effect": "Allow",

"Action": [

"S3:*",

" DynamodDB :*",

" Lambda :*"

],

"Resource": "*"

}

]

}

 

Lambda 服务访问海外区域资源时,可以使用角色,而无需指定密钥 (Credential) 。而在访问中国 S3 的时候,AWS海外和中国隔离,不能使用海外 Credential 访问中国,也不能通过 Role, STS, Cognito 方式获取临时 token 访问。可以把包含 S3 权限的 Credential 信息,放在海外 S3  bucket ,设置合适的 S3 权限。

例如,中国区的 credential 信息存放在 S3 ://globalbucket/S3BJScredential.txt ,内容示例如下。第一行为 Access key ,第二行为 Secret Key 。

****************GPLQ

****************qrCc

 

Lambda 访问 S3 的权限已经在 IAM 角色设置。为了安全起见,存放 Credential 的 S3  bucket,设置 S3  bucket 策略,只允许 Lambda 服务访问,其他用户不能下载。

 

{

"Version": "2012-10-17",

"Id": "AllowOnlyLambda",

"Statement": [

{

"Sid": "AllowOnlyLambda",

"Effect": "Deny",

"NotPrincipal": {

"Service": "lambda.amazonaws.com"

},

"Action": "S3:GetObject",

"Resource": "arn:aws:S3:::globalbucket/S3BJScredential.txt"

}

]

}

 

创建 Lambda 主函数,作为 S3 触发的目标。Name:  S3CopyToChina-Main,Runtime: Python 3.6,Role :  Lambda-S3copy。创建之后,Configuration 页面中,超时 Timeout=5 分钟。环境变量设置:

CredBucket = < 海外区域存放 credential  S3  bucket>

CredObject = < 海外区域存放 credential  S3  object>

DstBucket = < 中国目标  S3  bucket>

 

代码地址:

https://github.com/milan9527/S3copy2china/blob/master/S3CopyToChina-Main.py

 

 

创建 Lambda 分段上传的任务函数,Name:  S3CopyToChina-MPU,Runtime: Python 3.6,Role :  Lambda-S3copy。超时Timeout=5分钟。

 

与主函数不同,环境变量DstBucket作为事件参数,由主函数设置,传递到任务函数,无需再次设置。 Access key 和 Secret 是敏感信息,不适合作为参数传递,仍然需要在每个函数中重新构造 S3  Client , MPU 函数中需要设置 CredBucket 和 CredObject 。

代码URL:

https://github.com/milan9527/S3copy2china/blob/master/S3CopyToChina-MPU.py

 

创建 Lambda 单个对象的任务函数,Name:  S3CopyToChina-Single,Runtime: Python 3.6,Role :  Lambda-S3copy。超时 Timeout=5 分钟。环境变量同上。

代码URL:

https://github.com/milan9527/S3copy2china/blob/master/S3CopyToChina-Single.py

 

 

创建 Lambda 监控函数,Name:  S3CopyToChina-Monitor,Runtime: Python 3.6,Role :  Lambda-S3copy,超时 Timeout=5 秒。监控函数每5分钟运行一次,每次获取 DynamodDB 的信息。如果有任务没有完成,并且任务开始时间离当前时间已经超过5分钟,调用 S3CopyToChina-MPU 或者 S3CopyToChina-Single 再次执行。此函数运行时间短,也无需设置环境变量。

代码URL:https://github.com/milan9527/S3copy2china/blob/master/S3CopyToChina-Monitor.py

 

 

创建4个 DynamodDB 表:

S3MPU :记录每个 S3 分段的任务信息,在任务完成后删除数据。

S3MPUResult :记录所有 S3 分段上传任务的信息,一直保存。

S3Single :小于5MB文件不分段,记录单个文件的任务信息,在任务完成后删除数据。

S3SingleResult :所有单个文件任务的信息,一直保存。

 

 

表1

Table name:  S3MPU

Primary partition key: uploadid (String)

Primary sort key: part (Number)

 

表2

Table name:  S3MPUResult

Primary partition key: uploadid (String)

 

表3

Table name:  S3Single

Primary partition key: id (String)

 

表4

Table name:  S3SingleResult

Primary partition key: id (String)

 

设置 S3 通知。选择 S3  bucket,Properties – Events – Add notifiation。

Name:  S3 Copy,Events 选择 ObjectCreate (All) 和 ObjectDelete (All) ,任何创建或者删除操作都会触发。Send to:  Lambda Function, Lambda :  S3CopyToChina-Main

 

 

设置 Cloudwatch  Events ,每5分钟触发一次 Lambda 监控函数 S3CopyToChina-Monitor 。

 

 

所有设置完成,开始测试吧。

在设置过 S3  Notification 的bucket ,上传文件。如果文件大于5MB,在 DynamodDB 的 S3 MPUResult 中看到信息,包括:

upload id: S3 上传任务ID

source_bucket:源 S3  bucket

destination_bucket:目标中国 S3  bucket

key: S3 对象名称

part_qty:总分段数量

part_count:分段完成的数量

complete:总上传是否完成

complete_time:总上传任务完成时间,以Unix时间表示

 

 

S3 MPU记录每个分段任务的信息,可以看到有多条记录。包含信息:

upload id: S3 上传任务ID

source_bucket:源 S3  bucket

destination_bucket:目标中国 S3  bucket

key: S3 对象名称

range:根据范围下载 S3 对象到 Lambda 本地

part:  S3 上传任务的每个分段的顺序

etag:分段上传之后,从 S3 返回的每个分段的 etag ,最终完成任务时需要

part_complete:分段上传任务完成情况。用于监控任务。统计表中此项为Y的已完成分段数量,如果等于所有分段的数量,执行complete upload 最终完成任务。

start_time:分段上传任务开始时间。用于监控任务。如果 part_complete=N ,并且 start_time 小于(监控运行时间-5分钟),说明此分段任务一定超时,重新运行此任务函数。

 

 

表 S3 Single和 S3 SingleResult 相对比较简单,不进行分段,只记录任务的信息。监控任务同样定期检查 S3 Single,如果没有完成,并且时间超时,就重新运行。

 

如果需要查看 Lambda 函数的执行情况,也可以在 Cloudwatch  Log 中查看每个函数的信息,包括打印输出,以及超时情况。

 

当 S3 MPUResult 或 S3 SingleResult 中的 complete 显示 Y 时,整个复制任务完成。此时在中国的目标 S3  bucket 看到,文件已经成功从海外区域复制过来。

 

我在美国 us-west-1 区域向 S3 上传了一个6MB的文件,总时间1分30秒。又上传一个5GB的文件,总时间大约12分钟。具体时间和区域以及当时的网络状况相关。

对于5GB文件,调用了1000个 Lambda 任务,并发传输,极大提高效率。这1000个任务完成时间有先后,即使有些任务超时,仍然有监控 Lambda 检查 DynamodDB 中的信息。如果检测到没有成功,再次调用任务,直到最终所有分段任务完成。越是大文件,并发约多,就越能显示此分布式架构的优势。

 

如果向 S3 上传文件很频繁,或者文件很大,要注意 DynamodDB 的读写容量。默认读写都是5。

每个 Lambda 任务都要读写 Lambda 来记录任务的完成情况。大量触发的 Lambda 会频繁读写 DynamodDB ,当超出 DynamodDB 容量限制时,请求受限,任务信息无法读取。 DynamodDB 默认配置了 AutoScaling ,会根据读写容量自动伸缩。但是,如果读写短时间内急剧增加,AutoScaling自动扩容的时间可能跟不上读写速度,仍然有部分请求会被拒绝,直到扩容能满足需求。

可能出现的情况是,分段任务已经完成,但是无法更新 DynamodDB 表 S3 MPU的 part_complete 属性,直至超时。而此任务在监控 Lambda 看来并没有完成,于是重新调用。或者,监控 Lambda 获取不到最新的任务完成情况,不知道是否需要重新调用,直到5分钟之后再次触发监控。这些情况不会影响最终任务的完成,但是会因为数据不准确,而导致不必要的重新调用,延长任务时间,还增加跨区域的流量传输成本。

 

以下是上传5GB文件时, DynamodDB 的一些 Cloudwatch 监控指标:

 

 

可以看到,预配置默认读写容量是5,突发大量的读写请求,很多请求被限制,Autoscaling自动扩容,此时读写请求量已经降下来。

 

建议根据 S3 的数量、大小和频繁程度,适当调整 DynamodDB 读写容量,再结合 Autoscaling ,优化方案。如果再仔细观察,大量的是Query查询请求。对于读请求很多的情况,也可以使用 DynamodDB  DAX缓存提高效率,避免大量的直接查询 DynamodDB 。但是DAX缓存相对成本比较高,需要根据业务量做出权衡。

 

Lambda 默认并发1000。此方案中, Main 或者 Monitor 函数都使用 Event 异步方式调用MPU函数。即使到达1000并发,仍然有重试机制。除非同时上传的文件太大太多,一般都可以通过默认的重试两次来解决高并发问题。注意观察 Lambda   Cloudwatch  Dead Letter Queues 指标,如果此指标大于0,说明有事件被丢弃。

希望这个方案对大家的业务有所帮助。

 

 

本篇作者

章平

亚马逊 AWS 解决方案架构师。2014年加入 AWS 技术支持团队,解决客户疑难杂症,熟悉各种用户场景。对于各类云计算产品和技术,特别是在数据库和大数据方面,拥有丰富的技术实践和行业解决方案经验。