亚马逊AWS官方博客

从智能工厂到车联网:S3 Tables 双模式写入实战指南

引言

在 IoT 数据处理场景中,我们经常面临两种截然不同的数据写入需求:

  • 智能工厂:1000+ 传感器每 5 分钟批量上传数据
  • 车联网:10 万+ 车辆持续发送实时位置和状态

这两种场景带来了不同的技术挑战:

批量写入场景(智能工厂):

  1. 数据量大但频率低,需要高效的批处理能力
  2. 需要保证数据完整性和事务一致性
  3. 要求灵活的 Schema 演化支持设备升级

流式写入场景(车联网):

  1. 海量设备并发写入,峰值 QPS 可达数万
  2. 要求毫秒级延迟和近实时数据可用性
  3. 需要自动化的数据格式转换和分区管理

此外,对于IoT用户而言,还需要构建统一的数据分析平台,实现场景化的业务分析,这就要求:

  1. 两种数据源需要统一的查询接口
  2. 支持时间旅行和历史数据回溯
  3. 兼容 Athena、Spark 等多种分析引擎

如何用统一的存储方案优雅地处理这两种模式?本文将介绍基于 Amazon S3 Tables 的两种数据写入方案,充分利用 S3 Tables 作为 AWS 基于开放表存储服务的优势:

  1. 托管式 Iceberg :自动维护表元数据、快照与统计信息,无需手动运行 Iceberg catalog 或 rewrite manifests
  2. 自动优化:后台持续执行小文件合并、删除文件清理与索引重建,保持高查询性能并降低请求成本
  3. 成本优化:按使用量付费,无需预置资源,显著降低运营成本
  4. 安全保障:集成 AWS IAM,支持细粒度权限控制和数据加密
  5. 开放标准:基于 Apache Iceberg 格式,避免厂商锁定
  6. 企业级特性:天然支持 ACID 事务、Schema 演化和时间旅行
  7. 生态集成:与 Athena、Spark、EMR 等分析服务无缝集成

通过这些特性,实现统一的数据湖存储和查询,简化架构复杂度的同时保证数据一致性。

  • 方案一:Lambda + PyIceberg 批量写入:IoT 设备通过 HTTPS 协议将批量数据发送到 API Gateway,触发 Lambda 函数执行。Lambda 函数使用 PyIceberg 库直接操作 S3 Tables,执行数据验证、转换和写入操作。PyIceberg 提供了完整的 Iceberg 表操作能力,包括 Schema 管理、事务控制和元数据更新。整个流程采用事件驱动架构,按实际调用次数计费,无需预置资源。Lambda 函数可以实现任意复杂的业务逻辑,如数据清洗、格式转换、质量检查等,并通过 Iceberg 的 ACID 事务保证数据一致性。
  • 方案二:Kinesis Firehose 流式写入:IoT 设备通过 MQTT 或 HTTPS 协议连接到 AWS IoT Core,IoT Core 的规则引擎根据配置的规则将消息实时路由到 Kinesis Data Streams。Kinesis Data Streams 提供高吞吐的流式数据传输能力,支持海量设备并发写入。Kinesis Firehose 从 Data Streams 消费数据,自动进行批处理、压缩和格式转换(可选 Lambda 转换),然后以写入 S3 Tables。整个流程完全托管,自动扩展,无需管理服务器。Firehose 的缓冲机制(默认 60 秒或 5MB)在保证近实时性的同时,通过批量写入优化了成本和性能。

方案一:Lambda + PyIceberg 批量写入

