亚马逊AWS官方博客

云端引擎:使用Amazon Lambda和Amazon S3打造智能汽车数据处理平台

在现代汽车行业,数据已成为驱动创新和优化的关键因素。随着联网汽车和自动驾驶技术的快速发展,每辆车都变成了一个移动的数据中心,不断产生海量的传感器数据、行驶数据和用户交互数据。然而,如何高效、经济地收集、处理和分析这些数据,成为了汽车制造商和服务提供商面临的一大挑战。

1.serverless架构在汽车的场景应用

传统的车载数据传输方式通常涉及多个步骤,从数据采集到云端处理。首先,各种电子控制单元(ECUs)和传感器收集车辆状态和环境信息。这些数据通过canbus传输到中央处理单元,然后由TBOX(Telematics Box)进行初步处理。TBOX作为车辆的通信中心,对数据进行简单的过滤、压缩或格式化,随后通过专线或者公网将处理后的数据上传到云端Kafka或者其他消息中间件,进行后续的ETL处理。

图1:传统数据采集架构

然而,这种传统方法存在多个弊端,限制了其在现代汽车数据需求中的应用。主要问题包括

  • 带宽限制导致的数据传输延迟和成本增加,实时性受限影响及时决策,数据完整性风险增加。
  • TBOX处理能力有限,难以进行复杂的实时数据分析,大部分数据全部上传到云端,随着车辆数的持续增加及数据种类的不断丰富,信号数据解码例如Flink等大数据组件的负担和成本难以负荷。
  • 随着车联网规模扩大,中心化的Kafka可能面临扩展性瓶颈,同时不必要的数据上传和处理也降低了成本效益。

在这个背景下,这些局限性促使业界寻求更先进、灵活的解决方案,如边缘计算和智能数据筛选等技术。Amazon Serverless架构,特别是Lambda函数结合S3事件触发的模式,为汽车数据处理提供了一个灵活、高效且具有成本效益的解决方案。

图2:serverless数据采集架构

极氪(ZEEKR)是浙江吉利控股集团有限公司旗下智能电动汽车品牌,2021年3月成立。 极氪区别于传统造车与新势力模式,实现智能纯电的进化,开拓纯电发展第三赛道,极氪作为一个全建制的独立汽车品牌,拥有从制造环节到研发领域再到销售服务的完整体系,详情参考官网https://www.zeekrlife.com/en-sg/

在极氪的这套边缘算法平台中,车云数据闭环是其核心特征和最显著的优势。这个闭环系统巧妙地结合了车载边缘计算的实时性和云端大数据分析的深度,创造了一个不断自我优化的车运协同平台。车辆通过Edge Computer、Time Stream Database和Data Collect等模块收集和初步处理大量数据,包括周期性采集的常规数据、基于特定事件触发的即时数据,以及长期积累的历史数据。这些经过边缘智能筛选和处理的数据随后被上传到云端,实现了从车辆到云平台的数据流动。

云平台接收这些数据后,进行更为全面和深入的分析,生成优化的模型和策略下发到车端,调整数据采集策略、优化车辆性能参数等,实现了真正的车云数据闭环

2.Flink和Lambda在汽车分析场景中的不同应用

在车端数据上传场景中,选择使用Flink或Lambda主要取决于数据的特性和处理需求。一般来说, Flink适合处理高频,高并发的数据流,保证极低的延迟性,同时如果还存在需要实时关联多个数据源进行复杂聚合分析,那选择flink更加符合需求;

针对于离散性的事件触发性处理,每个子任务都可以作为独立事件处理,对于间歇性、不规律的实时数据处理Lambda更经济。在车联网和智能驾驶领域,数据采集和处理的需求日益多样化和复杂化。无论是本文讨论的灵活数据采集场景,还是智能架构中涉及的非结构化数据处理,AWS Lambda 都展现出了显著的优势,特别是在并发处理能力、成本效益和低延迟响应方面。

