亚马逊AWS官方博客

使用 Apache Flink 和 Amazon Kinesis Data Analytics for Java 应用程序构建和运行流应用程序

Original URL:https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/

流处理有助于实时数据的收集、处理和分析,并能够持续生成见解和快速响应新出现的情况。当派生见解的值随时间减少时,此功能非常有用。因此,您对检测到的情况反应越快,反应就越有价值。例如,考虑一个可以在欺诈性信用卡交易发生时对其进行分析和阻止的流应用程序。将该应用程序与传统的面向批处理的方法相比较,该方法在每个工作日结束时识别欺诈性交易,并生成一份供您在次日早上读取的全面报告。

见解的价值会随时间推移逐渐减少,这是很常见的现象。因此,使用流处理可以大幅提高分析应用程序的价值。但是,构建和运行持续接收和处理数据的流应用程序比运行传统的面向批处理的分析应用程序更具挑战性。

在本文中,我们将讨论如何使用 Apache FlinkAmazon Kinesis Data Analytics for Java Applications 来应对这些挑战。我们将探索如何基于托管服务构建可靠、可扩展且高度可用的流式架构,与自我管理环境相比,这些架构可显著降低运营开销。我们会特别关注如何使用 Kinesis Data Analytics for Java Applications 准备和运行Flink 应用程序。为此,我们使用包含源代码和 AWS CloudFormation 模板的示例性场景。您可以使用自己的 AWS账户来跟随此示例,也可以根据您的具体需要调整代码。

运行流应用程序的挑战

当您构建流应用程序时,下游系统(流数据消费系统)会自然地依赖于连续和及时的输出来生成。因此,对流应用程序的可用性要求更高。与传统的基于批处理的方法相比,解决操作问题的时间也少得多。在批处理方案中,如果工作日结束时运行一次的作业失败,通常可以重新启动失败的作业,并在需要结果的次日早上仍然完成计算即可。相反,当流应用程序发生故障时,消耗输出的下游系统可能会在几分钟甚至更短时间内受到影响,预期的输出不再及时生成。

此外,如果发生故障,您不能只是删除所有中间结果和重新启动失败的处理作业,就像是通常在批处理案例中完成一样。下游系统持续消耗流作业的输出。已经消耗的输出无法轻易收回被重新消费。因此,整个处理管道对由失败时重新启动的应用程序引入的重复项更加敏感。此外,流应用程序的计算通常依赖于某种内部状态,当应用程序失败时,该内部状态也可能损坏甚至丢失。

更重要的是,流应用处理程序通常会处理不同数量的吞吐量。因此,根据当前负载来扩展应用程序相当有必要。当负载增加时,支持流应用程序的基础架构必须进行扩容,以防止应用程序过载、落后并产生不再相关的结果。另一方面,当负载减少时,基础设施应该进行缩容,不预置超过需求量的资源,以此保持成本效益。

基于 Flink 和 Kinesis Data Analytics for Java Applications 的可靠且可扩展的流式架构

Apache Flink 是一个针对无界和有界数据集的状态计算定制的开源项目。Flink 通过支持不同的API(包括 Java 和SQL)、丰富的时间语义和状态管理功能来应对分析流数据时许多常见的挑战。它还可以在保持精确一次处理语义的同时从故障中恢复。因此,Flink 非常适合分析低延迟流数据。

在本文中,我们将介绍如何使用 Kinesis Data Analytics for Java Applications 部署、操作和扩展 Flink 应用程序。我们使用一个场景近乎实时地分析纽约市一个出租车车队的遥测数据,以优化车队运营。在此场景中,车队中的每辆出租车都在捕获有关已完成行程的信息。跟踪信息包括接送地点、乘客数量和产生的收入。此信息作为简单的 JSON blob 被提取到 Kinesis 数据流中。然后,数据由 Flink 应用程序处理,该应用程序被部署到 Kinesis Data Analytics for Java Applications。该应用程序可识别当前存在大量打车请求的区域。派生的见解最终会持久保存到 Amazon Elasticsearch Service 中,在那里可使用 Kibana 访问和可视化的查看它们。

