亚马逊AWS官方博客

活用 CloudWatch 创建监控、告警为业务保驾护航

当谈及业务连续性时,监控与告警功能在云计算环境中扮演着不可或缺的角色。随着企业在云端部署的增加,对于系统和应用程序的稳定性、性能以及安全性的需求也变得日益重要。监控的核心在于实时追踪和评估系统的健康状态和运行情况,这对于识别潜在问题、预测性能瓶颈以及防范可能的故障至关重要。而告警则是监控的补充,它通过设定阈值或规则,一旦系统状态异常或超出预期范围,即时通知相关人员或团队,使其能够及时采取行动以避免或最小化潜在的影响。本篇博客将重点关注基础设施层面的监控、告警,下面我们就结合企业的真实场景谈谈如何利用云原生的工具创建监控、告警来保障业务的连续性。

项目背景

我们在亚马逊云科技环境中启动了几百台的 EC2 实例,以及多种托管服务如 RDS Aurora、ElastiCache Redis、MSK、EMR 等。架构图如下:

我们的大数据平台是基于 Hadoop 自建的,除了 CPU、内存等常规需要关注的指标外,磁盘的使用情况也是我们需要特别关注的。我们的目标是建立完善的监控和告警机制,通过 Dashboard 可以及时了解以下指标(篇幅所限,此处仅列出若干关键指标):

监控名称 监控说明
CPU 使用率 Top10 同区域(Region)EC2 CPU 使用率 Top10,可实时观察本区域内最近 3 小时 CPU 使用率排名前 10 的资源详情。
内存使用空间 Top10 同区域(Region)EC2 内存使用空间 Top10,可实时观察本区域内最近 3 小时内存已使用空间排名前 10 的资源详情。
EBS 使用空间 Top10 同区域(Region)EBS 使用空间 Top10,可实时观察本区域内最近 3 小时 EBS 已使用空间排名前 10 的资源详情。
EBS 突增余额 Top10 同区域(Region)EBS 突增余额 Top10,可实时观察本区域内最近 3 小时 EBS 突增余额已使用排名前 10 的资源详情。
EBS Read OPs Top10 同区域(Region)EBS Read OPs Top10,可实时观察本区域内最近 3 小时 EBS Read OPs 排名前 10 的资源详情。
EBS Write OPs Top10 同区域(Region)EBS Write OPs Top10,可实时观察本区域内最近 3 小时 EBS Write OPs 排名前 10 的资源详情。
网络流量出 Top10 同区域(Region)EC2 网络流量出 Top10,可实时观察本区域内最近 3 小时 EC2 网络流量流出带宽占用排名前 10 的资源详情。
网络流量入 Top10 同区域(Region)EC2 网络流量入 Top10,可实时观察本区域内最近 3 小时 EC2 网络流量流入带宽占用排名前 10 的资源详情。

当 EC2 或者托管服务的指标出现异常时(超过相应的阈值)及时创建告警,并且通过钉钉群消息的形式通知到相关人员。告警在 10 分钟内没有处理掉的需要重复发送告警通知(钉钉群及短信),直到告警被排除掉。

前置准备

明确了需求以后,我们需要做一些前置的准备。利用亚马逊云科技的 CloudWatch 进行指标收集及配置监控、告警,在默认情况下,CloudWatch 不能收集内存、磁盘等指标,需要安装 CloudWatch Agent(具体安装过程请参阅 CloudWatch Agent 安装文档),安装完需要对 CloudWatch Agent 进行相关配置。下面是我们的一个参考配置,对于收集频率可以根据不同的需求进行定制化。为了避免针对每台 EC2 都需要安装 CloudWatch Agent 以及配置,我们可以先制作 AMI,在制作 AMI 的过程中,将 CloudWatch Agent 安装及配置好。后续根据创建好的 AMI 启动 EC2 实例。

1.	{  
2.	        "agent": {  
3.	                "metrics_collection_interval": 60,  
4.	                "run_as_user": "root"  
5.	        },  
6.	        "metrics": {  
7.	                "namespace": "au-prod",  
8.	                "aggregation_dimensions": [  
9.	                        [  
10.	                                "InstanceId"  
11.	                        ]  
12.	                ],  
13.	                "append_dimensions": {  
14.	                        "AutoScalingGroupName": "${aws:AutoScalingGroupName}",  
15.	                        "ImageId": "${aws:ImageId}",  
16.	                        "InstanceId": "${aws:InstanceId}",  
17.	                        "InstanceType": "${aws:InstanceType}"  
18.	                },  
19.	                "metrics_collected": {  
20.	                        "disk": {  
21.	                                "measurement": [  
22.	                                        "used_percent",  
23.	                                        "inodes_used"  
24.	                                ],  
25.	                                "metrics_collection_interval": 60,  
26.	                                "resources": [  
27.	                                        "/",  
28.	                                        "/data"  
29.	                                ]  
30.	                        },  
31.	                        "mem": {  
32.	                                "measurement": [  
33.	                                        "mem_used_percent"  
34.	                                ],  
35.	                                "metrics_collection_interval": 60  
36.	                        },  
37.	                        "netstat": {  
38.	                                "measurement": [  
39.	                                        "tcp_established"  
40.	                                ],  
41.	                                "metrics_collection_interval": 60  
42.	                        }  
43.	                }  
44.	        }  
45.	}