核心组件

  • API Gateway:作为 HTTPS 入口,接收 IoT 设备的批量数据请求。提供 REST API 接口,支持请求验证、限流和监控。通过集成 Lambda 函数,将接收到的 JSON 数据以事件形式传递给后端处理逻辑。
  • Lambda 函数:核心数据处理引擎,使用 Python 运行时执行业务逻辑。通过 PyIceberg 库直接操作 Iceberg 表,实现数据验证、清洗、转换和写入。支持完全自定义的处理流程,包括复杂的业务规则、数据质量检查和 Schema 演化管理。Lambda 按调用次数和执行时间计费,无需预置资源。
  • PyIceberg:Python 实现的 Apache Iceberg 客户端库,通过 S3 Tables 提供的 RESTful Iceberg Catalog 接口访问和操作表。PyIceberg 连接到 S3 Tables 的 REST Catalog 端点,获取表的元数据信息,然后直接读写 S3 中的数据文件。这种架构下,S3 Tables 负责管理 Iceberg 元数据(表结构、快照、分区信息等),而 PyIceberg 负责执行数据操作(读取、写入、事务提交)。
  • S3 Tables:全托管的 Apache Iceberg 表,ACID 事务、列式 Parquet、自动合并小文件,秒级 Schema 演进与时间旅行查询,原生集成 Athena/Redshift 等分析引擎。

核心实践

Lambda 函数实现

下面的示例代码,pyiceberg 对接 S3 Tables 的 rest catalog api批量插入数据到 S3 Tables,核心实现包括

  1. 连接到 S3 Tables Catalog
  2. 使用 PyArrow Schema 定义表结构
  3. 支持 Pandas DataFrame 数据插入
import jsonimport pandas as pdimport pyarrow as pafrom pyiceberg.catalog import load_catalog
def lambda_handler(event, context):
    """
    Lambda 函数:批量写入 IoT 数据到 S3 Tables
    
    Event 格式:
    {
        "records": [
            {"id": 1, "sensor_name": "temp_01", "value": 25.5, "timestamp": "2024-01-01T10:00:00Z"},
            {"id": 2, "sensor_name": "temp_02", "value": 26.1, "timestamp": "2024-01-01T10:01:00Z"}
        ]
    }
    """
try:
        # 1. 解析输入数据
        records = event.get('records', [])
        if not records:
            return {
                'statusCode': 400,
                'body': json.dumps({'error': 'No records provided'})
            }
        
        # 2. 连接 S3 Tables Catalog
        catalog = load_catalog("s3tables", **{
            "type": "rest",
            "warehouse": "arn:aws:s3tables:us-east-1:ACCOUNT_ID:bucket/BUCKET_NAME",
            "uri": "https://s3tables.us-east-1.amazonaws.com/iceberg",
            "rest.sigv4-enabled": "true",
            "rest.signing-name": "s3tables",
            "rest.signing-region": "us-east-1",
        })
        
        # 3. 加载表
        table = catalog.load_table("iot_namespace.sensor_data")
        
        # 4. 数据转换
        df = pd.DataFrame(records)
        
        # 确保数据类型正确
        df['id'] = df['id'].astype('int32')
        df['value'] = df['value'].astype('float64')
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        
        # 转换为 PyArrow Table
        arrow_table = pa.Table.from_pandas(df)
        
        # 转换时间戳类型为微秒精度UTC
        if 'timestamp' in arrow_table.column_names:
            timestamp_col = arrow_table.column('timestamp')
            new_timestamp = pa.compute.cast(timestamp_col, pa.timestamp('us', tz='UTC'))
            arrow_table = arrow_table.set_column(
                arrow_table.schema.get_field_index('timestamp'),
                'timestamp',
                new_timestamp
            )
        
        # 5. 写入 S3 Tables
        table.append(arrow_table)
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': f'Successfully wrote {len(records)} records',
                'records_count': len(records)
            })
        }
        
    except Exception as e:
        print(f"Error: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

Lambda 配置

requirements.txt

pyiceberg[s3fs,glue]==0.7.0
pandas==2.1.0
pyarrow==14.0.0

Lambda 设置

  • 内存:512 MB – 1024 MB
  • 超时:60 秒
  • 运行时:Python 3.12
  • IAM 权限
{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "s3tables:GetTable",
          "s3tables:UpdateTableMetadata",
          "s3tables:PutTableData"
        ],
        "Resource": "arn:aws:s3tables:*:*:bucket/*/table/*"
      },
      {
        "Effect": "Allow",
        "Action": [
          "s3:PutObject",
          "s3:GetObject"
        ],
        "Resource": "arn:aws:s3:::your-bucket/*"
      }
    ]
  }