此场景使用如下的设计架构,该架构分为三个阶段,用于提取、处理和呈现数据。

松耦合的基础架构是该领域中的常用方法,与更紧密耦合的架构相比具有若干优势。

首先,Kinesis 数据流充当解除生产者与消费者的耦合度的缓冲区。出租车可以将它们生成的事件持久存储到数据流中,而不论处理层的状况如何,处理层当前可能正在从节点故障中恢复。同样,即使由于某些操作问题,提取或处理层当前不可用,仍可通过 Kibana 获得派生数据。最后还有重要的一点是,所有组件都可以独立扩展,并且可以使用根据其个性化需求专门定制的基础设施。

该架构还允许您在将来试验和采用新技术。多个独立应用程序可以同时使用存储在 Kinesis 数据流中的数据。然后,您可以使用生产流量的副本测试现有应用程序新版本的执行情况。而且您还可以引入不同的工具和技术堆栈来分析数据,同样不会影响现有的生产应用程序。例如,通常将 Kinesis Data Firehose 传输流作为第二个使用者添加到 Kinesis 数据流,以此将原始事件数据持久保存到 Amazon S3。这有助于长期归档数据,然后您可以用其评估即席查询或分析历史趋势。

总而言之,将架构的不同方面分为提取、处理和呈现,可以很好地松耦合不同的组件,使架构更加的稳健。此外,它还使您能够高度灵活的为不同目的选择不同的工具,可随着时间的推移更改或发展架构。

在本文的其余部分,我们专注于使用 Apache Flink 和 Kinesis Data Analytics for Java Applications 来识别当前存在大量打车请求的区域。我们还得出了纽约市机场的平均行程时间。但是借助此架构,除了此处所述的工具,您还可以选择使用其他工具(例如 Apache Spark Structured Streaming 和 Kinesis Data Firehose)来使用传入事件。

让我们开始吧!

要查看所描述的架构,请在您自己的 AWS 账户中执行以下 AWS CloudFormation 模板。该模板首先构建分析传入的出租车行程的 Flink 应用程序,包括读取来自 Kinesis 数据流的数据所需的 Flink Kinesis 连接器。然后,它创建基础设施并将 Flink 应用程序提交到 Kinesis Data Analytics for Java Applications。

构建应用程序和创建基础设施的整个过程大约需要 20 分钟。创建 AWS CloudFormation 堆栈后,Flink 应用程序已部署为 Kinesis Data Analytics for Java 应用程序。然后它等待数据流中的事件到达。启用检查点,以便应用程序可以从底层基础设施的故障中无缝恢复,而 Kinesis Data Analytics for Java 应用程序代表您管理检查点。此外,还配置了自动扩展,以便 Kinesis Data Analytics for Java 应用程序自动分配或删除资源并扩展应用程序(即调整其并行性)以响应传入流量的变化。

为了填充 Kinesis 数据流,我们使用 Java 应用程序将纽约市历史出租车行程的公共数据集回放到数据流中。Java 应用程序已下载到AWS CloudFormation 配置的 Amazon EC2实例。您只需连接到实例并执行 JAR 文件即可开始将事件提取到流中。

您可以从先前执行的 AWS CloudFormation 模板的输出部分获取以下所有命令及参数。

$ ssh ec2-user@«Replay instance DNS name»

java -jar amazon-kinesis-replay-1.0-SNAPSHOT.jar -streamName «Kinesis data stream name» -streamRegion «AWS region» -speedup 3600

加速参数确定相对于实际发生的历史事件,将数据导入 Kinesis 数据流的速度加快多少。使用给定的参数,Java 应用程序可在一秒内提取一小时的历史数据。这会带来大约 13k 个事件的吞吐量和每秒 6MB 的数据,从而使Kinesis 数据流完全饱和(稍后将详细介绍)。

然后,您可以继续通过已创建的 Kibana 控制面板检查派生数据。或者,您可以创建自己的可视化以探索 Kibana 中的数据。

https://«Elasticsearch endpoint»/_plugin/kibana/app/kibana#/dashboard/nyc-tlc-dashboard