监控配置

等 CloudWatch 收集到相应的指标后,我们就可以根据指标配置我们的监控了,下面为 CloudWatch 的 Dashboard 页面截图。

接下来我们将一一描述这些 Dashboard 的配置过程。

CPU 使用率 Top10

浏览器页面导航到 CloudWatch → 指标 → 全部指标 → 多来源查询 → 编辑器,进入指标查询编辑器,输入查询语句:

SELECT AVG(CPUUtilization) FROM SCHEMA("AWS/EC2", InstanceId) GROUP BY InstanceId ORDER BY AVG() DESC LIMIT 10

点击“运行”按钮,即可获得本区域(Region)内全部 EC2 中 CPU 使用率(百分比)排名前 10 的 EC2 资源信息,如下截图:

点按右上角“操作”按钮,在下拉菜单中选择“添加到控制面板”,打开“添加到控制面板”配置窗口,选择控制面板 “CloudWatch-Default”(可随意新建自己想要的名称),选择小部件类型,这里默认“线形图”,自定义小部件标题“按最高排序的 EC2 实例的 CPU 使用率”,最后点按“添加到控制面板”即可完成“CPU 使用率 Top10”的 Dashboard 的配置,如下截图:

内存使用空间 Top10

浏览器页面导航到 CloudWatch → 指标 → 全部指标 → 多来源查询 → 编辑器,进入指标查询编辑器,输入查询语句(注意:内存指标为自定义指标,所以查询指标时要选择“自定义命名空间”,这里为“”au-prod””):

SELECT AVG(mem_used_percent) FROM SCHEMA("au-prod", InstanceId) GROUP BY InstanceId ORDER BY AVG() DESC LIMIT 10

点击“运行”按钮,即可获得本区域(Region)内全部 EC2 中内存使用空间(百分比)排名前 10 的 EC2 资源信息,如下截图:

点按右上角“操作”按钮,在下拉菜单中选择“添加到控制面板”,打开“添加到控制面板”配置窗口,选择控制面板 “CloudWatch-Default”(可随意新建自己想要的名称),选择小部件类型,这里默认“线形图”,自定义小部件标题“EC2-MEMUsedPercentIn5Min-Top10”,最后点按“添加到控制面板”即可完成“内存使用空间 Top10”的 Dashboard 的配置,如下截图:

EBS 使用空间 Top10

同“内存使用空间 Top10”配置,这里不再赘述,输入查询语句:

SELECT MAX(disk_used_percent) FROM SCHEMA("au-prod", ImageId,InstanceId,InstanceType,device,fstype,path) GROUP BY InstanceId, path ORDER BY MAX() DESC LIMIT 10

点击“运行”按钮,即可获得本区域(Region)内全部 EBS 使用空间(百分比)排名前 10 的 EC2 和卷资源信息,如下截图:

“添加到控制面板”的步骤同前述内容,这里不再赘述。

EBS 突增余额 Top10

因综合成本和效能考虑,Hadoop 集群的存储使用了大量的 st1(HDD)盘,而 st1 有 IO 和吞吐量突增余额(BurstBalance)限制,通过 ebs_BurstBalance_Percent_top10 Cloudwatch metics dashboard 数据,我们可以实时观察到哪些 st1 磁盘已达到其性能瓶颈,快速调整上层应用的读写均衡或替换少部分读写频繁的磁盘为 gp3,规避因此造成的性能下降风险。

同前述内容,输入查询语句:

SELECT MIN(BurstBalance) FROM SCHEMA("AWS/EBS", VolumeId) GROUP BY VolumeId ORDER BY MIN() ASC LIMIT 10

点击“运行”按钮,即可获得本区域(Region)内全部 EBS 突增余额已使用积分(百分比)排名前 10 的 EC2 和卷资源信息,如下截图:

后续步骤同上。

EBS Read OPs Top10

EBS 根据其 Type 类型都有其不同的 IOPS 上限(具体请参考 EBS 相关官方白皮书内容),一旦当某个 EBS 的 IOPS 上限被频繁突破,我们必须立即采取应对措施以免给业务运行造成不可挽回的影响,所以对 EBS 的 IOPS 的监控尤为重要,监控图的配置步骤同前述内容这里不在赘述,仅列出查询语句如下:

SELECT SUM(VolumeReadOps) FROM SCHEMA("AWS/EBS", VolumeId) GROUP BY VolumeId ORDER BY SUM() DESC LIMIT 10

需要着重描述的是上述语句查询得到的结果是每分钟 sum 值,为了方便观察,我们将其换算成每秒值,进入到“绘成图表的指标”页面,“添加数学”表达式 “q1/60”,使用新的 e1 表达式绘图即可得到想要的指标数据 Read OPs。

后续步骤同前述内容。

EBS Write OPs Top10

同上,查询语句如下:

SELECT SUM(VolumeWriteOps) FROM SCHEMA("AWS/EBS", VolumeId) GROUP BY VolumeId ORDER BY SUM() DESC LIMIT 10

同样换算成秒值,得到 Write OPs,如下截图:

