亚马逊AWS官方博客

使用 AWS Step Functions 和 Amazon Athena 实现简易大数据编排

很多公司都在亚马逊云上围绕 Amazon S3 实现了自己的数据湖。数据湖的建设涉及到数据摄入、清洗、转换,以及呈现等多个步骤,还需要对这些步骤进行编排,这对很多人手不足或者初识数据湖的团队形成了挑战。

在本篇文章中,我将介绍一个使用 AWS Step Functions 和 Amazon Athena 的简易大数据编排方案。如果你的团队现在已经有相当部分沉睡数据,想要利用,但是又没有专人或者专门的力量的公司,那么可以参考这个方案,在数天时间内搭建起一套可用的基础版大数据流水线,开始对数据进行一些探索和挖掘。

方案整体都采用无服务器服务,用户无需担心基建费用,完全只为用量付费,实现低成本快速启动。

服务介绍

开始之前,我们简单介绍下方案的两个核心服务。

Amazon Athena 是一个无服务器版的 SQL 大数据查询服务,底层基于 PrestoDB 引擎。用户可以提交 SQL 语句,而这个引擎则根据语句来分布式扫描数据湖中的文件,最后汇总成结果。除了查询之外,Athena 也可以用作简单的 ETL 工具。它按照扫描的文件的大小来收费。

AWS Step Functions 是一个无服务器编排服务。它可以帮助我们设计一个包含多个步骤的流程(有向无环图,Directed Acyclic Graph,简称 DAG),让每个步骤的输出变成下一个步骤的输入,并且支持步骤并发、条件判断以及不同的重试机制等。它和亚马逊云科技的其他服务有着很好的集成,并且也是完全按照步骤执行的次数来收费。

业务介绍

简单介绍一下业务。

假设我们是一家传统的白电公司。虽然我们追随潮流,在我们的很多新电器上搭载了 IoT 功能,并且也收到了很多的 IoT 数据,但这些数据其实并没太好地利用起来。现在,我们希望能做一个数据湖,用最低的成本,快速从这些数据里面挖掘一些价值。

目前最困扰我们的问题是电器品质和维修问题,以冰箱为例,如果商用冰箱出故障,可能会导致食品变质导致食品卫生问题,而如果保存的是药品,则更可能导致严重的问题;而家用冰箱如果出故障,也会严重影响客户体验和对品牌的信任。所以,我们希望能对设备回报的数据进行挖掘,看看冰箱在故障之前,通常出现什么指标异常,不同地区的同款冰箱在指标上是否有区别,以及不同的使用方式是否对冰箱的寿命和维修产生影响。

在这些问题之上,我们可能会形成一套预测性维护的机制,在冰箱出故障之前就做好预判,提前维护保养,避免问题的发生。

整体架构

架构的整体数据流向图上已经展示得很清楚,我们本次重点关注这些服务使用的细节,以及串接这些服务时的一些要点。

数据摄入

本次的数据源格式是 GZip 压缩好的 JSON Lines 文件,每天可能是单个或者数个文件。文件已经存放在某个内网 HTTP 节点,我们需要定期去拉取,并且上传到 S3 桶。

数据格式示范如下。

{"model": "model-1234", "city": "test-city-1", "reading_1": "15.6"}
{"model": "model-4323", "city": "test-city-2", "reading_1": "4.5"}
{"model": "model-3135", "city": "test-city-1", "reading_1": "7.4"}
{"model": "model-4237", "city": "test-city-3", "reading_1": "8.1"}
{"model": "model-9928", "city": "test-city-1", "reading_1": "6.3"}

把文件上传到 S3 桶之后,我们可以直接在 Athena 的查询编辑器中使用如下 SQL 语句创建外部表。

CREATE EXTERNAL TABLE example (
    model STRING,
    city STRING,
    reading_1 STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://{bucket-name}/'

创建成功后,我们就可以立即进行查询。

SELECT * 
FROM example;

这里需要注意的是 Athena 支持的是单个文件压缩,而不是我们常见的 TAR 包压缩。也就是说,每个文件都是通过 gzip filename.json 命令压缩成 filename.json.gz 而不是通过 tar cfz 命令打包并压缩成 .tar.gz,否则 Athena 将无法识别。

当然,通常我们的 IoT 数据都包含大量的字段,这里很可能我们不会用写 SQL 的方式来建表,而是用 Amazon Glue 的爬虫服务进行爬取,自动建表和识别字段类型。爬虫的使用不是本文的重点,如有需要,读者可参考其他关于 Glue 爬虫的文章。

无格式文本文件处理

在 IoT 场景中,有时候我们会遇到特定的原始数据格式。它并不是 JSON 格式,也不是其他认可的形式,而是取决于使用的设备,类似下面这样的格式。

DEV {model=23482, sn='238148234571', reading_1=23.5}
DEV {model=36740, sn='9942716322', reading_1=}

此时,我们可以借用 Athena 的正则表达式匹配编解码器(RegEx SerDe),来把数据读取成字符串,再进行处理。注意:数据仍然需要按行分割。

CREATE EXTERNAL TABLE example_regex (
    model STRING,
    sn STRING,
    reading_1 STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "^DEV\\s*\\{model=(.*?),\\s*sn='(.*?)',\\s*reading_1=(.*?)\\}"
) LOCATION 's3://{bucket-name}/{prefix}';

使用这种方式,所有的字段必须有固定顺序,正则表达式捕捉到的字符,会被按顺序录入到字段中,方便后续处理。

数据预处理

原始数据传输到 S3 桶后,我们需要对它做一些预处理,方便后续正式使用。

字段格式转换

首先,我们需要对字段格式转换。因为原始的 IoT 数据所有字段几乎都是字符串格式,不便于操作,所以我们需要把这些字段格式转换成正确的格式。我们先创建新的目标表。

CREATE EXTERNAL TABLE example_preprocessed (
    model STRING,
    city STRING,
    reading_1 DOUBLE
)
STORED AS PARQUET
LOCATION 's3://{bucket-name}/{prefix}'
TBLPROPERTIES ("parquet.compression"="SNAPPY");

注意,此时我们不仅转换字段格式,存储格式也换成了更便于统计操作的 Parquet,并使用 Snappy 进行了压缩。

对于 Athena 来说,字段转换非常简单,只需使用 SQL 的 CAST、DATE_PARSE 等类型转换函数。比如,我们可以使用如下方式语句把原始数据转换成正确的格式并插入到新的表。

INSERT INTO example_preprocessed 
SELECT model, city, CAST(reading_1 AS DOUBLE) as reading_1 FROM example;

动态字段映射

在 IoT 场景中,我们还可能会遇到动态字段映射问题。

比如,每个设备都会回传 data_01data_02date_03 这样的字段,但是不同设备、不同型号甚至不同版本的设备,所传回来的字段代表的意思可能不同。在 A 设备上 data_01 可能是温度,而在 B 设备上,data_01 则可能是门开关的角度。

这就需要我们有一个表来保存字段的映射关系,并且能动态地对这些数据进行映射。核心思路如下。

  • 保存一份表、全字段、全映射目标字段的映射关系
  • 遍历这个映射关系,并且使用 INSERT INTO 语句,按顺序列出所有源字段和目标字段
  • 添加反向条件,对未在映射关系列表中的设备进行默认映射

这个思路主要是借助了 INSERT INTO 可以同时列出字段和值并按顺序来插入的功能。下面是一段示意代码。

import time
import boto3

# 只打印 SQL

dry_run = False

# 源表和目标表

source_table = 'source_table'
target_table = 'target_table'
db = 'dbname'

# 映射关系表,从数据库中取出后改成如下格式

mapping = {
  # 型号名字为 Key
  'BCD': {
    'field': 'filed',  # 所有字段都必须列出来,即便是完全对应
    'model': 'model',
    'data01': 'temperature',  # 举例映射 data01 > temperature,data01 > door_status
    'data02': 'door_status'
  },
  'ABC': {
    'field': 'field',  # 所有字段都必须列出来,即便是完全对应
    'model': 'model',
    'data01': 'door_status',  # 举例映射 data01 > door_status,data01 > temperature
    'data02': 'temperature'
  },
  # 未被匹配的型号使用默认映射
  'Other': {
    'field': 'field',
    'model': 'model',
    'data01': 'other', # 映射的目标字段必须存在于目标表,如果有目标表字段没有覆盖,就会变成 NULL
    'data02': 'other2' # 映射的目标字段不能重复
  }
}

# 封装 Athena 请求和 SQL 到函数

client = boto3.client('athena')

def insert_with_mapping(model, mapping):
  source_columns = [f'"{k}"' for k in mapping.keys()]
  target_columns = [f'"{v}"' for v in mapping.values()]

  query = f'INSERT INTO {target_table} ({",".join(target_columns)}) SELECT {",".join(source_columns)} FROM {source_table} WHERE '

  if type(model) == list:
    models = [f"'{m}'" for m in model]
    query += f'model NOT IN ({",".join(models)})'
  else:
    query += f"model = '{model}'"

  print(query)

  if (dry_run):
    return

  query_start = client.start_query_execution(
      QueryString = query,
      QueryExecutionContext = {
          'Database': db
      }, 
      ResultConfiguration = { 'OutputLocation': 's3://my-athena-result-bucket'}
  )

  max_execution = 100 # 设置最长执行时间
  state = 'RUNNING'

  while (max_execution > 0 and state in ['RUNNING', 'QUEUED', 'SUCCEEDED','FAILED']):
    max_execution = max_execution - 1
    response = client.get_query_execution(QueryExecutionId = query_start['QueryExecutionId'])

    if 'QueryExecution' in response and \
            'Status' in response['QueryExecution'] and \
            'State' in response['QueryExecution']['Status']:
      state = response['QueryExecution']['Status']['State']
      if state == 'FAILED':
          print(response)
          raise Exception(f'> {model} INSERTION FAILED.')
          break
      elif state == 'SUCCEEDED':
          results = client.get_query_results(QueryExecutionId=query_start['QueryExecutionId'])
          print(f'> {model} INSERTION SUCCEEDED.')
          break

    print('WAITING...')
    time.sleep(1)

# 遍历每个模型,分别插入

mapping_without_other = { k: v for k, v in mapping.items() if k != 'Other' }
mapping_other = mapping['Other']

for model, column_mapping in mapping_without_other.items():
  insert_with_mapping(model, column_mapping)

insert_with_mapping(list(mapping_without_other.keys()), mapping_other)

分段导入

因为 INSERT INTO ... SELECT 语句会有 100 个分区的限制,如果我们按小时分区,一次导入了超过 100 个小时的数据,或者按照模型分区,一次导入超过 100 个模型,就会导入失败。

这时候,我们需要做分段导入。分段导入的方式很直白,就是用 WHERE 语句把数据分拆。比如每次插入 99 小时数据,或者每次插入 99 个模型。

清除已处理数据

最后,我们还需要删除已经预处理的数据,方便下一天导入新的数据继续处理。由于 S3 本身没有提供通配符删除的功能,所以我们只能使用一个脚本列出所有的数据文件,然后统一删除。

数据统计

业务核心的数据统计反而是整个流程中比较简单的部分,因为所有业务逻辑都使用 SQL 语句来表示。本次文章的重点不是业务梳理,所以对具体的 SQL 查询语句不再做展示,读者可根据自己需要来撰写和调用。

流水线编排

在所有流程都明确下来,并且手动执行完毕后,我们就可以开始设计自动化流水线了。

不管是 Step Functions,还是 Apache Airflow,流水线工具基本都基于「有向无环图」(Directed Acyclic Graph,简称 DAG)的理念。有向,指的是流水线中的步骤都明确指向下一个步骤,直至结束;无环,指的是步骤只往一个方向走,不能折返,形成循环。

之所以要避免循环,是因为调度器需要知道步骤的先后顺序(依赖关系)。如果出现了 A → B → A 这样的循环,那么调度器就会发现 A 需要等 B 执行完,但 B 又需要等 A 执行完,就没办法决定先执行哪一个了。反之,如果所有步骤都朝一个方向推进,又没有循环,就能明确先后顺序,并且也可以知道哪些步骤可能是可以并行执行,提升效率。

在 Step Functions 中,流水线被称作「状态机」(State Machine)。每个状态机分为多个步骤,而每个步骤则是一个亚马逊云 API 的调用。上一个步骤的输出,会作为下一个步骤的输入,直到出错或者运行结束。当然,步骤也可以调用其他状态机,从而把多个状态机串联成一个大的工作流。

数据摄入

我们原来是在 Amazon EC2 实例上直接执行命令来下载数据。现在,我们要把这个命令放到状态机里,有两个选择。

  • 使用 Amazon Lambda 的无服务器函数直接执行这个命令
  • 使用一台 EC2 机器来执行这个命令

这里主要需要考虑的是下载的文件大小。宁夏和北京区域的 Lambda 本地临时存储只有 512MB,海外最高可配置至 10GB,所以,如果下载的文件超过这个上限,就可能需要考虑 EFS 等外部存储方案,或者改用 EC2 来执行。

如果用 EC2 实例来执行命令,就没有执行时长和存储空间的问题。不过,我们还需要一个方便的方式可以调用实例上的命令,并且把执行结返回到步骤中。

要远程执行命令,我们可以使用 Amazon System Manager(下简称 SSM)。如果你使用的是 Amazon Linux,则其客户端已经随系统安装,我们只需要为这个实例添加如下策略即可使用。

  • arn:aws-cn:iam::aws:policy/AmazonSSMManagedInstanceCore,这个托管策略允许 SSM 操控该实例,包括执行命令、从浏览器中登录实例等。

因为下载和上传的时间不确定,所以我们这里需要有一个「等待」的过程。这里,我们需要调用 Step Functions 的 API,告诉它任务执行的结果。这个需要我们的 EC2 实例具备如下权限。

  • states:SendTaskSuccess,发送任务成功信号
  • states:SendTaskFailure,发送任务失败信号
  • states:SendTaskHeartbeat,发送任务心跳信号,确认任务还在执行

这里有一个问题,就是 EC2 上的执行者需要知道现在执行的是哪个任务,这样才能在发送信号的时候附带上任务 ID。Step Functions 提供了一个方式传入元数据,就是在参数键值后面添加 .$,然后在参数中使用 $$ 来引用。

从上图可以看出,我们把原来的 TaskToken 改成了 TaskToken.$,然后就可以直接使用 $$.Task.Token 来取出元数据中包含的「任务令牌」(Task Token)。任务执行完成时,我们只需要使用 SendTaskSuccess 并带上这个令牌,Step Functions 就会认为这个任务已经执行完成。

任意一个字符串参数,都可以用这个方式来替换成元数据中的值。借此,我们可以在任意步骤中获得任务名字、状态机原始参数等元数据。

但这里还有一个问题,那就是 SSM 的 sendCommand API 参数只支持数组,不支持字符串。这就意味着我们没办法用 .$ 后缀的方式把元数据直接传入,只能通过一个 Lambda 函数做一下转发。此时,Lambda 函数需要有调用 ssm:sendCommand 的权限。

这里我写了一个示范的 Lambda 函数。

import json
import boto3

def lambda_handler(event, context):
    print(event)
    
    client = boto3.client('ssm')

    instance_id = 'i-xxxxxxxx' # 示意代码,使用硬编码
    response = client.send_command(
        InstanceIds=[instance_id],
        DocumentName='AWS-RunShellScript',
        Parameters={
            'commands': [
                f'aws stepfunctions send-task-success --region cn-northwest-1 --task-token {event["TaskToken"]} --task-output {{}}'
            ] 
        }
    )
    return {
        'statusCode': 200,
        'body': json.dumps(response, default=str)
    }

这个函数会调用 ssm:sendCommand,在指定实例上运行命令。这里作为演示,只会发送成功信号。如需增加命令,直接在 commands 参数下,发送信号之前,增加所需的命令即可。如果要在在生产环境下使用,可能我们还会加入错误处理之类的,或者把所需要的命令直接写成一个完善的脚本。

数据处理

数据处理可能会用到「并发」(Parallel)和「判断」(Choice)两种流步骤。流步骤指的是不直接调用 API,而是做一些流程上的操作。比如「并发」让我们可以并行多个步骤,而「判断」则可以让我们根据上个步骤的不同输出来选择执行不同的步骤。

在数据处理阶段,我们可能会同时执行多个转换,比如可能按日期、城市来把不同的数据提取到不同的表内。在数据计算阶段,我们也可能会同时执行相互之间没有依赖关系的统计运算。这也是利用了 S3 存储高并发、高吞吐的优势。

此外,我们还可以使用条件判断。比如,在收到超过 10 万条记录时,才启动统计操作。再比如,当发现某个城市的故障率飙升时,发出告警等等。

定时触发

还有一个常见的需求是定时触发。如前面业务简介所言,我们可能会需要每天定时触发某个状态机,或者按周期触发,比如每 6 小时执行一次。此时,我们可以借助 Amazon EventBridge 的定时功能。

打开 Amazon EventBridge 服务,并找到「规则 > 创建规则」,「规则类型」选择「计划」。

接下来,我们就可以输入 cron 表达式,或者输入周期了。

cron 表达式需要填写所有下面的字段,比如在「分钟」框输入 1 就代表每个小时的第 1 分钟,而在「一周中的某天」框输入 2 则代表每周二。注意其中「一个月中的某天」和「一周中的某天」是有冲突的,所以二者只能输入一个,然后把另一个用 ? 代替。如果希望每分钟、每小时等都执行,那么就使用 * 代替。

输入成功时,会在下方列出下次执行的时期。注意:目前此处的 cron 表达式仅使用 UTC 时间,所以在使用时需要把时区也算进去。

接下来,我们可以把我们的状态机设置成「目标」。

保存之后,我们就可以在规则详情页面看到接下来 10 次触发时间。

总结

这篇文章中,我们以一个 IoT 场景为例,展示了如何结合 Step Functions 和 Athena 来实现简易的大数据调用。正确使用这些服务,可以让我们在数天之内就形成一个数据湖,让我们可以开始对数据湖中的数据进行探索。

很多传统公司在开拓新业务时往往会产生大量数据,但这些数据的使用需要大量专业开发和运维,这对很多刚成立的大数据团队造成了很大的压力。使用这些托管服务,用户无需再关心底层服务器,而可以把大量时间用在业务梳理和数据的价值挖掘上,大大降低了大数据的入门门槛。

当然,这篇文章主要还是抛砖引玉,有很多点因为篇幅问题未能涉及。比如:

  • 任务出错时的恢复、告警和重试机制
  • 任务的监控和统计
  • 更实时的数据摄入
  • 数据的增量更新
  • 更高效的分区和数据查询方式
  • 数据的安全性和权限控制

这些都是在使用更加深入后必然会遇到的问题。后续我们会有更多文章为大数据初学者介绍如何使用托管和无服务器服务来实现这些机制。

希望这篇文章对读者有所帮助,快速搭建其自己的数据湖。

本篇作者

张玳

AWS 解决方案架构师。十余年企业软件研发、设计和咨询经验,专注企业业务与 AWS 服务的有机结合。译有《软件之道》《精益创业实战》《精益设计》《互联网思维的企业》,著有《体验设计白书》等书籍。