亚马逊AWS官方博客

使用 Apache Flink 与 Amazon Kinesis Data Analytics 实现流式 ETL

Original URL:https://amazonaws-china.com/cn/blogs/big-data/streaming-etl-with-apache-flink-and-amazon-kinesis-data-analytics/

 

时至今日,大多数企业都在持续不断地实时生成新数据。用户进行手机游戏时、负载均衡器记录请求时、客户在网站上购物时、以及物联网传感器检测到温度变化时,都会生成相应的数据。我们可以通过时间敏感型事件对这些数据进行快速分析,进而改善客户体验、提高效率并推动创新。而获得这类洞察的具体速度,则通常取决于将数据加载到数据湖、数据存储以及其他分析工具的处理速度。随着数据总量增加与数据生成速度的加快,我们不仅需要加载新传入的数据,同时还要对数据进行近实时转换与分析,而这种处理能力也逐渐成为企业建立自身竞争优势的关键所在。

本文主要探讨如何以Apache Flink为基础构建起复杂的流式提取-转换-加载(ETL)管道。Apache Flink是一套处理流式数据的框架与分布式处理引擎。AWS通过Amazon Kinesis Data Analytics为Apache Flink提供全托管服务,帮助用户快速、轻松及更具成本效益的方式构建并运行复杂的流式应用程序。

本文讨论了使用Apache Flink与Kinesis Data Analytics建立强大且灵活的流式ETL管道所涉及的各项概念,同时涵盖大量针对不同来源与接收端的示例代码。关于更多详细信息,请参阅GitHub repo。此repo中还包含一套AWS CloudFormation模板,帮助大家在几分钟之内启动并探索示流式ETL管道的具体示例。

使用Apache Flink建立流式ETL架构

Apache Flink是一套框架兼分布式处理引擎,用于对各类有限及无限数据流进行有状态计算。它能够支持多种高度定制化连接器,包括各类面向Apache Kafka、Amazon Kinesis Data Streams、Elasticsearch以及Amazon Simple Storage Service(Amazon S3)的连接器。此外,Apache Flink还提供功能强大的API,不仅能够转换、聚合及丰富事件,同时亦支持精准一次语义。因此,Apache Flink完全能够在流式传输架构中充当良好的基础核心。

要部署及运行流式ETL管道,整体架构将高度依赖于Kinesis Data Analytics 服务。Kinesis Data Analytics使我们能够在全托管环境当中运行Flink应用程序。该服务自动供应和管理所需的基础设施,根据不断变化的流量模式对Flink应用程序进行扩展,并自动从基础设施及应用程序故障中进行恢复。大家可以使用Kinesis Data Analytics部署并运行Flink应用程序,将用于处理流式数据的强大Flink API与托管服务的各自优势结合起来,从而构建起健壮的流式ETL管道,同时显著降低基础设施的配置与运营开销。

本文中提到的架构充分发挥Kinesis Data Analytics运行Apache Flink时所能实现的多种功能优势。这套架构具体支持以下几项功能:

  • 专用网络连接 – 通过VPN连接接入用户数据中心内(或者通过VPCpeering连接另一个region)Amazon Virtual Private Cloud(Amazon VPC)中的资源。
  • 多源与接收端 – 从Kinesis数据流、Apache Kafka集群以及Amazon Managed Streaming for Apache Kafka (Amazon MSK)集群中读取及写入数据。
  • 数据分区 – 根据从事件负载中提取到的信息,确定需要注入至Amazon S3中的数据分区个数。
  • 多Elasticsearch索引与自定义文档ID – 从单一输入流扇出至不同Elasticsearch索引,并显式指定文档ID。
  • 精确一次语义 – 在Apache Kafka、Amazon S3以及Amazon Elasticsearch Service(Amazon ES)之间进行数据摄取与传递时避免重复。

下图为这套架构的整体示意图。

在后文中,我们将具体讨论如何使用Apache Flink与Kinesis Data Analytics实现流式ETL架构。这套架构能够将流式数据从一个或多个源处持久存储至不同的目的地,并随时根据需求进行扩展。另外需要强调的是,虽然在实际应用当中往往涉及过滤、扩展以及聚合转换等具体操作,但受篇幅所限,本文不会对这些议题做出探讨。

本文将讲解如何使用Kinesis Data Analytics实现Flink应用程序的构建、部署与操作,但不会过多涉及运营层面的内容。唯一相关的是将编译后的Flink应用程序jar文件上传至Amazon S3,并指定其他配置选项以创建Kinesis Data Analytics应用程序。接下来,大家即可在全托管环境中执行该Kinesis Data Analytics应用程序。关于更多详细信息,请参阅使用Apache Flink与面向Java应用的Amazon Kinesis Data Analytics构建及运行流式应用程序,以及Amazon Kinesis Data Analytics开发者指南

在AWS账户中探索流式ETL管道

在考虑关于具体实现及操作的更多细节之前,大家首先应该对流式ETL管道拥有明确的初步印象。要创建所需资源,请部署以下AWS CloudFormation模板:

这套模板将创建一条Kinesis数据流外加一个Amazon Elastic Compute Cloud(Amazon E2)实例,用于将历史数据集重播至该数据流内。本文使用的数据来自纽约市出租车与豪华轿车委员会提供的公共数据集。数据集中的事件描述了纽约市内的出租车乘坐活动,包括出行开始与结束的时间戳、出行开始及结束的地理行政区划信息以及出行费用等相关信息。接下来,Kinesis Data Analytics应用程序将读取事件并将其以Parquet格式保存至Amazon S3,并按事件时间进行分区。

在之前执行的CloudFormation模板的输出部分,通过ConnectToInstance旁边的链接接入实例。接下来,大家可以使用以下代码将一组出租车行程事件重播至数据流当中:

$ java -jar /tmp/amazon-kinesis-replay-*.jar -noWatermark -objectPrefix artifacts/kinesis-analytics-taxi-consumer/taxi-trips-partitioned.json.lz4/dropoff_year=2018/ -speedup 3600 -streamName <Kinesis stream name>

我们可以从AWS CloudFormation模板的输出部分中配合正确参数执行这项命令。输出部分还会为大家提供指向持久事件所处的S3存储桶,以及一套用于监控管道的Amazon CloudWatch仪表板。

关于启用其他来源与接收端组合(例如Apache Kafka与Elasticsearch)的更多详细信息,请参见GitHub repo

使用Apache Flink构建一条流式ETL管道

现在,我们的管道已经处于运行当中,接下来可以深入研究如何使用Apache Flink与Kinesis Data Analytics实现各项具体功能。

对专用资源进行读取与写入

Kinesis Data Analytics应用程序可以访问公共互联网上的资源以及归属于VPC专用子网内的资源。在默认情况下,Kinesis Data Analytics应用程序仅允许访问公共互联网上的资源。这一设定主要适用于提供公共端点的资源(例如Kinesis data streams或Amazon Elasticsearch Service)。

但如果出于技术或安全方面的考量,大家可能会使用专用VPC资源,这时可以为Kinesis Data Analytics应用程序配置VPC连接。假设大家拥有数套私有MSK集群,无法通过公共互联网进行访问。与只能通过VPN接入,保证不暴露于公共互联网上且仅接受VPC访问的Apache Kafka集群类似,大家也可以为VPC专用的其他资源(例如关系数据库或者基于AWS PrivateLink的端点)设置同样的访问机制。

要启用VPC连接,请配置Kinesis Data Analytics应用程序以接入VPC中的专用子网。Kinesis Data Analytics会根据应用程序的并发性,在VPC配置中为应用程序提供一个或者多个子网,借此建立起弹性网络接口。关于更多详细信息,请参阅配置Kinesis Data Analytics for Java application以访问Amazon VPC内的资源