3.Serverless架构在数采中的实践

  • 在本架构中,主要包含车云通道和数据通道两大核心组件,它们共同构建了一个高效、可靠的双向数据交互体系。车云通道主要通过MQTT负责从云端向车辆下发消息和算法, 数据通道负责将车辆产生的海量数据上传至云端,随后,这些原始数据被导入大数据分析平台进行进一步处理和分析,包括离线和实时应用。

数据采集包含1小时/次边缘算法日志和1秒/次车端的signal数据,性能数据,前端埋点信息,流程如下:

  1. 由售后和内部运营人员登陆前端界面,创建数据采集的任务下发到车端,包含任务创建和管理功能,EKS;
  2. 车云指令通道:将http请求转为MQTT协议到车端,包含EMQX的架构部署,NLB+EKS+RDS;
  3. 车端包含MQTT的客户端还有对象存储的客户端,云端通过车云通道传递token方式和车端完成认证,走公网将数据传到云端s3;
  4. 通过事件触发Lambda对上传的数据进行解析,写入三个Kafka( 日志;2,数据文件元数据;3. 数据传入大数据平台的Kafka),元数据以及日志文件使用的是Redshift,Aurora,Opensearch构建;
  5. 前端采集系统读取日志文件以及数据信息,形成闭环。

图3:极氪全链路数采平台

  • 在架构验证阶段,通过一系列的压力测试,我们模拟了不同级别的并发负载,包括正常运营条件下的稳定流量,以及模拟大规模车队同时上传数据的突发高峰情况,保证数据传入S3之后Lambda+SQS以及写入MSK的延迟维持在100毫秒以内,满足近实时处理的严格要求。

配置的步骤如下:

  1. SQS->create queue->Standard type, keep default
  1. 在SQS上配置policy允许s3消息的推送
{
    "Version": "2012-10-17",
    "Id": "example-ID",
    "Statement": [
        {
            "Sid": "example-statement-ID",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": [
                "SQS:SendMessage"
            ],
            "Resource": "SQS-queue-ARN",
            "Condition": {
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:s3:*:*:awsexamplebucket1"
                },
                "StringEquals": {
                    "aws:SourceAccount": "bucket-owner-account-id"
                }
            }
        }
    ]
}
PowerShell

3. Buckets->Property->Event notification->Create event notification,具体步骤参考https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html

4. 创建Lambda,并且保证Lambda的role包含SQS的权限,参考https://docs.aws.amazon.com/Lambda/latest/dg/services-SQS-configure.html

  • 代码示例+tracing 实现,主要是包含SQS触发Lambda以及Lambda去读取s3的链路追踪和可观测看板。
  • SQSTriggerHandler
{
    "Version": "2012-10-17",
    "Id": "example-ID",
    "Statement": [
        {
            "Sid": "example-statement-ID",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": [
                "SQS:SendMessage"
            ],
            "Resource": "SQS-queue-ARN",
            "Condition": {
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:s3:*:*:awsexamplebucket1"
                },
                "StringEquals": {
                    "aws:SourceAccount": "bucket-owner-account-id"
                }
            }
        }
    ]
}

PowerShell
  • S3Handler
public class S3Handler {

    static Logger logger = LogManager.getLogger(S3Handler.class);

    static S3Client s3Client =

            S3Client.builder()

                    .httpClientBuilder(AwsCrtHttpClient.builder())

                    .overrideConfiguration(

                            ClientOverrideConfiguration.builder()

                                    .addExecutionInterceptor(new TracingInterceptor()) // [Tracing] Adding X-Ray tracing interceptor

                                    .build())

                    .build();

    @Tracing // [Tracing] Enabling X-Ray tracing for this method