API Gateway 配置

REST API 端点

POST /iot/batch-upload

请求示例

curl -X POST https://api-id.execute-api.us-east-1.amazonaws.com/prod/iot/batch-upload \
  -H "Content-Type: application/json" \
  -H "x-api-key: YOUR_API_KEY" \
  -d '{
    "records": [
      {
        "id": 1,
        "sensor_name": "temperature_01",
        "value": 25.5,
        "timestamp": "2024-01-01T10:00:00Z"
      },
      {
        "id": 2,
        "sensor_name": "humidity_01",
        "value": 60.2,
        "timestamp": "2024-01-01T10:01:00Z"
      }
    ]
  }'

方案部署

**使用 AWS CLI 部署**:

# 1. 创建 IAM 角色
aws iam create-role \
    --role-name s3tables-lambda-role \
    --assume-role-policy-document '{
      "Version": "2012-10-17",
      "Statement": [{
        "Effect": "Allow",
        "Principal": {"Service": "lambda.amazonaws.com"},
        "Action": "sts:AssumeRole"
      }]
    }'

# 2. 附加权限策略
aws iam attach-role-policy \
    --role-name s3tables-lambda-role \
    --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

aws iam put-role-policy \
    --role-name s3tables-lambda-role \
    --policy-name s3tables-access \
    --policy-document '{
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3tables:*"],
          "Resource": "*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:PutObject", "s3:GetObject"],
          "Resource": "arn:aws:s3:::your-bucket/*"
        }
      ]
    }'

# 3. 打包 Lambda 函数
cd lambda
pip install -t package pyiceberg[s3fs] pandas pyarrow
cd package && zip -r ../function.zip . && cd ..
zip -g function.zip lambda_function.py

# 4. 创建 Lambda 函数
aws lambda create-function \
    --function-name s3tables-iot-writer \
    --runtime python3.12 \
    --handler lambda_function.lambda_handler \
    --role arn:aws:iam::ACCOUNT_ID:role/s3tables-lambda-role \
    --zip-file fileb://function.zip \
    --timeout 60 \
    --memory-size 1024

# 5. 创建 API Gateway
API_ID=$(aws apigatewayv2 create-api \
    --name iot-s3tables-api \
    --protocol-type HTTP \
    --target arn:aws:lambda:us-east-1:ACCOUNT_ID:function:s3tables-iot-writer \
    --query 'ApiId' --output text)

# 6. 授权 API Gateway 调用 Lambda
aws lambda add-permission \
    --function-name s3tables-iot-writer \
    --statement-id apigateway-invoke \
    --action lambda:InvokeFunction \
    --principal apigateway.amazonaws.com \
    --source-arn "arn:aws:execute-api:us-east-1:ACCOUNT_ID:${API_ID}/*/*"

echo "API Endpoint: https://${API_ID}.execute-api.us-east-1.amazonaws.com/iot/batch-upload"

方案二:Kinesis 流式写入 S3 Tables (Streaming Write with Kinesis)

