背景
随着业务不断发展,对实时监控指标的需求日益增长。我们越来越依赖于亚马逊云科技服务所产生的实时日志,以获取关键信息。其中,处理 CloudFront 实时日志是一个常见的场景。通常,我们会采用 Kinesis Stream 与 Kinesis Firehose 的组合,用于日志的采集、清洗和发送到目的地。Kinesis Firehose 是亚马逊云科技提供的托管服务,可以方便地将数据传输到多种外部目的地,如 OpenSearch、S3 和 Redshift。与 Firehose 不同,AWS Lambda 提供了另一种实时日志处理方案。如果我们对目标服务有深入了解,可以利用 Lambda 函数的定制能力,在日志量较大的场景下优化成本。Lambda 函数允许我们编写自定义代码,根据具体需求对日志数据进行处理和转换。通过精心设计的 Lambda 函数,我们可以实现更加灵活和高效的日志处理流程,同时降低运营成本。不过,与 Firehose 相比,使用 Lambda 需要更多的开发和维护工作。因此,在选择实时日志处理方案时,我们需要权衡成本、灵活性和维护复杂度。对于大多数场景,Kinesis Firehose 提供了一种简单、可靠且经济高效的解决方案。而对于有特殊需求或对成本有更高要求的场景,使用 Lambda 函数可以带来更大的优化空间。
方案说明
1. 架构说明
2. 具体代码
Lambda 的代码逻辑如下:
1. 摄取 Kinesis Data Stream 里存放的 Cloudfront Realtime log。考虑到 KDS 存放的格式是 Base64,摄取后需要先进行解码。
2. 结合用户存放日志的实际格式和数据类型,预先定义字典,对 Realtime log 做数据类型的转换。
3. 解析并转换好数据格式后,对数据进行压缩(压缩不但可以降低数据体积加快传输效率,如果处理 log 的服务在外网,同时可以降低数据传输的成本)。
4. 调用 HTTP 接口传输日志数据到目的地。
import base64
import json
import gzip
import requests
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
print('Loading function')
# 定义 HTTP Endpoint URL
HTTP_ENDPOINT_URL = 'https://cdn-log.myexample.com/amazon/cdnlog'
# 重试策略配置
RETRIES = 5
BACKOFF_FACTOR = 0.3
STATUS_FORCELIST = (500, 502, 504)
def lambda_handler(event, context):
output = []
# 定义字段字典
realtimelog_fields_dict = {
"timestamp": "float",
"cip": "str",
"timeToFirstByte": "float",
"scStatus": "int",
"scBytes": "int",
"csMethod": "str",
"csProto": "str",
"csHost": "str",
"csUrl": "str",
"csBytes": "int",
"xEdgeLocation": "str",
"xEdgeRequestId": "str",
"xHostHeader": "str",
"timeTaken": "float",
"csVersion": "str",
"cipVersion": "str",
"csUserAgent": "str",
"csRefer": "str",
"csCookie": "str",
"csUriQuery": "str",
"xEdgeResponseResultType": "str",
"xForwardedFor": "str",
"sslProtocol": "str",
"sslCipher": "str",
"xEdgeResultType": "str",
"fleEncryptedFields": "str",
"fleStatus": "str",
"scContentType": "str",
"scContentLen": "int",
"scRangeStart": "int",
"scRangeEnd": "int",
"cPort": "int",
"xEdgeDetailedResultType": "str",
"cipCountry": "str",
"csAcceptEncoding": "str",
"csAccept": "str",
"cacheBehaviorPathPattern": "str",
"csHeaders": "str",
"csHeaderNames": "str",
"csHeadersCount": "int"
}
try:
for record in event['Records']:
# 解码 base64 数据
payload_in_bytes = base64.b64decode(record['kinesis']['data'])
payload = "".join(map(chr, payload_in_bytes))
# 解析日志记录
payload_dict = parse_log_record(payload, realtimelog_fields_dict)
output_record = {
'recordId': record['kinesis']['sequenceNumber'],
'result': 'Ok',
'data': payload_dict
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['Records'])))
# 将输出记录列表压缩
compressed_output = gzip.compress(json.dumps(output).encode('utf-8'))
# 发送压缩数据到 HTTP Endpoint
send_to_http_endpoint(compressed_output)
except Exception as e:
print(f'Error processing records: {e}')
return {'records': output}
def parse_log_record(payload, fields_dict):
payload_dict = {}
counter = 0
payload_list = payload.strip().split('\t')
for field, field_type in fields_dict.items():
if payload_list[counter].strip() == '-':
field_type = "str"
if field_type == "int":
payload_dict[field] = int(payload_list[counter].strip())
elif field_type == "float":
payload_dict[field] = float(payload_list[counter].strip())
else:
payload_dict[field] = payload_list[counter].strip()
counter += 1
return payload_dict
def send_to_http_endpoint(data):
try:
headers = {'Content-Type': 'application/json', 'Content-Encoding': 'gzip'}
response = requests.post(HTTP_ENDPOINT_URL, data=data, headers=headers)
response.raise_for_status()
print(f'Data sent to {HTTP_ENDPOINT_URL}')
except requests.exceptions.RequestException as e:
print(f'Error sending data to HTTP endpoint: {e}')
3. 配置触发器,让 Lambda 消费 Kinesis Data Stream
批处理大小:指定每次调用 Lambda 函数时要读取的最大记录数。默认值为 100,最大值为 10,000。批量的摄取方式可以优化性能和降低成本(实测环境下:128M 的 Lambda 每 3s 可以处理 1000 条日志,假设预置 100 个并发,那么平均每秒可以处理 3 万条记录)。
批处理时间窗口:设置 Lambda 函数在调用之前可以等待的最长时间(以秒为单位),以收集更多记录。最大值为 300 秒(注意:1. Lambda 的 timeout 需要大于此时间;2. 此时间直接影响日志处理服务的报警时机,如果设定为 60s,即日志服务的日志延迟最多在 60s 内)。
定义 Lambda 函数从数据流中读取记录的起始位置。可选值包括:
TRIM_HORIZON
:从数据流的开始读取( 从系统中分区的最后一个未删除记录开始读取,该记录是分区中最老的数据记录)
LATEST
:仅处理到达时的新记录(分区中最新的记录开始读取,这样您就可以始终读取分区中最新的数据)
AT_TIMESTAMP
:从指定的时间戳开始读取(需提供时间戳)
结论
成本:Firehose 按照流量进行收费;Lambda 按照请求次数+计算资源运行时间收费。如果吞吐量大,使用 Lambda 会更省。
稳定性:Lambda 是 region 级别的服务。在测试情况下每月 1500 亿条(按照 1000 条批量摄入)的记录,Lambda 的平均耗时在 3 秒,最大耗时在 5~8 秒,concurrent 在 150 左右。
功能:实时日志场景下,Firehose 主要有批量摄入、gzip 等 2 个功能;Lambda 可以通过自带的能力+代码能力完全实现。
本篇作者