亚马逊AWS官方博客

将一个应用程序的日志拆分为多个流:Fluent 教程

并非所有日志都同等重要。一些日志需要实时分析,而一些日志只需长期存储,以便在需要时进行分析。在此教程中,我将显示三种不同的方法,您可以用它们将一个应用程序的日志流“分叉”到不同的流中,以便可以单独进行解析、筛选和发送。在此过程中,我们将了解几个开源项目:FluentdFluent BitFluent Logger Golang,以及我为此使用案例创建的新项目

现代应用程序会产生包含多种信息类型的日志:调试信息、错误消息、堆栈跟踪等等。通常情况下,将应用程序配置为将日志写入一个流中,然后发送到一个目的地进行存储和分析。这是一种次优方法,因为每种日志类型都可能有不同的格式;更为重要的是,一个分析平台不可能对所有类型都是最优的。工程师想要强大的搜索功能和实时警报来处理包含错误或堆栈跟踪的日志。这些功能会产生费用,对于其他日志中的调试信息而言,一组更简单的功能可能已足够。理想情况下,用户可以基于所需要的分析类型将每种日志类型发送到不同的目的地。

此外,即使将所有日志发送到一个目的地,每种日志类型仍可能从个体解析、筛选和处理中获益。本教程将探究可实现这些结果的多种方法。

在此博文中,我们将讨论三种不同的方法,您可以用它们将一个应用程序的日志输出拆分为多个流,以便可以进行单独处理:

  1. Fluent Bit 流处理
  2. Fluentd 的重新写入标签筛选器
  3. Fluent Logger 库

每种方法都只能在满足特定要求时适用;我将清楚地列出这些方法,以便您可以确定该方法是否适用于您的使用案例。

本教程不涵盖将日志提取到 Fluentd 和 Fluent Bit 中的内容;它与您的部署方法无关。要了解结合使用 Fluentd 和 Fluent Bit 与 AWS 的基本内容,我建议阅读以下博文:

一个简单的使用案例:将错误、信息和调试日志发送至单独的 CloudWatch 日志流

为了使每个示例具体化,我们将解决一个非常简单的使用案例。我们想要将所有日志都发送至一个 CloudWatch 日志组,但我们想基于它们的日志级别将这些日志分到不同的日志流中。这样便能单独搜索它们。

此示例基于我从客户那里收到的一个真实使用案例。利用这些技术,您可以实现很多功能——拆分来源处的日志流是非常强大的功能。例如,您可以将所有日志发送到一个系统中进行长期存储,但可以拆分并复制错误日志,然后将它们单独发送到警报系统。或者,您可能希望所有日志都进入一个目的地,但您想要筛选出与特定表达式匹配的调试消息。本教程是解决很多高级使用案例的起点。

我们会将我创建的简单 Golang 程序用作我们的应用程序。它使用常用的 logrus logger,并将日志消息输出为 JSON,类似于以下内容:

{"level":"info","msg":"Got a request","path":"/","requestID":"45234523","time":"2019-11-08T20:36:11Z"}
{"level":"warning","msg":"Access denied","path":"/tardis","requestID":"546745643","time":"2019-11-08T20:36:11Z","user":"TheMaster"}
{"level":"debug","msg":"Admin access","path":"/tardis","requestID":"546745643","time":"2019-11-08T20:36:11Z","user":"TheDoctor"}

正如您所看到的,这些日志以 JSON 格式在清栏中记录其日志级别。这样一来,便可以按其日志级别轻松拆分日志。此日志记录方法称为结构化日志记录;按设计,日志消息可由机器读取,因此,可以轻松查询并处理它们。

最后,我们对指定给这些日志的标签进行一个假设:Fluentd 和 Fluent Bit 根据其标签将规则应用于日志。在每个示例中,我们都假设应用程序日志的标签带有前缀 "app"。如果您使用适用于 Amazon ECS 的 FireLens,将会在您将容器命名为 "app" 时发生此情况。

Fluent Bit 流处理

要求:

  • 在您的日志管道中使用 Fluent Bit。
  • 日志格式为 JSON(或者您可以在 Fluent Bit 中解析为 JSON 的一些格式),包含您可以轻松查询的字段。

Fluent Bit 最酷的一项功能是,您可以在 Fluent Bit 处理日志时对日志运行 SQL 查询。使用流处理,您可以完成很多工作,包括解决我们的示例使用案例。

Fluent Bit 内部日志处理管道。首先是日志输入,其次是解析器阶段、筛选器阶段和缓冲区阶段。最后,日志将被路由至日志输出。流处理器可以在日志到达输出前对它们进行拆分,然后将查询结果发送回输入阶段。

Fluent Bit 内部日志处理管道

上图显示了 Fluent Bit 内部日志处理架构。如您所见,可以在日志到达流处理器之前提取、解析和筛选日志。然后,可以在日志上运行流查询,并且可以将查询结果再提取到日志管道中。

在对日志进行流查询之前,我们需要将日志解析为 JSON 格式,以便我们可以访问日志级别字段。如果您使用 Amazon EKS 或 Amazon ECS 部署您的应用程序,提取到 Fluent Bit 中的日志最初看起来类似以下内容:

{
    "log": "{\"level\":\"info\",\"msg\":\"Got a request\",\"path\":\"/\",\"requestID\":\"45234523\",\"time\":\"2019-11-08T20:36:11Z\"}"
}

应用程序发出的 JSON 日志消息被转义。要在日志级别上分割流,我们需要解析此转义 JSON。将以下配置代码段添加到您的 fluent-bit.conf 配置文件中。

[SERVICE]
    Parsers_File /parser.conf

[FILTER]
    Name parser
    Match *
    Key_Name log
    Parser json
    Reserve_Data True

amazon/aws-for-fluent-bit 映像和 fluent/fluent-bit 映像包括带有 JSON 解析器和内置 parsers.conf。然而,我发现,我的日志所用的时间格式不兼容解析器。因此,我自己编写了。您可以在此 GitHub 存储库中查看为此示例构建自定义 Fluent Bit 映像所需的所有文件。

解析后,Fluent Bit 的内部日志管道将被格式化为一个非常好的 JSON 对象(在技术上,Fluent Bit 的内部使用 msgpack,类似于 JSON 的数据序列化格式):

{
  "level": "info",
  "msg": "Got a request",
  "path": "/",
  "requestID": "45234523",
  "time": "2019-11-08T20:36:11Z"
}

现在,我们可以在日志上运行流查询。流查询存储在单独的配置文件中,我们将它称为 stream_processing.conf

[STREAM_TASK]
    Name   debug_logs
    Exec   CREATE STREAM debug WITH (tag='logs.debug') AS SELECT * from TAG:'app*' WHERE level = 'debug';

该查询将创建仅包含调试日志的新日志流。为了了解它的工作原理,我们来深入了解一下查询的每个部分。

CREATE STREAM debug WITH (tag='logs.debug')

我们将使用标签 logs.debug 创建一个新的日志“流”。此日志流将在输入阶段进入 Fluent Bit 内部日志管道,可以进行第二次解析和筛选。然后,我们可以创建匹配新 logs.debug 标签的 Fluent Bit 输出定义,并将这些日志发送至目的地。

AS SELECT * from TAG:'app*'

通过选择标签以 app 开头的日志来创建新流。将 app* 替换为匹配您的应用程序日志的模式。

WHERE level = 'debug';

新日志流包含具有名为 level 的字段且字段值为 debug 的源流中的日志。

要为每个日志级别创建新流,我们需要多个流查询。完整的流配置文件可参见 Github。流文件必须引用到主配置文件中:

[SERVICE]
    Streams_File stream_processing.conf

回想一下,我们的最终目标是将日志发送到一个 CloudWatch 日志组中,为每个日志级别设置单独的 CloudWatch 日志流。使用适用于 CloudWatch 的 Fluent Bit 插件很容易实现这一点;日志流名称可以是前缀加日志标签。

[OUTPUT]
    Name cloudwatch
    Match   logs.*
    region us-east-1
    log_group_name streams-example
    log_stream_prefix log-level-
    auto_create_group true

使用此输出,我们将日志流命名如下:

  • debug 日志为 log-level-logs.debug
  • info 日志为 log-level-logs.info
  • warn 日志为 log-level-logs.warning
  • error 日志为 log-level-logs.error
  • fatal 日志为 log-level-logs.fatal

请注意,输出与标签模式 logs.* 相匹配。这将匹配我们使用流查询处理的所有日志,但不匹配任何原始日志(它们拥有前缀为 app 的标签)。这是有意设置的。流查询不会拆分日志流,它们会对其进行复制。每个流查询都会创建日志子集的副本。如果我们使输出与任何标签 (*) 匹配,则每个日志消息的两个副本都会在我们的目的地中:一个来自具有 app* 标签的原始流,一个来自具有 logs.* 标签的新流。我们可以在 GitHub 中看到此示例的完整的 Fluent Bit 配置文件

请记住以下几点

  • 流查询复制日志。 此部分所示的小例子有效,这是因为所有的日志都有一个 level 字段,且该字段的值是以下五个字符串之一(debuginfowarningerrorfatal)。这样一来,便可以创建匹配所有日志的流查询。如果情况并非如此,我们会遇到问题:不匹配我们的流查询的日志将留在原始的异构日志流,它们将被删除并且不会被发送到目的地。如果您的设置是这种情况,请阅读 Fluentbit 流处理文档,以了解您是否可以编写识别您所有日志的查询。如果不能,请考虑在 fluent/fluent-bit 存储库中提出(或投票表决)问题,以支持流查询 where 子句中的其他条件。
  • 请勿编写循环流查询。 请勿编写将与其自己的结果匹配的流查询。在我们的示例中,流查询提取了标签匹配 app* 的日志并且生成了标签匹配 logs* 的日志。如果我们的 select 语句匹配所有日志 (SELECT * from TAG:'*'),查询将循环进行,以匹配其自己的结果。这将导致 Fluent Bit 在没有错误消息的情况下冻结,并导致日志处理停止
  • 只有当您的日志可以使用容易查询的字段解析时,此方法才有效。 Fluent Bit 支持可在 WHERE 子句中使用的有限数量的条件。要使用此方法,您可能需要为您的日志编写自定义解析器 ,以将它们转变为可以查询的格式。

Fluentd 重新写入标签筛选器

要求:

Fluentd 的重新写入标签筛选器插件与 Fluent Bit 的流查询有部分重叠的功能。我们可以用它来实现示例使用案例。

<match app**>
  @type rewrite_tag_filter
  <rule>
    key log
    pattern /debug/
    tag logs.debug
  </rule>
</match>

此设置完成了与 Fluent Bit 调试日志流查询相同的目标。请注意,我们不需要先将日志解析为 JSON,因为 Fluentd 筛选器可以使用正则表达式匹配日志。如果我们先将日志解析为 JSON,配置将会如下所示:

<match app**>
  @type rewrite_tag_filter
  <rule>
    key level
    pattern /debug/
    tag logs.debug
  </rule>
</match>

对于此使用案例,Fluentd 的重新写入标签筛选器与 Fluent Bit 的流查询相比,有一个主要优势:它将拆分日志而不是复制日志。上面显示的 Fluentd 配置将从我们的原始流中获取所有调试日志并更改其标签。这种方法很方便,因为这意味着,我们不必担心“剩下的”日志会不匹配任何筛选条件。应用筛选条件后,原始日志流将只包含不匹配的日志。

您可以在此查看 完整的 Fluentd 配置;请注意,您不是要使用筛选条件创建其他部分,而是使用所有日志类型的规则创建一个单独部分。

Fluent Logger 库

要求:

  • 希望对您的应用程序进行代码更改。

通过 Fluent Logger 库,您可以将日志直接编写到 Fluentd 或 Fluent Bit 中。有可供很多常用语言使用的库,包括 GoJavaPythonNodeJS

通过与 AWS 客户的对话,我了解到有些客户为自己的应用程序编写自定义日志记录库。如果您的应用程序也是这种情况,或者如果您从头开始编写新的应用程序,此选项可能非常适合。

您可以在下面看到适用于 Golang 的 Fluent Logger 的注释示例使用。

// 实例化可将日志发送到 Fluentd/Fluent Bit 的一个结构
fluentLogger, err := fluent.New(fluent.Config{})
if err != nil {
    log.Fatal(err)
}

// 您可以根据需要标记每个日志
tag := "app.logs"

// 将任意数据作为映射发送
data := map[string]string{
    "foo": "bar",
}

// 发送到 Fluent 实例
err = fluentLogger.Post(tag, data)

如果您使用 FireLens,ECS 将注入环境变量 FLUENT_HOSTFLUENT_PORT,以便您可以连接到日志路由器进行侦听时所在的 TCP 端口。您可以使用这些环境变量配置日志记录器:

port, err := strconv.Atoi(os.Getenv("FLUENT_PORT"))
if err != nil {
	// 过程错误
}
config := fluent.Config{
	FluentPort: port,
	FluentHost: os.Getenv("FLUENT_HOST"),
}
 
       

此示例的自定义库

为了解决我们的实例案例,我创建了一个通用库,其中打包了适用于 Golang 的 Fluent Logger,该通用库可用作 Logrus logger 的输出流:

// 将 logrus 配置以 JSON 格式输出
logrus.SetFormatter(&logrus.JSONFormatter{})
logrus.SetLevel(logrus.DebugLevel)

// create a FluentWriter instance
fluentLogger, err := logger.NewFluentWriter(fluent.Config{}, "app", []string{"level"})
if err != nil {
    fmt.Println(err)
    os.Exit(1)
}

// 设置 logrus 以使用它
logrus.SetOutput(fluentLogger)

// 现在可以正常使用 logrus!
logrus.WithFields(logrus.Fields{
    "path": "/",
    "requestID": "45234523",
}).Info("Got a request")

它的工作原理是什么? Logrus logger 可以写入任何 io.Writer 中;我的库会暴露将日志写入 Fluentd/Fluent Bit 的 io.Writer

logger.NewFluentWriter(fluent.Config{}, "app", []string{"level"})

请参阅项目 README 了解其功能的完整说明。上面所示为对我的库的构建器调用。第一个参数是 Fluent Logger Golang config 对象,显示在本部分的开头。第二个参数是为此编写器发出的日志指定的标签的前缀。第三个参数是值为标签后缀的日志消息中的键列表。此库依赖于这样一个事实:Logrus 生成的日志将为 JSON 格式。因此,它将在每个日志消息中查找 level 键并将其附加到标签前缀中以构建最终的标签。在实际操作中,将会为日志发出如下标签:

  • debug 日志为 app.debug
  • info 日志为 app.info
  • warn 日志为app.warning
  • error 日志为 app.error
  • fatal 日志为 app.fatal

将为没有 level 字段或无法解析为 JSON 的日志简单地指定标签 “app”。可以使用前面的部分中所示的 Fluentd 或 Fluent Bit 将这些标签发送到一个日志组内的不同 CloudWatch 日志流中。

您可以在项目存储库中查看此示例的完整应用程序代码。

请记住以下几点

  • 我认为此方法是实验性的。 我只用小例子对它进行了测试。如果您选择此方法,我建议将我创建的库用作起点。基于您对自己的日志和使用案例的了解,您可能可以编写一些适合您的情况的内容。虽然此示例实现了与前述示例相同的目的,但 Fluent Logger 库的功能或许更强大。由于您的代码中的日志标记已完成,您拥有完全控制权,且您可以解决无法通过其他方法解决的使用案例。
  • 您还必须为您的应用程序处理标准的输出和错误流如果 Fluent Logger 库无法发送您的日志,您需要回退。我已经编写好库,因此,当它无法发送到 Fluentd/Fluent Bit 时,会将日志打印到标准输出中。

小结

哪种方法最佳?

在此博文中,您了解了三种方法,通过它们,您可以“拆分”一个应用程序的日志:

  1. Fluent Bit 流处理
  2. Fluentd 的重新写入标签筛选器
  3. Fluent Logger 库

您的选择将取决于您自己的使用案例详细信息;每种方法所产生的资源利用率是一个重要的考虑因素。为帮助您做决定,我使用 FireLens 在 Amazon ECS 上运行了三个任务。每个任务都使用了介绍中的示例应用程序代码与每种方法的配置。由于 Fluent Logger 示例处理应用程序代码内的日志,当前两个示例将其卸载到日志路由器时,我测量了总内存和任务的 CPU 使用率(应用程序 + 日志路由器)。在第三个示例中,我将 Fluent Bit 用作日志路由器,因为它的效率通常更高。

三个示例的 CPU 利用率图。重新写入标签筛选器示例使用的 CPU 最多,其次是流处理示例,再然后是 Fluent Logger 库示例。

三个示例的内存利用率图。重新写入标签筛选器示例使用的内存最多,Fluent Logger 库和流处理示例与日志内存使用量相关。

  • fluent-logger 是 Fluent Logger Golang 示例
  • stream-logger 是 Fluent Bit 流示例
  • rewrite-tag is 是 Fluentd 重新写入标签筛选器示例

这些测试在 c5.9xlarge Amazon EC2 实例上运行;每个任务都指定了一个虚拟 CPU 和 2 GB 内存。显然,结果在某种程度上特定于我的示例。然而,有趣的是,fluent-logger 的资源使用量最低,这意味着,在您的应用程序代码内执行此处理是最有效的选项。除此之外,主要的结论是,Fluentd 的资源使用量明显高于 Fluent Bit 的,对于使用过这两种软件的人来说,这并不奇怪。

解决您自己的使用案例

正如我在介绍中所述,本教程的目的在于了解拆分单日志流的方法,以便可以独立处理每个子流。基于日志级别发送到不同的 CloudWatch 日志流仅仅是通过一个示例来演示每种方法。

深入探究 FluentdFluent Bit 文档,以了解各种可能性并找出解决您自己使用案例的方法。对于 Fluentd,其路由示例复制插件可能会很有用。

您有什么想法?

您在本博文中了解到的方法允许您解决更高级、更小众的使用案例。我很希望了解您的使用案例的详细信息。如果您认为此教程有用,并且有用一个适用其方法的使用案例,请在下面进行评论。

我是否错过了什么? 如果您是 Fluentd 或 Fluent Bit 高级用户,您觉得错过了一些内容,也请进行评论。

最后,如果您觉得我创建的开源 Golang 项目有用,请对其存储库中的使用问题进行评论。欢迎提出改进想法,不胜感激。此外,如果您喜欢此方法但不使用 Golang,请考虑使您编写的任何代码开源,以便其他人可以受益。可以为其他语言创建类似的库。