方案概述
Amazon CloudWatch Internet Monitor 可以将与您应用程序有关城市/网络(Location 和 ASN)的互联网测量数据保存到 CloudWatch 日志中。这些测量数据包括性能得分、可用性得分、监控的传输字节数和往返时间等,可以让您了解部署在 Amazon Web Services 上的应用服务和最终用户之间的性能和可用性。您还可以选择将所有监控城市网络的测量结果发布到 Amazon S3,然后使用不同的 Amazon Web Services 分析服务来查看和分析信息。
创建 Internet Monitor 监控器后,您可以直接在 Amazon CloudWatch 中使用 Dashboard 仪表盘,通过过滤和查看前 500 个城市网络的质量情况,快速掌握具体网络的健康状况、性能表现等。如果您更进一步,选择将所有测量日志发布到 S3,即可灵活集成各种分析服务,灵活地构建适合业务场景的全球网络质量监控仪表盘,并通过对测量日志进行深入分析和研究,获取有价值的见解。
在本文中,我们将介绍如何使用 Amazon Athena 对存储在 Amazon S3 上的 Internet Monitor 测量日志数据进行网络质量分析,然后使用 Amazon QuickSight 和 Amazon OpenSearch Service 构建仪表盘,对网络质量数据进行可视化展示。

架构图:CloudWatch Internet Monitor 测量日志保存于 S3,可与多种分析服务集成实现可视化
部署准备
本解决方案假设您之前已经设置了 Amazon CloudWatch Internet Monitor。如果您尚未执行此操作,请按照创建监控器的说明进行操作。要配置互联网测量日志发送保存到 S3 桶,您可以使用 AWS CLI update monitor
命令进行配置,并指定要更新的“Monitor Name”监控器名称和 S3 存储桶名称。
aws internetmonitor update-monitor
--monitor-name $MONITOR-NAME
--internet-measurements-log-delivery S3Config="{BucketName=$BUCKET,LogDeliveryStatus=ENABLED}"
方案 1 – 使用 Amazon Athena 进行测量日志分析
在 Athena 中创建一张外部表(external table)
在 Athena 的查询编辑器(Query Editor)中,输入以下的 DDL,并选择运行查询(Run Query)。
CREATE EXTERNAL TABLE internet_measurements (
version INT,
timestamp INT,
clientlocation STRING,
servicelocation STRING,
percentageoftotaltraffic DOUBLE,
bytesin INT,
bytesout INT,
clientconnectioncount INT,
internethealth STRING,
trafficinsights STRING
)
PARTITIONED BY (year STRING, month STRING, day STRING)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://ExampleBucketName/ExamplePrefix/AWSLogs/YourAWSAccountID/internetmonitor/YourRegion/'
TBLPROPERTIES ('skip.header.line.count' = '1');
该命令创建一个名为 internet_measurements 的事件数据表。
通过 Athena 查询互联网测量日志数据
在上述步骤中,您已经创建了一张名为 internet_measurements 的事件表。您可以在 Athena 中使用标准 SQL 构建自定义查询,以获取与您应用服务有关的互联网测量事件和各种信息。要了解如何在 Athena 中使用 SQL,请参阅查询 Amazon Athena 运行 SQL 查询。我们在这里提供了一些 Athena 查询示例,以展示您可以进行哪些查询,并帮助您快速上手。例如,您可以通过一个查询,返回访问您的应用服务的前 N 个客户端来源(地理位置)和 ASN(运营商),以及它们占您的总应用服务流量的百分比。
要返回此信息,请打开 Athena 查询编辑器,通过点击右方小“+”号添加一个新的查询,如以下屏幕截图所示。

