亚马逊AWS官方博客

基于 AWS S3 Tables 构建高效数据分析平台:架构设计与实施要点

在当今数字化转型加速的商业环境中,企业在处理日活数据分析时面临着四大关键痛点:

  • 首先,传统的 Iceberg存储层虽然易于集成,但缺乏简单高效的表管理功能;
  • 其次,Kubernetes集群全部使用按需实例的方式导致计算资源成本较高,且缺乏真正有效的弹性扩缩能力,造成资源利用率低下;
  • 第三,通过自行维护基础设施进行日活分析作业调度的方式,给运维增加了复杂度;
  • 最后,缺乏直观的可视化分析工具,使得业务数据难以清晰呈现,严重影响决策效率和准确性。如何在控制成本降低运维复杂度的同时,快速从海量日活数据中提取有价值的业务洞察,已成为决定竞争优势的关键因素。

针对这些挑战,我们设计了一套基于 AWS 的现代化数据分析平台。该方案:

  • 通过 Amazon S3 Tables 桶替代复杂的 Iceberg 格式,提供简单高效的数据操作能力;
  • 采用 Amazon EMR on Amazon EKS 结合 Karpenter、Graviton 和 Spot 实例的组合,显著降低计算成本并实现高效的弹性扩缩;
  • 利用 Amazon EventBridge 与 AWS Lambda 构建自动化调度系统,摆脱基础设施维护负担;
  • 整合 Amazon Athena 和 Amazon QuickSight 提供强大的分析可视化能力,让业务数据直观呈现,加速决策过程。

这套解决方案适合需要处理波动性大的日活数据、希望优化数据存储,追求降低总体成本、探索简化运维流程,同时要求高效数据可视化的企业。可以加速从数据采集到业务洞察的全流程,使企业能够更敏捷地响应市场变化,做出数据驱动的决策,实现数据价值的最大化。

下面我们将重点分享这一架构的详细设计思路、环境搭建过程中的关键注意事项。

一、架构介绍

下图展示了我们基于 AWS 构建的数据分析平台架构。该架构整合了多项 AWS 服务,实现了从数据采集、存储、处理到分析可视化的完整数据流程。下面将详细介绍各个组件及其在整体架构中的作用。

核心组件及优势

  • 数据存储层:Amazon S3 Tables。S3 Tables 为数据存储提供简单高效的解决方案,支持增删改查操作,避免了 Iceberg 等格式的复杂维护成本,同时兼容 Athena、Spark 等常见查询引擎。
  • 计算引擎:Amazon EMR on EKS。EMR Spark 提供 100% 兼容开源 Spark 的环境,同时针对 AWS 的环境进行了性能优化,为数据分析任务提供强大且灵活的计算能力。
  • 资源管理:Karpenter 结合 Graviton + Spot 实例。Karpenter 实现 EKS 计算资源的智能自动扩缩,结合 Graviton 处理器和 Spot 实例的使用,在保证性能的同时显著降低计算成本。
  • 作业调度:EventBridge 结合 Lambda。EventBridge 和 Lambda 的组合实现了完全自动化的作业调度,无需手工操作即可触发 Spark 作业,采用 Serverless 方式降低了运维复杂度。
  • 数据分析与可视化:Athena 和 QuickSight。Athena 提供无服务器 SQL 查询能力,QuickSight 则提供强大的 BI 分析和可视化功能,两者无缝集成,让业务数据可视化并提供直观洞察。

二、端到端运行流程

1. 在日活数据分析场景上应用该方案,需要在 S3 Tables 桶中创建两张表格:

其中:

  • user_activity:用于存储用户活动数据
  • daily_active_users_:用户存储分析完的日活数据结果

2. 客户使用 Amazon Data Firehose,通过 batch Direct PUT 的方式,采用 Glue 资源链接连接到 S3 Tables 桶,并将前一天的用户活动数据注入到 S3 Tables 桶中的 user_activity 表中。

3. Lambda 函数会启动用于日活数据分析的 EMR Spark 任务。此处 EMR 部署方式为在 EKS 之上。

4. 创建 EventBridge 规则,定义为每天定时触发 Lambda 函数进行日活数据分析。示例如下:

5. EMR Spark 任务分析 user_activity 中的数据,并将分析的上一日的日活结果写入到 daily_active_users 表中。EventBridge 触发的 EMR Spark 任务可在 EMR 界面查看。

6. 在 Athena 和 QuickSight 中通过查询 S3 Tables 桶中的 daily_active_users 表格来展示日活数据趋势。一个简单的图表结果示例如下:

三、实施要点与注意事项

1. 创建 S3 Tables 桶和表格

