亚马逊AWS官方博客

使用 FluentBit 收集 ECS Fargate 日志至 OpenSearch

背景

在日常的运营工作中,日志系统越来越成为不可或缺的一部分。一些企业上云时比较喜欢选择 ECS Fargate 服务承载自己的业务,而如何灵活地收集日志到自己的日志系统,是客户需要考虑的一个问题。在日志收集组件选型的过程中,客户比较喜欢使用一些开源的组件来收集日志,比如 Fluent Bit、Fluentd 等,日志的分析工具上也多种多样,OpenSearch、Grafana 等都是客户经常使用的工具。ECS Fargate 作为亚马逊云科技提供的一项 Serverless 的容器托管服务,提供了多种类型的日志收集方案给到客户,比如默认的我们可以直接使用 CloudWatch 来收集应用日志,同时 ECS 也提供了 AWS Firelens 的日志拓展方案使用开源组件来收集日志,本文即会介绍如何使用 FirelensLogDriver 结合 Fluent Bit 来收集应用系统的日志至 OpenSearch。

前置准备工作

  1. 测试环境需要安装 CDK
  2. 测试环境需要安装 Git
  3. 下载 CDK 安装代码:Repo 地址

整体架构介绍

目前 ECS 已经提供通过 awslogs log driver 来将日志输出到指定的 CloudWatch 的 Log group 中去,同时 ECS 也提供了通过 customer routing 的方式,使用 Fluent Bit 和 Fluentd 来收集日志,相较于使用 CloudWatch 来收集日志,很多用户更加愿意使用 custom log routing 的方式来收集日志,这样的方式可以为用户提供更多的灵活性和便利性,而对于 Fluent Bit 和 Fluentd 这两种中间件我们使用哪一个,我们可以通过以下对比来分析。

Fluentd Fluent Bit
适用范围 容器/服务器 嵌入式 Linux/容器/服务器
开发语言 Ruby & C C
内存消耗 大约 40MB 大约 650KB
性能 高性能 高性能
依赖 基于 Ruby Gem 构建,依赖一些 gem(Ruby 模块) 无依赖,除了一些特殊的插件
插件 超过 1000 个可用插件 大约 70 个可用插件
协议 Apache License v2.0 Apache License v2.0

从上面的比较中可以看到 Fluent Bit 运行使用的资源更少,对于我们本身使用 sidecar 的方式来收集日志,如果日志收集容器就使用了很多的资源,从资源成本上也会是一个负担,同时 AWS 也针对 Fluent Bit 进行了一些加强,可以非常有机地跟 AWS 的其他服务结合到一起,所以在本文中我们会使用 Fluent Bit 来作为首选,下面是我们这次要部署的环境架构:

本文中我们会创建一个 VPC,同时在 VPC 内我们会创建一个 ECS cluster,其中任务定义中包含 Nginx 和 Fluent Bit 容器,我们会通过 Fluent Bit 来采集 Nginx 的访问日志,同时将 Fluent Bit 容器的日志发送至 CloudWatch,Fluent Bit 采集 Nginx 的访问日志会发送至 S3 中存储,保存到 S3 中时,我们会通过 S3 的 event 来触发 Lambda 将信息保存在 OpenSearch 中。

环境部署

本示例中我们使用 CDK 来部署实验环境,整体部署时间在 20 分钟左右,部署过程请参考以下命令。

修改公共变量 /lib/Constants.ts

public static readonly VPC_CIDR = '10.10.0.0/16'; # 设置 VPC CIDR
public static readonly LOG_BUCKET = 'fargate-logs-s3-xxxxx' # 设置唯一 Bucket 名称

然后执行以下命令部署环境

npm run build
cdk bootstrap
cdk synth
cdk deploy

部署完毕后,可以看到以下输出内容

ECS Fargate 环境确认

ECS Cluster 配置信息,Service 信息

查看 ALB 的信息

访问 ALB 的 DNS

如下所示,则 Nginx 部署没有问题

Fluent Bit 环境确认

可以看到 Fluent Bit 正常运行中,下一步我们确认一下 Fluent Bit 的日志信息,确认日志收集是否正常

可以看到日志正常收集,并且成功上传至指定 S3 中。

Open Search 配置

应用环境确认完毕后,我们也需要去确认一下日志的信息是否被成功消费并且推送至 OpenSearch,在以上的 CDK 部署中,我们已经创建好了一个 OpenSearch Endpoint 的 Proxy,我们需要访问 Open Search Dashboard 来进行确认和配置。

配置 Index Pattern

点击创建完毕后,在 Discover 菜单查看日志信息。

至此我们整个环境就完全好了,如果我们有其他的应用日志需求,直接修改相应的配置来进行适配就可以了。下面我们来说明一下关于 Fluentbit 镜像相关的配置和 Lambda 函数的相关配置实践。

Fluent Bit 镜像注意事项

在本文中我们使用 ECS Capacity Provider 为 Fargate,无法将 Fluent Bit 的配置文件通过 S3 加载,所以我们需要将 Fluent Bit 配置文件加载到镜像内,如示例代码中/lib/fluentbit 中的配置。

Docker File

FROM --platform=linux/amd64 public.ecr.aws/aws-observability/aws-for-fluent-bit:stable

COPY ./fluent-bit-custom.conf /fluent-bit/etc/fluent-bit-custom.conf

CDK 中上传至 ECR

const dockerImage = new assets.DockerImageAsset(this, 'fluent-bit-image',
{
    directory: './lib/fluentbit', // 指向包含 Dockerfile 和应用程序代码的目录
});