以下截屏所示为具有VPC连接的Kinesis Data Analytics应用程序配置示例:

以此为基础,应用程序可以从已配置的子网中访问各类联网资源。其中也包括并未直接包含在子网内的资源,大家可以通过VPN连接或者VPC peering等访问这些资源。如果您为各个子网都配置有NAT网关,亦可通过这套配置支持访问公开在公共互联网上的各个端点。关于更多详细信息,请参阅对接入VPC的Kinesis Data Analytics for Java application进行互联网与服务访问

配置Kinesis与Kafka数据源

Apache Flink支持多种数据源,其中包括Kinesis Data Streas与Apache Kafka。关于更多详细信息,请参阅Apache Flink网站上的Streaming Connectors 部分。

要接入Kinesis数据流,请首先配置Region与凭证。作为通行的最佳实践,请选择AUTO作为凭证。而后,应用程序将使用来自各Kinesis Data Analytics应用角色的临时凭证以读取指定数据流中的事件。这样将避免把静态凭证添加至应用程序当中。凭借这样的优势,我们也能够接受由此带来的数据流读取操作间隔增加。在将默认的200毫秒增加至1秒时,延迟虽然会稍有增加,但同时也使多个消费方得以从同一数据流内读取数据。详见以下代码:

Properties properties = new Properties();
properties.setProperty(AWSConfigConstants.AWS_REGION, "<Region name>");
properties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
properties.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

此配置通过流名称与DeserializationSchema被传递至FlinkKinesisConsumer。本文使用TripEventSchema进行反序列化,其用于指定如何将表示Kinesis记录的字节数组反序列化为 TripEvent对象。具体参见以下代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TripEvent> events = env.addSource(
  new FlinkKinesisConsumer<>("<Kinesis stream name>", new TripEventSchema(), properties)
);

关于更多详细信息,请参阅GitHub上的TripEventSchema.java 与 TripEvent.java。Apache Flink还提供其他通用性更强的反序列化器,可以将数据反序列化为字符串或JSON对象。

Apache Flink的读取范围当然不限于Kinesis数据流。只要正确配置Kinesis Data Analytics应用程序的VPC设置,Apache Flink还能够从Apache Kafka与MSK集群内读取事件。这里需要指定以逗号分隔的代理与端口对列表,用于同集群建立初始连接。该配置将通过主题名称与DeserializationSchema被传递给FlinkKafkaConsumer,进而将Apache Kafka集群中的相应主题创建为读取源,具体参见以下代码:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "<comma separated list of broker and port pairs>");

DataStream<TripEvent> events = env.addSource(
  new FlinkKafkaConsumer<>("<topic name>", new TripEventSchema(), properties)
);

生成的DataStream将包含TripEvent对象,该对象为分别摄取自数据流与Kafka主题、且经过反序列化处理的数据。接下来,大家可以将数据流与接收端配合使用,从而将事件持久化至各自对应的目的地。

通过数据分区将数据持久存储在Amazon S3当中

在将数据持续流传输至Amazon S3的过程中,我们可能需要对数据进行分区。大家可以通过数据分区以显著提高分析工具的查询性能,这是因为一部分对当前查询无用的分区将被修剪掉,从而缩小读取的具体范围。例如,正确的分区策略能够减少查询所需读取的数据量,从而优化Amazon Athena的查询性能降低查询成本。大家应该根据应用程序逻辑与查询模式,使用相同的属性对数据进行分区。此外,在处理流式数据时,将事件时间纳入分区策略也是一种常见的操作。这种方式与使用接收时间或者其他服务端时间戳不同,因为后两种情况无法像事件时间那样准确反映事件发生的具体时间点。

关于使用Athena并按获取时间对数据进行分区,以及按事件时间对数据进行重新分区的更多详细信息,请参阅以规模化方式分析Amazon CloudFront访问日志。此外,大家也可以使用事件的实际负载确定分区,这意味着Apache Flink将根据事件时间直接对传入数据进行分区,从而回避额外的后续处理步骤。这种方式被称为data partitioning,且具体分区方式不限于按时间分区。