可以通过如下脚本创建 S3 Tables 桶,命名空间以及表格。

  • 创建 S3 Tables 桶
    aws s3tables create-table-bucket -region us-east-1 --name <your-table-bucket-name>
    PowerShell
  • 创建命名空间
    aws s3tables create-namespace  --table-bucket-arn <tyour-able-bucket-arn> --namespace <your-namespace-name>
    PowerShell
  • 依次创建表 user_activity 和 daily_active_users
    aws s3tables create-table --cli-input-json file://user_activity.json
    aws s3tables create-table --cli-input-json file://daily_active_users.json
    
    PowerShell

user_activity.json 内容如下:

{
    "tableBucketARN": "<your-table-bucket-arn>",
    "namespace": "<your-namespace-name>",
    "name": "user_event",
    "format": "ICEBERG",
    "metadata": {
        "iceberg": {
            "schema": {
                "fields": [
                     {"name": "user_id", "type": "int","required": true},
                     {"name": "event_time", "type": "timestamp"},
                     {"name": "event_type", "type": "string"}
                ]
            }
        }
    }
}
PowerShell

daily_active_users.json 内容如下:

{
    "tableBucketARN": "< your-table-bucket-arn>",
    "namespace": "<your-namespace-name>",
    "name": "daily_active_users",
    "format": "ICEBERG",
    "metadata": {
        "iceberg": {
            "schema": {
                "fields": [
                     {"name": "event_date", "type": "date", "required": true},
                     {"name": "dau", "type": "int"},
                     {"name": "ingestion_time", "type": "timestamp"}
                ]
            }
        }
    }
}
PowerShell

2. Amazon Data Firehose 填充数据

在使用 Amazon Data Firehose 将数据填充到 S3 时,需要注意以下几个关键点:

  • 要访问您的表,某些 AWS 分析服务需要一个指向您表命名空间的资源链接。资源链接是一个 Data Catalog(数据目录)对象,它充当指向其他 Data Catalog 资源(如数据库或表)的别名或指针。示例如下:
    aws glue create-database 
    --region us-east-1  \
    --catalog-id "<replace-to-your-account-id>"  \
    --database-input \
    ‘{
        "Name": "resource_link_to_s3tablesdemo",
        "TargetDatabase": {
            "CatalogId":"<replace-to-your-account-id>:s3tablescatalog/ <replace-to-your-table-bucket-name>", 
            "DatabaseName": "<replace-to-your-namespace>"
        },
        "CreateTableDefaultPermissions": []
    }'
    
    PowerShell
  • 创建 Amazon Data Firehose 时,配置示例如下:

其中:

  • Database expression 填写 resourcelink 的名字
  • Table expression 填写要操作的 S3 Table 桶中表格的名字。

另外,可以参考如下脚本将用户活动数据写入到 S3 Table 桶中的 user_activity 表格。

