天下武功唯快不破, 在很多应用场景中, 如机器学习, 数据分析,高性能计算等, 应用需要高速加载大量数据后进行本地计算。
试想一下, 您在亚马逊云科技上启动了一台p4d.24xlarge (8 x NVIDIA A100 Tensor Core GPUs)的实例, 您立即拥有了一尊有PetaFLOPS级处理能力的性能怪兽, 为了喂饱这个家伙, 您在Amazon Simple Storage Service (Amazon S3) 上准备了TB级的基础数据, 按200GiB一个对象拆分后存放. 如果按照万兆10G网络的理论速度 1.16GiB/s的峰值性能来下载数据到本地, 200GiB的单个对象需要大约172秒 ≈ 2.8分钟 , 那就意味着下载1TiB数据集总共需要等待14分钟左右, p4d.24xlarge的按需实例报价大约是$32.7/小时, 也就是说,每小时里花了$7.63等待数据到来后, 您才能真正开始计算任务。
改善数据等待的方法有很多, 优化工作流程、调整数据pipeline、边下载, 边计算都可以部分缓解这个问题, 但随之而来的是需要更复杂的工作流程和更智慧的调度算法。
但是, 如果我们找到问题的核心, 让数据下载更快, 那一切都会直接变得简单了。
经常有客户会问到: 把我存储在Amazon Simple Storage Service (Amazon S3) 上的数据下载到Amazon Elastic Compute Cloud (Amazon EC2) 上, 速度能有多快?
答案是非常肯定的: 在全球任何一个亚马逊云科技的区域里, Amazon EC2 到 Amazon S3的默认数据传输带宽都可以最高达到100Gbps。
“100Gbps ?! ”
“在云上可以达到10万兆网络的传输速度? ”
那么接下去的问题是: 需要怎么做才能达到这么高的传输带宽?
本文希望通过梳理在云上进行高性能数据传输的一些背景知识和具体方法, 帮助大家达成进行接近100Gbps线速的数据传输目标。为了验证最高的带宽, 我们给自己设定了一个难题: 将1个200GiB的对象存储在S3上, 使用单进程应用,以最快的速度把这个对象下载到EC2上。
图:整体架构
为了达到我们设定的目标, 我们先要回顾几个重要的关于AWS服务和性能的知识点:
知识点1: Amazon S3
Amazon S3是AWS在2006年发布的第一个云服务, 面向互联网海量用户同时使用的对象存储, 在2021 pi day上披露的数据, S3已经存储了超过 100 万亿个对象, 达到每秒数千万个请求的经常性峰值. S3为存储在上面的数据提供11个9的持久性和99.99%的可用性。
用户与S3的所有交互通过基于HTTP的S3 REST API来实现, 因为存取使用方便, 提供很高的可用性与可靠性, 按需计费, 并且价格也很便宜, 因此S3是客户长期持久化海量数据的首选服务. 如今以S3为中心的数据湖架构也应用得越来越广泛, S3持续发挥着作为互联网基石的作用。
图: Amazon S3
知识点2: AWS 的 VPC网络
S3和EC2在服务的网络上有很大不同, 简而言之, EC2的网络归属于用户的VPC中, 通过IP网络与其他EC2或者AWS的其他服务进行通讯和交互, 而S3是一个独立服务并不存在于任何一个用户的VPC中, EC2通过S3提供的Endpoint与S3进行通讯。
从下面的架构图中我们可以看到, 从EC2到S3的网络流量需要经过 EC2的本机网卡 ENA(1) → 用户VPC →网关(2) → S3 Endpoint(3)。
在这个数据传输链路中, 我们需要注意:
- 每款EC2实例类型都有可达到的最高带宽
- 部分实例有基线带宽和突增带宽, 通过网络I/O的credits机制控制可突增的时长
因此在构建数据传输之前, 您需要了解您使用实例的网络性能上限, 更详细有关EC2实例网络带宽的介绍可以参看文档:
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-network-bandwidth.html
图: EC2到S3的网络结构
知识点3 – 并发
在现代计算机的性能模型中, 提高并发度是达成高性能的关键技术, 并发的方式有多种多样, 线程级的并发、进程级的并发、节点级的并发, 每一种并发的方式都有自己的并发模型, 并与之配套有相应的同步机制和I/O请求模式。
为了实现更好的整体性能, 高性能应用通常会采用事件循环+非阻塞I/O的方式, 相比传统应用的线程/进程+阻塞I/O的方式, 可以极大提高单节点上的I/O处理能力. 比如,大家熟悉的redis, nginx等都采用了事件循环的架构来实现高性能。
图: Nginx Event Loop介绍
引用自: https://www.nginx.com/blog/thread-pools-boost-performance-9x/
我们的既定目标是: “将1个200GiB的对象存储在S3上, 使用单进程应用,以最快的速度把这个对象下载到EC2上.”, 那么如果只有一个目标对象, 如何能够实现并发访问或下载呢?
知识点4: 并发访问S3
对于S3来说在上传和下载路径都可以对单个对象进行分片操作,在上传路径, S3通过Multipart Upload实现单个大对象的分片上传。
图: S3对象分段上传下载
在下载路径, S3支持两种方式的分片下载.
第一种, 如果对象是以分片的方式进行上传的, 那下载时可以按照原来分片上传的各个部分(part)来进行下载,
第二种, S3 API提供了Bytes Range的方式, 可以自由指定一个对象中的某一段来进行下载。
第一种方式略有局限, 因为上传时的分片设置并不一定完全适合下载时使用, 并且有时候我们可能仅需要一个对象中的某一小部分, 而在Bytes Range的方式下, 一次API请求可以指定对象的某一个连续区间进行下载, 方便用户根据应用的需要进行适当调整。
图: S3对象按字节区间(Byte Range)下载
为了应对全网海量用户的同时访问请求, 在接入层, S3的Endpoint提供了单一域名对应多个IP地址的方式来实现接入层的负载均衡. 在实践中, 通过DNS解析S3的Endpoint域名, 大约每隔几秒钟会得到一个不同的A记录(IPv4地址). 因为S3提供了强一致性, 所以用户可以放心从Endpoint解析出来的多个IP同时访问S3。
为了尽可能增加下载的并发度, 以使传输能够达到最大的传输带宽, 我们采取按Bytes Range分片下载对象, 并尽可能把对每个分片下载请求发送到不同的S3 Endpoint IP地址上的方式来发起请求. 因此, 我们的数据请求模型变成了以下的模式。
图: 使用多个IP地址从S3按字节区间下载对象
知识点5: 关于数据落盘
以100Gbps为例, 如果您要在本机上保留下载数据的副本, 那意味着您需要以 11.64GiB/s 的速度在本地磁盘上保存这些数据. 在实际场景中这是需要注意的. 必须为计算节点配置与下载速率相匹配的本地存储. 当然您如果采用流式计算或者不需要持久化数据到本地, 那您可以忽略这部分内容。
最后, 在动手构建之前,我们还需要回答一个问题:
没有现成的工具可以达到我们的预设目标吗?
对于S3上的数据管理和使用, AWS已经提供了完善的工具集, 包括了Web Console, AWS Command Line Interface (AWS CLI)工具以及各种语言的AWS SDK等, 使用现有的这些工具能否达到最高的带宽呢?
答案是: 可能非常困难, 以AWS CLI为例, 默认情况下, 使用aws s3 cp命令下载S3上的对象能够达到几百MB/s的级别。
很多客户会有疑问, 为什么AWS自己提供的工具不能提供极限的性能?
这里需要特别说明:
AWS CLI提供了对AWS所有服务API调用的命令行工具, 针对S3这个服务, 不但提供了相对底层, 针对每个S3的API的命令行工具(aws s3api <subcommands>), 还进行了高级功能的封装(aws s3 <subcommands>), 在aws s3 命令集中实现了诸如目录同步等高级功能, 因此AWS CLI的重点在于提供简便高效的命令行工具, 帮助客户轻松完成对AWS服务的管理工作, 并且工具自身能够快速迭代, 紧跟AWS新服务新特性的发布, 而从S3上高性能下载数据, 并非AWS CLI的首要目标。
其次, AWS CLI基于python语言实现, python在多线程并发存在一定的局限性, 接下去我们会详述。
当然通过调整一些AWS CLI的配置, 我们还是可以让AWS CLI的下载速度更快. 比如, 调整CLI的并发参数(默认10)
aws configure set default.s3.max_concurrent_requests 100
下载速度的到很大提升, 详细可以参考: https://docs.aws.amazon.com/cli/latest/topic/s3-config.html
同时, 采用多个AWS CLI进程同时运行, 并行下载多个对象, 这也是平时我们常用的方法, 同样能够提高整体的下载速率. 在大部分场景下, 如我们前面所述进程级的并发已经可以满足应用的需要。
再来谈谈程序员们接触最多的AWS SDK, AWS为所有的服务提供了丰富的计算语言SDK的支持, AWS SDK是对AWS服务API的特定计算机语言的接口封装和实现, 并提供了一些高级的功能简化开发人员的使用, 但SDK需要维护自身的兼容性, 并考虑对语言更广泛的支持, 因此SDK通常不提供程序的运行时(runtime)、I/O模型、线程模型等这些对极致性能至关重要的部分, 这部分需要依赖程序员自己来实现。
说到这里, 要实现一个能够逼近线速下载的高性能应用的确不是一件很容易的事情, 对于10万兆网络来说, 普通应用使用的线程模型和I/O模型已经没法来应付了。接下去出场的是真正的主角: AWS Common Runtime (CRT) 为了帮助我们的客户构建高性能的应用, AWS提供的一套完整功能的应用基础库AWS Common Runtime (CRT)
https://docs.aws.amazon.com/sdkref/latest/guide/common-runtime.html C语言实现、模块化设计、Event Loop、线程/同步原语、内存管理等等…
AWS CRT提供了构建一个高性能应用所需的一切轮子,并且基于这些基础的底层库, AWS CRT还重新实现应用经常会用到的模块, 包括 http协议、checksum校验算法、mqtt协议、S3协议等,为了提供更大的使用灵活性, AWS CRT也提供了与其他编程语言的Bindings(语言接口绑定), 目前已经集成的语言包括: C++、Java、python、ruby、nodejs、swift、c#、php、Kotlin,客户可以在AWS CRT之上构建自己的高性能应用客户端或者服务端。
图: AWS CRT模块结构
您可能会问: 为什么没有看到目前最火热两门高性能语言Rust与Go ?
其实FFI for AWS CRT (Rust语言绑定)已经有了, 只是目前完成集成的功能还比较有限,总的来说, Rust和Go在语言特性上已经提供高性能应用所必需的并发处理、同步原语等机制,为这两种语言提供一个高性能应用框架集成的迫切性并没有这么高,接下去的演示中我们也会使用AWS原生的Rust SDK来实现对S3数据的并发下载来做对比。
基于上面一系列的原因, 为了突破现有应用框架带来的瓶颈, 我们选择使用AWS CRT来构建一个高性能的S3下载程序
接下去, 请各位绑好安全带, 开启我们的100Gbps极速高性能之旅吧!
为了达到100Gbps的目标, 我们选择c5n.18xlarge的EC2实例, 在EC2实例家族中带n的实例都具有更高的网络性能, 这款c5n.18xlarge的网络性能就可以达到100Gbps, 它的基本配置为:
|
Model |
vCPU |
Memory (GiB) |
Instance Storage (GB) |
Network Bandwidth (Gbps) |
EBS Bandwidth (Mbps) |
1 |
c5n.18xlarge |
72 |
192 |
EBS-Only |
100 |
19,000 |
图: 完整的验证架构
使用AWS CRT来构建一个高并发S3下载程序的基本逻辑和示例代码可以在AWS CRT的samples里找到
https://github.com/awslabs/aws-c-s3/tree/main/samples/s3
为了达到我们的测试目标, 即获得最高的下载速率, 我们基于AWS CRT构建了一个特殊版本的测试程序, 主要的不同有:
- 取消了数据下载后落盘的动作, 避免数据落盘成为测试的瓶颈.
- 在每个分片下载完成后, 记录下载完成的时间和完成的下载总量, 用于计算下载所花的时间.
- 参数化分片大小和并发数量, 用以调整以获得最高的性能.
此外,
- 我们在EC2实例上安装了AWS CloudWatch Agent, 用于采集操作系统的网络指标.
- 在EC2实例上安装
这里有一个小细节, 我们前面提到, 通过使用多个S3 Endpoint IP地址来分散S3的请求压力, AWS CRT其实也考虑到了这个问题, AWS CRT 实现了一个host resolver模块, 可以用来解析并缓存同一个域名的多个A记录, 供应用在执行中循环使用, 这个动作叫做DNS预热。
进行DNS预热需要花一定的时间, 而我们的验证程序希望能每次快速执行, 快速验证并调整各种参数来比较结果, 因此, 我们的验证代码没有使用AWS CRT的host resolver, 而是在测试实例上通过代码预先获取了一定数量的S3 Endpoint的A记录列表, 并通过dnsmasq在本机提供域名解析服务的方式实现地址解析. 但这并非生产环境中的最佳实践, 作为一个大规模的云服务, S3随时可能调整对外服务的IP地址, 因此, 实际应用当中请尽量从DNS服务器获取最新的A记录, 避免某些IP地址失效影响应用正常执行。
介绍到此完毕, let’s go build !
完整的验证代码如下:
#include <fcntl.h>
#include <unistd.h>
#include <aws/auth/credentials.h>
#include <aws/common/condition_variable.h>
#include <aws/common/mutex.h>
#include <aws/common/zero.h>
#include <aws/io/channel_bootstrap.h>
#include <aws/io/event_loop.h>
#include <aws/io/logging.h>
#include <aws/http/request_response.h>
#include <aws/s3/s3.h>
#include <aws/s3/s3_client.h>
#include <aws/common/private/thread_shared.h>
#include <aws/common/clock.h>
#define TEST_REGION "ap-northeast-1"
#define TEST_S3_EP "testbucket.s3.ap-northeast-1.amazonaws.com"
#define TEST_MAIN_EP "s3.ap-northeast-1.amazonaws.com"
#define TEST_FILE "200G_test.file"
#define TEST_RESULT_FILE "test_stats.result"
#define TEST_RANGE_SIZE 8 * 1024 * 1024
struct perf_item {
uint64_t ns;
uint64_t len;
} __attribute__((packed, aligned(8)));
static const struct aws_byte_cursor g_host_header_name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host");
struct app_ctx {
struct aws_allocator *allocator;
struct aws_s3_client *client;
struct aws_credentials_provider *credentials_provider;
struct aws_client_bootstrap *client_bootstrap;
struct aws_logger logger;
struct aws_mutex mutex;
struct aws_condition_variable c_var;
bool execution_completed;
struct aws_signing_config_aws signing_config;
const char *region;
enum aws_log_level log_level;
bool help_requested;
void *sub_command_data;
size_t expected_transfers;
size_t completed_transfers;
};
struct transfer_ctx {
struct aws_s3_meta_request *meta_request;
struct app_ctx *app_ctx;
struct aws_atomic_var bytes_done;
struct aws_atomic_var index;
struct perf_item *stats;
};
void s_get_request_finished(
struct aws_s3_meta_request *meta_request,
const struct aws_s3_meta_request_result *meta_request_result,
void *user_data) {
struct transfer_ctx *transfer_ctx = user_data;
struct perf_item *items = transfer_ctx->stats;
int i = 0;
uint32_t x = aws_atomic_load_int(&transfer_ctx->index);
uint64_t total_ns = last - first;
double total_ms = total_ns / 1000.0 / 1000.0;
printf("first: %ld, last: %ld, total ms: %.2lf\n", first, last, total_ms);
ssize_t total_bytes = aws_atomic_load_int(&transfer_ctx->bytes_done);
double gbps = ((total_bytes / total_ms) * 1000) * 8 / 1000 / 1000 / 1000;
printf("total bytes downloaded: %ld, line speed: %.2lf Gbps\n", total_bytes, gbps);
/* write test stats */
int fd;
fd = open(TEST_RESULT_FILE, O_CREAT | O_TRUNC | O_WRONLY);
write(fd, items, sizeof(struct perf_item) * x);
close(fd);
transfer_ctx->app_ctx->completed_transfers++;
aws_condition_variable_notify_one(&transfer_ctx->app_ctx->c_var);
aws_s3_meta_request_release(transfer_ctx->meta_request);
aws_mem_release(transfer_ctx->app_ctx->allocator, transfer_ctx->stats);
aws_mem_release(transfer_ctx->app_ctx->allocator, transfer_ctx);
return;
}
int s_get_body_callback(
struct aws_s3_meta_request *meta_request,
const struct aws_byte_cursor *body,
uint64_t range_start,
void *user_data) {
struct transfer_ctx *transfer_ctx = user_data;
uint64_t now;
size_t index;
aws_high_res_clock_get_ticks(&now);
index = aws_atomic_fetch_add(&transfer_ctx->index, 1);
(transfer_ctx->stats)[index].ns = now;
(transfer_ctx->stats)[index].len = body->len;
aws_atomic_fetch_add(&transfer_ctx->bytes_done, body->len);
return AWS_OP_SUCCESS;
}
bool s_are_all_transfers_done(void *arg) {
struct app_ctx *app_ctx = arg;
return app_ctx->expected_transfers == app_ctx->completed_transfers;
}
int s3_get(struct app_ctx *app_ctx, struct aws_s3_client *client) {
char source_endpoint[1024];
AWS_ZERO_ARRAY(source_endpoint);
sprintf(source_endpoint, TEST_S3_EP);
char main_endpoint[1024];
AWS_ZERO_ARRAY(main_endpoint);
sprintf(main_endpoint, TEST_MAIN_EP);
struct aws_byte_cursor slash_cur = aws_byte_cursor_from_c_str("/");
struct aws_byte_cursor keyname = aws_byte_cursor_from_c_str(TEST_FILE);
struct aws_byte_cursor *key = &keyname;
struct transfer_ctx *xfer_ctx = aws_mem_calloc(app_ctx->allocator, 1, sizeof(struct transfer_ctx));
xfer_ctx->app_ctx = app_ctx;
xfer_ctx->stats = aws_mem_calloc(app_ctx->allocator, 1000000, sizeof(struct perf_item));
aws_atomic_init_int(&xfer_ctx->bytes_done, 0);
aws_atomic_init_int(&xfer_ctx->index, 0);
struct aws_s3_meta_request_options request_options = {
.user_data = xfer_ctx,
.signing_config = &app_ctx->signing_config,
.type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT,
.finish_callback = s_get_request_finished,
.body_callback = s_get_body_callback,
.headers_callback = NULL,
.shutdown_callback = NULL,
.progress_callback = NULL,
};
struct aws_http_header host_header = {
.name = g_host_header_name,
.value = aws_byte_cursor_from_c_str(source_endpoint),
};
struct aws_http_header accept_header = {
.name = aws_byte_cursor_from_c_str("accept"),
.value = aws_byte_cursor_from_c_str("*/*"),
};
struct aws_http_header user_agent_header = {
.name = aws_byte_cursor_from_c_str("user-agent"),
.value = aws_byte_cursor_from_c_str("AWS common runtime command-line client"),
};
request_options.message = aws_http_message_new_request(app_ctx->allocator);
aws_http_message_add_header(request_options.message, host_header);
aws_http_message_add_header(request_options.message, accept_header);
aws_http_message_add_header(request_options.message, user_agent_header);
aws_http_message_set_request_method(request_options.message, aws_http_method_get);
struct aws_byte_buf path_buf;
aws_byte_buf_init(&path_buf, app_ctx->allocator, key->len + 1);
aws_byte_buf_append_dynamic(&path_buf, &slash_cur);
aws_byte_buf_append_dynamic(&path_buf, key);
struct aws_byte_cursor path_cur = aws_byte_cursor_from_buf(&path_buf);
aws_http_message_set_request_path(request_options.message, path_cur);
aws_byte_buf_clean_up(&path_buf);
struct aws_s3_meta_request *meta_request = aws_s3_client_make_meta_request(client, &request_options);
xfer_ctx->meta_request = meta_request;
aws_mutex_lock(&app_ctx->mutex);
app_ctx->expected_transfers++;
aws_mutex_unlock(&app_ctx->mutex);
aws_mutex_lock(&app_ctx->mutex);
aws_condition_variable_wait_pred(&app_ctx->c_var, &app_ctx->mutex, s_are_all_transfers_done, app_ctx);
aws_mutex_unlock(&app_ctx->mutex);
return 0;
}
int main() {
struct aws_allocator *allocator = aws_default_allocator();
aws_s3_library_init(allocator);
struct app_ctx app_ctx;
AWS_ZERO_STRUCT(app_ctx);
app_ctx.allocator = allocator;
app_ctx.c_var = (struct aws_condition_variable)AWS_CONDITION_VARIABLE_INIT;
aws_mutex_init(&app_ctx.mutex);
app_ctx.expected_transfers = 0;
app_ctx.completed_transfers = 0;
app_ctx.log_level = AWS_LOG_LEVEL_NONE;
struct aws_logger_standard_options logger_options = {
.level = app_ctx.log_level,
.file = stderr,
};
aws_logger_init_standard(&app_ctx.logger, app_ctx.allocator, &logger_options);
aws_logger_set(&app_ctx.logger);
/* event loop */
struct aws_event_loop_group *event_loop_group = aws_event_loop_group_new_default(allocator, 0, NULL);
/* resolver */
struct aws_host_resolver_default_options resolver_options = {
.el_group = event_loop_group,
.max_entries = 128,
};
struct aws_host_resolver *resolver = aws_host_resolver_new_default(allocator, &resolver_options);
/* client bootstrap */
struct aws_client_bootstrap_options bootstrap_options = {
.event_loop_group = event_loop_group,
.host_resolver = resolver,
};
app_ctx.client_bootstrap = aws_client_bootstrap_new(allocator, &bootstrap_options);
if (app_ctx.client_bootstrap == NULL) {
printf("ERROR initializing client bootstrap\n");
return -1;
}
/* credentials */
struct aws_credentials_provider_chain_default_options credentials_provider_options;
AWS_ZERO_STRUCT(credentials_provider_options);
credentials_provider_options.bootstrap = app_ctx.client_bootstrap;
app_ctx.credentials_provider = aws_credentials_provider_new_chain_default(allocator, &credentials_provider_options);
/* signing config */
aws_s3_init_default_signing_config(
&app_ctx.signing_config, aws_byte_cursor_from_c_str(TEST_REGION), app_ctx.credentials_provider);
app_ctx.signing_config.flags.use_double_uri_encode = false;
/* s3 client */
struct aws_s3_client_config client_config;
AWS_ZERO_STRUCT(client_config);
client_config.client_bootstrap = app_ctx.client_bootstrap;
client_config.region = aws_byte_cursor_from_c_str(TEST_REGION);
client_config.signing_config = &app_ctx.signing_config;
client_config.max_active_connections_override = 0;
client_config.throughput_target_gbps = 100;
client_config.part_size = TEST_RANGE_SIZE;
struct aws_s3_client *client = aws_s3_client_new(app_ctx.allocator, &client_config);
app_ctx.client = client;
s3_get(&app_ctx, client);
/* release resources */
aws_s3_client_release(app_ctx.client);
aws_credentials_provider_release(app_ctx.credentials_provider);
aws_client_bootstrap_release(app_ctx.client_bootstrap);
aws_host_resolver_release(resolver);
aws_event_loop_group_release(event_loop_group);
aws_mutex_clean_up(&app_ctx.mutex);
aws_s3_library_clean_up();
return 0;
}
经过编译、执行后的最终的测试结果如下,
[ec2-user@ip-172-31-32-214 clang]$ ./s3bench
first: 30633404368026, last: 30653346831918, total ms: 19942.46
total bytes downloaded: 214748364800, line speed: 86.15 Gbps
结果分析:
测试程序从收到第一个分片到最后一个分片一共花了19942.46毫秒, 通过这个时间计算得到的平均下载速度是10.03GiB/s, 折合线速为 86.15 Gbps。
同时通过AWS CloudWatch Agent采集了操作系统的每秒接收字节数指标 (net_bytes_recv), 并发送到AWS CloudWatch做图形化分析. 可以观察到测试过程中, 每秒接收的字节数会有一小段缓慢上升的过程, 这个阶段里由于测试程序刚刚启动,大量工作线程还在创建和初始化过程中, 因此整个应用还未达到峰值的接收能力. 而大约几秒种后, 待工作线程初始化完毕, 整个应用进入了稳定的工作状态, 峰值接收速度稳定在94.73Gbps, 已经非常接近100Gbps的极值了。
图: 10万兆线速测试结果
BONUS #1
AWS CRT提供如此高的I/O性能, AWS的开发人员也在积极向上层的AWS工具集提供这种能力,其实今天在AWS CLI中已经可以使用到AWS CRT, 只是在AWS CLI中使用CRT模式还处于Experimental阶段,在AWS CLI中启用CRT模式有两个步骤:
- 升级到AWS CLI到v2版本
具体的升级步骤请参考文档:
https://docs.aws.amazon.com/cli/latest/userguide/cliv2-migration-instructions.html
- 打开CRT模式
aws configure set default.s3.preferred_transfer_client crt
CRT模式的详细说明请参考文档:
https://awscli.amazonaws.com/v2/documentation/api/latest/topic/s3-config.html
设置完毕后, 运行s3 cp命令启动下载, 发现速度已经有了明显的提升, 在我们的测试实例上稳定在 2.0GiB/s
[ec2-user@ip-172-31-40-21 ~]$ aws s3 cp s3://testbucket/200_test.file /mnt/200_test.file
Completed 29.6 GiB/200.0 GiB (2.0 GiB/s) with 1 file(s) remaining
2GiB/s虽不及原生AWS CRT能够达到的10GiB/s, 但对大部分使用场景来说已经够用, 简单替换便能大幅提升性能, 不失为一个简单的好办法。
在好奇心的驱使下, 我们分析了一下开启CRT模式的AWS CLI遇到的瓶颈。在AWS CLI稳定到下载峰值的时候, 通过gdb dump了执行程序的调用链, 我们看到, 绝大部分的工作现场都处于epoll_wait()等待的状态
(gdb) info thread
Id Target Id Frame
* 1 Thread 0x7f467761e740 (LWP 18862) 0x00007f467645ea46 in do_futex_wait.constprop ()
from /lib64/libpthread.so.0
2 Thread 0x7f45d8dfa700 (LWP 18904) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
3 Thread 0x7f45661fc700 (LWP 18926) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
4 Thread 0x7f45da1fc700 (LWP 18902) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
5 Thread 0x7f46561fc700 (LWP 18873) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
6 Thread 0x7f4657fff700 (LWP 18870) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
7 Thread 0x7f45d97fb700 (LWP 18903) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
8 Thread 0x7f4664e86700 (LWP 18868) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
9 Thread 0x7f46161fc700 (LWP 18890) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
10 Thread 0x7f45975fe700 (LWP 18912) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
11 Thread 0x7f4596bfd700 (LWP 18913) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
12 Thread 0x7f45db5fe700 (LWP 18900) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
13 Thread 0x7f44c21fc700 (LWP 18962) 0x00007f467645ea46 in do_futex_wait.constprop ()
from /lib64/libpthread.so.0
14 Thread 0x7f451d7fb700 (LWP 18945) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
15 Thread 0x7f461ffff700 (LWP 18882) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
16 Thread 0x7f453d7fb700 (LWP 18939) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
17 Thread 0x7f457d7fb700 (LWP 18921) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
18 Thread 0x7f464f5fe700 (LWP 18876) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
19 Thread 0x7f45957fb700 (LWP 18915) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
20 Thread 0x7f4614dfa700 (LWP 18892) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
21 Thread 0x7f453ebfd700 (LWP 18937) 0x00007f4676d3088c in epoll_wait () from /lib64/libc.so.6
而AWS CLI的主线程锁在信号量上, 这是python著名的GIL, 关于python GIL这里不再赘述,AWS CLI V2 仍然是基于python的实现, 因此无法完全规避语言本身带来的多线程性能瓶颈。
(gdb) thread 1
[Switching to thread 1 (Thread 0x7f467761e740 (LWP 18862))]
#0 0x00007f467645ea46 in do_futex_wait.constprop () from /lib64/libpthread.so.0
(gdb) bt
#0 0x00007f467645ea46 in do_futex_wait.constprop () from /lib64/libpthread.so.0
#1 0x00007f467645eb22 in __new_sem_wait_slow.constprop.0 () from /lib64/libpthread.so.0
#2 0x00007f46768812ff in PyThread_acquire_lock_timed (lock=lock@entry=0x561a67fcf390,
microseconds=microseconds@entry=-1000000, intr_flag=intr_flag@entry=1) at Python/thread_pthread.h:483
#3 0x00007f46768dcaf4 in acquire_timed (timeout=-1000000000, lock=0x561a67fcf390)
at ./Modules/_threadmodule.c:63
#4 lock_PyThread_acquire_lock (self=0x7f4664407ea0, args=<optimized out>, kwds=<optimized out>)
at ./Modules/_threadmodule.c:146
#5 0x00007f46767368a2 in method_vectorcall_VARARGS_KEYWORDS (func=0x7f46775c92c0, args=0x561a67ee7f48,
nargsf=<optimized out>, kwnames=<optimized out>) at Objects/descrobject.c:348
#6 0x00007f46766d80a8 in _PyObject_VectorcallTstate (kwnames=<optimized out>, nargsf=<optimized out>,
args=<optimized out>, callable=<optimized out>, tstate=<optimized out>)
at ./Include/cpython/abstract.h:118
#7 PyObject_Vectorcall (kwnames=<optimized out>, nargsf=<optimized out>, args=<optimized out>,
callable=<optimized out>) at ./Include/cpython/abstract.h:127
#8 trace_call_function (kwnames=<optimized out>, nargs=<optimized out>, args=<optimized out>,
func=<optimized out>, tstate=<optimized out>) at Python/ceval.c:5058
#9 call_function (kwnames=0x0, oparg=<optimized out>, pp_stack=<synthetic pointer>, tstate=0x561a674afe40)
at Python/ceval.c:5074
#10 _PyEval_EvalFrameDefault (tstate=<optimized out>, f=<optimized out>, throwflag=<optimized out>)
at Python/ceval.c:3506
#11 0x00007f467682aec1 in _PyEval_EvalFrame (throwflag=0, f=0x561a67ee7db0, tstate=0x561a674afe40)
at ./Include/internal/pycore_ceval.h:40
但是, 还是那句话, 2GiB/s的下载速度对绝大部分使用场景已经够用, 而且轻松配置几下就能获得, 为什么不用呢?
BONUS #2
同样, 在好奇心的驱使下, 我们也验证了, 自带无畏并发特性的Rust语言, 在S3下载场景中的表现. 验证代码如下:
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::{Client, Region, PKG_VERSION};
use aws_sdk_s3::Endpoint;
use http::Uri;
use tokio::runtime::{Builder, Runtime};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::io::AsyncSeekExt;
use tokio::io::SeekFrom;
use tokio::fs::OpenOptions;
use tokio::time::{Duration, Instant};
use std::error::Error;
use indicatif::{ProgressBar, ProgressStyle};
macro_rules! thread_tasks {
($tasks:expr, $manager:expr) => {{
let mut _tasks = Vec::new();
for i in 0..$manager.threads {
let mut thread_tasks = Vec::new();
_tasks.push(thread_tasks);
}
let mut pos = 0;
for task in $tasks {
let i = pos % $manager.threads;
_tasks[i].push(task);
pos += 1;
}
_tasks
}}
}
type Range = (isize, isize);
type GetTask = Range;
#[derive(Clone)]
struct TransferManager {
s3: Client,
threads: usize,
chunks: isize,
bucket: String,
key: String,
filename: String,
objsz: isize,
forcewrite: bool,
bar: ProgressBar,
}
impl TransferManager {
pub async fn new(is_get: bool, forcewrite: bool, filename: String, bucket: String, key: String, thread_cnt: usize, chunks: isize, endpoint: Option<String>) -> Self {
let region_provider = RegionProviderChain::default_provider().or_else("ap-northeast-1");
let shared_config = aws_config::from_env().region(region_provider).load().await;
let client = if endpoint.is_some() {
let s3_config = aws_sdk_s3::config::Builder::from(&shared_config)
.endpoint_resolver(
Endpoint::immutable(Uri::try_from(endpoint.unwrap()).expect("error: bad endpoint")),
)
.build();
Client::from_conf(s3_config)
} else {
Client::new(&shared_config)
};
let mut manager = Self {
s3: client,
threads: thread_cnt,
chunks: chunks,
bucket: bucket.clone(),
key: key.clone(),
filename: filename.clone(),
objsz: 0,
forcewrite: forcewrite,
bar: ProgressBar::new(0),
};
let mut objsz = 0;
if is_get {
objsz = manager.head_object().await;
if forcewrite {
let mut f = File::create(manager.filename.clone()).await.expect("failed to create file");
f.set_len(objsz as u64).await.expect("failed to set local file len");
}
} else {
objsz = manager.get_file_size().await;
}
let bar = ProgressBar::new(objsz as u64);
bar.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta}) ({bytes_per_sec})")
.unwrap()
.progress_chars("#>-"));
manager.objsz = objsz;
manager.bar = bar;
manager
}
pub fn alloc_get_tasks(&self, tasks: &mut Vec<GetTask>) {
let mut remains = self.objsz;
let mut offset = 0;
while remains > 0 {
let mut range: Range = (0, 0);
if remains > self.chunks {
range = (offset, offset + self.chunks - 1);
} else {
range = (offset, offset + remains - 1);
}
let task = range;
tasks.push(task);
offset += self.chunks;
remains -= self.chunks;
}
}
pub async fn do_get_tasks(&self) {
let mut tasks = Vec::new();
self.alloc_get_tasks(&mut tasks);
let mut thr_tasks: Vec<Vec<GetTask>> = thread_tasks!(tasks, self);
let mut joins = Vec::new();
while let Some(mut thr_taskq) = thr_tasks.pop() {
let mut m = self.clone();
let bar = self.bar.clone();
let join = tokio::spawn(async move {
while let Some(task) = thr_taskq.pop() {
let res = m.download_object(task).await;
if res.is_err() {
println!("download error: {:?}", res);
}
bar.inc(res.unwrap() as u64);
}
0
});
joins.push(join);
}
let instant = Instant::now();
self.bar.reset();
for join in joins {
join.await;
}
let diff = instant.elapsed();
let bw = ((self.objsz as u128 / diff.as_millis()) as f64 / (1024.0 * 1024.0 * 1024.0)) * 1000.0;
self.bar.finish();
}
pub async fn download_object(&mut self, range: GetTask) -> Result<isize, Box<dyn Error>> {
let resp = self.s3
.get_object()
.bucket(&self.bucket)
.key(&self.key)
.range(format!("bytes={}-{}", range.0, range.1))
.send()
.await?;
let data = resp.body.collect().await.expect("error reading data");
let mut bytes = data.into_bytes();
if !self.forcewrite {
return Ok(bytes.len() as isize);
}
let mut file = OpenOptions::new()
.write(true)
.open(self.filename.clone())
.await?;
file.seek(SeekFrom::Start(range.0 as u64)).await?;
file.write_all(&bytes).await?;
Ok(bytes.len() as isize)
}
pub async fn head_object(&self) -> isize {
let resp = self.s3
.head_object()
.bucket(&self.bucket)
.key(&self.key)
.send()
.await
.expect("failed to get object size from S3");
resp.content_length() as isize
}
pub async fn get_file_size(&self) -> isize {
let attr = tokio::fs::metadata(self.filename.clone()).await.expect("failed to get file size");
attr.len() as isize
}
}
fn main() -> Result<(), Box<dyn Error>> {
let is_get = true;
let mut region = "ap-northeast-1".to_string();
let mut bucket = "testbucket".to_string();
let mut object = "200G_test.file".to_string();
let mut filename = "200G_test.file".to_string();
let mut forcewrite = false;
let mut threadpool = 30;
let mut parallel_tasks = 300;
let mut chunksize = 8 * 1024 * 1024;
let mut endpoint = None;
let rt = Builder::new_multi_thread()
.worker_threads(threadpool)
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let mut tm = TransferManager::new(is_get, forcewrite, filename, bucket, object, parallel_tasks, chunksize, endpoint).await;
tm.do_get_tasks().await;
});
Ok(())
}
运行结果如下:
[ec2-user@ip-172-31-32-214 src]$./s3perf
[00:00:24] [#######################################################################] 200.00 GiB/200.00 GiB (0s) (8.24 GiB/s)
[ec2-user@ip-172-31-32-214 src]$
200GiB的对象下载花了24秒, 整体下载速度为 8.24GiB/s, 而在更客观的CloudWatch监控数据中我们看到了峰值80.62Gbps
从结果看到, Rust语言的表现也非常优秀。
总结:
我们目标是通过验证的工作, 帮助大家了解在EC2到S3之间实现逼近100Gbps线速数据传输的一切细节,回归问题的本质,对于客户实际应用场景来说, 是否真正需要追求极限性能, 如何利用好现有的资源达到应用最优的效果,是值得思考的问题。
我们的建议是:
- 应该首先分析应用的场景, 确定极限性能的必要性, 对于绝大部分应用场景来说, 目前AWS提供的CLI或者SDK已经可以满足客户的需求, 或者通过一定的参数调整即可满足对性能的要求。
- 现有工具和方法可以满足应用需求的前提下, 不必盲目追求极限的性能, 毕竟使用AWS CRT改造应用需要重构代码及应用, 会带来额外的改造成本和测试集成的成本。
- 我们验证的场景是EC2上单应用进程对S3上单个大对象的下载, 在更具普适性的场景中, 使用多进程同时下载多个对象方式也是有效提高并发度, 进而提高系统整体吞吐率的有效方式。
- 对于确实需要极限性能的场景, 建议您尽早考虑使用AWS CRT来重构核心的模块, 实现最高的性能。
随着AWS CRT的不断发展, 我们相信它的易用性以及与现有语言和应用生态的集成会做的越来越好, 我们也期待有越来越多的客户使用AWS CRT来构建高性能的应用。
本篇作者