    public static Map<String, Object> getObject(String bucket, String objectName, Context context, MetricsLogger metricsLogger) {

        Map<String, Object> res = new HashMap<>();

        LambdaLogger log = context.getLogger();

        BufferedInputStream bis = null;

        ByteArrayOutputStream baos = null;

        try {

            GetObjectRequest getObjectRequest = GetObjectRequest.builder()

                    .bucket(bucket)

                    .key(objectName)

                    .build();

            long downloadStartTime = System.currentTimeMillis();

            ResponseInputStream<GetObjectResponse> s3Object = s3Client.getObject(getObjectRequest);

            long downloadEndTime = System.currentTimeMillis();

            GetObjectResponse objectResponse = s3Object.response();

            String requestId = objectResponse.responseMetadata().requestId();

            //ge metadata

            Map<String, String> userMetadata = objectResponse.metadata();

bis = new BufferedInputStream(s3Object);

baos = new ByteArrayOutputStream();

bis.available();

            int size;

            byte[] bytes = new byte[1024];

            while ((size = bis.read(bytes)) != -1) {

baos.write(bytes, 0, size);

            }

            long costTime = downloadEndTime - downloadStartTime;

context.getLogger().log("s3 download file: " + objectName + " costTime: "

                    + costTime

                    + " [ms]" + "s3 requestId: " + requestId);

            // [Custom Metric] Recording S3 download time

metricsLogger.putDimensions(DimensionSet.of(S3Constant.DIMENSIONS, S3Constant.DIMENSIONS_ENVIRONMENT));

metricsLogger.putMetric(S3Constant.S3_DOWNLOAD_COST_METRIC, costTime, Unit.MILLISECONDS);

metricsLogger.flush();

            if(CollectionUtils.isNullOrEmpty(userMetadata)) {

                return res;

            }

res.put("userMetadata", userMetadata);

            return res;

        } catch (Exception e) {

            throw new RuntimeException("s3 download file error", e);

        } finally {

            try {

                if (bis != null) {

bis.close();

                }

            } catch (IOException e) {

log.log("BufferedInputStream close error " + e.getMessage());

            }

            try {

                if (baos != null) {

baos.close();

                }

            } catch (IOException e) {

log.log("ByteArrayOutputStream close error " + e.getMessage());

            }

        }

    }

    @Tracing // [Tracing] Enabling X-Ray tracing for this method