核心组件

  • IoT Core:专为 IoT 设备设计的托管服务,负责管理海量设备的 MQTT 长连接(支持 10 万+ 设备并发)。提供设备认证、消息发布/订阅和规则引擎功能。规则引擎可以根据 SQL 语句过滤、转换消息,并将符合条件的数据实时路由到 Kinesis Data Streams,实现设备层与数据处理层的解耦。
  • Kinesis Data Streams (KDS):高吞吐的实时数据流服务,常用作缓冲层接收 IoT Core 等源的数据。通过 Shard 机制水平扩展,每 Shard 提供 1 MB/s 写入和 2 MB/s 读取。支持“按需模式”(自动扩缩)与“预置模式”(手动指定 Shard)。数据默认免费保留 24 小时,开启“延长保留”后可保存 7 天,再启用“长期保留”最多可达 365 天(额外按月/GB 计费),为下游应用提供有序且可重放的数据源。
  • Kinesis Data Firehose:全托管、无服务器的流式 ETL 服务,可从 Kinesis Data Streams 或其他源消费数据,自动投递到 S3 Tables、Redshift、OpenSearch 等目标。内置智能缓冲(默认 1 MB 或 60 秒触发,可配置范围 1–128 MB / 60–900 秒)、批量写入、自动重试与错误日志。支持 SNAPPY/GZIP 压缩、JSON 转 Parquet/ORC 格式,并可调用 Lambda 进行自定义转换,无需管理任何底层基础设施。
  • S3 Tables:全托管的 Apache Iceberg 表,ACID 事务、列式 Parquet、自动合并小文件,秒级 Schema 演进与时间旅行查询,原生集成 Athena/Redshift 等分析引擎。

核心实践

1.创建表存储桶,命名空间和表

在控制台中导航到 Amazon S3。选择”表存储桶”,然后如果尚未启用与 AWS 分析服务的集成,请选择”启用集成”,如下图所示。此集成允许用户在 AWS Glue 数据目录和 AWS Lake Formation 中发现在此 AWS 区域和此账户中创建的所有表,并通过 AWS 服务(如 Firehose、Athena、Amazon Redshift 和 Amazon EMR)访问它们。

输入表存储桶名称

点击表存储桶名,进入存储桶

使用Athena创建表

输入命名空间名称 test_namespace

点击创建命名空间

点击 使用Athena创建表, 如下,在Athena中点击运行创建示例表 daily_sales

2.创建给Data Firehose用的IAM角色

创建一个 IAM 角色,授予 Firehose 权限以对默认 AWS Glue 数据目录中的表执行操作、在通用 S3 存储桶中备份流式传输期间失败的记录,以及与 Kinesis Data Stream 交互。此外,根据您的 Firehose 配置,您可以选择授予 Amazon CloudWatch 日志记录和 AWS Lambda 函数操作的额外权限。
首先IAM服务中首先创建一个策略,以JSON格式输入以下内容,其中ap-southeast-1(新加坡)改成你自己的区域,123456789012改成你自己的12位AWS 账号ID(部分Resource暂时用*,等资源创建完成后可以修改策略内容以缩小权限范围),给策略取名,例如 FirehoseS3TablePolicy