准备的 Kibana 控制面板包括一幅热图和一幅折线图。热图可视化当前叫车位置,并显示叫车需求最高的位置是曼哈顿。此外,地图上还标出了肯尼迪国际机场和拉瓜迪亚机场,与其直接社区相比,这两个机场的叫车需求高很多。折线图可视化这两个机场的平均行程持续时间。下图显示了叫车量在日间如何稳定增加,直到晚间突然下降。

在本博文中,Elasticsearch 集群被配置为接受来自指定为 AWS CloudFormation 模板参数的 IP 地址范围的连接。对于生产工作负载,更需要进一步加强 Elasticsearch 域的安全性,例如,使用Amazon Cognito 进行Kibana 访问控制

扩展架构以提高其吞吐量

在本博文中,Kinesis 数据流被故意预置不足,因此 Java 应用程序完全使数据流饱和。如果仔细检查 Java应用程序的输出,您会注意到“回放延迟”不断增加。这意味着生产者无法根据指定的加速参数尽快提取事件。

您可以通过 Amazon CloudWatch 控制面板访问数据流,从而深入了解数据流的指标。然后,您可以看到,WriteProvisionedThroughputExceeded 指标略有增加:由于相应请求受限,大约 0.4% 的记录不会被接受到流中。另一方面,数据流预置不足,特别是在飞行中发生太多事件时生产者暂停提取新事件的情况下。

要增加数据流的吞吐量,只需在控制台上点击几次和通过 API 调用,就可以将分片数量从 6 更新为 12。对于生产环境,您甚至可能希望自动执行此过程。有关如何自动扩展 Kinesis 数据流的详细信息,请参阅博文使用 AWS Application Auto Scaling 扩展 Amazon Kinesis 数据流

当流分片的扩展操作完成时,您可以观察回放延迟如何减少以及更多事件如何被导入流中。

但是,直接结果是需要处理更多事件。因此,现在 Kinesis Data Analytics for Java 应用程序变得过载,无法再跟上增加的传入事件数量。您可以通过发布到 CloudWatch 的 millisBehindLatest 指标来观察此情况。指标根据提取时间(以毫秒为单位),报告 Kinesis Data Analytics for Java 应用程序当前读取的最旧记录与流中的最新记录之间的时间差。因此,它表明流提示中有多少事件的处理滞后。

正如这些指标所示,扩展操作完成 10 分钟后,流分片中最新事件的处理已滞后 3 分钟以上。更糟糕的是,它不断变得更加延后,并不断扩大此差距。

但是,与 Kinesis Data Streams 相比,Kinesis Data Analytics for Java 应用程序本身支持自动扩展。几分钟后,您可以在指标中看到扩展活动的效果。millisBehindLatest 指标开始减少,直到达到零,此时处理已赶上Kinesis 数据流的提示。

但是,请注意 millisBehindLatest 指标在开始下降之前是如何激增的。这是由今天扩展 Kinesis Data Analytics for Java 应用程序的方式导致的。要扩展正在运行的应用程序,应用程序的内部状态将持久保存为所谓的保存点。此保存点由 Kinesis Data Analytics for Java 应用程序公开为快照。随后,终止应用程序的运行实例,并创建一个具有更多资源和更高并行性的相同应用程序的新实例。然后,应用程序的新实例从快照填充其内部状态,并从当前终止的实例中断位置开始恢复处理。

因此,扩展操作导致处理短暂中断,这解释了指标的激增。但是,此操作对生产者和消费者是透明的。生产者可以继续写入 Kinesis 数据流,因为它们与应用程序是松耦合的。同样,消费者仍然可以使用 Kibana 查看他们的控制面板,不过他们可能看不到最新数据,因为其尚未处理。

让我们退一步,回顾一下您刚刚做了什么:您创建了一个完全托管、高度可用、可扩展的流式架构。您每秒提取和分析多达 25,000 个事件。通过几次单击即可扩展 Kinesis 数据流和 Kinesis Data Analytics for Java 应用程序,从而将架构的吞吐量翻倍。操作完成后,架构仍然完全正常运行,并且不断接收和处理事件,而不会丢失单个事件。您还可以像其他组件一样无缝扩展 Elasticsearch 集群。但是我们会把它留给感兴趣的读者练习。

试着想象一下,从头开始构建类似的架构。

为 Kinesis Data Analytics for Java Applications 准备 Flink 应用程序

现在您已经看到了流应用程序的运行情况,让我们看一下使用 Kinesis Data Analytics for Java 应用程序部署和运行 Flink 应用程序所需的内容。

与其他部署方法类似,首先构建 Flink 应用程序,并将其打包到一个 JAR 中,该 JAR 包含应用程序运行所需的所有依赖项。然后,将生成的JAR 上传到 Amazon S3。接下来,使用 S3 JAR 的位置和一些其他配置参数来创建一个可由 Kinesis Data Analytics for Java 应用程序执行的应用程序。因此,无需登录到集群并直接将作业提交到 Flink 运行,而是将相应的 JAR 上传到 S3。然后,您可以创建 Kinesis Data Analytics for Java 应用程序,分别使用 API 调用、控制台和 AWS CLI 进行交互。

调整 Flink 配置和运行时参数

要获得有效的 Kinesis Data Analytics for Java 应用程序,Flink 应用程序的 JAR 必须包含某些依赖项。当您使用 Apache Maven 构建 Flink 应用程序时,只需将另一个依赖项添加到项目的 .pom 文件中。

<!—pom.xml ->
<project>
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-kinesisanalytics-runtime</artifactId>
            <version>1.0.1</version>
        </dependency>
    </dependencies>
    ...
</project>

然后,您可以指定在创建或更新时传递给生成的 Kinesis Data Analytics for Java 应用程序的参数。这些参数基本上是键值对,包含在作为属性组一部分的属性映射中。

"ApplicationConfiguration": {
    "EnvironmentProperties": {
        "PropertyGroups": [
            {
                "PropertyGroupId": "FlinkApplicationProperties",
                "PropertyMap": {
                    "InputStreamName": "...",
                    ...
                }
            }
        ]
    },
    ...
}

然后,您可以从源自 Kinesis Data Analytics for Java 应用程序运行时的应用程序代码中,获取这些参数的值。例如,以下代码段可获取应用程序应从 FlinkApplicationProperties 属性组连接到的 Kinesis 数据流的名称。

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");

String kinesisStreamName = flinkProperties.getProperty("InputStreamName");

您可以使用相同的机制为 Kinesis Data Analytics for Java 应用程序配置其他属性(例如,检查点和应用程序的并行性),这些属性通常直接指定为 Flink 运行时的参数或配置选项。

"ApplicationConfiguration": {
    "FlinkApplicationConfiguration": {
        "CheckpointConfiguration": {
            "ConfigurationType": "DEFAULT"
        },
        "MonitoringConfiguration": {
            "ConfigurationType": "CUSTOM",
            "MetricsLevel": "TASK",
            "LogLevel": "INFO"
        },
        "ParallelismConfiguration": {
            "ConfigurationType": "DEFAULT"
        }
    },
    ...
}

使用此配置,检查点和并行度设置可保留默认值。这样可以实现检查点和自动扩展,并将 Kinesis Data Analytics for Java 应用程序的初始并行性设置为一个。此外,日志级别增加到 INFO,并为应用程序的每个子任务收集 CloudWatch 指标。

构建 Flink Kinesis 连接器

在构建 Kinesis 数据流的Flink 应用程序时,您可能会发现,在Maven Central 中无法使用 Flink Kinesis 连接器。因为实际上需要自己来构建它。以下步骤为最近的 Apache Flink 版本构建连接器。由于 Kinesis Data Analytics for Java 应用程序是基于 Flink 1.6.2的,您现在可以使用此特定版本。

$ wget -qO- https://github.com/apache/flink/archive/release-1.6.2.zip | bsdtar -xf-

$ cd flink-release-1.6.2

$ mvn clean package -B -DskipTests -Dfast -Pinclude-kinesis -pl flink-connectors/flink-connector-kinesis

