

在本文中,我们将介绍如何使用 Amazon Kinesis Data Streams 缓冲和聚合实时流数据,并通过 Amazon OpenSearch Ingestion 将数据发送到 Amazon OpenSearch Service 域或 Amazon OpenSearch Serverless 集合。这种方法适用于各种使用场景,包括从实时日志分析到集成应用消息数据用于实时搜索。在本文中,我们将以某企业根据存档和保留日志数据的合规需求,而进行集中日志聚合的使用场景作为示例。
Kinesis Data Streams 是一种全托管的无服务器数据流式处理服务,可以实时摄取和存储各种规模的流数据。在日志分析使用场景中,Kinesis Data Streams 通过解耦生产者和消费者应用,并提供一个弹性、可扩展的缓冲区来捕获和处理日志数据,增强了日志聚合功能。与传统架构相比,这种解耦架构更具优势。Kinesis Data Streams 可以根据日志生产者规模的扩缩,动态调配资源,持久缓冲日志数据。使用 Kinesis Data Streams,可以避免因负载波动对 Amazon OpenSearch Service 域造成的压力,同时提供一个具备高度弹性的日志数据存储解决方案,极大地便利了日志数据的消费。此外,Kinesis Data Streams 提供的实时数据持久存储功能有助于消费者应用消费数据,且支持使用多个消费者实时处理日志数据。这样,日志分析管道就能符合良好架构最佳实践 (Well-Architected) 的弹性策略要求 (REL04-BP02) 和成本优化策略要求 (COST09-BP02)。
OpenSearch Ingestion 是一个无服务器管道服务,提供多种强大的工具,将数据提取、转换和加载到 OpenSearch Service 域。OpenSearch Ingestion 与多个 AWS 服务集成,并提供多种模板蓝图帮助加速管道部署。你可以根据你的数据分析使用场景,选择模板蓝图部署管道,将数据摄取到你的 OpenSearch Service 域。OpenSearch Ingestion 与 Kinesis Data Streams 结合使用时,能够高效执行复杂的实时数据分析任务,从而显著减轻构建实时搜索与分析架构的工作负担。
解决方案概述
在此解决方案中,我们以集中日志聚合这种常见的使用场景为例。出于多种原因,企业可能会考虑采用集中式日志聚合方法。许多企业都有合规和治理要求,这些要求规定了需要记录哪些数据,以及日志数据必须保留多长时间以供调查查询。也有一些企业需要整合应用与安全运营,为其团队提供通用的可观察性工具集和功能。
为了满足此类要求,需要以可扩展、弹性且经济高效的方式从日志源(生产者)收集数据。因应用使用场景和基础设施配置不同,日志源可能不同,如下表所示。
日志生产者 | 示例 | 生产者日志配置示例 |
应用日志 | Amazon Lambda | Amazon CloudWatch Logs |
应用代理程序 | FluentBit | Amazon OpenSearch Ingestion |
AWS 服务日志 | Amazon Web Application Firewall | Amazon S3 |
下图展示了一个示例架构。