图例:在 Athena 中查询互联网测量日志数据
以下是 3 个常用的查询 SQL 样例,以协助您快速使用 Internet Monitor 的测量数据。
示例 1:前 10 个占总流量百分比最大的客户端位置和 ASN 统计
SELECT json_extract_scalar(clientLocation, '$.city') as city,
json_extract_scalar(clientLocation, '$.networkname') as networkName,
sum(percentageoftotaltraffic) as percentageoftotaltraffic
FROM internet_measurements
GROUP BY json_extract_scalar(clientLocation, '$.city'),
json_extract_scalar(clientLocation, '$.networkname')
ORDER BY percentageoftotaltraffic desc
limit 10
示例 2:前 10 个可用性最好的客户端位置和 ASN 统计
SELECT json_extract_scalar(clientLocation, '$.city') as city,
json_extract_scalar(clientLocation, '$.networkname') as networkName,
sum(
cast(
json_extract_scalar(
internetHealth,
'$.availability.percentageoftotaltrafficimpacted'
)
as double )
) as percentageOfTotalTrafficImpacted
FROM internet_measurements
GROUP BY json_extract_scalar(clientLocation, '$.city'),
json_extract_scalar(clientLocation, '$.networkname')
ORDER BY percentageOfTotalTrafficImpacted desc
limit 10
示例 3:前 100 个流量最大的客户端位置统计
SELECT json_extract_scalar(clientLocation, '$.city') as city,
json_extract_scalar(clientLocation, '$.subdivision') as subdivision,
json_extract_scalar(clientLocation, '$.country') as country,
avg(cast(json_extract_scalar(internetHealth, '$.availability.experiencescore') as double)) as availabilityScore,
avg(cast(json_extract_scalar(internetHealth, '$.performance.experiencescore') as double)) performanceScore,
avg(cast(json_extract_scalar(trafficinsights, '$.timetofirstbyte.currentexperience.value') as double)) as averageTTFB,
sum(bytesIn) as bytesIn,
sum(bytesOut) as bytesOut,
sum(bytesIn + bytesOut) as totalBytes
FROM internet_measurements
GROUP BY
json_extract_scalar(clientLocation, '$.city'),
json_extract_scalar(clientLocation, '$.subdivision'),
json_extract_scalar(clientLocation, '$.country')
ORDER BY totalBytes desc
limit 100
创建查询时,需注意:
- 如果要从嵌套的 JSON 字段中提取数据,请使用 Presto JSON_extract 或 JSON_extextract_scaler 函数。有关更多信息,请参阅从 JSON 中提取数据。
- 请确保 JSON 字段中的所有字符都是小写。
如果您希望以后能够运行相同的查询,可以将查询 SQL 保存在 Athena 中。您还可以将查询结果以 CSV 格式下载并保存在您的本地,以便于使用电子表格程序进行查看或分析。有关更多信息,请参阅使用 Athena 控制台下载查询结果文件。
方案 2 – 使用 QuickSight 对测量日志进行可视化
在 Athena 中设置事件表后,只需点击几下,就可以使用 Amazon QuickLight 把 Athena 表中数据进行可视化。在本节中,我们将逐步了解如何使用 QuickLight 对位于 Athena 表中的数据进行优良可视化。
在将 QuickSight 连接到 Athena 之前,你需要授予 QuickSight 对 Athena 和您帐户中相关 S3 存储桶的访问权限,请参考 QuickSight 访问数据来源文档进行授权。
要设置可视化效果,请执行以下步骤:
- 登录 QuickSight 并选择“Datasets”、“New dataset”。
- 选择 Athena 作为新的数据源。

- 为新数据集命名,然后选择“Validate connection”验证连接。
- 验证连接后,选择“Create data source”,创建数据源。
- 选择“Use custom SQL”使用自定义 SQL,并为您的 SQL 查询命名,如下屏幕截图所示。

例如,您可以输入如下的查询语句:
SELECT json_extract_scalar(clientLocation, '$.city') as city,
json_extract_scalar(clientLocation, '$.country') as country,
json_extract_scalar(clientLocation, '$.subdivision') as state,
json_extract_scalar(clientLocation, '$.networkname') as networkName,
sum(percentageoftotaltraffic) as percentageoftotaltraffic
FROM internet_measurements
GROUP BY json_extract_scalar(clientLocation, '$.city'),
json_extract_scalar(clientLocation, '$.country'),
json_extract_scalar(clientLocation, '$.subdivision') ,
json_extract_scalar(clientLocation, '$.networkname')
ORDER BY percentageoftotaltraffic desc
- 输入查询语句后,选择“Confirm query”确认查询
- 导入 SPICE 以实现更快的分析
现在您已经将数据导入到分析中,可以开始创建自定义可视化图表,您可以自定义并根据具体的需求创建有助于理解和分析的数据图表。
- 在 Amazon QuickSight 中,点击“New analysis”创建新的分析。
- 选择您之前创建的最后一个数据集,然后单击“Use in analysis”在分析中使用。
- 在屏幕的左下角,选择“ Line Chart”折线图。
- 在屏幕左下角,选择“Horizontal bar chart”水平条形图。
- 将“Country”拖放到 Y 轴。
- 将“percentageoftotaltraffic”总流量的百分比拖放到“Value”值中。这将创建类似于下图的可视化效果。

图例:按总流量百分比可视化排名靠前的国家情况
您还可以按客户端位置映射总流量的百分比绘制地图展示。可以在 QuickLight 中通过选择“Points on map”地图上的点可视化类型,并选择“City”城市作为要可视化的地理空间数据点来实现这一点。这将创建类似于下图的可视化效果。

