亚马逊AWS官方博客

基于亚马逊云科技数据分析服务,构建可靠数据治理模式

随着车联网技术的快速发展,汽车行业正在经历前所未有的数字化转型。大量来自车载传感器、诊断系统和远程监控设备的数据,为汽车制造商提供了全新的洞察和创新机会。为了充分利用这些宝贵的数据资产,构建一个先进的车联网数据分析平台(Vehicle Data Platform,车联网大数据平台)变得至关重要。基于亚马逊云科技(AWS)的数据分析产品组合,打造了一个横跨汽车产品全生命周期的车联网大数据平台。该平台集成了 Amazon Redshift、Amazon EMR Serverless、AWS Glue、Amazon MSK(托管 Kafka)、Amazon MSF(托管 Flink Serverless)和 Amazon MWAA(托管 Airflow)等核心服务,实现了数据的高效采集、存储、处理和分析。

凭借强大的数据处理和分析能力,车联网大数据平台支持多种创新应用场景,包括:

  1. 动力电池管理:通过实时监控和优化,延长电池寿命,提高能源效率。
  2. 智能车辆诊断:基于机器学习模型,实时检测潜在故障,提供预防性维修建议。
  3. 预测性维修:利用历史数据和使用模式,预测未来故障概率,优化维修计划。
  4. 质量和后市场服务:通过数据驱动的见解,持续改进产品设计,优化售后服务流程。

通过将车联网大数据平台与企业现有的 IT 系统和业务流程无缝集成,汽车制造商可以全面提升产品质量、优化运营效率、增强用户体验,并拥抱数字化转型带来的新商业模式。

在本文中,我们将详细探讨车联网大数据平台平台的架构设计、关键技术以及实施过程中的最佳实践,为读者提供构建自己的车联网数据分析平台的实用指导和灵感。

随着中国汽车企业加速全球化布局,在海外市场拓展过程中面临着诸多挑战和痛点:

  1. 基于 AWS 数据分析服务的场景,主要从数据规模,使用频率,实时处理,安全合规,技术栈等方向上,对于 EMR 和 Redshift 的评估,Spark,Flink 以及 streaming ingestion 的流式链路的压测以及性能评估。
  2. 数据合规仍在重中之重,欧盟的 GDPR《通用数据保护条例》数据主体有权不受仅基于自动化处理(用户画像)的决定约束,如果出于直接营销目的的处理个人数据,则数据主体有权随时反对出于此类营销目的处理与其他的个人数据。随着 UNECE WP.29 CSMS 车辆网络安全法规和 EDPB 车联网个人数据保护指南的相续公布,车企的相关部门包括法务合规、研发和 IT(数据)等团队应及时有效的进行产品合规影响分析(以上引用出自《打造合规守法的数字化底座》AWS &德勤),如 EDPR 列举的位置和生物特征数据等,应进行个人信息的盘点和隐私影响风险,识别中高风险问题并提出改进建议。因此车联网大数据平台的使用应符合车企合规部门的要求,做到最小细粒度的数据的访问和分享,并应具备“一键自动化级联处理”的平台能力,而无需多次修改一份数据的在多个场景下的“副本”。
  3. 车端数据类型众多,主要来自车联网和智能座舱相关数据,包括但不限于诸如发送机转速、发动机状态等动力类数据、如油门踏板开度、瞬时油耗等性能类数据、如加速、制动、驻车等操控类数据。如何简化日常阻碍数据团队的手动流程或脆弱的集成点。
  4. 车联网的数据场景仍持续创新中。从车辆故障识别预警联动维修方案及配置清单打造预见性主动服务能力,还是通过驾驶行为模型判断用户用车场景触发个性化关怀,亦或针对电池的工况分析、续航预测等数据场景目前还出于部分落地持续探索中,并不能像零售/电商行业通过建设客户数据平台具备多个参考实践,因此车联网大数据平台平台下针对数据的分析指标和算法模型目前并没有统一建设标准。