请注意,连接器已经可以通过 AWS CloudFormation 模板构建,并已经存储在 S3 上。从那里即可下载连接器的 JAR 文件,并使用以下 Maven 命令将其放在本地 Maven 存储库中:

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.6.2.jar -DpomFile flink-connector-kinesis_2.11-1.6.2.pom.xml

将 Flink Elasticsearch 接收器与 Amazon Elasticsearch Service 集成

从 1.6 版本开始,Apache Flink 附带一个 Elasticsearch 连接器,该连接器支持 HTTP 上的 Elasticsearch API。因此,它可以与 Amazon Elasticsearch Service 提供的终端节点进行本地通信。

您只需要决定如何针对 Elasticsearch 集群的公共终端节点对请求进行身份验证。您可以将单个 IP 列入白名单以访问集群。针对 Amazon Elasticsearch Service 终端节点进行身份验证的推荐方法是使用 IAM凭证和签名版本 4 签名过程 AWS 请求添加身份验证信息。

您也可以使用可从 Maven central 获得的open-source aws-signing-request-interceptor。只需在将请求发送到 Amazon Elasticsearch Service 终端节点之前调用的Elasticsearch 接收器中添加拦截器。然后,拦截器可以使用为 Kinesis Data Analytics for Java 应用程序配置的角色权限对请求进行签名。

final List<HttpHost> httpHosts = Arrays.asList(HttpHost.create("https://...")));

ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<T>() {
      ...
    }
);

final Supplier<LocalDateTime> clock = () -> LocalDateTime.now(ZoneOffset.UTC);
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final AWSSigner awsSigner = new AWSSigner(credentialsProvider, "eu-west-1", "es", clock);

esSinkBuilder.setRestClientFactory(
    restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(
        callback -> callback.addInterceptorLast(new AWSSigningRequestInterceptor(awsSigner))
    )
);

esSinkBuilder.build();

请注意,GitHub 存储库中的实际代码有点复杂,因为您需要获取可序列化的请求拦截器。但签署请求的基本方法仍然相同。

监控和调试 Flink 应用程序

当我们运行 Kinesis Data Analytics for Java 应用程序时,需直接访问运行 Flink 的集群。这是因为底层基础设施完全由服务托管。您仅需要通过 API 与服务进行交互即可。您也可以分别通过 CloudWatch CloudWatch Logs 获取指标和日志记录信息。

Kinesis Data Analytics for Java 应用程序公开了许多操作指标,从整个应用程序的指标到应用程序运算符单个进程的指标。您可以根据您的目的,控制适合或需要的详细程度。事实上,上一节中使用的指标都是通过 CloudWatch 获得的。

除了操作指标之外,您还可以配置 Kinesis Data Analytics for Java 应用程序以将消息写入 CloudWatch Logs。此功能与常见的日志记录框架无缝集成,例如 Apache Log4jSimple Logging Facade for Java (SLF4J)。因此,它对于调试和识别操作问题的原因很有用。

如需为您的 Kinesis Data Analytics for Java 应用程序启用日志记录,只需在启动应用程序时将现有 CloudWatch 日志流指定为日志记录选项即可,如下所示:

final Logger LOG = LoggerFactory.getLogger(...);

LOG.info("Starting to consume events from stream {}", flinkProperties.getProperty("InputStreamName"));

将日志消息持久保存到 CloudWatch Logs 后,可通过 CloudWatch Logs Insights

小结

在本文中,您不仅构建了基于 Apache Flink 和Kinesis Data Analytics for Java 应用程序的可靠、可扩展且高度可用的流应用程序,还可以扩展不同的组件,同时近乎实时地每秒提取和分析多达 25k 个事件。大部分情况下,通过使用托管服务启用此场景,因此您无需花时间配置和配置底层基础架构。

本文中使用的应用程序源代码和 AWS CloudFormation 模板可从 GitHub 获取,供您参考。您可以深入了解 Flink 应用程序的所有详细信息以及底层服务的配置。当您可以专注于以流式传输方式分析数据而不是花时间管理和运营基础设施时,我很想知道您会构建什么。

 


关于作者

Steffen Hausmann 是 AWS 的专业解决方案架构师