#!/bin/bash
# 配置参数
DELIVERY_STREAM="<your-datafirehose-name>"
BATCH_SIZE=200  # 每批次发送 500 条
NUM_BATCHES=1  # 需要发送的批次数
NUM_RECORDS=$((NUM_BATCHES * BATCH_SIZE))
EVENT_TYPES=("login" "browse" "purchase")
BATCH=()
# 生成并发送数据
for ((i=1; i<=$NUM_BATCHES; i++)); do
    for ((j=1; j<=$BATCH_SIZE; j++)); do
        USER_ID=$((RANDOM % 900000 + 100000))
        EVENT_TIME=$(date -v -$((1))d '+%Y-%m-%dT%H:%M:%S')  
        EVENT_TYPE=${EVENT_TYPES[$RANDOM % ${#EVENT_TYPES[@]}]}
        JSON_DATA="{\"user_id\": $USER_ID, \"event_time\": \"$EVENT_TIME\", \"event_type\": \"$EVENT_TYPE\"}"
        BASE64_DATA=$(echo -n "$JSON_DATA" | base64)
        BATCH+=("{\"Data\":\"$BASE64_DATA\"}")
    done

    JSON_BATCH="[$(IFS=,; echo "${BATCH[*]}")]"
    echo "aws firehose put-record-batch --delivery-stream-name $DELIVERY_STREAM --records [batch of $BATCH_SIZE]"
    aws firehose put-record-batch --delivery-stream-name "$DELIVERY_STREAM" --records "$JSON_BATCH" > /dev/null
    BATCH=()  # 清空批次
    echo "已发送 $i * $BATCH_SIZE 条数据..."
done
echo "数据生成并发送完毕,共 $NUM_RECORDS 条记录。"
PowerShell

3. EMR Spark Job 设置

在配置和运行 EMR Spark 作业时,应关注以下配置:

{
"name": "demojob", 
"virtualClusterId": "5noewg7p3wb509neqivxxxxxxxx", 
"executionRoleArn": "arn:aws:iam::73927xxxxxxx:role/EMRContainers-JobExecutionRole", 
"releaseLabel": "emr-7.5.0-latest",
"jobDriver": {
      ……
 },
"configurationOverrides": {
    "applicationConfiguration": [
    {
      "classification": "spark-defaults",
      "properties": {
"spark.jars.packages":"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160,software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.3",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.s3tablesbucket1": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.s3tablesbucket1.catalog-impl": "software.amazon.s3tables.iceberg.S3TablesCatalog",
"spark.sql.catalog.s3tablesbucket1.warehouse": "arn:aws:s3tables:us-east-1: <replace-to-your-account-id>:bucket/<replace-to-your-bucket-name>"
                     }
   }
   ]
  }
}
PowerShell

其中:

  • jars.packages:定义 Spark 运行时需要的依赖库 Jar 包,用于 Iceberg、AWS SDK 和 S3Tables 插件。
  • sql.extensions:指定 Spark 使用 Iceberg 扩展,以便支持 Iceberg SQL 语法。
  • sql.catalog.s3tablesbucket1:Spark SQL 中创建一个名为 s3tablesbucket1 的 Catalog,支持通过 SQL 访问 S3 Tables 数据。
  • sql.catalog.s3tablesbucket1.catalog-impl:指定 s3tablesbucket1 Catalog 的实现实现类,这里使用 Amazon 提供的 S3TablesCatalog。
  • sql.catalog.s3tablesbucket1.warehouse:指定数据存储路径(S3 Tables 存储桶)。

对用户活动数据分析的代码,示例如下:

import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, countDistinct, current_timestamp
import time
from datetime import datetime, timedelta
if __name__ == "__main__":
    spark = SparkSession.builder\
        .appName("DAU Analysis") \
        .config("spark.driver.memory", "2G") \
        .getOrCreate()
    output_path = None
    if len(sys.argv) > 1:
        output_path = sys.argv[1]
    else:
        print("S3 output location not specified printing top 10 results to output stream")
    # 计算前一天的日期
    yesterday_date = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d')
    # 读取用户行为数据(Iceberg 表)
    user_activity_df = spark.read.format("iceberg").load("<your-s3-catalog-name>.<your-namespace-name>.user_event") \
          .filter(to_date("event_time") == yesterday_date)  # 只选前一天的数据
    # 计算每日活跃用户(DAU)
    dau_df = user_activity_df.withColumn("event_date", to_date("event_time")) \
        .groupBy("event_date") \
        .agg(countDistinct("user_id").alias("dau")) \
        .withColumn("ingestion_time", current_timestamp())
    # 写入 Iceberg 目标表
    dau_df.write.format("iceberg").mode("append").save("<your-s3-catalog-name>.<your-namespace-name>.daily_active_users")
    # 关闭 SparkSession
    spark.stop()
PowerShell

4. 创建 Lambda 函数并配置 EventBridge 定时触发

Lambda 函数示例如下。其中 Job 的配置注意事项请参照上一节 EMR Spark Job 设置。

import json
import boto3
import os

emr_client = boto3.client('emr-containers')
def lambda_handler(event, context):
    job_config = get_job_config()

    # 解析必要参数
    virtual_cluster_id = job_config["virtualClusterId"]  
    job_name = job_config["name"]  
    job_config_params = {
        "name": job_name,
        "virtualClusterId": virtual_cluster_id,
        "executionRoleArn": job_config["executionRoleArn"],  
        "releaseLabel": job_config["releaseLabel"],  
        "jobDriver": job_config["jobDriver"],  
        "configurationOverrides": job_config.get("configurationOverrides", {}) 
    }
    try:
        # 调用 start-job-run
        response = emr_client.start_job_run(**job_config_params)      
        # 获取 Job ID
        job_run_id = response["id"]
        print(f"Job started successfully. Job ID: {job_run_id}")
        return {
            "statusCode": 200,
            "body": json.dumps({"message": "Job started", "job_run_id": job_run_id})
        }

    except Exception as e:
        print(f"Error starting job: {str(e)}")
        return {
            "statusCode": 500,
            "body": json.dumps({"error": str(e)})
        }

def get_job_config():
    return {
        "name": "dau-ana-trigger-by-lambda", 
        "releaseLabel": "emr-7.5.0-latest",
        ……
    }
PowerShell

5. Karpenter node 支持 IMDS

确保 Karpenter 管理的节点正确支持实例元数据服务(IMDS)。在创建 Karpenter EC2NodeClass 时,请将 metadataOptions.httpPutResponseHopLimit 设置为 2。

注意,此 EC2NodeClass 中使用的的 ami 的区域是 us-east-1。请根据你的所在区域使用相应的 ami。

apiVersion: karpenter.k8s.aws/v1
kind: EC2NodeClass
metadata:
  name: default
spec:
  amiFamily: AL2 # Amazon Linux 2
  role: "KarpenterNodeRole- eksdemo " # replace with your cluster name
  subnetSelectorTerms:
  - tags:
    alpha.eksctl.io/cluster-name: "eksdemo" # replace with your cluster name
  securityGroupSelectorTerms:
  - tags:
    alpha.eksctl.io/cluster-name: " eksdemo " # replace with your cluster name
  amiSelectorTerms:
  - id: "ami-07f0a903b02947a1c"
  - id: "ami-0e4591ba595196441"
  metadataOptions:
    httpPutResponseHopLimit: 2
PowerShell

6. EKS 引入 Graviton 和 Spot 到 Spark job

将 Graviton 和 Spot 实例应用于 Spark 作业需要考虑:

将可中断的 Spark Executor pods 放置在 Spot 实例上,不可中断的 Spark Driver pods 放置在按需实例上。因此我们会使用 Karpenter 配置两个 nodepool,分别管理 Spark Driver pods 和 Spark Executor pods 所需的实例。

针对 Spark Driver pods 的示例如下:

apiVersion: karpenter.sh/v1
kind: NodePool
metadata:
  name: nodepool-od
spec:
  template:
    metadata:
      labels:
        karpenternodepool: nodepool-od
spec:
  requirements:
  - key: kubernetes.io/arch
    operator: In
    values: [”arm64"]
  - key: karpenter.sh/capacity-type
    operator: In
    values: ["on-demand"]
  - key: "eks.amazonaws.com/instance-family"
    operator: In
    values: [c6g, c7g, m6g, r6g]
PowerShell

相应的 pod template 如下。通过设置 nodeSelector 来使用带有 label karpenternodepool 为 nodepool-od 的实例。

apiVersion: v1
kind: Pod
spec:
  volumes:
   - name: source-data-volume
     emptyDir: {}
   - name: metrics-files-volume
     emptyDir: {}
  nodeSelector:
    karpenternodepool: nodepool-od
  containers:
   - name: spark-kubernetes-driver
PowerShell

针对 Spark Executor pods 的示例如下:

apiVersion: karpenter.sh/v1
kind: NodePool
metadata:
  name: nodepool-mix
spec:
  template:
    metadata:
      labels:
        karpenternodepool: nodepool-mix
spec:
  requirements:
  - key: kubernetes.io/arch
    operator: In
    values: ["amd64","arm64"]
  - key: karpenter.sh/capacity-type
    operator: In
    values: [“on-demand”,”spot”]
  - key: "eks.amazonaws.com/instance-family"
    operator: In
    values: [c6g, c7g, m6g, c6i, m6i, r6g, r6i]
PowerShell

相应的 pod template 如下。通过设置 nodeSelector 来使用带有 karpenternodepool 标签值为 nodepool-mix 的实例。

apiVersion: v1
kind: Pod
spec:
  volumes:
    - name: source-data-volume
      emptyDir: {}
    - name: metrics-files-volume  
      EmptyDir: {}
  nodeSelector:
      karpenternodepool: nodepool-mix
  containers:
    - name: spark-kubernetes-executor
PowerShell

7. 授予 Lake Formation 对表资源的权限

使用 AWS Lake Formation 管理数据访问权限时,需要在 Lake Formation 导航窗格中,选择数据权限,然后选择授予。

对于 Athena 或 Firehose,选择 IAM 用户和角色。示例如下:

对于 Amazon QuickSight,选择 SAML 用户和组,然后输入 Amazon QuickSight 管理员用户的 ARN:

然后再继续分别授予表和资源链接的权限。

  • 对表格赋予权限时操作如下:
  • 对资源链接赋予权限时操作如下:

四、总结

本文介绍的架构充分利用了 AWS 的优势,构建了一个高效、低成本且易于维护的数据分析平台。通过 S3 Tables 作为数据存储、EMR on EKS 作为计算引擎、Karpenter 管理计算资源、EventBridge 和 Lambda 实现自动化调度,以及 Athena 和 QuickSight 提供分析可视化能力,我们实现了从数据采集到洞察的完整流程。

本篇作者

关志丽

亚马逊云科技解决方案架构师,专注于智能开发,致力于生成式 AI 赋能的开发工具研究和推广,提升软件开发效率与开发者生产力。

汤市建

亚马逊云科技数据分析解决方案架构师,负责客户大数据解决方案的咨询与架构设计。

杨冬冬

亚马逊云科技资深容器解决方案架构师,在云原生领域深耕多年,拥有丰富的行业经验。

丁洁羚

亚马逊云科技弹性计算解决方案架构师,主要负责亚马逊云科技弹性计算相关产品的技术咨询与方案设计。专注于弹性计算相关的产品和方向。