针对以上痛点,就集中式和联邦式治理、车联网数据摄入,Serverless,dbt+Redshift 等多个话题进行了深度探讨和 POC 测试,最终确认了以 Redshift 为核心的轻量化湖仓架构,结合其 Serverless、DataSharing 等产品特性完成一期的项目需求。

总架构图

离线分析架构图

针对挑战 1:

整套平台以 Serverless 架构为核心,减轻了运维负担,提升了开发效率,使数据平台团队能够更加专注在创新上,而不用耗费精力在基础设施管理上,为构建稳定可靠、快速迭代的分析系统提供了支持。Redshift 作为当今时代性价比最优的企业级云上数仓,不仅采用 code generation,vectorized execution 等技术优化其执行引擎,提供卓越的性能表现;同时作为云原生的企业级数仓,Redshift 在稳定性、安全性、弹性扩展、容灾能力等方面也不断创新。Redshift 存算分离架构,数据在 S3 可以显著降低存储成本,并且计算资源不受存储限制,可以灵活弹性扩展。Redshift DataSharing 可以做到跨账号,跨 Region 的数据秒级共享,无需移动数据。在评估了 streaming ingestion/spark/flink 等技术栈之后,最终选择了 Glue spark streaming 的方式作为 ETL 的传输转换。

在此次的案例中,具体解决了以下两大问题:

  • 配置`spark.sql.streaming.streamingQueryListeners`在 Spark Structured Streaming,即使指定了 group.id 的配置,Kafka Source 也不会 commit offset 到 Kafka,而是通过自身的 checkpoint 机制来管理 offset,程序失败后默认会从当前配置的 checkpoint 目录中记录的 offset 消费。由于 offset 不自动提交就会造成基于 offset 的 kafka 的监控工具都不可用,无法监控消费者组的消费的速度。通过自定义的 QueryListener 可以将 offset 提交到 kafka 来实现 offset 的监控。
  • 针对于车联网的这种 IoT 数据类型,转换后的原始数据作为 json 类型会有很多嵌套的子列,Redshift 的 super 数据类型可以表示多层嵌套的 json 数据,而不需要数据扁平化,显著简化了这种半结构化数据处理,极大提高了这种动态列的处理效率。

以下是基于 Glue 读取车端流式数据写入 Redshift 的具体操作步骤:

关于 redshift 集群的创建请参考:

  1. 登陆 redshift 创建目标表和 schema

    新建 schema ads,然后创建表

    CREATE TABLE ads.ads_test_table (
        id character varying(65535) ENCODE lzo,
        vehicle_code character varying(255) ENCODE lzo,
        receive_time bigint ENCODE raw,
        body super ENCODE zstd,
        vehicle_type character varying(255) ENCODE lzo
    ) DISTSTYLE EVEN
    SORTKEY(receive_time, vehicle_code);
    
  1. 创建 redshift 的 secret key
  2. 由于 Redshift,Glue,MSK 都在内网访问,为 secret manager 创建 private endpoint 限制公网流量
  3. 参考 Getting started using Amazon MSK – Amazon Managed Streaming for Apache Kafka 创建 MSK 集群,下载客户端脚本,创建连接,写入 topic
./kafka-topics.sh --create --zookeeper zookeeperConnectionString --replication-factor 2 --partitions 2 --topic vehicle04

注:replication-factor 和 partition 应该是 broker 数目的倍数,client.properties 里的内容取决于 MSK authentication 的方式。

如果是明文传输,参考:

PLAINTEXT:security.protocol=PLAINTEXT

如果是 IAM 交互,参考:

ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

将 broker-list 的值换成自己集群的字符串,pip install pykafka 安装所需要的包之后,执行下列 python 脚本,将 message 写入 topic,如下:

https://github.com/ziling777/vdp/blob/main/msk.py

import pykafka
import json
from pykafka import KafkaClient
from pytz import timezone
from datetime import timezone
from datetime import timedelta
from datetime import datetime