大家也可以使用Apache Flink的StreamingFileSink 与 BucketAssigner实现数据分区。关于更多详细信息,请参阅Apache Flink网站上的Streaming File Sink部分。

在指定具体事件后,BucketAssigner会以字符串的形式确定相应的分区前缀。具体请参见以下代码:

public class TripEventBucketAssigner implements BucketAssigner<TripEvent, String> {
  public String getBucketId(TripEvent event, Context context) {
    return String.format("pickup_location=%03d/year=%04d/month=%02d",
        event.getPickupLocationId(),
        event.getPickupDatetime().getYear(),
        event.getPickupDatetime().getMonthOfYear()
    );
  }

  ...
}

接收端会将S3存储桶的参数作为目标路径,并使用一个函数将TripEvent Java对象转换为字符串。详见以下代码:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forRowFormat(
    new Path("s3://<Bucket name>"),
    (Encoder<TripEvent>) (element, outputStream) -> {
      PrintStream out = new PrintStream(outputStream);
      out.println(TripEventSchema.toJson(element));
    }
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .withRollingPolicy(DefaultRollingPolicy.create().build())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

大家可以使用滚动策略(rolling policy)进一步定制写入Amazon S3的各对象大小以及对象创建频率。通过配置策略,我们能够将更多事件聚合至更少的对象当中,但代价就是增加延迟——反之亦然。这种作法能够避免Amazon S3上包含过多小对象,进而导致延迟提升。事实上,存在大量对象有可能给从Amazon S3中读取数据的消费方产生严重的负面查询性能影响。关于更多详细信息,请参阅Apache Flink网站上的DefaultRollingPolicy部分。

每一项滚动策略中进入S3存储桶的输出文件的具体数量,还取决于StreamingFileSink的并行度以及我们如何在Flink应用程序各算子之间分配事件。结合之前的示例,Flink的内部DataStream会通过从keyBy算子中提取位置接客位置ID来进行分区。而该位置ID还将在BucketAssigner当中作为待写入Amazon S3的对象前缀的一部分。因此,同一节点将聚合并保留具有相同前缀的所有事件,意味着Amazon S3上的对象体积将保持在较大的水平。

在使用 StreamingFileSink进行Amazon S3写入时,Apache Flink会在后台使用分段上传方法。一旦发生故障,Apache Flink可能无法清除不完整的分段上传数据。为了避免产生不必要的存储成本,请在S3存储桶上配置适当的生命周期规则以自动清除不完整的分段上传残留。关于更多详细信息,请参阅Apache Flink网站上的S3重要注意事项以及示例8:通过生命周期配置丢弃异常分段上传数据

将输出转换为Apache Parquet格式

除了在将数据交付至Amazon S3之前进行数据分区之外,大家可能还希望使用列式存储格式以实现数据压缩。Apache Parquet是一种流行的列格式,并在AWS生态系统中拥有良好的支持。其不仅能够减少存储空间占用,同时也可以显著提高查询性能并降低运营成本。

通过内置的BulkWriter factory,StreamingFileSink可以支持Apache Parquet以及其他批量编码格式。具体请参见以下代码:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forBulkFormat(
    new Path("s3://<bucket name>"),
    ParquetAvroWriters.forSpecificRecord(TripEvent.class)
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

关于更多详细信息,请参阅Apache Flink网站上的Bulk-encoded Formats部分。

在进行Parquet转换时,持久事件的运作方式有点差异。在启用Parquet转换之后,大家只能使用OnCheckpointRollingPolicy来配置StreamingFileSink,其仅在触发检查点时才会将已完成的分段文件提交至Amazon S3。我们需要在Kinesis Data Analytics应用程序中启用Apache Flink检查点,以实现面向Amazon S3的持久数据保存。但在实际触发检查点之前整个流程对消费方并不可见,因此实际交付延迟将取决于应用程序执行检查点的频率。

此外,之前我们只需要为数据生成字符串表示形式,即可将其写入Amazon S3。但现在,ParquetAvroWriters希望事件尽量使用Apache Avro schema。关于更多详细信息,请参见GitHub repo。该repo中还附带可以直接使用与扩展的示例schema。

一般来讲,如果大家希望想提高查询持久化数据的效率,最好是将数据转换为Parquet。尽管转换过程会带来额外的负担,但与直接存储原始数据相比,转换的收益已经远远大于这些额外的复杂性。

扇出至多个Elasticsearch索引与自定义文档ID

Amazon ES是一项全托管服务,可方便用户部署、保护及运行Elasticsearch集群。目前一种主流用例,是将应用程序与网络日志数据流式传输至Amazon S3。这些日志即Elasticsearch术语中的“文档”,大家可以为每个事件创建一份文档,并将其存储在Elasticsearch索引当中。

Apache Flink提供灵活且可扩展的Elasticsearch接收端,大家可以根据每个事件的实际负载为其指定索引。当流中包含多种不同事件类型,且我们需要将各个文档存储在不同的Elasticsearch索引中时,这项功能将发挥巨大的作用。利用该功能,我们可以通过单一接收端与应用程序面向多个索引执行写入。对于较新的Elasticsearch版本,单一索引无法包含多个类型。具体参见以下代码:

SinkFunction<TripEvent> sink = AmazonElasticsearchSink.buildElasticsearchSink(
  "<Elasticsearch endpoint>",
  "<AWS region>",
  new ElasticsearchSinkFunction<TripEvent>() {
   public IndexRequest createIndexRequest(TripEvent element) {
    String type = element.getType().toString();
    String tripId = Long.toString(element.getTripId());

    return Requests.indexRequest()
      .index(type)
      .type(type)
      .id(tripId)
      .source(TripEventSchema.toJson(element), XContentType.JSON);
   }
);

events.addSink(sink);

大家也可以在将文档发送至Elasticsearch时,为其显式指定文档ID。如果具有相同ID的事件被多次提取至Elasticsearch当中,则原事件将被直接覆盖(而非创建重复项)。这意味着面向Elasticsearch的写入具有幂等性质。如此一来,即使我们的数据流只提供至少一次语义,也可以在整体架构上实现精准一次语义。

以上使用的AmazonElasticsearchSink属于Apache Flink随附的Elasticsearch接收端的一项扩展。该接收端增加了对使用IAM凭证的签名请求的支持,因此大家可以使用该服务提供基于IAM的强大身份验证与授权机制。在这种情况下,接收端能够从应用程序运行所处的Kinesis Data Analytics环境中获取临时凭证。该扩展使用Signature Version 4方法将身份验证信息添加至被发送到Elasticsearch端点的请求当中。

使用精确一次语义

通过将幂等接收端与至少一次语义结合起来,我们即可获得精确一次语义——但在某些情况下,这种方法可能并不可行。例如,如果大家希望将数据从某一Apache Kafka集群复制到另一集群,或者将事务CDC数据从Apache Kafka持久存储至Amazon S3,则可能不希望目标中存在重复项。但由于这两个接收端皆不幂等,因此重复项有可能存在。

Apache Flink原生支持精准一次语义。Kinesis Data Analytics能够为检查点隐式启用精准一次模式。要实现端到端精准一次语义,大家需要在Kinesis Data Analytics应用程序中启用检查点机制,并选择一个支持精准一次语义的连接器,例如StreamingFileSink。关于更多详细信息,请参阅Apache Flink网站上的数据源与接收端容错性保证部分。

但使用精确一次语义也有部分副作用。例如,这可能会导致端到端延迟增加。首次,我们只能在触发检查点时提交输出。由此带来的延迟增加量与启用Parquet转换时相同,默认检查点间隔为1分钟。虽然我们可以减少这一时间间隔,但无论如何也很难将延迟控制在亚秒级水平。

另外,端到端精确一次语义中还包含不少小的细节。尽管Flink应用程序可以只对数据流进行精确一次读取,但数据流本身可能已经包含部分重复项,因此在整体应用程序的角度看其只相当于执行了至少一次语义。在将Apache Kafka分别作为源与接收端时,也将对应不同的注意事项。关于更多详细信息,请参阅Apache Flink网站上的Caveats部分。

在严格依赖精确一次语义之前,请保证您明确理解整体应用程序堆栈的所有细节情况。一般来说,如果您的应用程序可以使用至少一次语义,则最好不要使用其他限定更严格的语义。

使用多个源与接收端

一个Flink应用程序可以从多个源处读取数据,并将数据持久保存至多个目标处。这是一种非常有趣的能力,因为:首先,我们可以将数据或者该数据中的不同子集持久保存至不同的目的地。例如,我们可以使用同一应用程序将本地Apache Kafka集群中的所有事件复制到MSK集群。同时,我们还可以将特定的重要事件传递至Elasticsearch集群。

其次,大家也可以使用多个接收端以提高应用程序的健壮性。例如,包含过滤器与流数据充实机制的应用程序完全可以直接归档原始数据流。这样,即使更为复杂的应用程序逻辑引发错误,Amazon S3中也仍将保留原始数据,供大家使用这些数据回填至接收端。

但其中也需要做了权衡与取舍。当我们在单一应用程序中捆绑过多功能时,故障的影响半径也将随之增大。如果应用程序中的单一组件遭遇故障,则整体应用程序都将崩溃,要求我们从最后一个检查点进行恢复。这不仅会带来停机时间,同时也增加了由应用程序指向所有交付目的地的交付延迟。另外,大型单体式应用程序通常更难以维护与修改。因此,我们应该在添加功能与创建其他独立Kinesis Data Analytics应用程序之间求取平衡。

运营方面

当大家在生产环境中运行这套架构时,必然会开始连续且无限期地执行单一Flink应用程序。这就要求实施监控与适当的警报措施,以保证管道按预期方式运行且处理速度能够跟得上输入数据的增长节奏。在理想情况下,管道应该能够适应不断变化的吞吐量要求,如果无法将数据从源传递到目标处,则应及时发出通知。

从运营的角度来看,其中某些方面特别值得注意。下面,我们将就此提供一些想法与参考意见,讨论如何提高流式ETL管道的健壮性。

数据源监控与扩展

数据流与MSK集群,可以说是整套架构的重要入口点。它们将数据生产方与架构中的其余部分解耦开来。为了避免数据生产方遭到影响,我们需要适当扩展架构的输入流,以确保其能够随时接收消息。

Kinesis Data Streams使用基于分片的吞吐量供应模型。各个分片都将提供一定的读取与写入容量。从置备的分片数量中,我们可以结合事件摄取与发布/每秒数据量得出当前流的最大吞吐量。关于更多详细信息,请参阅 Kinesis Data Streams Quotas

Kinesis Data Streams通过CloudWatch公开发布各项衡量指标,用于报告上述特征并指出当前数据流是否存在过度配置问题。大家也可以使用IncomingBytes 与 IncomingRecords指标对数据流进行主动规模伸缩,或者使用WriteProvisionedThroughputExceeded指标实现被动规模伸缩。当然,数据出口部分也存在类似的指标,同样值得大家加以监控。关于更多详细信息,请参阅 使用Amazon CloudWatch监控Amazon Kinesis Data Streams

下图所示,为示例架构中关于数据流的部分指标。平均而言,Kinesis数据流每分钟接收280万个事件与1.1 GB数据。

大家甚至可以实现Kinesis数据流的自动伸缩功能。关于更多详细信息,请参阅使用UpdateShardCount对您的Amazon Kinesis Stream容量进行规模伸缩

Apache Kafka与Amazon MSK使用一套基于节点的置备模型。Amazon MSK还通过CloudWatch公开发布各项指标,包括指示当前集群能够接纳多少数据与事件的指标。关于更多详细信息,请参阅 可配合CloudWatch使用的Amazon MSK监控指标

此外,大家还可以使用Prometheus对MSK集群进行开放式监控。我们往往很难了解集群的总体容量,而且经常需要通过基准测试才能明确何时应该进行扩展。关于重要监控指标的更多详细信息,请参阅Confluent网站上的Monitoring Kafka部分。

监控及扩展Kinesis Data Analytics应用程序

Flink应用程序是这套架构的核心所在。Kinesis Data Analytics负责在托管环境中执行Flink应用程序,而我们需要保证其能够持续从数据源处读取数据、并持久将数据保存在接收端内(不致出现滞后或卡死)。

当应用程序发生处理滞后时,通常代表其未能进行适当的规模扩展。跟踪应用程序运行状况的两项重要指标,分别是 millisBehindLastest(当应用程序从Kinesis数据流中读取时)以及records-lag-max(当应用程序从Amazon Kafka及Amazon MSK中读取时)。这些指标不仅指示着从数据源处读取到的数据,同时也将体现数据读取速度是否够快。如果这些指标的值持续增长,则应用程序则持续出现滞后,并表明我们可能需要扩展Kinesis Data Analytics应用程序。关于更多详细信息,请参阅Kinesis Data Streams连接器指标 与 应用程序指标

下图所示,为本文中示例应用程序的相关指标。在检查点期间,millisBehindLatest指标的最高值有时会飙升至7秒。但由于报告的指标平均值低于1秒,且应用程序能够立即跟上数据流峰值,因此架构的整体资源基本不需要调整。

应用程序延迟当然是需要监控的核心指标之一,但Apache Flink与Kinesis Data Analytics还提供其他多项相关指标。关于更多详细信息,请参阅Apache Flink上的 Apache Flink应用程序监控基础知识部分。

监控接收端

要验证接收端是否正在接收数据,并保证不同类型的接收端不致耗尽存储资源,我们需要密切对其进行监控。

大家还可以为自己的S3存储桶启用更多详细指标,例如以1分钟间隔跟踪指向存储桶的请求以及上传的数据量。关于更多详细信息,请参阅使用Amazon CloudWatch监控指标。下图所示,为示例架构中S3存储桶的相关指标:

当架构将数据持久化至Kinesis数据流或者Kafka主题当中时,意味着其开始充当生产方,因此也应该采取与数据源相同的监控与扩展实践。关于在生产环境中运营及监控各服务的更多详细信息,请参阅Amazon Elasticsearch Service最佳实践

处理故障

“故障是必然的,随着时间的推移,一切终将出现故障。”因此,我们需要为应用程序的潜在故障做好充分准备。例如,Kinesis Data Analytics管理的基础设施底层节点有可能发生故障,网络也可能因间歇性超时而导致应用程序无法从数据源中及时读取数据/向接收端写入数据。一旦发生这种情况,Kinesis Data Analytics会重新启动应用程序并从最新检查点处恢复。由于原始事件已经被保存在数据流或者Kafka主题当中,因此应用程序可以重新读取最后一个检查点与恢复完成之间已保留的事件,并继续执行标准处理操作。

此类故障虽然比较少见,应用程序也可以在不牺牲处理语义(包括精确一次语义)的情况下正常恢复,但仍有其他一些需要额外关注及解决的故障模式。

当应用程序代码中的任意位置(例如包含事件分析逻辑的组件)发生异常时,整体应用程序都将陷入崩溃。虽然应用程序终将恢复,但如果异常源自代码在处理始终发生的特定事件时的错误,则会导致故障进入无限循环。换言之,在从故障中恢复过来后,应用程序会重新读取之前未得到成功处理的事件,并再次触发崩溃。过程将无限次数、无限期重复,并最终导致应用程序无法进入正常运行状态。

为此,我们必须捕捉并处理应用程序代码中的异常,以避免其再次发生崩溃。如果存在某些无法通过编程方式解决的持久性问题,则可使用旁侧输出将存在问题的原始事件重新定向至辅助数据流,进而将其持久保存在死信队列或者S3存储桶内以待后续调查。关于更多详细信息,请参阅Apache Flink网站上的旁侧输出部分。

当应用程序卡死且无法进一步恢复时,我们至少能够在应用程序延迟指标中看到相应迹象。如果您的流式ETL管道曾对该事件进行过滤或充实,则故障会变得更加复杂,我们也许得在故障发生的很长时间之后才能准确识别出问题根源。例如,由于应用程序中的错误,我们可能会意外丢弃某些重要事件或者以意外方式破坏其负载。Kinesis数据流最多可以存储最近7天内的事件,而Apache Kafka在技术层面讲可以无限期存储所有事件(但一般不会这么「奢侈」)。如果无法快速发现问题,那么原始事件的保留周期到期后相关信息有可能会永远丢失。

为了防止出现这种情况,我们可以将原始事件持久存储至Amazon S3,而后再进行其他转换或处理。我们可以长期保留原始事件,并在必要时重新处理或重播至数据流内。要将这项功能集成至应用程序中,大家需要添加一个新的、仅向Amazon S3写入的辅助接收器。或者,我们也可以使用独立的应用程序,由其单纯负责从流中读取及保留原始事件,但运行额外的应用程序当然会带来对应的运营负担与成本开销。

如何做出正确选择

AWS提供多种可配合流式数据,并能够执行流式ETL操作的服务。其中 Amazon Kinesis Data Firehose能够将数据流摄取、处理并持久保存至一系列的支持目的地。Kinesis Data Firehose与本文中提到的解决方案具有非常明显的功能交集,但具体选择要视实际情况而定。

根据我们的经验,只要与实际需求不冲突,则最好选择使用Kinesis Data Firehose。该服务拥有良好的简单性与易用性。在使用Kinesis Data Firehose时,大家只需要执行服务配置,其他工作全部可以自动完成。大家可以使用Kinesis Data Firehose实现ETL的流式传输,无需任何代码、无需服务器也无需承担日常运营管理。此外,Kinesis Data Firehose还提供多项内置功能,且只要求用户根据实际处理与交付的数据量付费。如果不将数据提取至Kinesis Data Firehose当中,该服务甚至不会产生任何费用。

与之不同,本文提出的解决方案要求大家创建、构建并部署Flink应用程序。另外,大家还需要通过严密的监控建立起一套健壮的架构,保证其不仅能够承受基础设施故障、同时也能够抵抗应用程序层面的故障与错误。虽然更为复杂,但这套用例也给大家探索更多高级功能提供了空间。关于更多详细信息,请参阅使用Apache Flink与Amazon Kinesis Data Analytics for Java Applications构建并运行流式应用程序 与 Amazon Kinesis Data Analytics 开发者指南

后续讨论方向

本文讨论了如何使用Apache Flink与Kinesis Data Analytics构建流式ETL管道。其中着重强调了如何构建可扩展解决方案,在解决流式摄取中部分高级用例的同时,保持较低的运营开销。这套解决方案将帮助大家快速实现流式数据的丰富与转换,并将其加载至数据湖、数据存储或者其他分析工具当中,且无需执行额外的ETL操作步骤。本文还探讨了如何通过监控与故障处理对应用程序加以扩展。

现在,大家应该对如何在AWS上构建流式ETL管道拥有了较为明确的理解。您可以通过流式ETL管道使用时间敏感型事件,帮助消费方快速访问有价值信息;也可以根据实际用例定制信息的格式与样式,且无需像传统的批处理ETL流程那样带来额外的延迟。
 

本篇作者

Steffen Hausmann

Steffen Hausmann 是 AWS 的专业解决方案架构师