new ecrdeploy.ECRDeployment(this, 'fluent-bit-ecr-dockerimage', {
    src: new ecrdeploy.DockerImageName(dockerImage.imageUri),
    dest: new ecrdeploy.DockerImageName(`${cdk.Aws.ACCOUNT_ID}.dkr.ecr.${cdk.Aws.REGION}.amazonaws.com/${this.fluentbitECR.repositoryName}:latest`),
});

ECS 中指定 Fluent Bit 配置文件

const firelensLogRouter = taskDefinition.addFirelensLogRouter('log-router', {
image: ecs.ContainerImage.fromEcrRepository(props.fluentBitRepository),
essential: true,
firelensConfig: {
    type: FirelensLogRouterType.FLUENTBIT,
    options: {
        enableECSLogMetadata: true,
        configFileType: FirelensConfigFileType.FILE,
        // This enables parsing of log messages that are json lines
        configFileValue: '/fluent-bit/etc/fluent-bit-custom.conf' #指定配置文件路径
    }
},
memoryReservationMiB: 50,
logging: fluentBitLog,
healthCheck: {
    command: [ "CMD-SHELL", "curl -f http://127.0.0.1:2020/api/v1/uptime || exit 0" ],
    // the properties below are optional
    interval: Duration.seconds(30),
    retries: 3,
    startPeriod: Duration.seconds(10),
    timeout: Duration.seconds(30),
},
});

日志处理 Lambda

因为我们创建的 OpenSearch 不是公开访问的 domain,所以我们 Lambda 的类型也需要 Attach 到跟 OpenSearch 的 VPC 上去,同时通常来讲应用日志的数据量都比较大,所以为了提高效率,我们在保存日志到 OpenSearch 时使用 bulk API 来批量插入,示例代码如下:

import {Context, S3Event} from 'aws-lambda';
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3';
import axios from 'axios';

const s3 = new S3Client();
const openSearchHost = process.env.OPENSEARCH_HOST;
export const handler = async (event: S3Event, context: Context): Promise<S3Event> => {
    console.log(`Event: ${JSON.stringify(event, null, 2)}`);
    console.log(`Context: ${JSON.stringify(context, null, 2)}`);
    const bucket = event.Records[0].s3.bucket.name;
    const key = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
    const params = {
        Bucket: bucket,
        Key: key,
    };
    try {
        const response = await s3.send(new GetObjectCommand(params));
        const str = await response.Body.transformToString();
        console.log(str)
        const jsonLines: string[] = str.split('\n');
        const jsonArray = [];
        const now = new Date();
        const year = now.getFullYear();
        const month = (now.getMonth() + 1).toString().padStart(2, '0');
        const day = now.getDate().toString().padStart(2, '0');
        const indexName = `nginx_logs-${year}${month}${day}`;
        for (const jsonLine of jsonLines) {
            // 跳过空行
            if (!jsonLine.trim()) {
                continue;
            }

            // 解析JSON数据
            try {
                jsonArray.push(JSON.stringify({ 'index': { '_index': indexName } }) + '\n');
                jsonArray.push(jsonLine + '\n');
            } catch (e) {
                console.error(`Failed to parse JSON: ${jsonLine}. Error: ${e}`);
            }
        }
        try {
            // 构造 OpenSearch 数据对象
            // 发送 POST 请求到 OpenSearch
            const response = await axios.post('https://' + openSearchHost + '/_bulk',
                jsonArray.join(''),
                {
                    headers: {
                        'Content-Type': 'application/x-ndjson',
                    },
                }
            );
            console.log('OpenSearch Response:', JSON.stringify(response.data));
        } catch (error) {
            console.error('Error saving data to OpenSearch:', error);
        }

    } catch (err) {
        console.log(err);
    }

    return event;
};

环境删除

cdk destroy

总结

Fluent Bit 作为轻量级的日志收集组件被广泛地使用,通过 ECS Firelenslog router 我们可以非常方便的将日志信息经由 Fluent Bit 收集到指定的 S3,Firehose,OpenSearch 中去。本文中使用的架构会将日志在 S3 中缓存,同时通过 S3 来 trigger 后续的 Lambda 调用,最终将日志写入至 OpenSearch,通过 OpenSearch 我们可以非常方便地进行日志分析。整个方案中,我们既利用了 S3 的经济性,又利用了 OpenSearch 对于开发运维同学查询日志的便利性。

问题排查

登录 Fargate 查看方式

  1. 安装 session manager plugin
    brew install --cask session-manager-plugin
  2. 添加 SSM 权限给 ECS Task Role
    {
       "Version": "2012-10-17",
       "Statement": [
           {
           "Effect": "Allow",
           "Action": [
                "ssmmessages:CreateControlChannel",
                "ssmmessages:CreateDataChannel",
                "ssmmessages:OpenControlChannel",
                "ssmmessages:OpenDataChannel"
           ],
          "Resource": "*"
          }
       ]
    }
    
  1. 将 ECS ExecuteCommand 给当前的 IAM 用户
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "User access to ECS ExecuteCommand",
                "Effect": "Allow",
                "Action": "ecs:ExecuteCommand",
                "Resource": "*"
            }
        ]
    }
    
  1. 开启 ECS Service Exec Command已有 Service 开启
    aws ecs update-service --service $SERVICE_NAME \
      --cluster $CLUSTER \
      --enable-execute-command \
      --force-new-deployment
    

    新 Service 开启

    aws ecs create-service \
        --cluster <cluster-name> \
        --task-definition <task-definition-name> \
        --service <service-name> \
        --desired-count 1 \
        --enable-execute-command
    

参考文档

https://docs.fluentbit.io/manual

https://docs.aws.amazon.com/AmazonECS/latest/developerguide/using_firelens.html

本篇作者

郭俊龙

亚马逊云科技解决方案架构师,主要负责游戏行业客户解决方案设计,比较擅长云原生微服务以及大数据方案设计和实践。