def send_msg(topic,msg):
    client=KafkaClient(hosts='xxxxxx')
    prd = client.topics[topic].get_sync_producer()
    prd._protocol_version=1  #timestamp只有在1版本的kafka数据结构中才引入
    prd.produce(msg.encode('utf-8'),timestamp=datetime.now()+ timedelta(hours=-8))

msg="""
{
    "id":"1489994324783209493261",
    "vehicleCode":"L1dwd67777",
    "receiveTime":11111348320,
    "body":{
        "vehicleType":"081",
        "s1":0,
        "s2":0
    }
}
"""
send_msg('vehicle04',msg)
  1. 创建 Glue 的 spark 的 ETL job,实时将 MSK 的数据接入 Redshift。创建 Glue service role 的配置参考:Step 2: Create an IAM role for AWS Glue – AWS Glue

根据下面的链接下载 jar 包并上传到 S3 的文件夹,拷贝 URL https://dxs9dnjebzm6y.cloudfront.net/tmp/spark3.3-sql-kafka-offset-committer-1.0.jar

创建 network connector,作业会运行在一个私有的 VPC 网络内。connector类型选择 network,以及 vpc,子网和安全组,可以选择 msk 或者 redshift 所在的配置

创建 Glue Job,选择 spark 类型

Spark 脚本内容如下,需要更改脚本中涉及的 secret key 名字,以及各个 S3 桶的 URL,还有 role name:https://github.com/ziling777/vdp/blob/main/glue.py

import sys
import boto3
import json
from botocore.exceptions import ClientError
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkConf
from pyspark.sql import DataFrame, Row
from pyspark.sql import SparkSession
from awsglue import DynamicFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,ArrayType,MapType,LongType
from pyspark.sql.functions import from_json,col,to_json,json_tuple


def get_secret():

    secret_name = "redshift"
    region_name = "us-east-2"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        raise e

    secret = get_secret_value_response['SecretString']
    return secret

secret_dict = json.loads(get_secret())

params = [
    'JOB_NAME',
    'TempDir',
  	'kafka_broker',
    'topic',
  	'consumer_group',
  	'startingOffsets',
  	'checkpoint_interval',
  	'checkpoint_location',
  	'aws_region'
]
args = getResolvedOptions(sys.argv, params)
conf = SparkConf()

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session
job_name = args['JOB_NAME']
kafka_broker = args['kafka_broker']
topic = args['topic']
consumer_group = args['consumer_group']
startingOffsets = args['startingOffsets']
checkpoint_interval = args['checkpoint_interval']
checkpoint_location = args['checkpoint_location']
aws_region = args['aws_region']

# super or flatten
complex_convert = "super"

maxerror = 0
redshift_host = secret_dict['host']
redshift_port = secret_dict['port']
redshift_username = secret_dict['username']
redshift_password = secret_dict['password']
redshift_database = "dev"
redshift_schema = "ads"
redshift_table = "ads_test_table"
redshift_tmpdir = "s3://aws-glue-assets-us-east-2/temp/"
tempformat = "CSV"
redshift_iam_role = "arn:aws:iam::aws-account:role/redshiftetl"

reader = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", topic) \
    .option("maxOffsetsPerTrigger", "1000000") \
    .option("kafka.consumer.commit.groupid", consumer_group) \
    .option("failOnDataLoss", "false")

if startingOffsets == "earliest" or startingOffsets == "latest":
    reader.option("startingOffsets", startingOffsets)
else:
    reader.option("startingTimestamp", startingOffsets)

kafka_data = reader.load()
df = kafka_data.selectExpr("CAST(value AS STRING)")


def process_batch(data_frame, batchId):
    dfc = data_frame.cache()
    logger.info(job_name + " - my_log - process batch id: " + str(batchId) + " record number: " + str(dfc.count()))
    logger.info(job_name + " - my_log - process batch id: " + str(batchId) + " record : " + str(dfc.show(5, truncate=False)))
    if not data_frame.rdd.isEmpty():
        # json_schema = spark.read.json(dfc.rdd.map(lambda p: str(p["value"]))).schema
        # 定义外层schema
        json_schema = StructType([StructField("id", StringType(), True),
                                  StructField("vehicleCode", StringType(), True),
                                  StructField("receiveTime", StringType(), True),
                                  StructField("body", StringType(), True)
                                  ])

        # 使用定义的schema解析,不存在的字段会设置为空
        sdf = dfc.select(from_json(col("value"), json_schema).alias("kdata")).select("kdata.*")
        logger.info(job_name + " - my_log - process batch id: " + str(batchId) + "sdf record : " + str(sdf.show()))
        # 使用json_tuple函数解析header,不存在的字段会设置为null
        sdf_with_header = sdf.select("id", "vehicleCode", "receiveTime","body",
                        json_tuple(col("body"), "vehicleType").alias('vehicleType')
                        )
        logger.info(job_name + " - my_log - process batch id: " + str(batchId) + "dfc record : " + str(sdf_with_header.show()))
        source_view_name = "kafka_source_table_view"
        sdf_with_header.createOrReplaceGlobalTempView(source_view_name)
        sdf = spark.sql(
            "select id,vehicleCode,receiveTime,body,vehicleType  from {view_name} where receivetime!=''".format(
                view_name="global_temp." + source_view_name))

        logger.info(job_name + " - my_log - source sdf schema: " + sdf._jdf.schema().treeString())
        logger.info(job_name + " - my_log - source sdf: " + sdf._jdf.showString(5, 20, False))
        if complex_convert == "super":
            csdf = to_super_df(spark, sdf)
        elif complex_convert == "flatten":
            csdf = flatten_json_df(sdf)
        else:
            csdf = to_super_df(spark,flatten_json_df(sdf))
        logger.info(job_name + " - my_log - convert csdf schema: " + csdf._jdf.schema().treeString())
        logger.info(job_name + " - my_log - convert csdf: " + csdf._jdf.showString(5, 20, False))

        if not csdf.rdd.isEmpty():
            csdf.write \
            .format("io.github.spark_redshift_community.spark.redshift") \
            .option("url", "jdbc:redshift://{0}:{1}/{2}".format(redshift_host, redshift_port, redshift_database)) \
            .option("dbtable", "{0}.{1}".format(redshift_schema,redshift_table)) \
            .option("user", redshift_username) \
            .option("password", redshift_password) \
            .option("tempdir", redshift_tmpdir) \
            .option("tempformat", tempformat) \
            .option("extracopyoptions", "TRUNCATECOLUMNS region '{0}' maxerror {1} dateformat 'auto' timeformat 'auto'".format(aws_region, maxerror)) \
            .option("aws_iam_role",redshift_iam_role).mode("append").save()
    dfc.unpersist()
    logger.info(job_name + " - my_log - finish batch id: " + str(batchId))


def to_super_df(spark: SparkSession, _df: DataFrame) -> DataFrame:
    col_list = []
    for field in _df.schema.fields:

        if field.dataType.typeName() in ["struct", "array", "map"]:
            col_list.append("to_json({col}) as aws_super_{col}".format(col=field.name))
        else:
            col_list.append(field.name)
    view_name = "aws_source_table"
    _df.createOrReplaceGlobalTempView(view_name)
    df_json_str = spark.sql("select {columns} from {view_name}".format(columns=",".join(col_list),view_name="global_temp."+view_name))

    fields = []
    for field in df_json_str.schema.fields:
        if "aws_super_" in field.name:
            sf = StructField(field.name.replace("aws_super_", ""), field.dataType, field.nullable,
                             metadata={"super": True, "redshift_type": "super"})
        else:
            sf = StructField(field.name, field.dataType, field.nullable)
        fields.append(sf)
    schema_with_super_metadata = StructType(fields)
    df_super = spark.createDataFrame(df_json_str.rdd, schema_with_super_metadata)
    return df_super