{
     "Version": "2012-10-17",
     "Statement": [
         {
             "Sid": "S3TableAccessViaGlueFederation",
             "Effect": "Allow",
             "Action": [
                 "glue:GetTable",
                 "glue:GetDatabase",
                 "glue:UpdateTable"
             ],
             "Resource": [
                "arn:aws:glue:ap-southeast-1:123456789012:catalog/s3tablescatalog/*",

                "arn:aws:glue:ap-southeast-1:123456789012:catalog/s3tablescatalog",

                "arn:aws:glue:ap-southeast-1:123456789012:catalog",

                "arn:aws:glue:ap-southeast-1:123456789012:database/*",
                "arn:aws:glue:ap-southeast-1:123456789012:table/*/*"
             ]
         },
         {
             "Sid": "S3DeliveryErrorBucketPermission",
             "Effect": "Allow",
             "Action": [
                 "s3:AbortMultipartUpload",
                 "s3:GetBucketLocation",
                 "s3:GetObject",
                 "s3:ListBucket",
                 "s3:ListBucketMultipartUploads",
                 "s3:PutObject"
             ],
             "Resource": "*"

        },

        {

            "Sid": "RequiredWhenUsingKinesisDataStreamsAsSource",

            "Effect": "Allow",

            "Action": [

                "kinesis:DescribeStream",

                "kinesis:GetShardIterator",

                "kinesis:GetRecords",

                "kinesis:ListShards"

            ],

            "Resource": "*"
         },
         {
             "Sid": "RequiredWhenDoingMetadataReadsANDDataAndMetadataWriteViaLakeformation",
             "Effect": "Allow",
             "Action": [
                 "lakeformation:GetDataAccess"
             ],
             "Resource": "*"
         },
         {
             "Sid": "RequiredWhenUsingKMSEncryptionForS3ErrorBucketDelivery",
             "Effect": "Allow",
             "Action": [
                 "kms:Decrypt",
                 "kms:GenerateDataKey"
             ],
             "Resource": [
                 "*"

            ],

            "Condition": {

                "StringEquals": {

                    "kms:ViaService": "s3.ap-southeast-1.amazonaws.com"

                },

                "StringLike": {

                    "kms:EncryptionContext:aws:s3:arn": "*"
                 }
             }
         },
         {
             "Sid": "LoggingInCloudWatch",
             "Effect": "Allow",
             "Action": [
                 "logs:PutLogEvents"
             ],
             "Resource": "*"

        },

        {

            "Sid": "RequiredWhenAttachingLambdaToFirehose",

            "Effect": "Allow",

            "Action": [

                "lambda:InvokeFunction",

                "lambda:GetFunctionConfiguration"

            ],

            "Resource": "*"
         }
     ]
 }

IAM服务中创建角色,可信任实体类型选择AWS服务,使用案例选择Firehose

附加刚才创建的策略,给策略取名,如 FirehoseS3TableRole,请记录这个角色,因为稍后需要使用它来授予 AWS Lake Formation 权限。

3.在Lake Formation中创建Resource link

Firehose要将数据流式传输到S3 table bucket中的表,需要在Lake Formation中创建指向表存储桶中命名空间的Resource link

在Shared database中选择上一步表存储桶中创建的name space

4.配置 AWS Lake Formation 权限

AWS Lake Formation 管理对表资源的访问。Lake Formation 使用自己的权限模型,可对数据目录资源进行细粒度访问控制。为了让 Firehose 将数据摄取到表存储桶中,您必须为向上一步中创建的 Resource link授予权限。

点击选择刚创建好的resource link,,然后选择”Grant”。在”授予权限”页面上,在”主体”下,选择”IAM 用户和角色”,然后选择上一步中创建的 IAM 角色。在此示例中,Firehose 角色名为 FirehoseS3TableRole。在”LF 标签或目录资源”下,选择”命名数据目录资源”。对于”目录”,选择您在集成表存储桶时创建的子目录。对于”数据库”,选择您刚创建的 resource link。对于”Resource Link permissions”,选择”Describe”。

授予Describe权限

点击选择刚创建好的resource link,,然后选择”Grant on target”。在”授予权限”页面上,在”主体”下,选择”IAM 用户和角色”,然后选择上一步中创建的 IAM 角色。在此示例中,Firehose 角色名为 FirehoseS3TableRole。在”LF 标签或目录资源”下,选择”命名数据目录资源”。对于”目录”,选择您在集成表存储桶时创建的子目录。对于”数据库”,选择在S3 table bucket创建的namespace选择您在表存储桶中创建的表,对于”表权限”,选择”Super”,选择”授予”,授予目标表权限。

5.创建Kinesis Data Stream

要创建 Kinesis Data Streams流,请在控制台中打开 Amazon Kinesis → Data Streams, 并点击创建数据流。然后,为您的 数据 流 设定一个名称,如 test-stream,遵循界面中显示的命名约定,如下图所示:

点击 创建数据流 ,等待Data Stream创建成功:

6.创建Firehose 流

要创建 Firehose 流,请在控制台中打开 Firehose 并选择”创建 Firehose “。选择 “Amazon Kinesis Data Streams” 作为源,选择 Apache Iceberg Tables 作为目标。然后,为您的 Firehose 流选择一个名称,遵循界面中显示的命名约定,如下图所示:

要为您的表存储桶配置目标设置,您需要配置 Firehose 应写入的数据库和表名称。如果您希望 Firehose 流仅写入一个表,则可以配置”唯一键配置”部分。要配置此部分,请选择您在步骤 2 中创建的resource link(s3tables_resource_link)作为数据库名称,并选择您在步骤 1 中创建的表(daily_sales)作为表名称。如果 Firehose 无法传输到配置的表,它将传输到 S3ErrorOutputPrefix

指定一个 Amazon S3 通用存储桶来存储无法传输到 S3 表存储桶的记录。

在 IAM 角色下,选择您之前为 Firehose 创建的用于访问 S3 表存储桶的角色 FirehoseS3TableRole,然后选择”创建 Firehose 流”来创建您的 Firehose 流,如下图所示

当流创建完成后,监控 Firehose 传输流状态,直到它变为”活动”状态,如下图所示:

7.使用 Kinesis Data Generator 发送流式数据

Kinesis Data Generator 是一个允许您向 Firehose 发送流式数据的应用程序。首先,为您的账户配置 configure Kinesis Data Generator。然后,将区域设置为与您的 Firehose 匹配,并选择在步骤 6 中创建的 Firehose 流。使用以下与步骤 1 中定义的表架构匹配的模板:

{
"id": {{random.number(99999)}},
"name": "{{random.arrayElement( ["CHECKIN","CHECKOUT","PAYMENT_FULL","PAYMENT_PARTIAL"] )}}",
"value": {{random.number(99999)}}
}

8. 使用 Athena 验证和查询数据

现在,您可以使用 Athena 查询 S3 表。要查询并验证从 Firehose 摄取的数据,可以在 Athena 中运行 SELECT 命令,如下图所示。只要数据持续从 Kinesis Data Generator 流式传输,您应该会看到此表中的行数不断增加,这确认了数据摄取成功。

方案选择

上文介绍了两种将 IoT 数据写入 S3 Tables 的方案,它们针对不同的数据特征和业务场景进行了优化:

方案一:Lambda + PyIceberg 批量写入

适用于定时批量采集场景,如智能工厂传感器每隔几分钟批量上传数据。该方案通过 API Gateway 接收 HTTPS 请求,Lambda 函数使用 PyIceberg 库直接操作 Iceberg 表,提供完全自定义的数据转换能力。优势在于实现灵活、成本可控,适合数据量较小(< 1GB/小时)且对延迟不敏感(5-15分钟可接受)的场景。

方案二:IoT Core + Kinesis Firehose 流式写入

专为高频实时数据流设计,如车联网场景中数万辆车持续发送位置数据。IoT Core 负责管理海量设备的 MQTT 连接,并通过规则引擎将消息路由到 Kinesis Data Streams,再由 Firehose 自动批量写入 S3 Tables。该方案完全托管、自动扩展,延迟低至 60 秒以内,适合大数据量(> 1GB/小时)和高并发场景。

对于同时运营多条业务线的企业(如既有工厂设备又有物流车队),可以采用混合架构。批量数据走方案一,流式数据走方案二,最终写入同一个 S3 Tables,实现统一的数据湖和查询层。这种方式在保证各业务线最优性能的同时,简化了下游分析架构。

选型决策建议

选择方案一的场景:

  • 设备数量在 10000 以下
  • 数据采集频率为分钟或小时级
  • 需要复杂的数据转换和业务逻辑
  • 对成本敏感,希望按实际使用量付费
  • 可接受 5-15 分钟的数据延迟

选择方案二的场景:

  • 设备数量超过 10000,甚至数十万
  • 需要秒级或近实时的数据可用性
  • 数据持续高频写入(> 1GB/小时)
  • 希望使用完全托管服务,降低运维负担
  • 需要自动扩展能力应对流量突发

选择混合方案的场景:

  • 企业有多条业务线,数据特征差异大
  • 既有批量历史数据导入,又有实时数据写入
  • 需要统一的数据湖和查询接口
  • 希望为不同业务选择最优技术方案

详细对比表

下表提供了两种方案在各个维度的详细对比,帮助您做出更精准的选择:

A B C D
1 对比维度 方案一:Lambda + PyIceberg 方案二:IoT Core + Kinesis Firehose 混合方案
2 数据频率 低频(分钟/小时级) 高频(秒级/毫秒级) 混合频率
3 数据量 < 1GB/小时 > 1GB/小时 不限
4 批次大小 100-10000条/批 单条或小批量 按来源区分
5 数据延迟 5-15分钟 < 60秒 按业务区分
6 典型场景 智能工厂、定时采集 车联网、实时监控 多业务线并存
7 设备数量 100-10000 10000+ 不限
8 连接方式 HTTPS REST API MQTT/HTTPS 两者都支持
9 数据模式 批量上传 持续流式 批量+流式
10 实现复杂度 中等 中高
11 运维负担 中等 极低 中等
12 扩展性 手动调整 自动扩展 自动扩展
13 数据转换 完全自定义(Python) Lambda转换 两者都支持
14 Schema演化 灵活 需配置 统一管理
15 固定成本 Kinesis Shard费用 Kinesis Shard费用
16 变动成本 API Gateway + Lambda Firehose数据处理 两者叠加
17 存储成本 S3 Tables统一计费 S3 Tables统一计费 S3 Tables统一计费
18 推荐场景1 定时批量采集 实时数据流 多业务线
19 推荐场景2 成本敏感 海量设备 不同数据源
20 推荐场景3 需要复杂转换逻辑 低延迟要求 统一数据湖
21 不推荐场景1 实时性要求高 低频批量数据 单一数据模式
22 不推荐场景2 持续高频写入 复杂业务逻辑 简单场景

小结

本文介绍了两种将 IoT 数据写入 Amazon S3 Tables 的架构方案,它们针对不同的业务场景提供了最优解决方案:

方案一:Lambda + PyIceberg 提供了灵活可控的批量写入能力,适合智能工厂等定时采集场景。其核心优势在于完全自定义的数据处理逻辑和按需付费的成本模型,让您可以精确控制数据转换过程和事务边界。

方案二:IoT Core + Kinesis Firehose 提供了全托管的流式写入能力,专为车联网等实时场景设计。其核心优势在于零运维负担和自动扩展能力,可以轻松应对海量设备的并发写入,实现近实时的数据处理。

两种方案都基于 S3 Tables (Apache Iceberg) 构建统一的数据湖,提供 ACID 事务、Schema 演化和时间旅行等企业级特性。特别值得关注的是,Amazon S3 Tables 最新推出的压缩成本优化功能,可将压缩成本降低达 90%,进一步提升了 IoT 数据存储的经济性。

您可以根据业务特点单独使用某一方案,也可以采用混合架构,让批量数据和流式数据汇聚到同一数据湖,简化下游分析架构。

选择 Lambda + PyIceberg 追求灵活性和成本优化,选择 Kinesis Firehose 追求实时性和零运维,或者组合使用实现最佳性价比。无论哪种选择,S3 Tables 都为您的 IoT 数据平台提供了坚实的存储基础。

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

孙亮

AWS资深解决方案架构师,硕士毕业于浙江大学计算机系,拥有多年软件行业开发经验。在AWS主要服务于制造、生命科学和医疗健康相关的行业客户,致力于提供有关HPC、容器、仿真设计等各类云计算解决方案的咨询与架构设计。

朱坦

亚马逊云科技解决方案架构师,负责数据与存储架构设计,目前主要关注在人工智能与机器学习、数据分析、高性能计算,如 EDA、自动驾驶、基因分析等领域的存储设计与优化。