基于 Apache Kafka 构建实时流分析应用程序
![Olawale Olaleye Olawale Olaleye](https://d1.awsstatic.com/xuefezha-jennie/%e7%ba%a7%e5%88%ab_level%201.01c35fea97656a6beac50b0c8ae81e8afc1eedef.png)
![](https://d1.awsstatic.com/xuefezha-jennie/Group%20281.67a8494bd80a4bd979e37efcb490ada486dd72ae.png)
![](https://d1.awsstatic.com/xuefezha-jennie/cost-icon.84b1b7bbeb5d58956ebf11a05ed8152992f762ba.png)
4.00 美元
![](https://d1.awsstatic.com/xuefezha-jennie/code-icon.4d6cc6173fc04e1c6c381f942f75c083b207f0e1.png)
本教程中使用的示例代码来自 GitHub
![](https://d1.awsstatic.com/xuefezha-jennie/Group%20287.a27381901d308706720071b52d42054d154eab4c.png)
![](https://d1.awsstatic.com/guoheng/product.2d7b328b4c088795e2ac7b9c03e03d54eb5ea73f.png)
在当今快节奏的数字世界中,实时流分析变得日益重要,因为组织需要即刻了解客户、应用程序和产品的当前行为,并迅速做出响应。例如,企业希望实时分析数据,持续监控应用程序,以确保较长的服务正常运行时间,并向客户提供个性化的促销优惠和产品推荐。但是,使用 Apache Kafka 生产者和 Kafka 消费者构建这样一个端到端的实时流处理应用程序可能很有挑战性。
本教程介绍如何使用 Amazon Managed Streaming for Apache Kafka (MSK) 设置和实现实时数据管道。具体来说,本指南详细介绍了如何将流数据引入 Kafka 集群、实时处理数据并使其由下游应用程序使用。
学习目标
在本教程中,我们将:
- 启动无服务器 Amazon MSK 集群
- 使用 Kafka 客户端容器向 MSK Serverless 生成流数据
- 使用 Amazon Kinesis Data Analytics 使用和处理流数据
- 在 Amazon OpenSearch Service 中可视化流数据
让我们开始吧!
架构
以下架构概述了我们将用哪些亚马逊云科技资源和服务来将实时点击流数据写入 Kafka 集群并使用数据。我们利用 Amazon Fargate 把生成示例点击流数据的容器应用程序部署到 MSK Serverless 集群。点击流数据由在 Amazon Kinesis Data Analytics 中运行的 Apache Flink 应用程序使用。具体来说,Flink 应用程序通过 windowing(窗口化)来处理点击流,将数据流拆分为有限大小的存储桶。我们依靠这些窗口来应用计算并分析每个窗口中的数据。最后,生成的分析将写入 Amazon OpenSearch Service 进行可视化。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/architecture.6428eba4e4e43c73082ebbdcdd515bc9b6e1ea20.jpeg)
执行步骤
下面提供了在 Apache Kafka 上实现实时流分析应用程序的分步教程。在开始之前,请确保计算机上安装了以下必备组件:
- Java JDK
- Apache Maven
步骤 1:获取存储库
存储库 build-on-aws/real-time-streaming-analytics-application-using-apache-kafka 包含帮助我们入门所需的文件。运行以下命令,将存储库下载到本地机器:
git clone https://github.com/build-on-aws/real-time-streaming-analytics-application-using-apache-kafka.git
步骤 2:构建 Flink 应用程序
成功在计算机上安装 Apache Maven 后,在之前下载的存储库中,使用以下命令导航至 flink-clickstream-consumer 文件夹:
cd flink-clickstream-consumer
然后运行以下命令,在 flink-clickstream-consumer 文件夹内构建 Flink 应用程序:
mvn package
成功构建应用程序后,您应该会在终端中看到以下消息:
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/flink_build.06a1eb1553c73e5f0a01e06e7eb7625b347a8de3.png)
Maven 将项目编译后的源代码以可分发的 JAR 格式打包在 flink-clickstream-consumer/target/ 目录下,打包的文件名为 ClickStreamProcessor-1.0.jar。如果您想更好地了解 Flink 应用程序的内部工作原理,您可以查看 src 目录中的 ClickstreamProcessor.java 文件。这是 main 函数所在的 Java 应用程序的入口点。
接下来,我们必须将 JAR 文件上传至 Amazon S3,为 Amazon Kinesis Data Analytics 提供该文件。
步骤 3:将文件上传至 Amazon S3
- 登录您的亚马逊云科技账户,导航至 Amazon S3 控制台,点击 Create bucket(创建存储桶)。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/create_bucket.f30db6ff3564f3e9349f8776ebe46c5eadcf75ce.png)
2. 提供您选择的唯一存储桶名称,然后选择一个亚马逊云科技区域(例如 us-east-1),再点击页面底部的 Create Bucket(创建存储桶)按钮。请记下存储桶名称。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/create_bucket_2.c85fda6cd464d673ee0f42edb315c9b0447db999.png)
3. 点击新创建的存储桶,再点击 Upload(上传)按钮,将以下文件上传至 S3 存储桶。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/upload_to_bucket.b6de30a0dd0bd3f4269fb5209de5375f3dccdd9d.png)
4. 点击 Add files(添加文件)按钮,选择您刚刚生成的 JAR 文件 ClickStreamProcessor-1.0.jar。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/add_files_to_bucket.c082a01581e673d1a013bbf03017f867a649419d.png)
很好!现在 JAR 文件已经上传至 S3 存储桶,我们可以轻松地在 Kinesis Data Analytics 中运行 Flink 应用程序,而无需管理服务器。请注意,我们可以将包作为 JAR 文件上传,也可以将包压缩并作为 ZIP 文件上传。
步骤 4:使用 Amazon CloudFormation 创建堆栈
接下来,我们将创建一个 CloudFormation 堆栈,并通过上传 CloudFormation 模板来自动部署以下资源:
- Amazon OpenSearch Cluster:我们可以在这里对所用的点击流数据进行可视化。它部署在 VPC 的私有子网中。
- Amazon ECS Cluster + Task definition:生成示例点击流数据的容器应用程序,它作为 Fargate 任务在 ECS 内运行。
- Amazon Kinesis Data Analytics:这是 Flink 应用程序运行的位置,它使用 MSK 集群中的点击流数据,对其进行处理并将其写入 OpenSearch Service。
- Amazon EC2 Instance (Kafka client):此 EC2 实例用作 Kafka 客户端,允许我们通过创建 Kafka 主题等方式与 MSK 集群进行交互。
- Amazon EC2 Instance (Nginx proxy):此 EC2 实例用作 Nginx 代理,允许我们从 VPC 外部(即从互联网)访问 OpenSearch 控制面板。
- Security groups:安全组帮助我们控制允许特定资源的入站和出站流量。
- IAM roles:IAM 角色是附加了特定权限的 IAM 身份,可由 IAM 用户或亚马逊云科技服务担任。例如,IAM 角色可用于向 EC2 实例上运行的、需要访问特定 Amazon S3 存储桶的应用程序授予权限。
我们不手动创建所需的资源,而使用 CloudFormation 模板在亚马逊云科技账户中自动部署资源。
导航至 CloudFormation 控制台,点击 Create Stack(创建堆栈)。
点击 Upload a template file(上传模板文件),再点击 Choose file(选择文件)即可上传 CloudFormation 模板文件 cf_template.yml,该文件位于所下载的存储库的根目录。然后点击 Next(下一步)。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/create_stack.b493b1d1eae4d187e3bbfa2914889b8242a663d1.png)
3. 为堆栈命名 Stack name(堆栈名称),例如 msk-serverless-stack。此外,还必须为参数 AssetsBucketName 提供一个值。输入您之前创建的 S3 存储桶的名称。如果您没有更改之前生成的 JAR 文件的名称,您就可以不改动 KdaAppKey 的默认值 ClickstreamProcessor-1.0.jar。保留 LatestAmiId 和 OpenSearchmasterUserName 的值不变。点击 Next(下一步)。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/specify_stack_details.de7af935bcef45b2f5c0ed81007e6354dfa1cca4.png)
4. 向下滚动 Configure stack options(配置堆栈选项)页面,然后点击 Next(下一步)。
5. 向下滚动 Review <Your_Stack_Name>(检查「您的堆栈名称」)页面。请勾选 I acknowledge that AWS CloudFormation might create IAM resources with custom names(我了解 Amazon CloudFormation 可能会创建具有自定义名称的 IAM 资源)这一复选框。最后,点击 Submit(提交)即可创建 CloudFormation 堆栈。
等待堆栈的状态从 CREATE_IN_PROGRESS 变为 CREATE_COMPLETE。注意:这可能需要一些时间。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/stack_progress.df708809b2abe89003ee19a39a10e27ab0cf6f86.png)
状态更改为 CREATE_COMPLETE 后,CloudFormation 模板中定义的资源就在您的亚马逊云科技账户中创建完成了。但是,我们需要更多的资源和配置,才能最终得到一个端到端的实时流处理应用程序。
接下来,我们将在亚马逊云科技上创建 MSK 集群。亚马逊云科技提供两种类型的集群:MSK Serverless 集群,它通过自动扩缩提供按需容量;MSK Provisioned 集群,它允许您指定集群中的代理数量和每个代理的存储量,从而提供更大的控制权。但是,MSK Provisioned 不会随着应用程序 I/O 需求的变化而自动扩缩。在本教程中,我们将选择 MSK Serverless,因为我们不希望关心底层基础设施,并将管理开销降至最低。
步骤 5:创建 MSK Serverless 集群
导航至 Amazon MSK 控制台,点击 Create cluster(创建集群)。
点击 Custom create(自定义创建),给出您想要的集群名称(如 msk-cluster)。对于 Cluster type(集群类型),请选择 Serverless(无服务器)。然后点击 Next(下一步)。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/msk_cluster_settings.c51b18c19cf413e524dde38d15540fb7bca60e65.png)
3. 在 Networking(网络)视图中,点击名为 MMVPC 的自定义 VPC。然后,点击 Add subnet(添加子网)以添加第三个子网,并为 us-east-1a、us-east-1b 和 us-east-1c 中的三个不同区域点选三个可用的私有子网(PrivateSubnetMSKOne、PrivateSubnetMSKTwo、PrivateSubnetMSKThree)。
4. 选择名为 MSK Security Group 的安全组,而非默认安全组。最后,点击 Next(下一步)。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/msk_vpc_settings.a246bc61d374372f3bb3019102b0f821bd81d835.png)
5. 点击 Next(下一步)。
6. 点击 Next(下一步)。
7. 最后,点击 Create cluster(创建集群),创建 MSK Serverless 集群。
8. 在 MSK Serverless 集群状态更改为 Active(活动)后,点击 View client information(查看客户端信息)。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/msk_cluster_information.8f4cb6b2367febeaeb29cbb58c9f29ca1baec3e1.png)
9. 请记下 MSK Serverless 集群的端点。请注意,我们使用 IAM 访问控制来处理对 MSK 集群的身份验证。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/msk_cluster_endpoint.157773840f50bede0dfa49a1c25d6704fbdf9533.png)
目前,MSK Serverless 仅支持 IAM 身份验证。如果您选择 MSK Provisioned,则可以选择使用 IAM、TLS 或 SASL/SCRAM 对客户端进行身份验证以及允许或拒绝操作。
步骤 6:创建 Kafka 主题
现在 MSK Serverless 集群已准备就绪并可供使用,我们需要创建一个 Kafka 主题来生成和使用数据。我们可以创建如下所示的 Kafka 主题:
- 导航至 Amazon EC2 控制台。在 EC2 主页上,点击 Instances (running)(运行中的实例)。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/ec2_running.a01683fa8c7a736762a32980ac2a83197e8f7591.png)
2. 在 EC2 Instances(实例)页面上,选中 KafkaClientInstance 实例的复选框,然后点击右上角的 Connect(连接)按钮,如下图所示。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/ec2_instances_connect.09aa1bebb097d93626a511c3f676f98f6807d42e.png)
3. 在 Connect to instance(连接实例)页面上,确保选择 Session Manager,并点击 Connect button(连接)按钮。这样即可打开一个带有 EC2 终端窗口的新选项卡。
4. 在终端窗口中,执行以下命令,将用户更改为 ec2-user:
sudo su - ec2-user
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/ec2_sudo.b98cb3b0a88270cff1fbff5eb81216f46412ce6d.png)
5. 执行以下命令,将 MSK 集群端点设置为 shell 变量 BS。将 <Your_Cluster_Endpoint> 替换为您在创建 MSK Serverless 集群后记下的端点。
export BS=<Your_Cluster_Endpoint>
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/ec2_export.2a04eb2ffa0e096ca2c87c5ec460467621a5591e.png)
6. 然后,执行以下命令,创建 Kafka 主题。
bash create-topics.sh
您将看到终端输出的一些警告。您可以忽略警告。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/ec2_topics.e4b8e7e08cacbb95facd3e551cd0c989fa519b1b.png)
7. 您应该会看到一个已创建的 MSK 主题:clickstream。
您可以运行以下命令来查看 bash 脚本,并查看其中执行的 Kafka 命令的详细信息:
cat create-topics.sh
如果您愿意,您可以随时运行其他 Kafka 命令,以更好地了解您的 MSK 集群。例如,执行以下命令,查看已创建主题的详细信息:
/home/ec2-user/kafka/bin/kafka-topics.sh --bootstrap-server $BS --describe --topic clickstream
步骤 7:启动容器应用程序以生成点击流数据
成功创建 MSK 集群后,下一步是设置将数据写入主题 clickstream 的生产者。为此,我们将部署一个无服务器 Amazon ECS Fargate 容器,该容器运行应用程序,向 MSK Serverless 集群生成示例点击流数据。
- 导航至 Amazon ECS 控制台。点击左侧菜单中的 Task Definitions(任务定义),查看所有可用的任务定义。选中可用任务定义的复选框,点击 Deploy(部署)菜单中的 Run task(执行任务)。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/ecs_task_definition.bdc70cc5d25126edd47b2c64d6311bb37c4de9ea.png)
2. 在 Run Task(运行任务)页面上,选择现有集群(msk-serverless-[...]-cluster),并保留默认设置。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/ecs_create.14106dbd21f75315adc16a381812f32591f2707f.png)
3. 展开 Networking(网络)部分。将默认 VPC 更改为 MMVPC。请选择与之前相同的 PrivateSubnetMSKOne、PrivateSubnetMSKTwo 和 PrivateSubnetMSKThree 三个子网。最后,取消选中默认安全组,然后选择包含 -ProducerECSTaskSecurityGroup- 的安全组。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/ecs_vpc.7dfa2c6fa01dccbbb2f0d0b2a1e7acb1719f69eb.png)
4. 展开 Container overrides(容器覆写)部分。对于 BOOTSTRAP_STRING,输入 MSK Serverless 集群端点的值(之前已在 MSK 集群控制台页面中的 View client information(查看客户端信息)部分记下)。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/ecs_container_overrides.8f453a2a157fd5eac05369e3db9d3bfec7d0b753.png)
5. 最后,点击 Create button(创建)按钮。
6. 等待任务状态变为 Running(运行中),如下所示。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/ecs_task_running.ebddf5eb5fbc9b2c03fdde2698b2115e3afe9544.png)
现在,您已成功创建生产者 ECS 任务,该任务将持续向 MSK Serverless 集群生成点击流数据。
具体来说,这个 ECS 任务会生成随机点击事件。生成的事件包含用户 IP、产品类型、事件时间戳等信息。还有一个与每个事件关联的用户 ID,用作键值。事件的分区号是用其键的哈希值确定的。在将数据发送到 MSK 集群之前,Amazon Glue 架构注册表提供的 Avro 序列化程序会对事件数据进行序列化。生成的每个事件都会发送到之前创建的主题 clickstream。
步骤 8:检查 Amazon Glue 架构注册表中的架构
上一步中,我们成功创建了一个 ECS 生产者任务。现在,我们要在 Amazon Glue 架构注册表中创建点击流架构。
导航至 Amazon Glue 控制台。点击左侧菜单中 Data Catalog(数据目录)下的 Stream schema registries(流架构注册表)。您可以看到名为 serverless 的架构注册表。点击它。
您可以查看架构注册表 serverless 的可用架构。点击 clickstream 架构可以查看不同的架构版本。您可在此处看到版本 1。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/schema_properties.0c32c851330411dd9a5191356ce2cd4f399b9c09.png)
3. 点击版本 1 可查看 ECS 任务生成的点击流数据的 Avro 架构。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/schema_version_definition.2ba3ab29a86d042cae911668e3a8748144a76a89.png)
Apache Avro 是一个数据序列化系统,允许对结构化数据进行高效、紧凑的编码,尤其是在大数据或流数据使用场景中。为此,Avro 提供了一种紧凑的二进制格式,用于数据存储和交换。生产者使用 Amazon Glue 架构注册表提供的 Avro 序列化程序,并在 Glue 架构注册表中自动注册架构版本。
步骤 9:使用 Kinesis Data Analytics 使用点击流数据
我们已经完成了对 MSK 无服务器集群的设置,并不断将点击流数据写入集群。现在,我们希望利用 Amazon Kinesis Data Analytics 和 Flink 来使用 MSK Serverless 集群中的点击流数据。Apache Flink 应用程序实时处理点击流数据,并将分析写入 Amazon OpenSearch Service。
OpenSearch Service 已部署在您的亚马逊云科技账户中,并且控制面板也已配置好。现在我们缺少的是 Kinesis Data Analytics 应用程序的正确运行时参数。
导航至 Amaon Kinesis Analytics 控制台,点击打开的流处理应用程序 KDAFlinkCLickstream-msk-serverless-stack。
点击 Configure(配置)按钮,配置并更新应用程序。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/kda_configure.0f8745f98a2ffcdc5c0ff2918dd0d5a085dd7e0b.png)
3. 向下滚动到 Runtime properties(运行时属性)。将 BootstrapServers 更新为您之前记下的 MSK Serverless 集群端点。其他配置保留默认值。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/kda_runtime_properties.7f3b2d3c2436bbe01b9b510cad21565c00eb899d.png)
4. 最后,保存更改。
5. 点击 Run(运行)按钮,运行 Flink 应用程序。点击 Run without snapshot(无快照运行)。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/kda_run_application.e4dce39cd6aa921096ced212494d4442c3a02f6b.png)
6. Kinesis Analytics 应用程序开始运行后,点击 Open Apache Flink dashboard(打开 Apache Flink 控制面板)以打开 Flink 控制面板。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/kda_open_dashboard.dd9aa6b2e7bc06c50bb9f6038b41e4437d1510e0.png)
7. 点击菜单左侧的 Running Jobs(运行中的作业)。点击 Flink Streaming Job(Flink 流处理作业)即可访问运行中的作业的详细信息。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/kda_running_jobs.03f702c07fcac94291d551dde024f8e4e823b7fb.png)
8. 您将看到的界面中包含一个有向无环图 (DAG),表示应用程序的每个 operator(算子)中的数据流。作业工作流中的每个蓝色框代表一系列相链接的算子,在 Apache Flink 中称为 Task。
如前所述,Flink 应用程序通过窗口化处理点击流,即将连续的数据流划分为有限的、离散的块或窗口进行处理。更确切地说,Flink 应用程序使用 EventTimeSessionWindows,通过对指定时间间隔内的事件进行分组,从点击流数据中提取用户会话。然后,应用程序部署 TumblingEventTimeWindows,将点击流划分为固定大小的非重叠窗口,从而计算特定时间段内的特定聚合特征。例如,这可能需要计算过去 10 秒内进行购买的用户会话的聚合计数。
此外,我们可以看到每个任务的状态,以及屏幕底部的 Bytes Received(接收字节数)、Bytes Sent(发送字节数)、Records Received(接收记录数)和 Records Sent(发送记录数)。请注意,Flink 只能测量算子之间发送或接收的字节数。看不到 source 或 sink 算子的指标,原因就在于此:数据来自 Flink 外部。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/kda_dag.dbbebee948e0cfc71eb0ae3ff3644dc53ef9e78f.png)
现在,我们已成功创建了一个 Kinesis Analytics 应用程序,该应用程序从 Kafka 主题读取消息,处理数据,然后将分析写入 Amazon OpenSearch Service。我们在 OpenSearch 控制面板中检查数据吧!
步骤 10:在 Amazon OpenSearch 控制面板中查看点击流数据
在这最后一步中,我们希望看到基于从 Kinesis Analytics 应用程序所提取的数据生成的可视化控制面板。
导航至 CloudFormation 控制台,点击我们先前创建的堆栈。前往堆栈的 Outputs(输出)选项卡。
请记下 OpenSearchMasterUserName 和 OpenSearchMasterPassword。我们在下一步中会用到这两个值。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/stack_outputs.f2bad39cc6c9e6e2f6dc074ffd961a00fd6850dd.png)
3. 点击 OpenSearchDashboardEndpoint,在新选项卡中打开 OpenSearch 控制面板登录页面。由于 OpenSearch 服务部署在 VPC 中,因此我们依靠 Nginx 反向代理来访问 VPC 外部的 OpenSearch 控制面板。请注意,我们使用的是 Nginx 的自签名证书。但是,我们强烈建议对生产部署使用有效的证书。
4. 如果您使用 Google Chrome 访问这个 URL,请点击 Advanced(高级)按钮,点击 Proceed to <Your_EC2_DNS>(继续访问「您的 EC2 DNS 名」)。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/chrome_warning.3bb70313b7e1464f368052c0b55db14099bce6af.png)
5. 使用上一步中的 OpenSearchMasterUserName 和 OpenSearchMasterPassword 登录 OpenSearch 控制面板。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/opensearch_login.06fc6dc6a80b5e7d36535899341677409315bb0c.png)
6. 在弹出的对话框中选择 Global(全局)租户。
7. 点击屏幕左侧的三横菜单,然后点击 Dashboards(控制面板),如下所示。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/opensearch_dashboard.f6bc3f15e9d9443ab712fb671df84a6abc8e93c8.png)
8. 在 Dashboards(控制面板)视图中,选择名为 Clickstream Dashboard 的控制面板以查看绘制的数据:
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/opensearch_graphs.675a3def420676c620a61b511b6dfc436225005f.png)
我们现在已经确认数据流向 OpenSearch 服务,并呈现了可视化效果。但是数据是如何从 Flink 应用程序到 Opensearch 的呢?我们利用了 Apache Flink 的 Elasticsearch 连接器。此连接器提供接收器,可请求对 Elasticsearch 索引执行文档操作。您可以导航至下载的存储库中的文件 AmazonOpenSearchSink.java,查看连接器的具体实现。
清理资源
现在,您已完成基于 Apache Kafka 的实时流分析应用程序的构建,您可以删除所有资源以避免产生意外成本。
- 在 Actions(操作)菜单下,删除 MSK Serverless 集群。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/msk_delete.f6595594f8e5fb35af3d939d9101f885c6940d3c.png)
- 删除 CloudFormation 堆栈。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/cloudformation_delete.bff050513089874b0299dc5bffcd042c13cd2b5f.png)
- 清空并删除您之前创建的 S3 存储桶。
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/cloudformation_delete.bff050513089874b0299dc5bffcd042c13cd2b5f.png)
![](https://d1.awsstatic.com/lili/community%e6%95%99%e7%a8%8b/40/bucket_delete.f05dfba985dd3659b51b29288e1a8bde93e0849c.png)
总结
恭喜您!您已构建了基于 Apache Kafka 的实时流分析应用程序。具体来说,您设置了一个 ECS 任务,以向 MSK Serverless 集群生成示例点击流数据。然后,这些点击流数据由在 Amazon Kinesis Analytics 中运行的 Flink 应用程序使用,得到处理并写入 Amazon OpenSearch。
如果您想了解有关亚马逊云科技上的流处理和 Apache Kafka 的更多信息,可以查看以下博客文章: