Amazon Web Services ブログ

Fluent チュートリアル – アプリケーションのログを複数のストリームに分割する

この記事は Splitting an application’s logs into multiple streams: a Fluent tutorial (記事公開日: 2019 年 11 月 20 日) を翻訳したものです。

ログの重要性はすべてのログで同じではありません。リアルタイム分析が必要なログもあれば、必要に応じて分析できるように長期保存する必要があるログもあります。このチュートリアルでは、単一のアプリケーションのログストリームを、個別に分析、フィルタリング、および送信できる複数のストリームに「フォーク」できる 3 つの異なる方法を紹介します。その過程で、FluentdFluent Bitfluent-logger-golang、およびこのユースケース用に私が作成した新しいプロジェクトなど、いくつかのオープンソースプロジェクトについて学習します。

モダンなアプリケーションは、デバッグ情報、エラーメッセージ、スタックトレースなど、複数の種類の情報を含むログを生成します。一般に、アプリケーションはログを単一のストリームに書き込むように構成され、そのストリームからログは単一の宛先に送信されて保存および分析に使用されます。ログタイプごとにフォーマットが異なる可能性があるため、これは最適ではありません。さらに重要なことに、単一の分析プラットフォームがすべてのログの種類に最適であるとも限りません。エンジニアは、強力な検索機能と、エラーやスタックトレースを含むログに対するリアルタイムのアラート機能を求めています。これらの検索やアラートや機能にはコストがかかりますが、一部のログのデバッグ情報については、よりシンプルな機能セットで十分なこともあります。必要な分析の種類に応じて、それぞれのログタイプを異なる宛先に送ることができると理想的です。

さらに、すべてのログが単一の宛先に送信される場合でも、ログの種類ごとに個別に分析、フィルタリング、および処理を行うことが有効な場合があります。このチュートリアルでは、これを行うことができる複数の方法について説明します。

この記事では、単一のアプリケーションのログ出力を複数のストリームに分割し、別々に処理するために使用できる 3 つの異なる方法について説明します。

  1. Fluent Bit のストリームプロセッシング
  2. Rewrite Tag フィルター
  3. Fluent Logger ライブラリ

それぞれの方法は、特定の要件が満たされている場合にのみ機能します。お客様のユースケースでその方法が機能するかどうかを判断できるように、これらの要件は明確にリストします。

このチュートリアルでは、Fluentd と Fluent Bit へのログの取り込みについては説明しません。デプロイの方法に依存しないためです。AWS で Fluentd とFluent Bit を使用するための基礎知識については、以下をお勧めします。

(訳注: 本記事の著者による以下の記事もお勧めします)

シンプルなユースケース: エラー、情報、デバッグのログを別々の CloudWatch ログストリームに送信する

それぞれの方法の例を具体的にするため、非常にシンプルなユースケースを解決してみましょう。すべてのログを単一の CloudWatch ロググループに送りたいが、ログレベルに基づいてログを異なるログストリームに分けたいとします。これによって、ログレベルごとに個別に検索できるようになります。

この例は、お客様からいただいた実際のユースケースに基づいています。ログのストリームをソースでフォークする能力は強力で、この手法でできることはたくさんあります。例えば、すべてのログを 1 つのシステムに送信して長期保存しておき、エラーログはフォークしてコピーし、アラートシステムだけに送ることもできます。あるいは、すべてのログを 1 つの宛先に送りたいが、特定の表現にマッチするデバッグメッセージをフィルタリングしたい場合もあります。このチュートリアルは、多くの高度なユースケースを解決するための出発点です。

ここでは、私が作成した簡単な Go 言語のプログラムをアプリケーションとして使用します。人気のある Logrus ロガーを使用しており、ログメッセージを以下のような JSON として出力します。

{"level":"info","msg":"Got a request","path":"/","requestID":"45234523","time":"2019-11-08T20:36:11Z"}
{"level":"warning","msg":"Access denied","path":"/tardis","requestID":"546745643","time":"2019-11-08T20:36:11Z","user":"TheMaster"}
{"level":"debug","msg":"Admin access","path":"/tardis","requestID":"546745643","time":"2019-11-08T20:36:11Z","user":"TheDoctor"}

ご覧のとおり、これらのログは、ログレベルを JSON の特定のフィールドに記録しています。これにより、ログをログレベルごとに簡単に分割できます。このようなロギングのアプローチは構造化ロギングと呼ばれます。ログメッセージは、ソフトウェアで読み取りやすいように設計されており、簡単にクエリや処理ができます。

最後に、これらのログに付与されたタグについて、1 つの仮定を置きます。Fluentd と Fluent Bit は、タグに基づいてログにルールを適用します。各方法の例では、アプリケーションからのログのタグは、"app" が前に付いていると仮定します。Amazon ECS で FireLens を使用している場合、コンテナ名を "app" とすると、このような動作になります。

Fluent Bit のストリームプロセッシング

前提条件:

  • ログパイプラインで Fluent Bit を使用します。
  • ログは JSON (あるいは Fluent Bit によって JSON にパースできるフォーマット) でフォーマットされ、簡単にクエリできるフィールドを持っています。

Fluent Bit の最もクールな機能の 1 つは、ログを処理する際にログに対して SQL クエリを実行できることです。今回のユースケースの解決など、ストリームプロセッシングでは多くのことを実現できます。

Fluent Bit 内部のログ処理パイプライン。インプットが最初にあり、続いてパーサーステージ、フィルターステージ、バッファステージがあります。最後に、ログはアウトプットにルーティングされます。ストリームプロセッサーは、ログがアウトプットに到達する直前にログをフォークし、クエリの結果をインプットステージに送り返すことができます。

Fluent Bit 内部のログ処理パイプライン

上の図は、Fluent Bit 内部のログ処理アーキテクチャーを示したものです。ご覧のとおり、ログはストリームプロセッサーに到達する前に、インプット、パース、フィルタリングを行うことができます。その後、ログに対してストリームクエリを実行し、クエリの結果をログパイプラインに再びインプットすることができます。

ログに対してストリームクエリを実行する前に、ログレベルフィールドにアクセスできるように、ログを JSON としてパースする必要があります。Amazon EKS や Amazon ECS を使用してアプリケーションをデプロイしている場合、Fluent Bit に取り込まれるログは、最初は以下のような形になります。

{
    "log": "{\"level\":\"info\",\"msg\":\"Got a request\",\"path\":\"/\",\"requestID\":\"45234523\",\"time\":\"2019-11-08T20:36:11Z\"}"
}

アプリケーションから出力された JSON ログメッセージはエスケープされています。ログレベルでストリームを分割するためには、このエスケープされた JSON をパースする必要があります。以下の設定スニペットを fluent-bit.conf 設定ファイルに追加してください。

[SERVICE]
    Parsers_File /parser.conf

[FILTER]
    Name parser
    Match *
    Key_Name log
    Parser json
    Reserve_Data True

amazon/aws-for-fluent-bit イメージと fluent/fluent-bit イメージには、JSON パーサーを備えたビルトインの parsers.conf が含まれています。しかし、私のログが使用している日時のフォーマットは、この設定と互換性がなかったため、設定を自分で書きました。この例のカスタム Fluent Bit イメージをビルドするために必要なすべてのファイルは、この GitHub リポジトリで確認できます。

パース後、Fluent Bit の内部ログパイプラインのログは、いい感じの JSON オブジェクトとしてフォーマットされます (技術的には、Fluent Bit は内部的に JSON に似たデータのシリアライズ形式である msgpack を使用しています) 。

{
  "level": "info",
  "msg": "Got a request",
  "path": "/",
  "requestID": "45234523",
  "time": "2019-11-08T20:36:11Z"
}

これで、ログに対してストリームクエリを実行できるようになりました。ストリームクエリは stream_processing.conf という別の設定ファイルに記載されています。

[STREAM_TASK]
    Name   debug_logs
    Exec   CREATE STREAM debug WITH (tag='logs.debug') AS SELECT * from TAG:'app*' WHERE level = 'debug';

このクエリは、デバッグログだけを含むログの新しいストリームを作成します。どのように動作するかを理解するために、クエリの各パートを見てみましょう。

CREATE STREAM debug WITH (tag='logs.debug')

logs.debug というタグでログの新しい「ストリーム」を作成します。新しいストリームは、インプットステージで Fluent Bit の内部ログパイプラインに入り、2 回目のパースとフィルタリングが可能です。そして、新しい logs.debug タグに一致する Fluent Bit のアウトプット定義を作成し、これらのログを宛先に送信できます。

AS SELECT * from TAG:'app*'

新しいストリームは、タグが app で始まるログをセレクトして作成されます。app* は、あなたのアプリケーションのログにマッチするパターンに置き換えてください。

WHERE level = 'debug';

新しいストリームには、level というフィールドを持ち、その値が debug であるソースストリームからのログが含まれます。

ログレベルごとに新しいストリームを作成するためには、複数のストリームクエリが必要になります。ストリームプロセッシングの完全な設定ファイルは GitHub で確認できます。ストリームプロセッシング設定ファイルは、メインの設定ファイルから参照される必要があります。

[SERVICE]
    Streams_File stream_processing.conf

最終的な目標は、ログレベルごとに個別の CloudWatch ログストリームを持つ 1 つの CloudWatch ロググループにログを送信することであったことを思い出してください。これは、CloudWatch 用の Fluent Bit プラグインを使えば簡単です。ログストリーム名は、プレフィックス + タグとなります。(訳注: Fluent Bit v1.5 以降では AWS が提供する cloudwatch プラグインの代わりに、Fluent Bit コア機能に含まれる cloudwatch_logs プラグインを使用することができます。)

[OUTPUT]
    Name cloudwatch
    Match   logs.*
    region us-east-1
    log_group_name streams-example
    log_stream_prefix log-level-
    auto_create_group true

このアウトプット定義により、以下のような名前のログストリームが作成されます。

  • log-level-logs.debug (デバッグログ)
  • log-level-logs.info (情報ログ)
  • log-level-logs.warning (警告ログ)
  • log-level-logs.error (エラーログ)
  • log-level-logs.fatal (致命的ログ)

アウトプットは logs.* というタグパターンにマッチすることに注意してください。これは、ストリームクエリで処理したすべてのログにマッチしますが、オリジナルのログ (app というプレフィックスのタグを持つ) には一切マッチしません。これは意図的なものです。ストリームクエリは、ログストリームをフォークするのではなく、コピーします。ストリームクエリは、ログのサブセットのコピーを作成します。もし、アウトプットが任意のタグ (*) に一致することを許可した場合、各ログメッセージの 2 つのコピーが宛先には存在することになります。1 つは app* タグを持つオリジナルのストリームから、もう 1 つは logs.* タグを持つ新しいストリームからです。この例の完全な Fluent Bit 設定ファイルは、GitHub で確認できます

注意すべき点

  • ストリームクエリはログをコピーします。このセクションで示した簡単な例は、すべてのログに level フィールドがあり、その値が 5 つの文字列 (debug, info, warning, error, fatal) のうちの 1 つであるため、機能しました。これにより、すべてのログにマッチするストリームクエリを作成することができました。もしそうでない場合、問題が発生します。ストリームクエリにマッチしないログは、オリジナルの不均質なログストリームに残り、宛先に送られずに「廃棄」されます。もし、あなたのセットアップがそうであるなら、Fluent Bit のストリームプロセッシングのドキュメントを読んで、すべてのログを識別するクエリを書けるかどうか確認してください。もしできない場合は、fluent/fluent-bit リポジトリで、ストリームクエリの WHERE 句で追加の条件をサポートするための Issue をオープンする (または賛成する) ことを検討してください。
  • 循環的なストリームクエリを書かないでください。自分自身の結果にマッチするようなストリームクエリを書かないでください。この例では、ストリームクエリはタグが app* にマッチするログを取り込み、タグが logs* にマッチするログを生成しています。もし、SELECT 文がすべてのログにマッチする場合 (SELECT * from TAG:'*') 、クエリは循環的になり、自身の結果にマッチすることになります。これは、Fluent Bit がエラーメッセージなしでフリーズし、ログの処理を停止する原因となります。
  • この方法は、ログが簡単にクエリ可能なフィールドでパースできる場合にのみ機能します。Fluent Bit は、WHERE 句で使用できる条件の数を制限しています。この方法を使用するには、ログをクエリ可能な形式に変換するためのカスタムパーサーを書く必要があるかもしれません。ストリームプロセッシングのドキュメントをよく読んで、必要な条件がサポートされているかどうかを判断してください。

Rewrite Tag フィルター

前提条件:

Rewrite Tag フィルタープラグインは、Fluent Bit のストリームクエリと部分的に重複する機能を備えています。これを利用して、今回のユースケースを実現できます。このチュートリアルでは、Fluentd を使って Rewrite Tag のデモを行います。Fluent Bit の同等のフィルターには、構文は異なりますが同じ機能があります。

<match app**>
  @type rewrite_tag_filter
  <rule>
    key log
    pattern /debug/
    tag logs.debug
  </rule>
</match>

この設定は、デバッグログに対する Fluent Bit のストリームクエリと同じ目的を達成するものです。Fluentd のフィルターは正規表現でログをマッチさせることができるため、エスケープされた JSON ログを最初にパースする必要はないことに注意してください。もし、最初にログを JSON としてパースした場合は、以下のような設定になります。

<match app**>
  @type rewrite_tag_filter
  <rule>
    key level
    pattern /debug/
    tag logs.debug
  </rule>
</match>

Fluentd の Rewrite Tag フィルターは、Fluent Bit のストリームクエリに比べて、このユースケースでひとつ重要な利点があります。それは、ログをコピーするのではなく、フォークすることです。上に示した Fluentd の設定は、オリジナルのストリームからすべてのデバッグログを取り出し、そのタグを変更します。これは、どのフィルターにもマッチせず「廃棄」されるログが発生することを心配する必要がないことを意味するため、便利です。フィルターが適用された後、オリジナルのログストリームには、マッチしないログのみが含まれます。

ここで Fluentd の完全な設定を確認できます。フィルターを使用して追加のセクションを作るのではなく、すべてのログタイプのためのルールを持つ単一のセクションを作ることに注意してください。

Fluent Logger ライブラリ

前提条件:

  • アプリケーションコードに変更を加えることを厭わないこと。

Fluent Logger ライブラリを使用すると、Fluentd や Fluent Bit に直接ログを書き込むことができます。Go 言語JavaPythonNode.js など、多くの一般的な言語用のライブラリが利用できます。

私は AWS のお客様との会話から、アプリケーション用にカスタムロギングライブラリを作成している人がいることを知りました。もしあなたのアプリケーションがそうであるなら、あるいはゼロから新しいアプリケーションを書くなら、このオプションは理想的かもしれません。

以下では、Go 言語用の Fluent Logger ライブラリ (fluent-logger-golang) の使用例を注釈付きで示します。

// Fluentd/Fluent Bit にログを送ることができる構造体のインスタンスを作成します
fluentLogger, err := fluent.New(fluent.Config{})
if err != nil {
    log.Fatal(err)
}

// 各ログに必要なタグを付けることができます
tag := "app.logs"

// 任意のデータをマップとして送信します
data := map[string]string{
    "foo": "bar",
}

// Fluent インスタンスに送信します
err = fluentLogger.Post(tag, data)

FireLens を使用している場合、ECS は環境変数 FLUENT_HOSTFLUENT_PORT を挿入します。これにより、ログルーターがリッスンしている TCP ポートに接続することができます。これらの環境変数を使用して、ロガーを設定します。

port, err := strconv.Atoi(os.Getenv("FLUENT_PORT"))
if err != nil {
    // エラー処理
}
config := fluent.Config{
    FluentPort: port,
    FluentHost: os.Getenv("FLUENT_HOST"),
}

このユースケースのためのカスタムライブラリ

このユースケース (エラー、情報、デバッグのログを別々の CloudWatch ログストリームに送信する) を解決するために、私は fluent-logger-golang をラップした汎用ライブラリを作成しました。このライブラリは、Logrus ロガーの出力ストリームとして使用することができます。

// JSON として出力するように Logrus を設定します
logrus.SetFormatter(&logrus.JSONFormatter{})
logrus.SetLevel(logrus.DebugLevel)

// FluentWriter のインスタンスを作成します
fluentLogger, err := logger.NewFluentWriter(fluent.Config{}, "app", []string{"level"})
if err != nil {
    fmt.Println(err)
    os.Exit(1)
}

// Logrus が作成したインスタンスを使用するように設定します
logrus.SetOutput(fluentLogger)

// あとは普通に Logrus を使ってください!
logrus.WithFields(logrus.Fields{
    "path": "/",
    "requestID": "45234523",
}).Info("Got a request")

これはどのように機能するのでしょうか?Logrus ロガーは任意の io.Writer に書き込むことができます。私のライブラリは Fluentd/Fluent Bit にログを書き込む io.Writer を公開します。

logger.NewFluentWriter(fluent.Config{}, "app", []string{"level"})

機能についての詳しい説明は、プロジェクトの README をご覧ください。上記では、私のライブラリのコンストラクタを呼び出しています。最初の引数は、このセクションの冒頭で示した fluent-logger-golang の設定オブジェクトです。2 番目の引数は、このライターが出力するログに付与するタグのプレフィックスを指定します。3 番目の引数は、ログメッセージのキーのリストで、その値はタグのサフィックスになります。このライブラリは、Logrus によって生成されるログが JSON フォーマットであることを前提にしています。これによって、各ログメッセージの level キーを見つけ、これをタグのプレフィックスに追加して、最終的なタグを作成します。実際には、ログは次のようなタグで出力されます。

  • app.debug (デバッグログ)
  • app.info (情報ログ)
  • app.warning (警告ログ)
  • app.error (エラーログ)
  • app.fatal (致命的ログ)

level フィールドを持たないログや、JSON としてパースできないログには、単に "app" というタグが付与されます。前のセクションで示したように、これらのタグは、Fluentd や Fluent Bitを使って、1 つのロググループ内の異なる CloudWatch ログストリームに送信することができます。

この例の完全なアプリケーションコードは、プロジェクトのリポジトリで確認できます。

注意すべき点

  • このアプローチは実験的なものです。私はまだ簡単な例でテストしただけです。この方法を選択する場合、私が作成したライブラリを出発点として使用することをお勧めします。あなたのログとユースケースの知識に基づいて、あなたの状況にとって理想的なものを書くことができるかもしれません。この例は、これまでの例と同じことを実現していますが、Fluent Logger ライブラリの能力は、もっと強力なはずです。ログへのタグ付けをコードで行うため、完全な制御が可能であり、他の方法では対応できないようなユースケースを解決することができます。
  • アプリケーションの標準出力とエラーストリームを処理する必要があります。Fluent Logger ライブラリがログの送信に失敗した場合、フォールバックが必要です。私は、Fluentd/Fluent Bit への送信に失敗した場合、標準出力にログを出力するようにライブラリを作成しました。

まとめ

どのアプローチが最適か?

この記事では、単一のアプリケーションのログを「フォーク」できる 3 つの方法を学びました。

  1. Fluent Bit のストリームプロセッシング
  2. Rewrite Tag フィルター
  3. Fluent Logger ライブラリ

選択は、あなた自身のユースケースの詳細によって異なります。重要な考慮事項の 1 つは、それぞれのアプローチで発生するリソース使用率です。選択を助けるために、私は Amazon ECS で FireLens を使用して 3 つのタスクを実行しました。それぞれ、導入部にあるサンプルアプリケーションのコードと、各方法の設定を使用しました。Fluent Logger の例ではアプリコード内でログを処理していますが、最初の 2 つの例ではログルーターにログの処理をオフロードしています。そのため、タスク (アプリ + ログルーター) の合計メモリと CPU 使用率を測定しました。3 つ目の例では、Fluent Bitをログルーターとして使用しました。一般的に効率が良いためです。

3 つの例の CPU 使用率のグラフです。Rewrite Tag フィルターの例が最も CPU を使用し、次にストリームプロセッシングの例、そして Fluent Logger ライブラリの例となっています。

3 つの例のメモリ使用率のグラフです。Rewrite Tag フィルターの例が最も多くのメモリを使用し、Fluent Logger ライブラリとストリームプロセッシングの例は、どちらも非常にメモリ使用量が少なくなっています。

  • fluent-logger: Go 言語の Fluent Logger ライブラリの例
  • stream-logger: Fluent Bit のストリームプロセッシングの例
  • rewrite-tag: Fluentd の Rewrite Tag フィルターの例

このテストは、c5.9xlarge の Amazon EC2 インスタンスで実行され、各タスクには 1 つの仮想 CPU と2 GB のメモリが割り当てられています。当然ながら、この結果は私の例に固有のものです。しかし、fluent-logger が最もリソース使用量が少なく、アプリケーションのコード内でこの処理を行うことが最も効率的であるということは興味深いことです。それ以外の主な収穫は、Fluentd のリソース使用量が Fluent Bit よりもかなり多いということで、これは両者を使ったことがある人なら驚くことではないでしょう。

独自のユースケースを解決する

冒頭で述べたように、このチュートリアルの目標は、ログの単一のストリームをフォークして、各サブストリームを独立して処理できるようにする方法を学ぶことでした。ログレベルに基づいて異なる CloudWatch ログストリームに送信することは、それぞれのアプローチをデモンストレーションするための 1 つの例に過ぎません。

FluentdFluent Bit のドキュメントに飛び込んで、何が可能かを理解し、自分自身のユースケースを解決する方法を考えてください。Fluentd では、ルーティングの例Copy プラグインが役に立つかもしれません。

Fluent Bit では、@record.contains(key) 関数を使って、レコードにキーが含まれているかどうかを判別できることに注意してください。これにより、2 つの異なるスキーマに従う JSON ログを含むストリームを分割できます。1 つ以上のキーの存在によって、ログがどちらのスキーマに適合するかを決定することができます。

ご意見をお聞かせください

この記事で学んだテクニックを使用すると、より高度でニッチなユースケースを解決することができるようになります。私は皆様のユースケースの具体的な内容に興味があります。

最後に、もし私が作った Go 言語のオープンソースプロジェクトが役に立つと思ったら、リポジトリの使用方法の Issue にコメントしてください。コントリビューションや改善のためのアイデアをお待ちしています。さらに、もしこのアプローチを気に入ったけれども Go 言語を使っていないのであれば、他の人が恩恵を受けられるように、あなたが書いたコードをオープンソースにすることを検討してください。他の言語でも同様のライブラリを作成できます。

翻訳はプロフェッショナルサービスの杉田が担当しました。原文はこちらです。