你可以将 Kinesis Data Streams 用于各种此类使用场景。可以配置 Amazon CloudWatch 根据订阅过滤器策略将日志数据发送到 Kinesis Data Streams。具体信息,请参阅通过订阅实时处理日志数据。如果在日志分析场景中使用 Kinesis Data Streams 来发送数据,则可以通过 OpenSearch Ingestion 创建可扩展、可伸缩的管道,用于消费流数据和将数据写入 OpenSearch Service 索引。Kinesis Data Streams 提供缓冲区,支持多个消费者、可配置数据保留时间,并支持与各种 AWS 服务内置集成。在其他使用场景下,如数据存储在 Amazon Simple Storage Service (Amazon S3) 或由代理 (Agent) 将数据写入到 FluentBit 等场景,由于 OpenSearch Ingestion 具有内置持久缓冲区和自动扩缩能力,代理可将数据直接写入 OpenSearch Ingestion,无需额外的中间缓冲区。
标准化日志记录可减少企业的开发和运营开销。例如,在支持 CloudWatch 日志的情况下,可以将所有应用日志标准化为 CloudWatch 日志;在不支持 CloudWatch 日志的情况下,借助 Amazon S3 处理日志。这样减少了集中日志团队需要处理的日志聚合用例数量,降低了日志聚合解决方案的复杂性。如果开发团队技术更成熟,当日志数据不需要存储在 CloudWatch 中时,可以使用 FluentBit 代理将数据直接写入 OpenSearch Ingestion,实现日志标准化,以降低成本。
该解决方案主要使用 CloudWatch 日志作为日志聚合的数据源。有关结合使用 Amazon S3 处理日志的解决方案,请参阅将 OpenSearch Ingestion 管道与 Amazon S3 结合使用。有关基于代理处理日志的解决方案,请参阅与 OpenSearch Ingestion 集成的代理服务文档,例如将 OpenSearch Ingestion 管道与 Fluent Bit 结合使用。
前提条件
本解决方案中,通过 OpenSearch Ingestion 将数据摄取到 OpenSearch Service 需要以下关键基础设施:
- 一个 Kinesis 数据流,用于聚合来自 CloudWatch 的日志数据。
- 一个 OpenSearch 域,用于存储日志数据。
在创建 Kinesis 数据流时,我们建议先使用按需模式。在这种模式下,Kinesis Data Streams 可以根据日志吞吐量自动扩缩所需的分片数量。在确定日志聚合工作负载量稳定后,建议根据按需模式下确定的分片数量,转换为预置模式。这样有助于优化高吞吐量使用场景下的长期开销成本。
一般来说,我们建议使用一个 Kinesis 数据流来处理日志聚合工作负载。每个 OpenSearch Ingestion 管道最多支持 96 个 OpenSearch 计算单元 (OCU),每个管道定义文件最多支持 24,000 个字符。更多详细信息,请参阅 OpenSearch Ingestion 配额。因为每个 OCU 处理一个分片,这意味着每个管道支持的Kinesis 数据流最多可以有 96 个分片。使用一个 Kinesis 数据流简化了将日志数据聚合到 OpenSearch Service 的整个过程,并简化了为日志组创建和管理订阅过滤器的过程。
根据日志工作负载的规模以及 OpenSearch Ingestion 管道逻辑的复杂性,可以考虑增加 Kinesis 数据流。例如,针对生产环境中的各类主要日志,可以考虑为每种类型日志分配一个独立的数据流。将不同类型的日志数据分成不同的流有助于降低管理 OpenSearch Ingestion 管道的操作复杂性,还可以在需要时单独扩展和更改单个类型日志的资源部署配置。
有关如何创建 Kinesis 数据流的详细信息,请参阅创建数据流。
有关如何创建 OpenSearch 域的详细信息,请参阅创建和管理 Amazon OpenSearch 域。
配置日志订阅筛选器
可以配置账户级或日志组级的 CloudWatch 日志组订阅筛选器。我们建议创建基于随机分布的订阅筛选器,以确保日志数据在 Kinesis 数据流分片中均匀分布。
账户级订阅筛选器适用于账户中的所有日志组,可用于将所有日志数据发送到单个目标。如果你想通过 Kinesis Data Streams 将所有日志数据发送到 OpenSearch Service 中存储,设置账户级订阅筛选无疑是一个好方法。每个账户最多只能对应一个账户级订阅筛选器。使用 Kinesis Data Streams 作为日志发送目标时,还可以在业务需要情况下使用多个日志消费者来处理账户下的日志数据。有关如何创建账户级订阅筛选器的具体信息,请参阅账户级订阅筛选条件。
日志组级订阅筛选器应用于每个日志组。如果想使用 Kinesis Data Streams 将日志数据的子集存储发送到 OpenSearch Service 中存储,并想使用多个不同的数据流来存储和处理多种日志类型,可以使用日志组级订阅筛选器。每个日志组最多只能对应两个日志组级订阅筛选器。有关如何创建日志组级订阅筛选器的详细信息,请参阅日志组级别订阅筛选条件。
创建订阅筛选器后,测试日志数据是否发送到 Kinesis 数据流。在 Kinesis Data Streams 控制台上,单击目标数据流名称。

选择一个分片,Starting position(起始位置)设置为 Trim horizon(从首条记录开始),然后单击 Get records(获取记录)。