def flatten_json_df(_df: DataFrame) -> DataFrame:
    flattened_col_list = []
    def get_flattened_cols(df: DataFrame, struct_col: str = None) -> None:
        for col in df.columns:
            if df.schema[col].dataType.typeName() != 'struct':
                if struct_col is None:
                    flattened_col_list.append(f"{col} as {col.replace('.', '_')}")
                else:
                    t = struct_col + "." + col
                    flattened_col_list.append(f"{t} as {t.replace('.', '_')}")
            else:
                chained_col = struct_col + "." + col if struct_col is not None else col
                get_flattened_cols(df.select(col + ".*"), chained_col)

    get_flattened_cols(_df)
    return _df.selectExpr(flattened_col_list)


save_to_redshift = df \
    .writeStream \
    .outputMode("append") \
    .trigger(processingTime="{0} seconds".format(checkpoint_interval)) \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", checkpoint_location) \
    .start()

save_to_redshift.awaitTermination()

Job 的参数配置如下,除了 DPU 的配置之外其他参数可保持默认

选择第一步创建的 network connection

填入 jar 包所在的 S3 URL

Spark Job 的参数如下

(如需复制参数:spark.sql.streaming.streamingQueryListeners=net.heartsavior.spark.KafkaOffsetCommitterListener)

针对挑战 2:

采用了联邦式治理模式而非过去电商/零售行业常采用的集中式治理模式,技术上选择了 redshift 和多个 redshift serverless 集群满足多个不同领域的数据团队协作。为了保证大数据平台上的数据合规性和避免数据冗余:

  1. 为生产测试,研发和运维创建独立的 IAM 账户,账号之间实现完全隔离,每个账户内部再细分 IAM 角色,赋予最小权限策略,降低越权风险。为每个环境部署独立的 VPC,实现网络层隔离,才用 TGW 保证南北东西流量隔离。
  2. 引入 Redshift 数据仓库架构,在大数据平台上构建一个中心化的数据湖,作为唯一的原始数据存储层。所有数据收集和计算后只保存至 redshift 中,作为默认的和唯一的数据保存位置。
  3. dbt 可以为每一步数据转换打上标签和添加文档,自动生成数据血缘信息,提高透明度。dbt 可以生成交互式的数据血缘图,清晰展示整个数据转换链路,极大地提升分析数据的质量和可追溯性。同时,dbt 会保存完整的运行日志,可以追溯每次运行的细节,辅助数据问题诊断。
  4. 通过 Redshift datasharing 精细控制数据访问权限,实现将基础表的行列权限控制对隐私数据进行脱敏,返回给调用方的数据严格按需提供,避免存取全量原始数据,在多个 Redshift 集群之间进行实时的数据共享,实现不同组织和部门之间的数据访问。
  5. 先将数据导入 Redshift 集群,作为主数据存储,在 Redshift 集群上配置 Data Sharing,实时共享数据到 Redshift Serverless,应用服务统一连接 Redshift Serverless,这个方案充分利用了 Redshift 的 Data Sharing 功能,交叉使用集群和 Serverless 的优势保证在集群维护窗口期间业务连续性。

具体操作步骤如下:

由于 Serverless 集群默认是打开数据加密的,所以在配置 datashare 之前需要开启集群加密。

  1. 在原始集群通过 query editor 客户端连接
  2. 在 producer redshift 集群创建 datashare 并加入需要共享的 schema(table,schame 或者 database)
    -- 创建 datashare
    CREATE DATASHARE cust_share2 SET PUBLICACCESSIBLE false;
    -- 添加schema到 datashare
    ALTER DATASHARE cust_share2 ADD SCHEMA public;
    --添加table到datashare
    ALTER DATASHARE cust_share2 ADD TABLE public.customer;
    --后续是否还需要继续添加新的表,视图和UDF到schema通过datashare分享
    ALTER DATASHARE cust_share2 SET INCLUDENEW = TRUE FOR SCHEMA public;
    --添加视图到 datashare
    ALTER DATASHARE cust_share2 ADD table public.myview4;
    -- 查看被分享的对象list
    show datashares;
    select * from SVV_DATASHARE_OBJECTS;
    
  3. 登陆 Consumer Redshift Serverless 集群,执行以下命令并记录用户名和 namespace id
    select current_user, current_namespace; 
  4. 在 Producer 集群对 consumer 集群进行赋权
    Grant USAGE ON DATASHARE cust_share2 to NAMESPACE '385d6f62-cfff-4dcf-xxxxx-e9581fe78';
    GRANT ALTER, SHARE ON DATASHARE cust_share2 TO awsuser;
    
  5. 考虑到 gdpr 的审计要求,如果用户要求自己的行为数据不参与任何数据分析,那我们必须对分享的数据做行列权限管控,保证后续的离线分析不会涉及到客户的隐私数据。目前 datashare 可以直接共享表,视图和物化视图(全量刷新),仅支持将 datashare 下某个 database 级别的所有的对象给到 consumer 的某个用户或者用户组,如果要实现单表或者单视图的精细化权限管控,可以在 consumer 集群执行以下操作:
    --查看datashare
    SHOW DATASHARES;
    --将producer的datashare挂在本地的一个数据库dsconsumer下面
    CREATE DATABASE dsconsumer from DATASHARE cust_share2 OF NAMESPACE 'ProducerNamespaceID';
    --创建一个新的schema名为dsconsumer2
    create schema dsconsumer2;
    --基于datashare中schema下的view创建新的view并挂在新的schema下
    SET search_path TO dsconsumer2;
    create view topcustomer as
    select id,name
    from "dsconsumer"."public"."myview4"
    with no schema binding;
    --创建新的用户和用户组
    CREATE GROUP ro_group;
    CREATE USER demo WITH password '20080808Nm';
    CREATE USER demo2 WITH password '20080808Nm';
    -- 将用户加入用户组
    ALTER GROUP ro_group ADD USER ro_user;
    ALTER GROUP ro_group ADD USER demo;
    ALTER GROUP ro_group ADD USER demo2;
    -- 给用户组赋予权限
    revoke USAGE ON DATABASE dsconsumer TO demo;
    grant USAGE ON DATABASE dsconsumer TO demo2;
    grant select on table topcustomer to demo2;
    

 针对挑战 3:

使用云原生技术能够实现从代码提交到服务部署的整个流程的自动化,通过 Gitlab 自动触发 CodeBuild 进行编译和构建,然后利用 Docker 等容器技术生成镜像,再通过 Kubernetes 等容器编排系统进行自动部署。这种持续集成和持续交付的流水线大大简化了软件交付周期,能够快速地进行迭代更新。Airflow 专注调度管理,Astronomer Cosmos 采集数据,dbt 负责转换计算,边界清晰。

  1. 考虑到安全合规的风险,所有的生产代码存在公司内部的源码管理系统,不会泄露源代码。API Gateway 接收到 webhook 请求,调用 Lambda 函数实现业务逻辑,验证请求合法性,读取 GitLab 发送的代码信息,Lambda 调用 CodeBuild 启动构建任务,将代码拉取到构建环境。
  2. 在 Airflow 部署中安装 Astronomer Cosmos 插件,作为后端数据存储。Astronomer Cosmos 会捕获 DAGs 执行的状态、任务实例日志、任务运行时长、重试次数等 Airflow 元数据。Airflow 中可以定义 dbt 任务,在 dbt 项目中配置对应 Astronomer Cosmos 的 adapter。dbt 通过这个 adapter 可以查询 Astronomer Cosmos,获取 Airflow 元数据。
  3. dbt 模型可以根据业务需求,对这些元数据进行转换加工,例如聚合统计、加入维度等。经过 dbt 模型运算后的数据,可以写入到 Redshift、在 Airflow DAG 中配置该 dbt 任务,设置定期调度,完成从元数据到 warehouse 的流水线。

具体配置参考博客基于 Cosmos,dbt 和 Redshift 快速构建无服务器的现代数据管道 | 亚马逊AWS官方博客

车联网大数据平台(Vehicle Data Platform 车辆数据平台)

  • 车辆数据实时摄入

车端通过 TBOX 将 ECU 信号、日志数据实时传到云端。在车云传输数据的过程中,涉及到复杂的数据协议,为了实时解析车端数据并将其转化为结构化数据,使用流式数据处理平台 AmazonMSK、 Flink。利用 Glue 将这些结构化数据实时入仓到 Redshift,以供后续的查询和分析场景使用,同时实时应用场景如智能诊断可以直接消费这些结构化数据。

  • 实时场景-智能诊断应用

基于 MSK、Flink、Spark 以及实时数仓 Redshift,成功实现了一个准实时的智能诊断应用。首先,梳理信号监控规则,对车端的 ECU 信号进行了实时监控。接着,利用自研的决策引擎进行数据建模,实现了车辆故障的智能诊断。最后,将诊断结果与相应的业务部门或业务系统形成了闭环,实现了全方位的智能诊断应用。

  • 分析场景-UBA(User Behavior Analysis)

数据仓库工程师、数据分析师使用 DBT 进行数据建模,简化了数据建模过程,提高团队的协作效率高,以数据驱动产品决策,提升用户产品体验。

  • Datashare-助力业务发展

电池

  • SOC 监测

监测电池的 SOC(State of Charge),即当前电量和续航里程状况。通过监测电池电量的变化,实时评估续航里程,并分析用户的充电习惯,从而给出更智能的使用建议。

  • SOH 评估

评估电池的 SOH(State of Health),即电池的健康度和容量衰减情况。通过评估充放电性能参数,能够衡量电池的健康状态和剩余寿命,预测电池的未来可用性。同时,还需要结合充电行为的数据,评估频繁快充等情况对电池的影响。在电池状态异常时,会及时发送预警和提示。

由于不同用户的使用情况各异,给出个性化的电池保养和使用策略。通过综合利用电池的 SOC、SOH、充电及使用数据,可以进行全面的电池状态分析,从而让系统更准确地预测电池状况,并主动给用户提供建议,帮助延长电池使用寿命,提升用户的电动出行体验。

VSOC

  • 满足 WP29.R155 的要求,提供实时感知量产车攻击信息和安全威胁的能力,满足要进入这些市场的新车型符合该法规的要求。
  • 满足国内监管机构要求,当收到监管单位漏洞信息后,可以第一时间定位涉及的车辆范围,及时处理反馈问题。
  • 通过 VSOC 平台全面评估对应车型的量产车的安全风险,提升业务对安全设计的理念,更好地闭环车端安全风险。

总的来说, ELT 架构先将原始数据直接加载到大数据平台的数据湖或数据仓库中,避免了数据提取和转换的中间环节,大大提高了数据摄入效率,尤其适用于车联网场景下海量车载数据的高速采集需求。借助云计算的弹性扩展能力,数据湖和数据仓库可根据需求动态调整存储和计算资源,满足车联网数据量快速增长的需求,而无需预先估算和配置固定的 ETL 集群资源。所有原始数据统一存储在数据湖或数据仓库中,消除了数据孤岛,有利于跨领域数据的整合和共享,为车联网大数据分析提供了完整、准确、实时的数据基础。

本篇作者

谢紫玲

亚马逊云科技解决方案架构师,主要负责 Auto 行业客户解决方案设计,擅长云原生数据库以及大数据方案设计和实践。

潘超

亚马逊云科技数据分析解决方案架构师。负责客户大数据解决方案的咨询与架构设计,在开源大数据方面拥有丰富的经验。工作之外喜欢爬山。