    public static void initInvoke(String bucket, String objectName) {

        try {

            long downloadStartTime = System.currentTimeMillis();

            GetObjectRequest getObjectRequest = GetObjectRequest.builder()
PowerShell
  • 优化步骤

第一批压测:

一次6万个550K文件上传下载,模拟海外1万辆车接入,测试时间10分钟-15分钟 。测试从通过Lambda 模拟车端接入上传文件到S3(500k),给SQS写事件, 触发启动Lambda从S3下载到Lambda内存结束,统计从Lambda被触发从S3下载完成的时间。

环境配置:

在高并发的车联网数据处理场景中,我们对AWS Lambda配置进行了精心优化,以确保在模拟100台车辆同时上传数据到S3的情况下,系统能够保持高效、稳定的性能。

1. Lambda内存配置(1024MB): 选择1024MB内存是在性能和成本之间的平衡。这个配置为Lambda函数提供了足够的计算资源来处理复杂的数据解析和转换任务,同时避免了过度配置导致的成本浪费。

2. Lambda Graviton处理器: 使用ARM架构的Graviton处理器可以显著提高性能并降低成本。

3. Java 8运行时: 业务代码要求,如果单纯考虑到性能,nodejs的性能会高于java。

https://docs.aws.amazon.com/sdkref/latest/guide/common-runtime.html

4. Java SDK v2: 最新版本的AWS SDK for Java提供了更好的性能和更低的内存占用;

https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html

5. CRT HTTP Client: Common Runtime (CRT) HTTP Client相比默认的HTTP客户端提供了更高的性能和更低的延迟。

https://docs.aws.amazon.com/sdkref/latest/guide/common-runtime.html

6. 400预置并发: 设置400的预置并发可以确保系统能够立即响应突发的高并发请求,避免冷启动延迟。

https://docs.aws.amazon.com/Lambda/latest/dg/configuration-concurrency.html

7. batch size 100: 将100个并发请求集中到一个Lambda函数上可以更有效地利用资源,减少管理开销

https://docs.aws.amazon.com/Lambda/latest/dg/with-SQS.html

8. S3 0-4分区预热: 预热S3的0-4分区可以显著提高写入性能。在车联网应用中,这种优化可以确保即使在大量车辆同时上传数据的高峰期,S3也能保持高效的写入速度,避免成为系统瓶颈。

https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html

测试结果:

最长 平均 最短 P99
2s 28-30ms 17ms左右 178.9ms

S3 指标:5XX错误较多,网络抖动大

第二批测试:

Lambda预制并发增加到900,s3 prefix分区增加到10个分区

测试结果: 数据分布平滑,无特别异常点

最长 平均 最短 P99
1s 26-29ms 16-23ms 65~76ms

5XX错误减少,但依旧发生,网络抖动减少。

第三批测试:

  • Lambda 调整size为2GB,JVM Heap size 设置到800M
  • 将客户端重试去掉用于单次排查延迟的日志原因,加入X-ray 追踪

测试结果:

S3 Download:Lambda下载s3中文件的总延迟(P99)75ms左右

S3 Download:Lambda下载s3中文件的总延迟(最大值)700-800ms左右,单次延迟高需要另外排查问题;

LambdaWholeCostTime:包含s3下载以及消息写入MSK(不包含MSK的ACK)的总延迟P99(95-100ms)

第四批测试:

S3下载第一次下载时间长的问题,日志排查之后是dns解析的延迟问题,通过讲s3 endpoint类型改成interface类型之后解决。如果使用gateway类型的话是通过route53进行解析,改为interface之后是通过绑定的VPC内的ENI来做解析,稳定性更好,最终s3下载的时常稳定在70ms左右,全链路s3下载,Kafka推送保障在100ms以内,满足客户需求,如果追求更极致的性能可以考虑使用express zone等其他存储类型。

4.迭代发展

随着车云协同技术的深化和边缘计算能力的提升,灵活数据采集架构将evolve为一个更加动态、智能且高效的生态系统,实现从车载传感器到云端分析的无缝数据流动,为智能驾驶和车联网应用提供更精准、实时的决策支持。

  • 算力下层到车端,减少云成本,同时提高算法的实时性和准确性;
  • 车云同构的开源压缩算法文件格式,支持云端数据湖方案,可以极大减少云端转换以及解压的算力成本和Kafka的集群成本;
  • 车端存算分离的设计思路,支持流批多场景,满足多种用户需求;

图4: 迭代的边缘计算数采平台

本篇作者

陈小宇

极氪技术专家,精通大数据架构设计、云计算部署及车联网技术,擅长分布式系统、流处理框架及高并发架构的实现。在整车厂的大数据平台搭建和车联网技术栈优化方面有深入实践,致力于打造高性能、可扩展的技术解决方案

吴迪

极氪高级软件开发工程师,专注于全球化车云通信,负责设计并实现大规模、高稳定性的车云通道数据上云链路。在工作中追求卓越,致力于通过严谨的态度和专业的技能确保数据传输的安全与稳定,支持全球范围内的海量数据高效上云。

谢紫玲

亚马逊云科技汽车行业解决方案架构师,专注于汽车行业客户的云端解决方案设计。在云原生数据库领域有深厚造诣,尤其擅长车联网大数据方案的设计与实践。曾成功为多家知名汽车企业提供创新的云架构,助力其数字化转型。

王景辉

亚马逊云科技无服务器解决方案架构师,负责基于亚马逊云科技无服务器计算与应用集成领域的方案咨询与架构设计,同时致力于亚马逊云科技云服务知识体系的传播与普及。