然后,你应该能看到 Partition key(分区键)列的唯一分区标识和 Data(数据)列的二进制日志记录。这是因为 CloudWatch 发送的是 .gzip 格式的压缩日志数据。
配置 OpenSearch Ingestion 管道
现在,我们已经有了 Kinesis 数据流和 CloudWatch 订阅筛选器,可将数据发送到数据流,接下来要配置 OpenSearch Ingestion 管道来处理日志数据。首先,创建一个 AWS Identity and Access Management (IAM) 角色。该角色需要具有 Kinesis 数据流的读权限和 OpenSearch 域的读/写权限。用于创建管道的管理员角色需要拥有 iam:PassRole 权限。
- 创建一个具有以下权限的 IAM 角色,用于从 Kinesis 数据流中读取数据和访问 OpenSearch 域:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "allowReadFromStream",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:DescribeStreamConsumer",
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:ListStreams",
"kinesis:ListStreamConsumers",
"kinesis:RegisterStreamConsumer",
"kinesis:SubscribeToShard"
],
"Resource": [
"arn:aws:kinesis:{{region}}:{{account-id}}:stream/{{stream-name}}"
]
},
{
"Sid": "allowAccessToOS",
"Effect": "Allow",
"Action": [
"es:DescribeDomain",
"es:ESHttp*"
],
"Resource": [
"arn:aws:es:{region}:{account-id}:domain/{domain-name}",
"arn:aws:es:{region}:{account-id}:domain/{domain-name}/*"
]
}
]
}
2. 为角色设置一个信任策略,允许从 osis-pipelines.amazonaws.com 访问:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": [
"osis-pipelines.amazonaws.com"
]
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"aws:SourceAccount": "{account-id}"
},
"ArnLike": {
"aws:SourceArn": "arn:aws:osis:{region}:{account-id}:pipeline/*"
}
}
}
]
}
通过管道写入数据的目标域必须配置一个允许管道角色访问这个域的域级访问策略。如果你的域使用了精细访问控制,则需要将 IAM 角色映射到 OpenSearch Service 安全插件的后端角色,从而允许创建和写入索引。
3. 创建管道角色后,在 OpenSearch Service 控制台导航栏中的 Ingestion(摄取)下,选择 Pipelines(管道)。
4. 选择 Create pipeline(创建管道)。

5. 在蓝图搜索页面搜索 Kinesis,选择 Kinesis Data Streams 蓝图,然后单击 Select blueprint(选择蓝图)。
.587782c8a71f3f599dc1f49ecaf7d5db473c4f27.png)
6. 在 Pipeline settings(管道设置)下,输入管道名称,然后将管道的 Max capacity(最大容量)值设置为与 Kinesis 数据流中的分片数量相等的值。
如果数据流是按需模式,请将容量值设置为与数据流中当前分片数量相等的值。此示例使用场景不需要持久缓冲区,因为 Kinesis Data Streams 提供了日志数据缓存能力,OpenSearch Ingestion 会根据时间定位日志数据在 Kinesis 数据流中的位置,防止重启后发生数据丢失。