图例:在地图中按占总流量的百分比可视化排名靠前的城市情况
方案 3 – 使用 Amazon OpenSearch Service 对测量日志进行可视化
Amazon OpenSearch Service 是一项托管服务,可帮助您快速部署、操作和扩展 OpenSearch 集群,而无需担心软件安装、升级、补丁、扩展或跨区域复制。您可以使用 Amazon OpenSearch Service 仪表盘创建和共享各种图表、仪表盘和报告,以直观地显示和浏览您的日志。
步骤 1 创建 Amazon OpenSerach Service 集群
您可以在 Amazon OpenSearch Service Console 页面中通过“Easy create”方法一键创建一个 Amazon OpenSearch Service。

选择您期望创建 Amazon OpenSearch Service Cluster 的方式
如果您有更多自定义的要求,可以选择“Standard create” 。
您也可以通过 CLI 命令行创建 Amazon OpenSearch Service Cluster。
aws opensearch create-domain \
—domain-name mylogs \
—engine-version OpenSearch_2.5 \
—cluster-config InstanceType=r6g.large.search,InstanceCount=2,ZoneAwarenessEnabled=true,ZoneAwarenessConfig={AvailabilityZoneCount=2} \
—ebs-options EBSEnabled=true,VolumeType=gp3,VolumeSize=100 \
—access-policies '{"Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"AWS": "arn:aws:iam::xxxx-Your-Account-ID:root" }, "Action":"es:", "Resource": "arn:aws:es:us-east-1:xxxx-Your-Account-ID:domain/mylogs/" } ] }'
等待 Amazon OpenSearch 集群创建完成后,您可以通过以下命令查看集群的状态:
aws opensearch describe-domain --domain-name Your-OpenSearch-Domain
更多 Amazon OpenSearch Service 的最佳实践,可以查看最佳运行实践文档。
步骤 2 创建一个 Lambda Function 函数
接下来我们需要创建一个 Lambda Function,用于处理 S3 事件通知,并把日志文件内容发送到 Amazon OpenSearch Service。您可以把 Lambda 代码打包成 ZIP 文件通过 CLI 创建 Lambda,并为 Lambda 指定一个执行角色并添加相应权限。您也可以通过 Web Console 的方式来完成这个 Lambda 函数的创建。
- 通过 CLI 命令行创建一个 Lambda Function
aws lambda create-function —function-name my-function \
--zip-file fileb://function.zip —handler index.handler —runtime python3.8 \
--role arn:aws:iam::123456789012:role/lambda-ex
- 请确保 Lambda Function 的 IAM Role 需要具有以下权限
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"es:*"
],
"Effect": "Allow",
"Resource": "arn:aws:Your-OpenSearch-Region:xxxx-Your-AWS-Account-Id:domain/Your-OpenSearch-Domain-Name/*"
}
]
}
当 Amazon S3 收到新的日志时会通过事件触发 Lambda,这个 Lambda 会将日志进行解压缩后,将日志按合适的格式送到 Amazon OpenSearch Service。
import boto3
import json
import requests
import gzip
from requests_aws4auth import AWS4Auth
region = 'Your Region' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
host = 'https://Your-OpenSearch-Endpoint'
# the OpenSearch Service Endpoint, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
index = 'Your-Index-Name'
datatype = '_doc'
url = host + '/' + index + '/' + datatype
headers = { "Content-Type": "application/json" }
s3 = boto3.client('s3')
def lambda_handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
obj = s3.get_object(Bucket=bucket, Key=key)
body = obj['Body'].read()
if key.endswith('.gz'):
body = gzip.decompress(body)
lines = body.splitlines()
for line in lines:
line = line.decode("utf-8")
message = json.loads(line) # parse JSON object
document = {
"timestamp": message["timestamp"],
"latitude": message["clientLocation"]["latitude"],
"longitude": message["clientLocation"]["longitude"],
"country": message["clientLocation"]["country"],
"subdivision": message["clientLocation"]["subdivision"],
"metro": message["clientLocation"]["metro"],
"city": message["clientLocation"]["city"],
"countryCode": message["clientLocation"]["countryCode"],
"subdivisionCode": message["clientLocation"]["subdivisionCode"],
"asn": message["clientLocation"]["asn"],
"networkName": message["clientLocation"]["networkName"],
"serviceLocation": message["serviceLocation"],
"percentageOfTotalTraffic": message["percentageOfTotalTraffic"],
"bytesIn": message["bytesIn"],
"bytesOut": message["bytesOut"],
"clientConnectionCount": message["clientConnectionCount"],
"availabilityExperienceScore": message["internetHealth"]["availability"]["experienceScore"],
"availabilityPercentageOfTotalTrafficImpacted": message["internetHealth"]["availability"]["percentageOfTotalTrafficImpacted"],
"availabilityPercentageOfClientLocationImpacted": message["internetHealth"]["availability"]["percentageOfClientLocationImpacted"],
"performanceExperienceScore": message["internetHealth"]["performance"]["experienceScore"],
"performancePercentageOfTotalTrafficImpacted": message["internetHealth"]["performance"]["percentageOfTotalTrafficImpacted"],
"performancePercentageOfClientLocationImpacted": message["internetHealth"]["performance"]["percentageOfClientLocationImpacted"],
"p50RoundTripTime": message["internetHealth"]["performance"]["roundTripTime"]["p50"],
"p90RoundTripTime": message["internetHealth"]["performance"]["roundTripTime"]["p90"],
"p95RoundTripTime": message["internetHealth"]["performance"]["roundTripTime"]["p95"]
}
r = requests.post(url, auth=awsauth, json=document, headers=headers)
使用 aws lambda add-permission
命令为 Lambda 函数添加一个权限,允许 S3 调用它。
aws lambda add-permission \
--function-name my-s3-function \
--action lambda:InvokeFunction \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::my-log-bucket \
--statement-id s3-event
步骤 3 S3 存储桶 Event Notification 事件通知设置
使用 aws s3api put-bucket-notification-configuration
命令为 S3 存储桶配置事件通知,指定对象创建或删除事件,并指定 Lambda 函数作为目标。
aws s3api put-bucket-notification-configuration \
--bucket my-log-bucket \
--notification-configuration file://notification.json
其中 notification.json
文件内容如下:
{
"LambdaFunctionConfigurations": [
{
"Id": "s3-lambda",
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-s3-function",
"Events": ["s3:ObjectCreated:*"]
}
]
}
步骤 4 Amazon OpenSearch Service 设置
- 通过“Add Map User”,授权 Lambda Function 写日志到 OpenSearch 的权限
- 您可以在浏览器中输入您的 Amazon OpenSearch Service 的 endpoint,例如 https://search-mydomain.us-west-1.es.amazonaws.com,并登录您的 OpenSearch console。
- 在 Security—Roles 中,找到 index_management_full_access,将 Lambda Function 的 arn 加到 Backend roles,然后点“Map”,如下图所示:

了解更多关于 OpenSearch 的权限管理,请参照 Users and roles – OpenSearch documentation。
- 在 OpenSearch 中创建 Index Pattern
- 在 OpenSearch Service 控制台的左侧菜单栏中,单击“Stack Management”堆栈管理。
- 选择“Index patterns”索引模式,然后单击“Create Index patterns”创建索引模式。
- 您可以使用星号(*)来匹配 Index 模式中的多个索引,例如“My-Index-2023-”。
- 在左侧导航条中找到“Discover”,并点击进入 Discover 发现页面。
- 然后,在“CHANGE INDEX PATTERN”中,选择刚才您创建的 Index Pattern。
- 选择合适的 time range 时间窗口。
- 您可以在“Discover”发现页面上找到感兴趣的日志和指标,并将它们添加到可视化仪表盘中。

步骤 5 通过创建 OpenSearch Dashboard 仪表盘获得更多洞察
您可以使用 Amazon OpenSearch Service Dashboard 仪表盘中的地图工具,以各种颜色在世界地图上显示不同国家和城市的互联网体验分数。

您也可以使用 Pie Chart 饼图来显示多个国家和城市的排名。

您还能通过条形图来显示 P50 延迟排名的前 100 个城市统计。

步骤 6 导入 Amazon OpenSearch Dashboard 模板(可选)
为了方便开始使用,您可以采用亚马逊云科技企业级支持团队已构建好的 Internet Monitor Dashboard 模板。
打开 GitHub 链接,下载后缀为 ndjson 的模板文件,并将其导入到 OpenSearch 中。
在“Management”中选择“Stack Management”。

接着选择“Saved Objects”,点击“Import”,选择在 GitHub 下载的 ndjson 文件,最后点击“Done”完成导入。

最后,您就能够在 Dashboard 页面以图表形式监控互联网质量,如下图所示。

总结
本文介绍了如何利用 Amazon Athena、Amazon QuickSight 和 Amazon OpenSearch Service 来分析和可视化 Internet Monitor 的互联网测量日志数据的方法。这些服务让您可以更方便快捷地发布、存储、分析和可视化互联网测量日志数据。您可以用 Amazon Athena 的标准 SQL 语句,对存放在 Amazon S3 上的日志进行查询,并用 Amazon QuickSight 制作交互式的仪表盘和图表来探索数据。您还可以用 Amazon OpenSearch Service,通过 RESTful API 或 Dashboard 对日志进行索引和搜索。凭借您的经验和创意,您可以利用存放在 Amazon S3 上的日志,结合亚马逊云科技的各种分析服务,得到比这篇博客更多的洞察。
本篇作者