网络流出 Top10

同上,查询语句如下:

SELECT AVG(NetworkOut) FROM SCHEMA("AWS/EC2", InstanceId) GROUP BY InstanceId ORDER BY AVG() DESC LIMIT 10

后续步骤同前述内容。

网络流入 Top10

同上,查询语句如下:

SELECT SUM(NetworkIn) FROM SCHEMA("AWS/EC2", InstanceId) GROUP BY InstanceId ORDER BY SUM() DESC LIMIT 10

后续步骤同前述内容。

告警配置

与监控类似,有了 CloudWatch 收集到的指标,就可以根据指标创建告警了。我们的环境中有几百台机器,手动创建告警是一件耗时耗力的操作。CloudWatch 提供了多种编程语言的 SDK,这里我们通过 Lambda 函数调用 Python SDK 进行告警创建。以 disk_used_percent 这个指标为例,下图显示了 CloudWatch 创建出来的 disk_used_percent 指标。

下面代码演示了我们如何根据指标 disk_used_percent 创建告警的过程。通过 list_metrics 这个 API,列出所有的 disk_used_percent 指标,然后调用 put_metric_alarm 这个 API 创建告警。创建告警的方法中有两个参数,一个叫 AlarmActions,这个是当告警状态变成告警中的时候配置触发什么动作,还有一个叫 OKActions,这个是当告警从告警中恢复到正常状态配置触发什么动作。这里我们配置了同一个 SNS topic。消息发送到 SNS topic 后,会触发发送钉钉消息的 Lambda 进行处理。

1.	import json  
2.	import boto3  
3.	  
4.	def lambda_handler(event, context):  
5.	    namespace = 'YOUR_NAMESPACE'  
6.	    metric_name = 'disk_used_percent'  
7.	      
8.	    cw = boto3.client('cloudwatch')  
9.	    all_metrics = []  
10.	  
11.	    next_token = None  
12.	      
13.	    while True:  
14.	        if next_token:  
15.	            response = cw.list_metrics(  
16.	                Namespace=namespace,  
17.	                MetricName=metric_name,  
18.	                NextToken=next_token)  
19.	        else:  
20.	            response = cw.list_metrics(  
21.	                Namespace=namespace,  
22.	                MetricName=metric_name)  
23.	  
24.	        all_metrics.extend(response['Metrics'])  
25.	  
26.	        if 'NextToken' in response:  
27.	            next_token = response['NextToken']  
28.	        else:  
29.	            break  
30.	      
31.	     
32.	    # 打印metrics数量  
33.	    print(f"Total Metrics Count: {len(all_metrics)}")  
34.	  
35.	  
36.	    exceptionnum = 0  
37.	    successnum = 0  
38.	      
39.	    for metric in all_metrics:  
40.	        dimensions = metric['Dimensions']  
41.	        instanceId = ''  
42.	        path = ''  
43.	        alarmName = ''  
44.	          
45.	        #make alarm name  
46.	        for dimension in dimensions:  
47.	            if(dimension['Name'] == 'InstanceId'):  
48.	                instanceId = dimension['Value']  
49.	              
50.	            if(dimension['Name'] == 'path'):  
51.	                path = dimension['Value']  
52.	                  
53.	        print(f"Path is : {path}")  
54.	          
55.	        if path:  
56.	            alarmName = 'Disk-' + instanceId + '-' + path  
57.	            try:  
58.	                alarmresponse = cw.put_metric_alarm(  
59.	                    AlarmName=alarmName,  
60.	                    AlarmDescription='EC2云服务器磁盘告警规则',  
61.	                    ActionsEnabled=True,  
62.	                    AlarmActions = ['arn:aws:sns:ap-southeast-2:accountid:DiskAlarm'],  
63.	                    OKActions = ['arn:aws:sns:ap-southeast-2:accountid:DiskAlarm'],  
64.	                    MetricName=metric['MetricName'],  
65.	                    Namespace=metric['Namespace'],  
66.	                    Statistic='Maximum',  
67.	                    Dimensions=metric['Dimensions'],  
68.	                    Period=60,  
69.	                    EvaluationPeriods=15,  
70.	                    DatapointsToAlarm=10,  
71.	                    Threshold=78,  
72.	                    ComparisonOperator='GreaterThanThreshold',  
73.	                    TreatMissingData='missing'  
74.	                )  
75.	                successnum=successnum+1  
76.	            except Exception:  
77.	                exceptionnum=exceptionnum+1  
78.	                print(metric)  
79.	      
80.	    print(f"Total alarm created successfully: {successnum}")  
81.	    print(f"Total alarm created failed: {exceptionnum}")  
82.	      
83.	    return {  
84.	        'statusCode': 200,  
85.	        'body': json.dumps('Lambda sucessfully Executed!')  
86.	    }  

当消息送到 SNS topic 后,触发发送钉钉消息的 Lambda。示例代码如下,该代码演示了如何通过告警返回的消息构建 markdown 文本,并且调用钉钉的发送 API 往企业钉钉群里发送消息。