7. 在 Pipeline configuration(管道配置)下,更改管道源设置,设置为你的 Kinesis 数据流名称和管道 IAM 角色的 Amazon 资源名称 (ARN)。
有关具体配置信息,请参阅相关文档。大多数配置都可以使用默认值。默认情况下,管道每 1 秒可以批量写入 100 个文档,并使用增强型扇出功能从 Kinesis 数据流中的最新位置获取数据,每 2 分钟检查一次数据流中的数据位置。你可以根据需要调整这些设置,包括调整消费者检查点的检查频率、检查点在数据流中的开始位置,并使用轮询方式来降低增强扇出的开销。
source:
kinesis-data-streams:
acknowledgments: true
codec:
# JSON codec supports parsing nested CloudWatch events into
# individual log entries that will be written as documents to
# OpenSearch
json:
key_name: "logEvents"
# These keys contain the metadata sent by CloudWatch Subscription Filters
# in addition to the individual log events:
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
include_keys: ['owner', 'logGroup', 'logStream' ]
streams:
# Update to use your Kinesis Stream name used in your Subscription Filters:
- stream_name: "KINESIS_STREAM_NAME"
# Can customize initial position if you don't want OSI to consume the entire stream:
initial_position: "EARLIEST"
# Compression will always be gzip for CloudWatch, but will vary for other sources:
compression: "gzip"
aws:
# Provide the Role ARN with access to KDS. This role should have a trust relationship with osis-pipelines.amazonaws.com
# This must be the same role used below in the Sink configuration.
sts_role_arn: "PIPELINE_ROLE_ARN"
# Provide the region of the Data Stream.
region: "REGION"
8. 更改管道接收器 (Sink) 设置,设置为你的 OpenSearch 域端点 URL 和管道 IAM 角色的 ARN。
OpenSearch Service 接收器定义和 Kinesis Data Streams 源定义中的 IAM 角色 ARN 必须相同。你可以通过接收器中的索引定义来控制不同索引分别指向哪些数据。例如,你可以使用 Kinesis 数据流名称的元数据作为索引,实现按数据流进行索引 (${getMetadata("kinesis_stream_name"));也可以使用文档字段根据 CloudWatch 日志组或其他文档数据来索引数据 (${path/to/field/in/document})。在本例中,我们使用三个文档级字段 (data_stream.type、data_stream.dataset 和 data_stream.namespace) 来索引文档,并在下一章节介绍的管道处理器逻辑中创建这些字段:
sink:
- opensearch:
# Provide an AWS OpenSearch Service domain endpoint
hosts: [ "OPENSEARCH_ENDPOINT" ]
# Route log data to different target indexes depending on the log context:
index: "ss4o_${data_stream/type}-${data_stream/dataset}-${data_stream/namespace}"
aws:
# Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
# This role must be the same as the role used above for Kinesis.
sts_role_arn: "PIPELINE_ROLE_ARN"
# Provide the region of the domain.
region: "REGION"
# Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
serverless: false
最后,更新管道配置,设置处理器定义。设置为在将文档写入 OpenSearch 域之前转换日志数据。例如,本示例中采用 Simple Schema for Observability (SS4O),并使用 OpenSearch Ingestion 管道为 SS4O 创建所需的 Schema。这包括添加通用字段,将元数据与索引文档相关联,以及解析日志数据,从而增强数据的可搜索性。本示例中还使用日志组名称将不同的日志类型标识为不同的数据集,并根据日志类型将文档写入不同的索引。
9. 重命名 CloudWatch 事件时间戳。将 rename_keys 处理器生成日志时的时间戳重命名为 observed_timestamp,并添加当前时间戳作为 OpenSearch Ingestion 使用日期处理器处理日志记录时的时间戳 (processed_timestamp):
# Processor logic is used to change how log data is parsed for OpenSearch.
processor:
- rename_keys:
entries:
# Include CloudWatch timestamp as the observation timestamp - the time the log
# was generated and sent to CloudWatch:
- from_key: "timestamp"
to_key: "observed_timestamp"
- date:
# Include the current timestamp that OSI processed the log event:
from_time_received: true
destination: "processed_timestamp"
10. 使用 add_entries 处理器添加已处理文档的元数据,包括日志组、日志流、账户 ID、AWS 区域、Kinesis 数据流信息和数据集元数据:
- add_entries:
entries:
# Support SS4O common log fields (https://opensearch.org/docs/latest/observing-your-data/ss4o/)
- key: "cloud/provider"
value: "aws"
- key: "cloud/account/id"
format: "${owner}"
- key: "cloud/region"
value: "us-west-2"
- key: "aws/cloudwatch/log_group"
format: "${logGroup}"
- key: "aws/cloudwatch/log_stream"
format: "${logStream}"
# Include default values for the data_stream:
- key: "data_stream/namespace"
value: "default"
- key: "data_stream/type"
value: "logs"
- key: "data_stream/dataset"
value: "general"
# Include metadata about the source Kinesis message that contained this log event:
- key: "aws/kinesis/stream_name"
value_expression: "getMetadata(\"stream_name\")"
- key: "aws/kinesis/partition_key"
value_expression: "getMetadata(\"partition_key\")"
- key: "aws/kinesis/sequence_number"
value_expression: "getMetadata(\"sequence_number\")"
- key: "aws/kinesis/sub_sequence_number"
value_expression: "getMetadata(\"sub_sequence_number\")"
11. 使用条件表达式语法根据日志源更新 data_stream.dataset 字段,指定文档写入哪个索引,并使用 delete_entries 处理器删除已重命名的原始 CloudWatch 文档字段:
- add_entries:
entries:
# Update the data_stream fields based on the log event context - in this case
# classifying the log events by their source (CloudTrail or Lambda).
# Additional logic could be added to classify the logs by business or application context:
- key: "data_stream/dataset"
value: "cloudtrail"
add_when: "contains(/logGroup, \"cloudtrail\") or contains(/logGroup, \"CloudTrail\")"
overwrite_if_key_exists: true
- key: "data_stream/dataset"
value: "lambda"
add_when: "contains(/logGroup, \"/aws/lambda/\")"
overwrite_if_key_exists: true
- key: "data_stream/dataset"
value: "apache"
add_when: "contains(/logGroup, \"/apache/\")"
overwrite_if_key_exists: true
# Remove the default CloudWatch fields, as we re-mapped them to SS4O fields:
- delete_entries:
with_keys:
- "logGroup"
- "logStream"
- "owner"
12. 使用 grok 和 parse_json解析日志消息字段,有助于在 OpenSearch 索引中更高效地搜索结构化数据和 JSON 数据。
Grok 处理器根据模式匹配来解析结构化文本字段中的数据。有关内置 Grok 模式的示例,请参阅 java-grok 模式和 dataprepper grok 模式。
# Use Grok parser to parse non-JSON apache logs
- grok:
grok_when: "/data_stream/dataset == \"apache\""
match:
message: ['%{COMMONAPACHELOG_DATATYPED}']
target_key: "http"
# Attempt to parse the log data as JSON to support field-level searches in the OpenSearch index:
- parse_json:
# Parse root message object into aws.cloudtrail to match SS4O standard for SS4O logs
source: "message"
destination: "aws/cloudtrail"
parse_when: "/data_stream/dataset == \"cloudtrail\""
tags_on_failure: ["json_parse_fail"]
- parse_json:
# Parse root message object as JSON when possible for Lambda function logs - can also set up Grok support
# for Lambda function logs to capture non-JSON logging function data as searchable fields
source: "message"
destination: "aws/lambda"
parse_when: "/data_stream/dataset == \"lambda\""
tags_on_failure: ["json_parse_fail"]
- parse_json:
# Parse root message object as JSON when possible for general logs
source: "message"
destination: "body"
parse_when: "/data_stream/dataset == \"general\""
tags_on_failure: ["json_parse_fail"]
全部设置完成后,管道配置应该类似以下代码:
version: "2"
kinesis-pipeline:
source:
kinesis-data-streams:
acknowledgments: true
codec:
# JSON codec supports parsing nested CloudWatch events into
# individual log entries that will be written as documents to
# OpenSearch
json:
key_name: "logEvents"
# These keys contain the metadata sent by CloudWatch Subscription Filters
# in addition to the individual log events:
# https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
include_keys: ['owner', 'logGroup', 'logStream' ]
streams:
# Update to use your Kinesis Stream name used in your Subscription Filters:
- stream_name: "KINESIS_STREAM_NAME"
# Can customize initial position if you don't want OSI to consume the entire stream:
initial_position: "EARLIEST"
# Compression will always be gzip for CloudWatch, but will vary for other sources:
compression: "gzip"
aws:
# Provide the Role ARN with access to KDS. This role should have a trust relationship with osis-pipelines.amazonaws.com
# This must be the same role used below in the Sink configuration.
sts_role_arn: "PIPELINE_ROLE_ARN"
# Provide the region of the Data Stream.
region: "REGION"
# Processor logic is used to change how log data is parsed for OpenSearch.
processor:
- rename_keys:
entries:
# Include CloudWatch timestamp as the observation timestamp - the time the log
# was generated and sent to CloudWatch:
- from_key: "timestamp"
to_key: "observed_timestamp"
- date:
# Include the current timestamp that OSI processed the log event:
from_time_received: true
destination: "processed_timestamp"
- add_entries:
entries:
# Support SS4O common log fields (https://opensearch.org/docs/latest/observing-your-data/ss4o/)
- key: "cloud/provider"
value: "aws"
- key: "cloud/account/id"
format: "${owner}"
- key: "cloud/region"
value: "us-west-2"
- key: "aws/cloudwatch/log_group"
format: "${logGroup}"
- key: "aws/cloudwatch/log_stream"
format: "${logStream}"
# Include default values for the data_stream:
- key: "data_stream/namespace"
value: "default"
- key: "data_stream/type"
value: "logs"
- key: "data_stream/dataset"
value: "general"
# Include metadata about the source Kinesis message that contained this log event:
- key: "aws/kinesis/stream_name"
value_expression: "getMetadata(\"stream_name\")"
- key: "aws/kinesis/partition_key"
value_expression: "getMetadata(\"partition_key\")"
- key: "aws/kinesis/sequence_number"
value_expression: "getMetadata(\"sequence_number\")"
- key: "aws/kinesis/sub_sequence_number"
value_expression: "getMetadata(\"sub_sequence_number\")"
- add_entries:
entries:
# Update the data_stream fields based on the log event context - in this case
# classifying the log events by their source (CloudTrail or Lambda).
# Additional logic could be added to classify the logs by business or application context:
- key: "data_stream/dataset"
value: "cloudtrail"
add_when: "contains(/logGroup, \"cloudtrail\") or contains(/logGroup, \"CloudTrail\")"
overwrite_if_key_exists: true
- key: "data_stream/dataset"
value: "lambda"
add_when: "contains(/logGroup, \"/aws/lambda/\")"
overwrite_if_key_exists: true
- key: "data_stream/dataset"
value: "apache"
add_when: "contains(/logGroup, \"/apache/\")"
overwrite_if_key_exists: true
# Remove the default CloudWatch fields, as we re-mapped them to SS4O fields:
- delete_entries:
with_keys:
- "logGroup"
- "logStream"
- "owner"
# Use Grok parser to parse non-JSON apache logs
- grok:
grok_when: "/data_stream/dataset == \"apache\""
match:
message: ['%{COMMONAPACHELOG_DATATYPED}']
target_key: "http"
# Attempt to parse the log data as JSON to support field-level searches in the OpenSearch index:
- parse_json:
# Parse root message object into aws.cloudtrail to match SS4O standard for SS4O logs
source: "message"
destination: "aws/cloudtrail"
parse_when: "/data_stream/dataset == \"cloudtrail\""
tags_on_failure: ["json_parse_fail"]
- parse_json:
# Parse root message object as JSON when possible for Lambda function logs - can also set up Grok support
# for Lambda function logs to capture non-JSON logging function data as searchable fields
source: "message"
destination: "aws/lambda"
parse_when: "/data_stream/dataset == \"lambda\""
tags_on_failure: ["json_parse_fail"]
- parse_json:
# Parse root message object as JSON when possible for general logs
source: "message"
destination: "body"
parse_when: "/data_stream/dataset == \"general\""
tags_on_failure: ["json_parse_fail"]
sink:
- opensearch:
# Provide an AWS OpenSearch Service domain endpoint
hosts: [ "OPENSEARCH_ENDPOINT" ]
# Route log data to different target indexes depending on the log context:
index: "ss4o_${data_stream/type}-${data_stream/dataset}-${data_stream/namespace}"
aws:
# Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
# This role must be the same as the role used above for Kinesis.
sts_role_arn: "PIPELINE_ROLE_ARN"
# Provide the region of the domain.
region: "REGION"
# Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
serverless: false
13. 配置完成后,选择 Validate pipeline(验证管道),检查管道定义语法是否有误。

14. 在 Pipeline role(管道角色)下,可以设置一个后缀来创建一个唯一服务角色,用于启动管道运行。

15. 在 Network(网络)下,选择 VPC access(VPC 访问)。
你无需为 Kinesis Data Streams 源选择 VPC、子网或安全组。OpenSearch Ingestion 只要求位于 VPC 内的 HTTP 数据源需要设置这些属性。对于 Kinesis Data Streams,OpenSearch Ingestion 使用 AWS PrivateLink 从 Kinesis Data Streams 读取数据,然后写入 OpenSearch 域或 OpenSearch Serverless 集合。
.a07a1ad0223faf5f7cefae625b6e9ad6c07c0949.png)
16. 你可以为管道启用 CloudWatch 日志记录功能。
17. 选择 Next (下一步),然后预览信息并创建管道。
如果你在运行 OpenSearch Ingestion 的账户中设置了账户级 CloudWatch 日志订阅筛选器,则应在该账户级订阅筛选器中排除该日志组。这是因为在订阅筛选器的作用下, OpenSearch Ingestion 管道日志可能会导致订阅递归循环,从而产生大量日志数据摄取和开销。
.a44a7433df8229f257ebb1499af87f5d3d30de8b.jpg)
18. 在 Review and create(查看和创建)区域,选择 Create pipeline(创建管道)。
.7b8d52630565f97bbb1c3c17bc500a75f9bf742d.jpg)
当管道进入 Active(活动)状态后,你可以看到日志开始填充到 OpenSearch 域或 OpenSearch Serverless 集合。

监控
为了维护日志摄取管道的运行状况,需要监控几个关键项:
- Kinesis Data Streams 监控项:应监控以下指标:
- FailedRecords:表示根据 CloudWatch 订阅筛选条件写入数据到 Kinesis 数据流时出现问题。如果此指标长期保持非零值,请联系 AWS 客户支持。
- ThrottledRecords:表示 Kinesis 数据流需要更多分片来容纳来自 CloudWatch 的日志数据。
- ReadProvisionedThroughputExceeded:表示 Kinesis 数据流的消费读取吞吐量超过了分片限制的容量,可能需要将消费者策略改为增强型扇出策略。
- WriteProvisionedThroughputExceeded:表示 Kinesis 数据流需要更多分片来容纳 CloudWatch 的日志数据,或者日志数据在分片上分布不均。确保将订阅筛选器的分发策略设置为 random,并且可以考虑在数据流上启用增强型分片级监控,以识别热分片。
- RateExceeded:表示消费者配置中的流设置不正确,并且 OpenSearch Ingestion 管道可能存在问题,导致数据发送过于频繁。检查 Kinesis 数据流的消费者策略。
- MillisBehindLatest:表示增强型扇出消费者无法满足数据流中的负载处理需求。检查 OpenSearch Ingestion 管道的 OCU 配置,确保有足够的 OCU 来处理 Kinesis 数据流分片。
- IteratorAgeMilliseconds:表示轮询消费者无法满足数据流中的负载处理需求。检查 OpenSearch Ingestion 管道的 OCU 配置,确保有足够的 OCU 来处理 Kinesis 数据流分片,并检查消费者的轮询策略。
- CloudWatch 订阅筛选器监控项:应监控以下指标:
- DeliveryErrors:表示向 Kinesis 数据流传输数据时 CloudWatch 订阅筛选器发生了错误。检查数据流的监控指标。
- DeliveryThrottling:表示 Kinesis 数据流的容量不足。检查数据流的监控指标。
- OpenSearch Ingestion 监控项:有关 OpenSearch Ingestion 的建议监控项,请参阅推荐 CloudWatch 告警项。
- OpenSearch Service 监控项:有关 OpenSearch Service 的建议监控项,请参阅Amazon OpenSearch Service 的推荐 CloudWatch 告警项。
清理资源
实验完成后,请务必及时清理实验过程中创建的且不再需要的 AWS 资源,以免产生额外费用。按照以下步骤清理你的 AWS 账户下的资源:
- 删除 Kinesis 数据流。
- 删除 OpenSearch Service 域。
- 使用 DeleteAccountPolicy API 删除账户级 CloudWatch 订阅筛选器。
- 删除日志组级别 CloudWatch 订阅筛选器:
- 在 CloudWatch 控制台上,选择目标日志组。
- 在 Actions(操作)菜单上,选择 Subscription Filters(订阅筛选器)和 Delete all subscription filter(s)(删除所有订阅筛选器)。
- 删除 OpenSearch Ingestion 管道。
总结
在本文中,我们介绍了如何创建无服务器数据摄取管道,使用 OpenSearch Ingestion 将 CloudWatch 日志实时传输到 OpenSearch 域或 OpenSearch Serverless 集合。这个方法适用于各种实时数据摄取场景,可以在使用 Kinesis Data Streams 进行实时数据分析的现有工作负载处理解决方案中使用这种方法。
对于 OpenSearch Ingestion 和 Kinesis Data Streams 的其他使用场景,可以实现以下能力:
- 使用 OpenSearch Service 的异常检测功能对日志、事件或其他数据执行实时异常检测。
- 使用 OpenSearch Service 的追踪分析功能分析来自分布式应用的追踪数据。
- 使用 OpenSearch Service 的混合搜索功能,使用 OpenSearch Service 的本地机器学习和向量数据库功能执行自然语言和神经搜索查询。
要进一步完善 OpenSearch 中的日志分析用例,请考虑使用 Integrations in OpenSearch Dashboards(OpenSearch 控制面板集成)中提供的预置的控制面板。
更多教程
快速搭建容量高达 35GB 的免费个人网盘
本教程将介绍如何搭建一个没有使用限制的免费私人网盘。
构建企业专属智能客服机器人
本文将演示如何结合多种服务,打造企业专属的智能客服。
使用生成式 AI 构建多语言问答知识库
使用多种服务,构建可汇总搜索结果的多语言知识库。
免费套餐
AWS 海外区域
拓展海外业务或个人体验
免费使用 100 余种云产品或服务, 长达 12 个月
AWS 中国区域
发展中国业务
免费使用 40 余种核心云服务产品,长达 12 个月