背景
随着社交媒体应用越来越注重用户体验,用于加速内容分发的CDN产品已经成为最常用的服务,尤其是在视频点播和直播场景中。如果在视频内容分发过程中出现过多次数卡顿和播放失败等情况,会严重影响应用的使用活跃度。如何评价一个CDN的加速质量,并对其进行实时质量监控成为一个较为棘手的问题。本文将会收集客户端和服务端的网络性能指标,并使用亚马逊云科技的时序数据库Amazon Timestream和Amazon Managed Grafana对这些指标进行实时统计分析,帮助用户了解CDN加速效果,并为快速定位问题提供依据。
架构说明
Amazon CloudFront 是一种内容分发网络 (CDN) 服务,在全球拥有数以千计的第 1/2/3 层电信运营商,与所有主要接入网络连接良好,可实现最佳性能,并具有数百 TB 的部署容量。本架构会收集服务端(Amazon CloudFront)日志和客户端自定义日志,并存储在S3中。当日志到达S3时会触发Lambda函数对客户端日志和服务端日志分别进行格式转换,并将关键数据指标推送到亚马逊时序数据库Amazon Timestream中。在Amazon Timestream存在多个数据表,为了快速查询,3天内的数据会驻留在内存中,为了节省成本,2周后数据会被数据库自动删除。用于可视化展示的性能仪表盘使用Amazon Managed Grafana。
方案部署
- 创建S3存储桶,并分别创建客户端日志目录和服务端日志目录
- 打开CloudFront服务端日志,并将日志存储位置指向为刚创建的S3目录
- 本博客中客户端日志由EC2上部署的日志收集服务进行收集汇聚,然后定时将压缩的.gz日志存储到S3指定目录中,客户端日志数据结构如下:
ClientTime |
1642520043060 |
System |
ANDROID_PHONE |
Host |
xinranl.com |
URL |
http://xinranl.com/test/video.mp4 |
Bytes |
208683 |
ServerIP |
143.204.110.58 |
ClientIP |
240.199.105 |
Country |
新加坡 |
Protocol |
http/1.1 |
ASN |
5388 |
ErrorCode |
0 |
NetCost |
121 |
- 创建客户端上传的指标表ClientMetric,在内存驻留时间为3天,2周后的数据将被删除。
- 同样方式创建用于存储服务器端日志的数据表CloudFrontMertic
- 配置S3触发Lambda函数,分别对服务端日志和客户端日志进行格式转换,并通过AWS SDK将数据注入时序数据库Timestream中,下面为Lambda 函数 TransformClientLogtoTimestream 的配置:
import os
import gzip
import time
import boto3
import json
import urllib.parse
from botocore.config import Config
s3 = boto3.client('s3')
def load_data_from_object(filename):
records = []
try:
with gzip.open(filename, 'rt') as f:
for text in f:
temp = text.split()
data_row = {
'ClientTime': temp[0],
'System': temp[1],
'Host': temp[2],
'URL': temp[3],
'Bytes': temp[4],
'ServerIP': temp[5],
'ClientIP': temp[6],
'Country': temp[7],
'Protocol': temp[8],
'ASN': temp[9],
'ErrorCode': temp[10],
'NetCost': temp[11]
}
dimensions = [
{'Name': 'ServerIP', 'Value': data_row['ServerIP']},
{'Name': 'ClientIP', 'Value': data_row['ClientIP']},
{'Name': 'protocol','Value': data_row['Protocol']},
{'Name': 'ASN','Value': data_row['ASN']},
{'Name': 'Country','Value': data_row['Country']},
{'Name': 'ErrorCode','Value': data_row['ErrorCode']}
]
record_netcost = {
'Dimensions': dimensions,
'MeasureName': 'neatcost',
'MeasureValue': data_row['NetCost'],
'MeasureValueType': 'BIGINT',
'Time': data_row['ClientTime']
}
records.append(record_netcost)
except Exception as e:
print('load_data_from_log_file error:',str(e))
return None
return records
def write_records(data):
session = boto3.Session()
write_client = session.client('timestream-write',
config=Config(read_timeout=20,
max_pool_connections=5000,
retries={'max_attempts': 10}))
i = 1
Records = []
while len(data) > 100*i:
Records = data[(i-1)*100:(i)*100-1]
try:
result = write_client.write_records(DatabaseName='CDN-Performance',
TableName='ClientMetric',
Records=Records,
CommonAttributes={})
print("Have WriteRecords num:",100*i)
except write_client.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
except Exception as err:
print("Error:", err)
i = i + 1
if len(data) < i*100:
Records = data[(i-1)*100:]
try:
result = write_client.write_records(DatabaseName='CDN-Performance',
TableName='ClientMetric',
Records=Records,
CommonAttributes={})
print("Have WriteRecords num:",len(data))
#print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
except write_client.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
except Exception as err:
print("Error:", err)
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
response = s3.get_object(Bucket=bucket, Key=key)
data = load_data_from_object(response['Body'])
write_records(data)
except Exception as e:
print(e)
print('Error getting object {} from bucket {}.'.format(key, bucket))
raise e
l 同样的方式配置处理CloudFront日志函数TransformCFLogToTimestream,函数源码如下:
import os
import gzip
import time
import boto3
import json
import urllib.parse
from datetime import datetime
from botocore.config import Config
s3 = boto3.client('s3')
def load_data_from_object(filename):
print("load_data_from_log_file....")
records = []
try:
with gzip.open(filename, 'rt') as f:
for text in f:
temp = text.split()
if temp[0].startswith('#'):
continue
data_row = {
'date': temp[0],
'time': temp[1],
'x-edge-location': temp[2],
'sc-bytes': temp[3],
'c-ip': temp[4],
'cs-method': temp[5],
'Host': temp[6],
'cs-uri-stem': temp[7],
'sc-status': temp[8],
'Referer': temp[9],
'User-Agent': temp[10],
'cs-uri-query': temp[11],
'Cookie': temp[12],
'x-edge-result-type': temp[13],
'x-edge-request-id': temp[14],
'x-host-header': temp[15],
'cs-protocol': temp[16],
'cs-bytes': temp[17],
'time-taken': temp[18],
'x-forwarded-for': temp[19],
'ssl-protocol': temp[20],
'ssl-cipher': temp[21],
'x-edge-response-result-type': temp[22],
'cs-protocol-version': temp[23],
'fle-status': temp[24],
'fle-encrypted-fields': temp[25],
'c-port': temp[26],
'time-to-first-byte': temp[27],
'x-edge-detailed-result-type': temp[28],
'sc-content-type': temp[29],
'sc-content-len': temp[30],
'sc-range-start': temp[31],
'sc-range-end': temp[32]
}
dimensions = [
{'Name': 'ClientIP', 'Value': data_row['c-ip']},
{'Name': 'location', 'Value': data_row['x-edge-location']},
{'Name': 'ResultType','Value': data_row['x-edge-result-type']}
]
record_first_byte = {
'Dimensions': dimensions,
'MeasureName': 'FirstByteCost',
'MeasureValue': str(int(float(data_row['time-to-first-byte'])*1000)),
'MeasureValueType': 'BIGINT',
'Time': str(int(datetime.strptime(data_row['date']+' '+data_row['time'], "%Y-%m-%d %H:%M:%S").timestamp()))
}
records.append(record_first_byte)
except Exception as e:
print('load_data_from_log_file error:',str(e))
return None
return records
def write_records(data):
session = boto3.Session()
write_client = session.client('timestream-write',
config=Config(read_timeout=20,
max_pool_connections=5000,
retries={'max_attempts': 10}))
i = 1
Records = []
while len(data) > 100*i:
Records = data[(i-1)*100:(i)*100-1]
try:
result = write_client.write_records(DatabaseName='CDN-Performance',
TableName='CloudFrontMetric',
Records=Records,
CommonAttributes={})
print("Have WriteRecords num:",100*i)
except write_client.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
except Exception as err:
print("Error:", err)
i = i + 1
if len(data) < i*100:
Records = data[(i-1)*100:]
try:
result = write_client.write_records(DatabaseName='CDN-Performance',
TableName='ClientMetric',
Records=Records,
CommonAttributes={})
print("Have WriteRecords num:",len(data))
#print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
except write_client.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
except Exception as err:
print("Error:", err)
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
response = s3.get_object(Bucket=bucket, Key=key)
data = load_data_from_object(response['Body'])
write_records(data)
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
- 配置Grafana读取的数据源,本文使用上一步创建的Amazon Timestream
- 使用邮箱注册登录用户,并将用户设置为管理员,否则该用户无Grafana页面的配置权限
- 通过Grafana管理工作区中提供的URL登录到Grafana页面,配置数据源选择Amazon Timestream 和其所在Region,提供可以访问Amazon Timestream 的AK/SK。
配置Grafana Dashboard
本案例中的监控面板提供以下功能:
SELECT COUNT(DISTINCT ClientIP) as IP_NUM
FROM "CDN-Performance"."ClientMetric"
WHERE $__timeFilter
AND measure_name='netcost'
SELECT ROUND(AVG(measure_value::bigint), 2) AS avg_netcost
FROM "CDN-Performance"."ClientMetric"
WHERE measure_name = 'netcost'
AND $__timeFilter
select protocol,
ROUND(AVG(measure_value::bigint), 1) AS avg_netcost,
ROUND(APPROX_PERCENTILE(measure_value::bigint, 0.5), 2) AS p50_netcost,
ROUND(APPROX_PERCENTILE(measure_value::bigint, 0.9), 2) AS p90_netcost,
ROUND(APPROX_PERCENTILE(measure_value::bigint, 0.95), 2) AS p95_netcost,
ROUND(APPROX_PERCENTILE(measure_value::bigint, 0.99), 2) AS p99_netcost
FROM "CDN-Performance"."ClientMetric"
WHERE measure_name = 'netcost'
AND $__timeFilter
GROUP BY protocol
select ServerIP, count(*) as num
FROM "CDN-Performance"."ClientMetric"
WHERE $__timeFilter
Group By ServerIP
order by num desc
select ASN asn , count(*) as num
FROM "CDN-Performance"."ClientMetric"
WHERE $__timeFilter
Group By ASN
order by num desc
select protocol, count(*) as num
FROM "CDN-Performance"."ClientMetric"
WHERE $__timeFilter
Group By protocol
order by num desc
select country, count(*) as num
FROM "CDN-Performance"."ClientMetric"
WHERE $__timeFilter
Group By country
order by num desc
SELECT ClientIP, round(approx_percentile( measure_value::bigint, 0.90 ), 0) as netcost
FROM "CDN-Performance"."ClientMetric"
WHERE $__timeFilter
group by ClientIP
order by netcost desc limit 20
SELECT a.ClientIP, CREATE_TIME_SERIES(time, measure_value::bigint) as netcost FROM "CDN-Performance"."ClientMetric" a
INNER JOIN (SELECT ClientIP, round(approx_percentile( measure_value::bigint, 0.95 ), 0) as netcost, count(*) as nums
FROM "CDN-Performance"."ClientMetric"
WHERE $__timeFilter
group by ClientIP
order by netcost desc limit 20) b
ON a.ClientIP=b.ClientIP
WHERE $__timeFilter
AND measure_name = 'netcost'
GROUP BY a.ClientIP
最终监控页面展示效果,如下:
本篇作者