1.	# _*_coding:utf-8_*_  
2.	# python 3.8  
3.	# Creation time: 2021/11/18  
4.	import time  
5.	import json  
6.	import os  
7.	import requests  
8.	import datetime  
9.	import boto3  
10.	  
11.	  
12.	def lambda_handler(event, context):  
13.	    headers = {'Content-Type': 'application/json;charset=utf-8'}  
14.	    token = ''  
15.	    token1 = ''  
16.	    timestamp = str(round(time.time() * 1000))  
17.	    secret = ''  
18.	  
19.	    # get url   
20.	    url = "https://oapi.dingtalk.com/robot/send?access_token="  
21.	    api_url = url + token  
22.	    api_url_new = url + token1  
23.	  
24.	    # msg setting  
25.	    message = event['Records'][0]['Sns']  
26.	    Timestamp = message['Timestamp']  
27.	    Subject = message['Subject']  
28.	    sns_message = message['Message']  
29.	    sns_message = json.loads(message['Message'])  
30.	    NewStateReason = json.loads(event['Records'][0]['Sns']['Message'])['NewStateReason']  
31.	    current_time = (datetime.datetime.now() + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')  
32.	      
33.	    #title = '![3.png](https://img.isolarcloud.com/webapp/static/resources/portal/images/login/iSolarCloud-erwm.png)'  
34.	    if "ALARM" in Subject:  
35.	        title = '![1.png](https://github.com/printZZC/alarm/blob/main/%E5%9B%BE%E7%89%872.png?raw=true)'  
36.	    elif "OK" in Subject:  
37.	        title = '![2.png](https://github.com/printZZC/alarm/blob/main/%E6%81%A2%E5%A4%8D.png?raw=true)'  
38.	    else:  
39.	        title = '![3.png](https://github.com/printZZC/alarm/blob/main/%E5%9B%BE%E7%89%872.png?raw=true)'  
40.	      
41.	    _disk = sns_message['Trigger']['Dimensions'][0]['value']  
42.	    length = len(_disk)  
43.	      
44.	    if length >10:  
45.	        _disk1 = '/'  
46.	        _value = sns_message['Trigger']['Dimensions'][0]['value']  
47.	        # 创建 EC2 客户端  
48.	        ec2 = boto3.client('ec2')  
49.	        # 替换 'your_instance_id' 为你要查询的实例的 ID  
50.	        instance_id = sns_message['Trigger']['Dimensions'][0]['value']  
51.	        # 使用 describe_instances 函数获取实例信息  
52.	        response = ec2.describe_instances(InstanceIds=[instance_id])  
53.	        print(response)  
54.	        # 提取实例的名称(Name 标签)  
55.	        for reservation in response['Reservations']:  
56.	            for instance in reservation['Instances']:  
57.	                for tag in instance['Tags']:  
58.	                    if tag['Key'] == 'Name':  
59.	                        instance_name = tag['Value']  
60.	                        print("实例 {instance_id} 的名称为: {instance_name}")  
61.	                          
62.	                          
63.	        for reservation in response['Reservations']:  
64.	            for instance in reservation['Instances']:  
65.	                instance_ip = instance['PrivateIpAddress']  
66.	                print("实例 {instance_ip} 的名称为: {PrivateIpAddress}")  
67.	        ##结束  
68.	        content = "### {title}".format(title=title) + \  
69.	                  "\n> #### **时间**: "  + current_time  + \  
70.	                  "\n> #### **状态**: " + str(sns_message['OldStateValue']) + " => " + str(sns_message['NewStateValue']) + \  
71.	                  "\n> #### **告警名称**: " + str(sns_message['AlarmName']) + \  
72.	                  "\n> #### **AWS区域**: " + str(sns_message['Region']) + \  
73.	                  "\n> #### **描述**: " + str(sns_message['AlarmDescription']) + \  
74.	                  "\n> #### **产品资源**: " + str(sns_message['Trigger']['Namespace']) + \  
75.	                  "\n> #### **实例名称**: " + str(instance_name) + \  
76.	                  "\n> #### **实例ID**: " + str(_value) + \  
77.	                  "\n> #### **实例IP**: " + str(instance_ip) + \  
78.	                  "\n> #### **告警磁盘**: " + str(_disk1) + \  
79.	                  "\n> #### **指标名称**: " + str(sns_message['Trigger']['MetricName']) + \  
80.	                  "\n> #### **报警详情**: " + str(sns_message['NewStateReason'])     
81.	    else:  
82.	        _value = sns_message['Trigger']['Dimensions'][1]['value']  
83.	        # 创建 EC2 客户端  
84.	        ec2 = boto3.client('ec2')  
85.	        # 替换 'your_instance_id' 为你要查询的实例的 ID  
86.	        instance_id = sns_message['Trigger']['Dimensions'][1]['value']  
87.	        # 使用 describe_instances 函数获取实例信息  
88.	        response = ec2.describe_instances(InstanceIds=[instance_id])  
89.	        print(response)  
90.	        # 提取实例的名称(Name 标签)  
91.	        for reservation in response['Reservations']:  
92.	            for instance in reservation['Instances']:  
93.	                for tag in instance['Tags']:  
94.	                    if tag['Key'] == 'Name':  
95.	                        instance_name = tag['Value']  
96.	                        print("实例 {instance_id} 的名称为: {instance_name}")  
97.	                          
98.	                          
99.	        for reservation in response['Reservations']:  
100.	            for instance in reservation['Instances']:  
101.	                instance_ip = instance['PrivateIpAddress']  
102.	                print("实例 {instance_ip} 的名称为: {PrivateIpAddress}")  
103.	        ##结束  
104.	        content = "### {title}".format(title=title) + \  
105.	                  "\n> #### **时间**: "  + current_time  + \  
106.	                  "\n> #### **状态**: " + str(sns_message['OldStateValue']) + " => " + str(sns_message['NewStateValue']) + \  
107.	                  "\n> #### **告警名称**: " + str(sns_message['AlarmName']) + \  
108.	                  "\n> #### **AWS区域**: " + str(sns_message['Region']) + \  
109.	                  "\n> #### **描述**: " + str(sns_message['AlarmDescription']) + \  
110.	                  "\n> #### **产品资源**: " + str(sns_message['Trigger']['Namespace']) + \  
111.	                  "\n> #### **实例名称**: " + str(instance_name) + \  
112.	                  "\n> #### **实例ID**: " + str(_value) + \  
113.	                  "\n> #### **实例IP**: " + str(instance_ip) + \  
114.	                  "\n> #### **告警磁盘**: " + str(_disk) + \  
115.	                  "\n> #### **指标名称**: " + str(sns_message['Trigger']['MetricName']) + \  
116.	                  "\n> #### **报警详情**: " + str(sns_message['NewStateReason'])     
117.	              
118.	                
119.	  
120.	    msg = {  
121.	        "msgtype": "markdown",  
122.	        "markdown": {  
123.	            "title": title,  
124.	            "text": content  
125.	        }  
126.	    }  
127.	  
128.	    # request  
129.	    request = requests.post(url=api_url, data=json.dumps(msg), headers=headers).content.decode("utf8")  
130.	      
131.	    request_new = requests.post(url=api_url_new, data=json.dumps(msg), headers=headers).

钉钉群里看到的消息如下列截图所示,相关人员看到告警信息就会迅速进行处理,这样就避免或最小化了潜在的影响。

持续告警

对于亚马逊云科技的 CloudWatch 告警,告警状态变为告警中只会发送一次消息,如果一直没有处理,告警消息不会再次发送出来。对于一直处于告警中的告警,我们需要定期检查,并持续通知到相关人员,直到告警状态为告警中的被处理。

通过研究,发现可以通过 EventBridge 定期触发 Lambda 进行实现。我们每隔十分钟触发一次扫描所有告警中的告警。然后根据告警的类型构建消息发送到钉钉群以及短信。

扫描所有告警中的告警代码示例如下,通过CloudWatch的describe_alarms API拿到所有告警中的告警,并且根据不同告警类型调用不通方法进行处理。在持续告警的场景中,我们还加入了短信通知的实现,为了让相关人员通过多个渠道收到信息并尽快处理。

1.	import time  
2.	import json  
3.	import os  
4.	import requests  
5.	import datetime  
6.	import boto3  
7.	import hashlib  
8.	import base64  
9.	import hmac  
10.	from urllib.parse import quote  
11.	  
12.	# 创建 CloudWatch 和 SNS 的客户端  
13.	cloudwatch = boto3.client('cloudwatch')  
14.	title = '![1.png](https://github.com/printZZC/alarm/blob/main/%E5%9B%BE%E7%89%872.png?raw=true)'  
15.	ec2 = boto3.client('ec2')  
16.	headers = {'Content-Type': 'application/json;charset=utf-8'}  
17.	current_time = (datetime.datetime.now() + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')  
18.	token = ''  
19.	token1 = ''  
20.	token2 = ''  
21.	token3 = ''  
22.	url = "https://oapi.dingtalk.com/robot/send?access_token="  
23.	api_url = url + token  
24.	api_url_new = url + token1  
25.	api_url_center = url +token2  
26.	api_test = url+token3  
27.	  
28.	#发送阿里短信变量  
29.	access_key_id = ''  
30.	access_key_secret = ''  
31.	sign_name = ''  
32.	template_code = ''  
33.	phone_number = ''  
34.	  
35.	def percentEncode(s):  
36.	    return quote(s.encode('utf-8'),safe ='~')  
37.	  
38.	def isBasicAlarm(alarmName):  
39.	    if alarmName and ((alarmName.find('Disk')!=-1 or alarmName.find('mem')!=-1 or alarmName.find('cpu')!=-1)) and alarmName.find('kafka')==-1 and alarmName.find('Kafka')==-1:  
40.	        return  True  
41.	    else:  
42.	        return False  
43.	  
44.	def isAuoraAlarm(alarmName):  
45.	    if alarmName and alarmName.find('Aurora')!=-1:  
46.	        return True  
47.	    else:  
48.	        return False  
49.	  
50.	def isRedisAlarm(alarmName):  
51.	    if alarmName and alarmName.find('Redis')!=-1:  
52.	        return True  
53.	    else:  
54.	        return False  
55.	  
56.	def isKafkaAlarm(alarmName):  
57.	    if alarmName and (alarmName.find('Kafka')!=-1 or alarmName.find('kafka')!=-1):  
58.	        return True  
59.	    else:  
60.	        return False  
61.	  
62.	  
63.	  
64.	def processAurora(alarm):  
65.	    content = "### {title}".format(title=title) + \  
66.	              "\n> #### **时间**: "  + current_time  + \  
67.	              "\n> #### **状态**: " + str(alarm['StateValue']) + \  
68.	              "\n> #### **告警名称**: " + str(alarm['AlarmName']) + \  
69.	              "\n> #### **告警描述**: " + str(alarm['AlarmDescription']) + \  
70.	              "\n> #### **告警指标**: " + str(alarm['MetricName']) + \  
71.	              "\n> #### **实例名称**: " + str(alarm['Dimensions'][1]['Value']) + \  
72.	              "\n> #### **告警原因**: " + str(alarm['StateReason'])  
73.	    msg = {  
74.	        "msgtype": "markdown",  
75.	        "markdown": {  
76.	            "title": title,  
77.	            "text": content  
78.	        }  
79.	    }  
80.	  
81.	    content_aurora = "时间: "+ current_time+",状态: " + str(alarm['StateValue'])+",告警名称: " + str(alarm['AlarmName'])+",告警描述: " + str(alarm['AlarmDescription'])+",告警指标: " + str(alarm['MetricName'])+",实例名称: " + str(alarm['Dimensions'][1]['Value']) + ",告警原因:" + str(alarm['StateReason'])  
82.	      
83.	    template_param = {'awsmessage' : content_aurora}  # 根据模板中的参数进行替换  
84.	      
85.	    result = send_sms(access_key_id, access_key_secret, sign_name, template_code, phone_number, template_param)  
86.	    print(result)  
87.	    request_aurora = requests.post(url=api_test, data=json.dumps(msg), headers=headers).content.decode("utf8")  
88.	  
89.	def processRedis(alarm):  
90.	    content = "### {title}".format(title=title) + \  
91.	              "\n> #### **时间**: "  + current_time  + \  
92.	              "\n> #### **状态**: " + str(alarm['StateValue']) + \  
93.	              "\n> #### **告警名称**: " + str(alarm['AlarmName']) + \  
94.	              "\n> #### **告警描述**: " + str(alarm['AlarmDescription']) + \  
95.	              "\n> #### **告警指标**: " + str(alarm['MetricName']) + \  
96.	              "\n> #### **实例名称**: " + str(alarm['Dimensions'][0]['Value']) + \  
97.	              "\n> #### **告警原因**: " + str(alarm['StateReason'])  
98.	    msg = {  
99.	        "msgtype": "markdown",  
100.	        "markdown": {  
101.	            "title": title,  
102.	            "text": content  
103.	        }  
104.	    }  
105.	  
106.	    content_redis = "时间: "+ current_time+",状态: " + str(alarm['StateValue'])+",告警名称: " + str(alarm['AlarmName'])+",告警描述: " + str(alarm['AlarmDescription'])+",告警指标: " + str(alarm['MetricName'])+",实例名称: " + str(alarm['Dimensions'][0]['Value']) + ",告警原因:" + str(alarm['StateReason'])  
107.	      
108.	    template_param = {'awsmessage' : content_redis}  # 根据模板中的参数进行替换  
109.	      
110.	    result = send_sms(access_key_id, access_key_secret, sign_name, template_code, phone_number, template_param)  
111.	    print(result)  
112.	    request_redis = requests.post(url=api_test, data=json.dumps(msg), headers=headers).content.decode("utf8")  
113.	      
114.	      
115.	def processKafka(alarm):  
116.	    content = "### {title}".format(title=title) + \  
117.	              "\n> #### **时间**: "  + current_time  + \  
118.	              "\n> #### **状态**: " + str(alarm['StateValue']) + \  
119.	              "\n> #### **告警名称**: " + str(alarm['AlarmName']) + \  
120.	              "\n> #### **告警描述**: " + str(alarm['AlarmDescription']) + \  
121.	              "\n> #### **告警指标**: " + str(alarm['MetricName']) + \  
122.	              "\n> #### **实例名称**: " + str(alarm['Dimensions'][0]['Value']) + \  
123.	              "\n> #### **告警原因**: " + str(alarm['StateReason'])  
124.	    msg = {  
125.	        "msgtype": "markdown",  
126.	        "markdown": {  
127.	            "title": title,  
128.	            "text": content  
129.	        }  
130.	    }  
131.	  
132.	    content_kafka = "时间: "+ current_time+",状态: " + str(alarm['StateValue'])+",告警名称: " + str(alarm['AlarmName'])+",告警描述: " + str(alarm['AlarmDescription'])+",告警指标: " + str(alarm['MetricName'])+",实例名称: " + str(alarm['Dimensions'][0]['Value']) + ",告警原因:" + str(alarm['StateReason'])  
133.	      
134.	    template_param = {'awsmessage' : content_kafka}  # 根据模板中的参数进行替换  
135.	      
136.	    result = send_sms(access_key_id, access_key_secret, sign_name, template_code, phone_number, template_param)  
137.	    print(result)  
138.	    request_kafka = requests.post(url=api_test, data=json.dumps(msg), headers=headers).content.decode("utf8")  
139.	      
140.	def processBasic(alarm):  
141.	        dimensions = alarm['Dimensions']  
142.	        instanceId = ''  
143.	        path = ''  
144.	        instance_name = ''  
145.	        instance_ip = ''  
146.	        content_ec = ''  
147.	          
148.	        for dimension in dimensions:  
149.	            if(dimension['Name'] == 'InstanceId'):  
150.	                instanceId = dimension['Value']  
151.	              
152.	            if(dimension['Name'] == 'path'):  
153.	                path = dimension['Value']  
154.	                  
155.	        # 使用 describe_instances 函数获取实例信息  
156.	        response = ec2.describe_instances(InstanceIds=[instanceId])  
157.	        print(response)  
158.	        # 提取实例的名称(Name 标签)  
159.	        for reservation in response['Reservations']:  
160.	            for instance in reservation['Instances']:  
161.	                for tag in instance['Tags']:  
162.	                    if tag['Key'] == 'Name':  
163.	                        instance_name = tag['Value']  
164.	                        print("实例 {instanceId} 的名称为: {instance_name}")  
165.	                      
166.	                      
167.	        for reservation in response['Reservations']:  
168.	            for instance in reservation['Instances']:  
169.	                instance_ip = instance['PrivateIpAddress']  
170.	                print("实例 {instance_ip} 的名称为: {PrivateIpAddress}")  
171.	                    
172.	          
173.	        if  alarm['MetricName'] == 'disk_used_percent':  
174.	            content_ec = "时间: "+ current_time+",状态: " + str(alarm['StateValue'])+ ",告警名称:" + str(alarm['AlarmName']) +",告警描述: " + str(alarm['AlarmDescription'])+",告警指标: " + str(alarm['MetricName'])+",实例名称: " + str(instance_name) + ",实例IP: " + str(instance_ip) + ",磁盘目录: " + str(path) + ",告警原因: " + str(alarm['StateReason'])  
175.	            content = "### {title}".format(title=title) + \  
176.	                  "\n> #### **时间**: "  + current_time  + \  
177.	                  "\n> #### **状态**: " + str(alarm['StateValue']) + \  
178.	                  "\n> #### **告警名称**: " + str(alarm['AlarmName']) + \  
179.	                  "\n> #### **告警描述**: " + str(alarm['AlarmDescription']) + \  
180.	                  "\n> #### **告警指标**: " + str(alarm['MetricName']) + \  
181.	                  "\n> #### **实例名称**: " + str(instance_name) + \  
182.	                  "\n> #### **实例ID**: " + str(instanceId) + \  
183.	                  "\n> #### **告警原因**: " + str(alarm['StateReason']) + \  
184.	                  "\n> #### **实例IP**: " + str(instance_ip) + \  
185.	                  "\n> #### **磁盘目录**: " + str(path)  
186.	                    
187.	        else:  
188.	            content_ec = "时间: "+ current_time+",状态: " + str(alarm['StateValue'])+",告警名称: " + str(alarm['AlarmName'])+",告警描述: " + str(alarm['AlarmDescription'])+",告警指标: " + str(alarm['MetricName'])+",实例名称: " + str(instance_name) + ",实例IP:" + str(instance_ip) + ",告警原因: " + str(alarm['StateReason'])  
189.	            content = "### {title}".format(title=title) + \  
190.	                  "\n> #### **时间**: "  + current_time  + \  
191.	                  "\n> #### **状态**: " + str(alarm['StateValue']) + \  
192.	                  "\n> #### **告警名称**: " + str(alarm['AlarmName']) + \  
193.	                  "\n> #### **告警描述**: " + str(alarm['AlarmDescription']) + \  
194.	                  "\n> #### **告警指标**: " + str(alarm['MetricName']) + \  
195.	                  "\n> #### **实例名称**: " + str(instance_name) + \  
196.	                  "\n> #### **实例ID**: " + str(instanceId) + \  
197.	                  "\n> #### **告警原因**: " + str(alarm['StateReason']) + \  
198.	                  "\n> #### **实例IP**: " + str(instance_ip)  
199.	          
200.	        print(content_ec)  
201.	                        
202.	        msg = {  
203.	            "msgtype": "markdown",  
204.	            "markdown": {  
205.	                "title": title,  
206.	                "text": content  
207.	            }  
208.	        }  
209.	      
210.	        template_param = {'awsmessage' : content_ec}  # 根据模板中的参数进行替换  
211.	      
212.	          
213.	        if instance_name.find('中央研究院')!=-1:  
214.	            request_center = requests.post(url=api_url_center, data=json.dumps(msg), headers=headers).content.decode("utf8")  
215.	          
216.	        elif alarm['AlarmName'].find('Disk')!=-1:  
217.	        # request  
218.	            result = send_sms(access_key_id, access_key_secret, sign_name, template_code, phone_number, template_param)  
219.	            print(result)  
220.	            request = requests.post(url=api_url, data=json.dumps(msg), headers=headers).content.decode("utf8")  
221.	            request_new = requests.post(url=api_url_new, data=json.dumps(msg), headers=headers).content.decode("utf8")  
222.	          
223.	        elif alarm['AlarmName'].find('mem')!=-1:  
224.	            result = send_sms(access_key_id, access_key_secret, sign_name, template_code, phone_number, template_param)  
225.	            print(result)  
226.	            request = requests.post(url=api_url, data=json.dumps(msg), headers=headers).content.decode("utf8")  
227.	          
228.	        elif alarm['AlarmName'].find('cpu')!=-1:  
229.	            result = send_sms(access_key_id, access_key_secret, sign_name, template_code, phone_number, template_param)  
230.	            print(result)  
231.	            request = requests.post(url=api_url, data=json.dumps(msg), headers=headers).content.decode("utf8")  
232.	  
233.	  
234.	##调用阿里短信服务推送函数  
235.	def send_sms(access_key_id, access_key_secret, sign_name, template_code, phone_number, template_param):  
236.	    # 阿里云短信接口请求地址  
237.	    url_ali = 'https://dysmsapi.aliyuncs.com/'  
238.	      
239.	    # 构建请求参数  
240.	    params = {  
241.	        'AccessKeyId': access_key_id,  
242.	        'Timestamp': time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),  
243.	        'Format': 'JSON',  
244.	        'SignatureMethod': 'HMAC-SHA1',  
245.	        'SignatureVersion': '1.0',  
246.	        'SignatureNonce': str(int(time.time() * 1000)),  
247.	        'Action': 'SendSms',  
248.	        'Version': '2017-05-25',  
249.	        'RegionId': 'cn-hangzhou',  
250.	        'PhoneNumbers': phone_number,  
251.	        'SignName': sign_name,  
252.	        'TemplateCode': template_code,  
253.	        'TemplateParam': json.dumps(template_param)  
254.	    }  
255.	      
256.	    print(sign_name)  
257.	      
258.	    # 对参数进行排序  
259.	    sorted_params = sorted(params.items(), key=lambda x: x[0])  
260.	      
261.	    # 构建待签名字符串  
262.	    #query_string = '&'.join([f'{p[0]}={requests.utils.quote(p[1])}' for p in sorted_params])  
263.	    #string_to_sign = 'GET&%2F&' + requests.utils.quote(query_string)  
264.	    query_string = '&'.join([f'{p[0]}={percentEncode(p[1])}' for p in sorted_params])  
265.	    string_to_sign = 'GET&%2F&' + percentEncode(query_string)      
266.	      
267.	      
268.	      
269.	    # 计算签名  
270.	    sign = base64.b64encode(hmac.new((access_key_secret + '&').encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha1).digest()).decode('utf-8')  
271.	      
272.	    # 添加签名到请求参数中  
273.	    params['Signature'] = sign  
274.	      
275.	    print(params)  
276.	    # 发送 POST 请求  
277.	    response = requests.get(url_ali, params=params)  
278.	      
279.	    return response.json()        
280.	      
281.	      
282.	  
283.	def lambda_handler(event, context):  
284.	    # 获取当前所有的告警  
285.	    alarms = cloudwatch.describe_alarms(  
286.	            StateValue='ALARM'  
287.	        )['MetricAlarms']  
288.	      
289.	    print(len(alarms))  
290.	    print(alarms)  
291.	      
292.	# 构建告警信息  
293.	    for alarm in alarms:  
294.	        if isBasicAlarm(alarm['AlarmName']):  
295.	            processBasic(alarm)  
296.	        if isAuoraAlarm(alarm['AlarmName']):  
297.	            processAurora(alarm)  
298.	        if isKafkaAlarm(alarm['AlarmName']):  
299.	            processKafka(alarm)  
300.	        if isRedisAlarm(alarm['AlarmName']):  
301.	            processRedis(alarm)  
302.	          
303.	          
304.	      
305.	    return {  
306.	        'statusCode': 200,  
307.	        'body': 'Alarm details sent to SNS topic.'  
308.	    }  

短信以及钉钉消息截图如下所示,相关人员接受到短信以及钉钉消息后,就能及时处理这些问题,避免更多潜在的影响。

总结

本文为大家在使用亚马逊云科技的基础实施过程中如何利用云原生的服务 CloudWatch 创建监控、告警提供了一个思路。只有做好完备的监控和告警才能为业务保驾护航。

本篇作者

蔡如海

西云数据解决方案架构师,10+年开发和架构经验,曾就职于知名外企,在媒体、金融等业务领域有丰富的工作经验,擅长云计算、机器学习等技术,并且有丰富的项目管理经验。

张子辰

阳光电源运维工程师,负责大数据组件、云平台维护。

汪仁君

伟仕佳杰-云计算事业部-高级工程师,拥有 20+年的 IT 从业经验。致力于云计算产品解决方案在企业中的推广和应用,云端架构方案制定,方案讲解演示,现场答疑,项目 POC,落地实施,交付和维护。

刘科

西云数据解决方案架构师,15+年 IT 研发、管理经验,曾就职于知名通信设备厂商、头部互联网企业,有丰富、广泛的系统架构与研发管理经验。擅长帮助企业打造完善研发 Devops 体系,构建网络与安全体系,帮助企业业务产品项目落地。

卢达

解决方案架构师,10+年产品研发和解决方案架构咨询经验,曾就职于知名外企,对企业 IT 架构、云计算以及企业级存储等技术领域有丰富的经验。