使用 Apache Kafka 和 Go 将数据摄取到 OpenSearch

您有时可能需要编写自定义集成层,以满足数据管道中的特定需求。了解如何使用 Go 通过 Kafka 和 OpenSearch 来实现此目的。
发布时间:2023 年 7 月 13 日
OpenSearch
Kafka
Golang
分析
教程
亚马逊云科技
Olawale Olaleye
亚马逊云科技使用经验
200 - 中级
完成所需时间
60 分钟
所需费用
前提条件

注册 / 登录 亚马逊云科技账户

已安装 CDK:访问 Amazon CDK 入门可了解更多信息。

示例代码

本教程中使用的示例代码来自 GitHub

上次更新时间
2023 年 7 月 10 日

可扩展的数据摄取是像 OpenSearch 这样的大规模分布式搜索和分析引擎的一个关键环节。构建实时数据摄取管道的方法之一是使用 Apache Kafka。它是一个开源的事件流平台,用于处理海量高速数据,并与包括关系数据库和 NoSQL 数据库在内的各种数据源集成。例如,一个典型的使用场景是在异构系统(源组件)之间实时同步数据,以确保 OpenSearch 索引是最新的,可用于分析或通过控制面板和可视化使用下游应用程序。

这篇博客文章将介绍如何创建数据管道,将写入 Apache Kafka 的数据摄取到 OpenSearch。我们将使用 Amazon OpenSearch ServerlessAmazon Managed Streaming for Apache Kafka (Amazon MSK) ServerlessKafka Connect 非常适合此类需求。它为 OpenSearch 和 ElasticSearch(如果您选择在 Amazon OpenSearch 中使用 ElasticSearch OSS 引擎,则可以使用 ElasticSearch)提供接收器连接器。但有时,由于特定的需求或原因,可能需要使用自定义解决方案。

例如,您可能正在使用 Kafka Connect 不支持的数据源(这种情况很少见,但可能会发生),并且不想从头开始编写。或者,这可能是一次性集成,您在想是否值得花费精力设置和配置 Kafka Connect。也许还有其他问题,例如许可等。

值得庆幸的是,Kafka 和 OpenSearch 提供了多种编程语言的客户端库,使您能够编写自己的集成层。这正是本博客所讨论的内容!我们将利用自定义 Go 应用程序,通过 KafkaOpenSearch 的 Go 客户端来摄取数据。

学习内容:

  • 概述如何设置所需的亚马逊云科技服务 - OpenSearch Serverless、MSK Serverless、Amazon Cloud9 以及 IAM 策略和安全配置。
  • 大致了解应用程序。
  • 启动并运行数据摄取管道。
  • 如何在 OpenSearch 中查询数据。

在深入讨论之前,我们先大概了解一下 OpenSearch Serverless 和 Amazon MSK Serverless。

Amazon OpenSearch Serverless 和 Amazon MSK Serverless 简介

OpenSearch 是一款开源搜索和分析引擎,用于日志分析、实时监控和点击流分析。Amazon OpenSearch Service 是一项托管服务,可简化亚马逊云科技中的 OpenSearch 集群的部署和扩展。

Amazon OpenSearch Service 支持 OpenSearch 和旧版 Elasticsearch OSS(直至 7.10 版本,即该软件的最终开源版本)。创建集群时,您可以选择使用哪个搜索引擎。

您可以创建一个 OpenSearch Service 域(与 OpenSearch 集群同义)来表示一个集群,其中每个 Amazon EC2 实例都充当一个节点。但是,OpenSearch Serverless 通过为 OpenSearch Service 按需提供无服务器配置来消除操作复杂性。它使用索引集合支持特定的工作负载,与传统集群不同,它将索引和搜索组件分离,并使用 Amazon S3 作为索引的主存储。这种架构支持独立扩展搜索和索引功能。

有关详细信息,请参阅比较 OpenSearch Service 和 OpenSearch Serverless

Amazon Managed Streaming for Apache Kafka (MSK) 是一项全托管服务,旨在使用 Apache Kafka 处理流数据。它可以处理包括创建、更新和删除在内的集群管理操作。您可以使用标准的 Apache Kafka 数据操作生成和使用数据,而无需修改应用程序。它支持开源 Kafka 版本,可确保与现有工具、插件和应用程序的兼容性。

MSK Serverless 是 Amazon MSK 中的一种集群类型,无需手动管理和扩展集群容量。它会根据需求自动预配和扩展资源,并负责主题分区管理。采用按量付费的定价模式,您只需为实际使用量付费。MSK Serverless 非常适合需要灵活、自动扩展流容量的应用程序。

下面,我们先讨论应用程序的简要架构,再讨论其注意事项。

应用程序概述和关键架构注意事项

下面是一个简化的应用程序架构,大致展示了各组件及其交互方式。

该应用程序由生产者和消费者组件组成,它们是部署到 EC2 实例的 Go 应用程序:

  • 顾名思义,生产者将数据发送到 MSK Serverless 集群。
  • 消费者应用程序从 MSK Serverless 主题接收数据(movie 信息),并使用 OpenSearch Go 客户端对 movies 集合中的数据建立索引。

注重简易性

需要注意的是,这篇博客文章经过优化,更简单且易于理解,因此该解决方案不适用于运行生产工作负载的场景。本文简化了以下事项:

  • 生产者和消费者应用程序在同一个计算平台(EC2 实例)上运行。
  • 只有一个消费者应用程序实例处理来自 MSK 主题的数据。不过,您可以尝试运行消费者应用程序的多个实例,查看数据是如何跨实例分布的。
  • 没有使用 Kafka CLI 生成数据,而是使用 Go 语言编写了一个自定义生产者应用程序,并使用 REST 端点发送数据。这演示了如何使用 Go 语言编写 Kafka 生产者应用程序并模拟 Kafka CLI。
  • 使用的数据量很小。
  • OpenSearch Serverless 集合具有公共访问类型。

对于生产工作负载,您应该考虑的以下一些事项:

  • 根据数据量和可扩展性要求,为消费者应用程序选择适当的计算平台(下文将详细介绍)。
  • 为 OpenSearch Serverless 集合选择 VPC 访问类型。
  • 考虑使用 Amazon OpenSearch Ingestion 创建数据管道。

如果您仍然需要部署自定义应用程序来构建从 MSK 到 OpenSearch 的数据管道,可以选择以下计算选项:

  • 容器:您可以将消费者应用程序打包为 Docker 容器(Dockerfile 可从 GitHub 存储库中获取),然后将其部署到 Amazon EKS 或 Amazon ECS
  • 如果将应用程序部署到 Amazon EKS,您还可以考虑使用 KEDA,根据 MSK 主题中的消息数量自动扩缩消费者应用程序。
  • 无服务器:您也可以使用 MSK 作为 Amazon Lambda 函数的事件源。可以将消费者应用程序编写为 Lambda 函数,并将其配置为由 MSK 事件触发,或者在 Amazon Fargate 上运行。
  • 由于生产者应用程序是一个 REST API,因此您可以将其部署到 Amazon App Runner。
  • 最后,您可以利用 Amazon EC2 Auto Scaling 组为您的消费者应用程序自动扩缩 EC2 队列。

已有足够的材料介绍如何使用基于 Java 的 Kafka 应用程序通过 IAM 与 MSK Serverless 连接。下面我们转一下话题,先简单了解一下在 Go 中如何实现。

Go 客户端应用程序如何使用 IAM 在 MSK Serverless 中进行身份验证?

MSK Serverless 需要使用 IAM 访问控制处理 MSK 集群的身份验证和授权。这意味着,您的 MSK 客户端应用程序(在本例中为生产者和消费者应用程序)必须使用 IAM 向 MSK 进行身份验证,系统将基于此允许或拒绝这些应用程序执行特定的 Apache Kafka 操作。

幸好 franz-go Kafka 客户端库支持 IAM 身份验证。消费者应用程序的以下代码片段将展示它在实践中的工作原理:

func init() {
//......
 cfg, err = config.LoadDefaultConfig(context.Background(), config.WithRegion("us-east-1"), config.WithCredentialsProvider(ec2rolecreds.New()))
 
 creds, err = cfg.Credentials.Retrieve(context.Background())
//....

func initializeKafkaClient() {

 opts := []kgo.Opt{
 kgo.SeedBrokers(strings.Split(mskBroker, ",")...),
 kgo.SASL(sasl_aws.ManagedStreamingIAM(func(ctx context.Context) (sasl_aws.Auth, error) {

 return sasl_aws.Auth{
 AccessKey: creds.AccessKeyID,
 SecretKey: creds.SecretAccessKey,
 SessionToken: creds.SessionToken,
 UserAgent: "msk-ec2-consumer-app",
 }, nil
 })),
//.....
  • 首先,应用程序使用 ec2rolecreds.New() 凭证提供程序从 EC2 实例元数据服务检索临时 IAM 凭证。EC2 实例应具有适当的 IAM 角色(包含权限),以便对 MSK 集群组件执行所需的操作(下文将详细介绍)。
  • 然后,使用这些凭证通过 sasl_aws 包中的 AWS_MSK_IAM SASL 身份验证实现方案初始化 Kafka 客户端。

注意:由于 Kafka 有多个 Go 客户端(包括 Sarama),请务必查阅相应的客户端文档,确认是否支持 IAM 身份验证。

有了这些背景知识后,我们继续设置运行数据摄取管道所需的服务。

基础设施设置

本部分将帮助您设置以下组件:

  • 所需的 IAM 角色
  • MSK Serverless 集群
  • OpenSearch Serverless 集合
  • 运行应用程序的 Amazon Cloud9 EC2 环境

MSK Serverless 集群

您可以参阅此文档,使用亚马逊云科技管理控制台设置 MSK Serverless 集群。完成后,记下以下集群信息:VPC、子网、安全组(Properties(属性)页签)和集群端点(点击 View client information(查看客户端信息))。

应用程序 IAM 角色

本教程需要不同的 IAM 角色。

首先,创建一个 IAM 角色来完成后续步骤,并按照步骤 1:配置权限(参阅 Amazon OpenSearch 文档)执行操作,配置 OpenSearch Serverless 的使用权限。

为客户端应用程序创建另一个 IAM 角色,这类应用程序将与 MSK Serverless 集群进行交互。然后使用 OpenSearch Go 客户端对 OpenSearch Serverless 集合中的数据建立索引。创建如下内联 IAM 策略,确保将示例值替换为您所需的值。

{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "kafka-cluster:*"
 ],
 "Resource": [
 "<ARN of the MSK Serverless cluster>",
 "arn:aws:kafka:us-east-1:<AWS_ACCOUNT_ID>:topic/<MSK_CLUSTER_NAME>/*",
 "arn:aws:kafka:us-east-1:AWS_ACCOUNT_ID:group/<MSK_CLUSTER_NAME>/*"
 ]
 },
 {
 "Effect": "Allow",
 "Action": [
 "aoss:APIAccessAll"
 ],
 "Resource": "*"
 }
 ]
}

使用以下信任策略:

{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Principal": {
 "Service": "ec2.amazonaws.com"
 },
 "Action": "sts:AssumeRole"
 }
 ]
}

最后,您将向相关 IAM 角色附加 OpenSearch Serverless 数据访问策略,下一步将详细介绍。

OpenSearch Serverless 集合

按照此文档创建 OpenSearch Serverless 集合。在执行步骤 2:创建集合中的第 8 点时,请确保配置两个数据策略,即在上一部分的步骤 2 和 3 中创建的 IAM 角色各一个。

请注意,为了完成本教程,我们选择了公共访问类型。对于生产工作负载,建议选择 VPC。

Amazon Cloud9 EC2 环境

按照此文档创建 Amazon Cloud9 EC2 开发环境,确保使用 MSK Serverless 集群所在的同一个 VPC。

完成后,您需要执行以下操作:打开 Cloud9 环境,在 EC2 Instance(EC2 实例)下,点击 Manage EC2 instance(管理 EC2 实例)。在 EC2 实例中,前往 Security(安全),然后记下附加的安全组。

打开与 MSK Serverless 集群关联的安全组,并添加入站规则,允许 Cloud9 EC2 实例与其连接。选择 Cloud9 EC2 实例的安全组作为源,选择 9098 作为端口,并选择 TCP 协议。

现在可以运行应用程序了!

选择 Cloud9 环境,然后选择 Open in Cloud9(在 Cloud9 中打开)以启动 IDE。打开终端窗口,克隆 GitHub 存储库并将目录更改为该文件夹。

git clone https://github.com/build-on-aws/opensearch-using-kafka-golang

cd opensearch-using-kafka-golang

启动生产者应用程序:

cd msk-producer

export MSK_BROKER=<enter MSK Serverless cluster endpoint>
export MSK_TOPIC=movies

go run main.go

您应当会在终端看到以下日志:

MSK_BROKER <MSK Serverless cluster endpoint>
MSK_TOPIC movies
starting producer app
http server ready

若要将数据发送到 MSK Serverless 集群,请使用 bash 脚本,该脚本会调用您刚刚启动的应用程序公开的 HTTP 端点,并使用 curl 以 JSON 格式提交电影数据(来自 movies.txt 文件):

./send-data.sh

在生产者应用程序终端日志中,您应当会看到类似于以下内容的输出:

producing data to topic
payload {"directors": ["Joseph Gordon-Levitt"], "release_date": "2013-01-18T00:00:00Z", "rating": 7.4, "genres": ["Comedy", "Drama"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQxNTc3NDM2MF5BMl5BanBnXkFtZTcwNzQ5NTQ3OQ@@._V1_SX400_.jpg", "plot": "A New Jersey guy dedicated to his family, friends, and church, develops unrealistic expectations from watching porn and works to find happiness and intimacy with his potential true love.", "title": "Don Jon", "rank": 1, "running_time_secs": 5400, "actors": ["Joseph Gordon-Levitt", "Scarlett Johansson", "Julianne Moore"], "year": 2013}
record produced successfully to offset 2 in partition 0 of topic movies

producing data to topic
payload {"directors": ["Ron Howard"], "release_date": "2013-09-02T00:00:00Z", "rating": 8.3, "genres": ["Action", "Biography", "Drama", "Sport"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg", "plot": "A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.", "title": "Rush", "rank": 2, "running_time_secs": 7380, "actors": ["Daniel Br\u00c3\u00bchl", "Chris Hemsworth", "Olivia Wilde"], "year": 2013}
record produced successfully to offset 4 in partition 1 of topic movies

.....

为了完成本教程并使其简单易懂,特意将数据量限制为 1500 条记录,并且脚本在将每条记录发送给生产者后有意休眠 1 秒。您应该能够轻松地跟随学习。

当生产者应用程序忙于向 movies 主题发送数据时,您可以启动消费者应用程序,开始处理来自 MSK Serverless 集群的数据,并在 OpenSearch Serverless 集合中对其建立索引。

cd msk-consumer

export MSK_BROKER=<enter MSK Serverless cluster endpoint>
export MSK_TOPIC=movies
export OPENSEARCH_INDEX_NAME=movies-index
export OPENSEARCH_ENDPOINT_URL=<enter OpenSearch Serverless endpoint>

go run main.go

您应当在终端中看到以下输出,这表明它确实已经开始从 MSK Serverless 集群接收数据,并在 OpenSearch Serverless 集合中对其建立索引。

using default value for AWS_REGION - us-east-1
MSK_BROKER <MSK Serverless cluster endpoint>
MSK_TOPIC movies
OPENSEARCH_INDEX_NAME movies-index
OPENSEARCH_ENDPOINT_URL <OpenSearch Serverless endpoint>
using credentials from: EC2RoleProvider
kafka consumer goroutine started. waiting for records
paritions ASSIGNED for topic movies [0 1 2]

got record from partition 1 key= val={"directors": ["Joseph Gordon-Levitt"], "release_date": "2013-01-18T00:00:00Z", "rating": 7.4, "genres": ["Comedy", "Drama"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQxNTc3NDM2MF5BMl5BanBnXkFtZTcwNzQ5NTQ3OQ@@._V1_SX400_.jpg", "plot": "A New Jersey guy dedicated to his family, friends, and church, develops unrealistic expectations from watching porn and works to find happiness and intimacy with his potential true love.", "title": "Don Jon", "rank": 1, "running_time_secs": 5400, "actors": ["Joseph Gordon-Levitt", "Scarlett Johansson", "Julianne Moore"], "year": 2013}
movie data indexed
committing offsets
got record from partition 2 key= val={"directors": ["Ron Howard"], "release_date": "2013-09-02T00:00:00Z", "rating": 8.3, "genres": ["Action", "Biography", "Drama", "Sport"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg", "plot": "A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.", "title": "Rush", "rank": 2, "running_time_secs": 7380, "actors": ["Daniel Br\u00c3\u00bchl", "Chris Hemsworth", "Olivia Wilde"], "year": 2013}
movie data indexed
committing offsets

.....

该进程完成后,您应该在 OpenSearch Serverless 集合中对 1500 部电影建立了索引。不过,你不必等待该进程完成。有了几百条记录后,您就可以继续前往 OpenSearch 控制面板中的开发工具,运行以下查询。

在 OpenSearch 中查询电影数据

运行简单的查询

我们先运行一个简单的查询,列出索引中的所有文档(不使用任何参数或筛选条件)。

GET movies-index/_search

仅获取特定字段的数据

默认情况下,搜索请求会检索对文档建立索引时提供的整个 JSON 对象。可以使用 _source 选项从选定字段中检索源。例如,若要仅检索 title、plot 和 genres 字段,请运行以下查询:

GET movies-index/_search
{
 "_source": {
 "includes": [
 "title",
 "plot",
 "genres"
 ]
 }
}

获取与搜索词精确匹配的数据

您可以使用术语查询来实现此目的。例如,若要搜索 title 字段中包含 christmas 一词的电影,请运行以下查询:

GET movies-index/_search
{
 "query": {
 "term": { 
 "title": {
 "value": "christmas"
 }
 }
 }
}

**结合使用字段选择和术语查询

您可以使用以下查询仅检索某些字段并查找特定词:

GET movies-index/_search
{
 "_source": {
 "includes": [
 "title",
 "actors"
 ]
 },
 "query": {
 "query_string": {
 "default_field": "title",
 "query": "harry"
 }
 }
}

聚合

您可以使用聚合根据特定字段中的值分组来计算汇总值。例如,您可以汇总 ratings、genre 和 year 等字段,以根据这些字段的值搜索结果。通过聚合,我们可以回答这样的问题:“每种类型的电影有多少部?”

GET movies-index/_search
{
 "size":0,
 "aggs": {
 "genres": {
 "terms":{"field": "genres.keyword"}
 }
 }
}

清理资源

完成演示后,请确保删除所有服务,避免产生任何额外费用。您可以按照相应文档中的步骤删除服务。

总结

回顾一下,您部署了一个管道,使用 Kafka 将数据摄取到 OpenSearch Serverless,然后以不同的方式查询了这些数据。在此过程中,您还了解了运行生产工作负载时应注意的架构事项和可使用的计算选项,以及如何使用基于 Go 的 Kafka 应用程序进行 MSK IAM 身份验证。此外,我建议阅读通过 Go 为 Amazon OpenSearch 构建 CRUD 应用程序这篇文章,特别是,若您正在寻找有关如何通过 Go SDK 执行 OpenSearch 操作的教程时,这篇文章会对您有所帮助。