亚马逊AWS官方博客

亚马逊云科技异常流量监控告警

背景

开发者通常利用亚马逊云科技的 Amazon API 网关Amazon VPC 端点实现应用程序对外及内部的接口发布。然而,如何基于云原生的方式来分析和监控出口流量,成为许多人所困扰的难题。本文将介绍一种出口流量监控方案,它融合了亚马逊云科技原生服务和第三方监控工具 Grafana,为您呈现一种全新的流量监控体验。

通过创新性地结合云端资源和强大的可视化工具,该方案不仅能够实时捕捉出口流量的动态变化,更能深入剖析流量模式,为您的应用程序性能优化和安全防护提供坚实的数据支撑。无论您是云计算资深从业者,还是刚步入云端世界,本文必将为您开启流量监控的新视野。

需求分析

一般云上客户主要是用 Amazon API 网关 和 Amazon VPC 端点进行服务发布,对于数据的发布方式主要通过以下两种方式实现:

整个解决方案需要对 Amazon API 网关Amazon VPC 端点 的流量相关日志进行收集和处理。当流量超过阈值时,将发出流量异常警报。

为了更好的了解这两种服务及设计思路,现对其数据格式及实现内容进行说明如下:

服务名称 具备日志服务 可提供日志类型 说明
API 网关 执行日志和访问日志。 此解决方案仅需访问日志记录。 使用包含用于分析的文件、$context.requestTime、$context.domainName、$context.identity.SourceIP、$context.requestTime 和其他字段的访问日志记录。
VPC 端点 VPC 流量日志。此解决方案会复用客户已有的 Central Logging 中的 VPC Flow Logs 使用 VPC Flow Logs 结合 VPC Endpoints 和域名的对应的关系来进行格式化结果记录。

架构设计

该解决方案的整体架构设计考虑了端到端的流程,从数据生成到数据处理、数据消费、报告展示, 以及异常数据的阈值设置和警报管理。架构设计将数据处理过程分为以下几个主要部分:

数据生成

来自两个主要数据源:Amazon API 网关 Access Logs 和 VPC Flow Logs

  • Amazon API 网关 Access Logs 通过启用 Access Logging 直接生成
  • VPC Flow Logs 复用客户现有的 Central Logging 解决方案

数据 ETL

对原始数据进行提取(Extract)、转换(Transform)和加载(Load)

数据消费和报告展示

  • 转换后的数据存储在 Amazon S3 存储桶,通过 Amazon Athena 对数据进行查询分析
  • 通过 Grafana 控制面板结合 Amazon Athena Plugin 对数据进行可视化展示,包括流量趋势图和 Top IP 等

阈值设置和警报管理

  • 在 Grafana 中设置流量阈值,当超过阈值时触发警报
  • 警报通过 Alert-manager 发送到 Amazon SNS,再通知相关利益相关方

架构设计综合利用了亚马逊云科技的多种服务,实现了从数据采集、ETL、存储、可视化到异常监控的一体化解决方案。详细方案设计将在后续章节中详细介绍。

图 1. 异常流量监控告警整体架构图

详细设计

为了更好地说明架构设计中更为代表性的实现和考虑,接下来将从数据获取、提取、转换、加载及数据消费这几个角度,分别说明 VPC Flow Logs 和 Amazon API 网关 Access Logs 的处理方式,同时阐述告警实现的关键设计。

VPC Flow Logs

VPC Flow Logs 是 Amazon Virtual Private Cloud (Amazon VPC) 的一项功能,可帮助捕获有关进出您网络接口的 IP 流量的信息VPC。但由于标准 VPC Flow Logs 中更多的是对于 IP 流量的解读而缺少对于应用的描述,根据需求需要将标准日志中的数据流量进行字段扩充,将应用相关的描述补充到数据流量日志中。

对于 VPC Flow Logs,该架构设计充分考虑了数据实效性、字段可读性和存储格式等实际因素,并采取了一系列创新性的措施来优化数据处理流程。在数据获取环节,复用了客户现有的 Central Logging 解决方案中的 VPC Flow Logs 数据;在数据 ETL 环节,通过 AWS Glue ETL 作业与 DynamoDB 中的 VPC 端点元数据相结合,实现了字段扩充和格式转换,并利用作业书签和 Workflows 等功能实现了增量处理和作业编排;在数据消费环节,则使用 Athena 和 Grafana 对处理后的数据进行查询和可视化展示。

图 2  VPC Flow Logs 处理架构图

接来下将选取数据处理过程中的几个关键点进行分别说明在数据处理过程中如何进行考虑并进行优化相关的配置。

基础数据获取

该解决方案将基于客户原有的 Central Logging 日志解决方案,对已经产生的日志进行使用并进行分析。客户原有的 Central logging 日志存储在 Security Logging 账户中,在对原有日志进行消费前,需要集合整个解决方案对于数据展示的实效性及数据格式和存储方式等维度进行考虑,这些是消费这些数据的前置条件也是整个解决方案中如何进行有效字段扩充和响应延迟的前置。

  • 日志实效性:已有解决方案 VPC Flow logs 每 10 分钟会进行聚合并生成,这意味着该解决方案从数据生成到数据处理将有至少 10 分钟的延迟。
  • 字段格式:VPC Flow logs 的数据格式只包含了基础的网络数据流相关的信息,整体缺乏可读性,需要添加额外的元数据字段来填充相应的数据;
  • 数据存储:数据以 CSV 格式生成。 考虑到后续数据处理的效率,需要在 ETL 过程中将其转换为 Parquet 格式。

元数据表设计和更新原则(元数据设计)

考虑到客户原有数据已经存储在 Amazon S3 中,且格式化后的数据仍存储在 Amazon S3 中,为了操作的便捷性和可扩展性,架构设计中采用 AWS Glue ETL Job 完成数据的抽取和转换工作。通过已有的 Share Services VPC 的 VPC Flow Logs 结合生成的 VPC Endpoints 和域名的对应关系完成字段的扩充。 具体实现逻辑如下:

  • 在 “VPC Endpoints Collection“ 中,通过 Amazon Route53 Event 事件触发 Lambda,从而实现 Amazon Route53 PHZ 中的 DNS 记录和 VPC 端点的信息采集,并将结果作为元数据存储在 Amazon DynamoDB 中,用于后续 Glue Job 处理过程中的字段扩充。默认情况下,AWS Glue Job 将使用 record_delete_time 为 “9999-12-31T 00:00:00” 的记录进行映射。
  • 当 VPC  端点发生变更后,AWS Lambda 会自动调整 Amazon DynamoDB 中元数据的记录并将 VPC 端点的相关信息更新到表中的记录。
  • 如果发生 VPC 端点删除,AWS Lambda 会自动将元数据中 record_delete_time 设置为当前时间,使其不再参与后续的 Glue Job 数据处理。

数据格式和字段如下所示:

{
  "vpc_endpoint_id": {
    "S": "vpce-0593f03c52cc203aa"
  },
  "hostzone_id": {
    "S": "Z0546917VILAMMMMMMMM"
  },
  "record_create_time": {
    "S": "2024-11-04T02:54:13.971082"
  },
  "record_delete_time": {
    "S": "9999-12-31T00:00:00"
  },
  "record_name": {
    "S": "monitor.dev.aws.example.cn."
  },
  "target_ips": {
    "L": [
      {
        "S": "100.65.249.139"
      },
      {
        "S": "100.65.248.112"
      }
    ]
  }
}
JSON

数据增量处理实现(ETL)

AWS Glue 通过保存作业运行的状态信息来跟踪上次运行 ETL 作业期间已处理的数据。此持久状态信息称为作业书签。作业书签可帮助 AWS Glue 维护状态信息,并可防止重新处理旧数据。有了作业书签,您可以在按照计划的时间间隔重新运行时处理新数据。作业书签包含作业的各种元素的状态,如源、转换和目标。例如,您的 ETL 任务可能会读取 Amazon S3 文件中的新分区。AWS Glue 跟踪任务已成功处理哪些分区,以防止任务的目标数据存储中出现重复处理和重复数据。为了达到基于增量的日志处理,在数据处理过程中通过采用 Glue ETL Job 的 Glue Bookmark 功能实现基于增量的 VPC Flow logs的处理,这样可以大幅度减少每次扫描的时间和成本;为了实现 Glue ETL Job 自动的分区处理,通过使用 Catalog 的源数据读取模式进行 S3 数据读取,并将字段存储在 DataFrame 中用于后续的 Amazon S3 数据存储路径和目标 Glue 数据目录的实现,这样可以确保处理后的数据能够根据源分区资源将数据存储到目标路径中,从而保证了数据分区格式的一致性。

AWS Glue ETL Job 部分核心代码设置如下所示:

# 从DynamoDB表中读取数据
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(dynamodb_mapping_table)
response = table.scan(
    FilterExpression=Attr('record_delete_time').eq('9999-12-31T00:00:00')
)
items = response['Items']

# 将DynamoDB中读取的数据转换为Spark DataFrame,并重新分区为1个分区
dynamodb_schema = StructType([
    StructField("vpc_endpoint_id", StringType(), True),
    StructField("record_name", StringType(), True),
    StructField("hostzone_id", StringType(), True),
    StructField("record_create_time", StringType(), True),
    StructField("record_delete_time", StringType(), True),
    StructField("target_ips", ArrayType(StringType()), True)
])
dynamodb_df = spark.createDataFrame(items, schema=dynamodb_schema)
dynamodb_df = dynamodb_df.repartition(1)

# 从AWS Glue Data Catalog中读取源表,创建DynamicFrame
datasource0 = glueContext.create_dynamic_frame.from_catalog(
    database= catalog_database,
    table_name= source_catalog_table,
    transformation_ctx="datasource0"
)

# 使用DynamoDB表中的target_ips列过滤VPC Flow Logs数据
filtered_vpc_flow_logs_df = filtered_data_frame.join(
    dynamodb_df.select("target_ips"),
    (array_contains(col("target_ips"), col("srcaddr")) |
     array_contains(col("target_ips"), col("dstaddr"))),
    how="inner"
)

# 将过滤后的数据写回AWS Glue Data Catalog中的目标表,并设置分区键和增量更新策略
catalogsink = glueContext.write_dynamic_frame.from_catalog(
    frame=DynamicFrame.fromDF(joined_df, glueContext, "filtered_applymapping"),
    database= catalog_database,
    table_name= target_catalog_table,
    transformation_ctx="write_catalog",
    additional_options={
        "enableUpdateCatalog": True,
        "updateBehavior": "UPDATE_IN_DATABASE",
        "partitionKeys": ["aws-account-id", "aws-service", "aws-region", "year", "month", "day"]
    }
)
Python

AWS Glue ETL Job Bookmark 激活(ETL)

Enable Job bookmark,使作业在运行后更新状态,以跟踪之前处理的数据。如果作业的源支持作业书签,它将跟踪已处理的数据,当作业运行时,它将处理自上一检查点以来的新数据。

图 3. Glue Bookmark 的启用

AWS Glue Workflows (ETL)

为了规避 AWS Glue Bookmark 功能对 AWS Glue Job 并发性的限制,定义 AWS Glue Work flow 触发器用于 VPC Flow logs 事件的缓冲处理(Batch size = 4 和 Batch windows = 240s),当对应 S3 Event 事件有 4 条时会自动触发 AWS GlueJob,或者 4 分钟后自动执行。可根据实际情况对此参数进行优化,用于避免 AWS Glue Job 高并发而导致数据处理失败。

图 4. Glue Workflow 的配置实现

API 网关 Access Logs

Amazon API 网关 Access Logs记录了谁访问了 API 以及调用方访问 API 的方式。

对于 Amazon API 网关 Access Logs,架构设计更侧重于数据转换的灵活性和高效性。在数据获取环节,直接启用 Amazon API 网关的 Access Logging 功能;在数据 ETL 环节,使用 Amazon Firehose 的 Transform AWS Lambda 对日志数据进行格式转换,包括 IP 白名单识别、字段转换和数据解压缩等;在数据消费环节,与 VPC Flow Logs 类似,也使用 Amazon Athena 和 Grafana 对转换后的数据进行查询和可视化。

图 5. API Access Logs 处理架构图

基础数据获取

为了避免数据无效并导致后续的数据分析,用于为 Amazon CloudWatch Log Group 订阅筛选器定义的过滤器名称和日志格式应包含上述所定义的字段。 如果缺少字段如果日志格式配置,CloudWatch 日志 groups 的Subscription filters 和 Transform Convert AWS Lambda 将在数据转换期间对数据进行常规筛选。在 AWS Lambda 执行过程中会将有问题的日志输出到 AWS Lambda CloudWatch 日志组中。

{ "requestId":"$context.requestId", "extendedRequestId":"$context.extendedRequestId","ip": "$context.identity.sourceIp", "caller":"$context.identity.caller", "user":"$context.identity.user", "requestTime":"$context.requestTime", "httpMethod":"$context.httpMethod", "domainName":"$context.domainName", "domainPrefix":"$context.domainPrefix", "resourcePath":"$context.resourcePath", "status":"$context.status", "protocol":"$context.protocol", "responseLength":"$context.responseLength"}
JSON

数据获取和数据 ETL

该解决方案将对客户原有的 Amazon API 网关 Access Logs 进行启用,并根据展示分析对 Access Logs 中所包含的内容字段进行选择,数据在通过 Firehose 时通过 Transform AWS Lambda 对数据进行转换和存储。Transform AWS Lambda 主要完成以下功能:

  • 快速识别非白名单中的异常数据,在数据处理过程中,通过定义的白名单列表进行确定 API 访问日志的客户端的 IP 地址是否属于白名单范围,并将结果标记为 is_whitelisted 字段。 如果它属于白名单,则为 True,否则为 False。
  • 对 Amazon API 网关 访问日志数据格式进行处理,例如将日期字段转换为 Grafana 处理更为友好的 Unix Timestamp。
  • 处理消息中的字段展平,在处理过程中对消息体外的字段进行处理并将其存储到消息体中,从而实现扁平化字段的目的。
  • 数据的解压缩和无效数据的处理,判断所需字段是否存在,如果字段不存在或者存在无效字段数据,将相关的记录作为异常抛出便于管理员进行问题的跟踪和处理。

部分核心代码如下所示:

# 从Lambda环境变量中获取API跟踪日志的白名单路径
API_TRACELOGS_WHITELISTS = os.environ['API_TRACELOGS_WHITELISTS']

# 缓存已判断过的IP地址,避免重复判断影响执行效率
ip_cache = {}
# 从亚马逊云科技 Systems Manager Parameter Store中获取白名单IP列表
def get_whitelist_ips(whitelist_paths):
    ssm_client = boto3.client('ssm')
    whitelist_ips = []
    whitelist_paths = whitelist_paths.split(',')
    for path in whitelist_paths:
        try:
            response = ssm_client.get_parameter(Name=path, WithDecryption=True)
            whitelist_ips.extend(response['Parameter']['Value'].split('\n'))
        except Exception as e:
            print(f"Error getting whitelist from {path}: {e}")
    return whitelist_ips
# 预处理白名单IP列表,将IP地址转换为网络对象,并去除重叠的网络
def preprocess_whitelist(whitelist_ips):
    whitelist_networks = set()
    for ip_str in whitelist_ips:
        try:
            network = ipaddress.ip_network(ip_str)
            if any(network.subnet_of(existing_network) for existing_network in whitelist_networks):
                continue
            whitelist_networks.add(network)
        except ValueError:
            continue
    return whitelist_networks
# 全局变量,只在Lambda函数加载时初始化一次
whitelist_networks = preprocess_whitelist(get_whitelist_ips(API_TRACELOGS_WHITELISTS))
# 检查IP地址是否在白名单中
def is_ip_whitelisted(ip, whitelist_networks):
    if ip in ip_cache:
        return ip_cache[ip]
      try:
        ip_obj = ipaddress.ip_address(ip)
    except ValueError:
        ip_cache[ip] = False
        return False
        for network in whitelist_networks:
        if ip_obj in network:
            ip_cache[ip] = True
            return True
      ip_cache[ip] = False
    return False
### 使用新的解决方案进行IP地址检查和验证 ###
def transformLogEvent(log_event, owner, logGroup, subscriptionFilters):
    try:
        message = json.loads(log_event['message'])
        # 过滤不符合API网关日志格式的记录
        if not (message.get('requestTime') and message.get('ip') and message.get('domainName') and message.get('responseLength')): 
            print('Some records abandon while calling transformLogEvent, Need to redefine the API网关 log format according to the Abnormal Traffic Guideline.')
            print(f"Debug info: owner={owner}, logGroup={logGroup}, subscriptionFilters={subscriptionFilters}")
            print(f"Original message: {message}")
            return None
        
        message['owner'] = owner
        message['logGroup'] = logGroup
        message['subscriptionFilters'] = subscriptionFilters
        request_time = datetime.strptime(message['requestTime'], "%d/%b/%Y:%H:%M:%S %z")
        message['requestTime'] = int(time.mktime(request_time.timetuple()))
        # 检查IP地址是否在白名单中,并在返回的消息中添加新字段
        ip = message.get('ip')
        if ip:
            try:
                message['is_whitelisted'] = is_ip_whitelisted(ip, whitelist_networks)
            except ValueError:
                message['is_whitelisted'] = False
        return json.dumps(message) + '\n'
    except Exception as e:
        print(f"Error processing log event: {e}")
        return None
Python

其他设计部分和关键实现

跨账户的 Amazon S3 Object Event 的使用

使用 Amazon EventBridge AWS Organizations 中的相关事件进行捕捉并将事件转发给目标网络账户。通过使用 EventBridge 的无服务器事件总线,实现极基于对象事件的接收、过滤、转换、路由和传递。通过对 VPC Flow logs 集中存储桶开启 Amazon EventBridge 功能将 S3 对象创建的事件转发到目标网络账户中,并在目标网络账户中对对所需的对象事件进行过滤并进行转发,将满足筛选的事件转发到分区检测 Lambda 中完成目标分区的检测和分区的创建。

图 6. 跨账户的 S3 Object Event 转发示意图

Auto Partition 的实现(S3 Event Base)

AWS Glue 分区可用于提高查询性能和缩短数据处理时间。 分区是一种根据特定列将表或目录中的数据分成较小的部分来组织表或目录中的数据的方法。 这允许对数据进行并行处理,从而可以显著提高查询性能。 在架构设计中,将基于特定字段(例如年、月和日)创建分区。 分区在 AWS Glue 目录中具有元数据属性,用于记录分区及其分区文件的位置。 当在新的 S3 分区中创建对象时,默认情况下不会自动创建该分区。为了实现基于 S3 对象的自动分区的识别,采用 S3 Object Create Event 进行自动识别和判断,当制定 S3 桶中有对象生成时会自动根据事件进行触发分区的判断和生成逻辑从而实现分区的自动识别和创建。

AWS Glue Catalog 存在多种自动分区的创建方法,我们选取了其中的三种进行对比。从成本和后续扩展性角度进行触发,最终选择了 Amazon Lambda 来实现新的 S3 对象的识别以及分区的检查和创建:

自动分区方式 优点 缺点
Glue Cralwer 易于使用,需少量配置即可使用 成本很高
执行效率较慢
用它来识别分区和创建有点“大炮打蚊子”
MSCK REPAIR TABLE 易于使用,需调用 SQL 执行 仍需要采用其他方式,例如 Lambda 执行 SQL
执行时它会扫描现有分区
Lambda 成本非常低,且效率高
可以基于 S3 对象事件触发
可以根据您的需求进行完全定制
需要编写更多的代码和判断逻辑来识别和创建新的分区

Auto Partition 部分核心代码如下所示:

def create_or_update_partition(DATABASE_NAME, TABLE_NAME, source_key):
    """
    该函数用于在 AWS Glue 数据目录中创建或更新分区。
    参数:
    DATABASE_NAME (str): Glue 数据库名称
    TABLE_NAME (str): Glue 表名称
    source_key (str): 源数据路径,例如 's3://bucket/path/to/data'
    功能:
    1. 根据 source_key 提取 Glue 数据库和表名
    2. 从 source_key 中解析出分区值
    3. 初始化 Glue 客户端
    4. 尝试获取分区信息,如果分区已经存在,则跳过创建
    5. 如果分区不存在,则:
        a. 获取表的详细信息
        b. 创建自定义存储描述符,包含新的分区位置
        c. 在 Glue 数据目录中创建新的分区
    6. 如果创建成功,则打印成功消息
    7. 如果出现其他异常,则打印异常信息
    """
    # 根据 source_key 提取 Glue 数据库和表名
    TABLE_NAME = get_table_name(source_key)
    source_key = unquote(source_key)
    parts = source_key.split('/')[1:-1]
    partitions_values = [part.split('=')[1] for part in parts]
    # 使用 Boto 3 初始化 Glue 客户端
    glue_client = boto3.client('glue')
    try:
        # 检查分区是否已经存在。如果存在,则跳过创建
        get_partition_response = glue_client.get_partition(
            DatabaseName=DATABASE_NAME,
            TableName=TABLE_NAME,
            PartitionValues=partitions_values
        )
        print('Glue partition already exists.')
    except Exception as e:
        # 检查异常是否为 EntityNotFoundException。如果是,则继续创建分区
        if type(e).__name__ == 'EntityNotFoundException':
            print('Retrieve Table Details:')
            get_table_response = glue_client.get_table(
                DatabaseName=DATABASE_NAME,
                Name=TABLE_NAME
            )
            # 提取现有的存储描述符,并创建带有新分区位置的自定义存储描述符
            storage_descriptor = get_table_response['Table']['StorageDescriptor']
            custom_storage_descriptor = copy.deepcopy(storage_descriptor)
            custom_storage_descriptor['Location'] = f"{storage_descriptor['Location']}/{''.join([f'{part}/' for part in parts])}"
            # 在 Glue 数据目录中创建新的 Glue 分区
            create_partition_response = glue_client.create_partition(
                DatabaseName=DATABASE_NAME,
                TableName=TABLE_NAME,
                PartitionInput={
                    'Values': partitions_values,
                    'StorageDescriptor': custom_storage_descriptor
                }
            )
            print('Glue partition created successfully.')
        else:
            # 根据业务需求处理异常
            print(e)
Python

Glue Catalog 的设计和实现

根据设计,格式化后的数据通过 Grafana 的 Amazon Athena 插件实现,通过自定义报表将结果展示在 Grafana 仪表板中。Amazon Athena 数据源插件允许从 Grafana 内部查询和可视化 Amazon Athena 数据指标。为了实现 Amazon Athena 数据访问的控制,该方案采用独立的工作组,并使用单独的数据库和表来使用 Amazon Athena 数据。整个解决方案中共涉及以下三个 Glue Catalog 表。

工作组 数据源 数据库 表名 标记
abnormal-traffic-workgroup AwsDataCatalog dbs_abnormal_traffic api_gateway_flow_logs_extend 扩展表,API Access Log 的格式字段定义和分区定义
abnormal-traffic-workgroup AwsDataCatalog dbs_abnormal_traffic vpc_flow_logs_extend 扩展表,VPC Flow logs 的字段定义和分区定义
abnormal-traffic-workgroup AwsDataCatalog dbs_abnormal_traffic vpc_flow_logs_origin 源表,VPC Flow logs 的原始字段定义
  • xx_origin 表示此表定义是一个源表,用于源数据格式的定义;
  • xx_extend 表示此表是一个扩展表,用于扩展需要的字段或根据源表调整字段格式。

在进行表的设计和定义时要尽可能的考虑数据血缘关系,也就是尽量让原始数据少做聚合,因为聚合会影响后续的数据维度。具体的表设计原则如下:

  • 保留原始数据:尽量保留原始数据格式和字段,避免过多聚合导致数据维度丢失。
  • 分离扩展字段:将扩展的字段与原始数据分离存储在不同表中,保持原始数据的完整性。
  • 支持分区查询:表的设计要支持按分区键(如时间分区)进行高效查询,提高查询性能。
  • 存储格式优化:对于大数据量,采用列式存储(Parquet)以提高查询效率。

通过上述设计原则不仅能够保留数据的完整性和维度信息,也有利于后续的数据分析和查询,并提高整体的查询性能,从而为后续的数据分析奠定基础。

以 VPC flow logs 为例,源表和扩展表的定义如下:

CREATE EXTERNAL TABLE `vpc_flow_logs_origin`(
  `version` int, 
  `account_id` string, 
  `interface_id` string, 
  `srcaddr` string, 
  `dstaddr` string, 
  `srcport` int, 
  `dstport` int, 
  `protocol` bigint, 
  `packets` bigint, 
  `bytes` bigint, 
  `start` bigint, 
  `end` bigint, 
  `action` string, 
  `log_status` string, 
  `vpc_id` string, 
  `subnet_id` string, 
  `instance_id` string, 
  `tcp_flags` int, 
  `type` string, 
  `pkt_srcaddr` string, 
  `pkt_dstaddr` string, 
  `region` string, 
  `az_id` string, 
  `sublocation_type` string, 
  `sublocation_id` string, 
  `pkt_src_aws_service` string, 
  `pkt_dst_aws_service` string, 
  `flow_direction` string, 
  `traffic_path` int)
PARTITIONED BY ( 
  `aws-account-id` string, 
  `aws-service` string, 
  `aws-region` string, 
  `year` string, 
  `month` string, 
  `day` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ' ' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://central-vpc-flow-logs-012345678901-cn-north-1/AWSLogs'
TBLPROPERTIES ('skip.header.line.count'='1')
CREATE EXTERNAL TABLE `vpc_flow_logs_extend`(
  `srcaddr` string, 
  `dstaddr` string, 
  `srcport` int, 
  `dstport` int, 
  `protocol` bigint, 
  `packets` bigint, 
  `bytes` bigint, 
  `start` bigint, 
  `end` bigint, 
  `action` string, 
  `log_status` string, 
  `record_name` string, 
  `vpc_endpoint_id` string)
PARTITIONED BY ( 
  `aws-account-id` string, 
  `aws-service` string, 
  `aws-region` string, 
  `year` string, 
  `month` string, 
  `day` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://flowlogs-abnormal-logs-target-012345678901-cn-north-1/AWSLogs'
TBLPROPERTIES (
  'classification'='parquet', 
  'useGlueParquetWriter'='true')
SQL

Grafana 的展现和阈值告警

除了上述基础数据采集和数据处理之外, 整个解决方案中通过 Grafana 来展示 Amazon VPC Flow Logs 和 Amazon API 网关 Access Log 的分析结果,并将异常监控结果通过 Grafana Alertmanager 告警给 Amazon SNS 服务,对应的干系人可以通过订阅 SNS 的 Topic 来收取告警信息。

流量监控

对于流量监控指标,在 Grafana 中构建了以下的 Dashboard 用于用户观测。

  1. 不同终端用户对于不同 Amazon API 网关域名每分钟的下载总量。
  2. 不同终端用户对于不同端点域名每分钟的下载总量。
  3. 不在白名单内的终端用户对于不同 Amazon API 网关域名每分钟的访问次数。

以“不同终端用户对于不同 Amazon API 网关域名每分钟的下载总量”的监控举例,Grafana 中通过 SQL 语句实现对 Athena 数据的读取。

以下是 Grafana 面板对于数据源的查询语句:

SELECT
  date_trunc('minute', from_unixtime(CAST(requesttime AS DOUBLE))) AS minute_trunc,
  ip,
  sum(CAST(responseLength AS BIGINT)) AS addr
FROM
  api_gateway_flow_logs_extend
WHERE
   $__timeFilter(from_unixtime(CAST(requesttime AS DOUBLE)))
   AND domainname = '${api_name}'
GROUP BY
  date_trunc('minute', from_unixtime(CAST(requesttime AS DOUBLE))),
  ip
ORDER BY
  minute_trunc ASC,
  ip
SQL

异常流量告警

根据需求,定义了以下的异常流量检测规则,当查询的结果达到定义的阈值后会进行告警提示。

  1. 在单位时间内终端用户对于某个 Amazon API 网关域名的下载总量超过阀值。
  2. 在单位时间内终端用户对于某个端点域名的下载总量超过阀值。
  3. 在单位时间内不在白名单内的终端用户对于某个 Amazon API 网关域名的访问次数超过阀值。

以“在单位时间内终端用户对于某个 Amazon API 网关域名的下载总量超过阀值”为例,以下是 Grafana Alert Rule 对于源数据的查询语句:

WITH base_query AS (
SELECT
    COALESCE(SUM(CAST(responseLength AS BIGINT)) / 1024, 0) AS total_volume_size,
    COALESCE(domainname, '') AS api_name,
    COALESCE(ip, '') AS src_ip,
    COALESCE(owner, '') AS awsid
FROM
    api_gateway_flow_logs_extend
WHERE
    $__timeFilter(from_unixtime(CAST(requesttime AS DOUBLE)))
GROUP BY
    domainname, ip, owner
)
SELECT * FROM base_query
UNION ALL
SELECT 0, '', '', ''
WHERE NOT EXISTS (SELECT 1 FROM base_query)
SQL

异常流量告警管理

解决方案中采用标准化的 Grafana Alert 规则进行统一告警管理。其中 Grafana Alertmanager 插件实例部署在托管 Grafana 的 Kubernetes 集群中,用于协调警报的管理和分发。

Alertmanager 在集群内可以访问 Grafana 联系点和相应的通知策略。 通知策略通过标签来做标识。在配置特定的警报规则时,确保相关标签(例如 env、alertname 和 alertgroup)与通知策略保持一致,从而建立完整的警报管道。

在满足警报标准后,Grafana 会通过联系点将警报信息传输给 Grafana Alertmanager,然后通过信息模板呈现告警的详细信息,包括亚马逊云科技账户 ID、源 IP、API 名称或 VPC 端点域、当前值和阈值,再将其传输到 Amazon SNS 服务。 这确保了高度的可读性和及时的通知,使运维工程师能够及时检测和解决潜在的流量异常。

图 7. Grafana Alertmanager 流程图

邮件模版

通过定义合理的邮件模版使得更多的信息能够通过告警事件进行展示,使得客户在收到告警邮件时能够快速甄别邮件告警的内容以及触发的阈值,并在展示过程中增加相关 dashboard 的展现地址从而提升终端用户的访问体验。

以下是 Alertmanager 中为 API 网关定义亚马逊 SNS 的邮件模版:

templates: 
  default_template: |
    {{ define "sns.default.subject" }}{{ .Status | toUpper }}{{ if eq .Status "firing" }}:{{ .Alerts.Firing | len }}{{ end }}{{ end }}
    {{ define "sns.default.api.subject" }}APIGateway AccessLogs Alert {{ .Status | toUpper }}{{ if eq .Status "firing" }}:{{ .Alerts.Firing | len }}{{ end }}{{ end }}

    {{ define "sns.default.api.message" }} 
    APIGateway Download Alert
    {{ range .Alerts }} 
    • Stage: {{ .Labels.env }} 
    • API Name: {{ .Labels.api_name }} 
    • Alert Name: {{ .Labels.alertname }} 
    • AWS Account Id: {{ .Labels.awsid }}
    • Alert Time: {{ .StartsAt }}
    • End user Source IP: {{ .Labels.src_ip }} 
    {{ with .Annotations.__values__ }}{{ if . }}• End user download volume size per minute: {{ reReplaceAll "^([0-9]+\\.[0-9]{2}).*$" "${1}" (reReplaceAll ".*\"A\":\\s*([0-9.]+).*" "${1}" .) }} KB
    • Threshold of download volume size per minute: {{ reReplaceAll ".*\"C\":\\s*([0-9.]+).*" "${1}" . }} KB{{ end }}{{ end }} 
    -------------------------------------
    {{ end }} 
    More Info please visit to grafana url - dashboard Abnormal Traffic - APIGateway AccessLogs : https://grafana.loggin-${CURRENT_TENANT}.tenant.${PLATFORM_STAGE}-${PLATFORM_PREFIX}.cn/dashboards
    Please investigate this API Gateway alert immediately.
    Best regards,
    The Platform Team
    {{ end }}
YAML

实际邮箱收到的测试邮件展示效果:

图 8. 测试邮件内容

Grafana 仪表盘

Grafana 仪表盘里的内容可以根据不同客户的情况而定制化。整个解决方案中包含了 VPC Flow Logs 以及 Amazon API 网关 Access Log 的仪表盘。以 Amazon API 网关 Access Log 为例进行说明仪表盘设计的思路和方法。整个仪表盘中包含 API 访问日志警报总数、所有 API 域名响应/分钟、前 10 名最终用户 IP 汇总、单个域名响应/分钟详细信息和非白名单 IP 等面板。

图 9. API 网关访问日志仪表盘

API 访问日志警报总数:此面板显示从 API 访问日志表扫描的当前数据范围内的所有活动警报信息。 它显示了活跃警报的总数,包括新的和未解决的警报,从而全面了解了正在发生的和最近的异常情况。

所有 API 域名响应/分钟:此面板显示从 API 访问日志表中扫描的默认七天内的汇总流量数据。 时间范围可以在整个仪表板上向前推进,也可以使用 Grafana 的时间过滤器进行调整。 默认情况下,它显示所有 API 名称的流量。 过滤到某个域名后,控制面板会刷新以聚焦特定的 API 名称,并调整 “响应/分钟”、“摘要”、“前 10 名最终用户 IP” 和 “响应/分钟详细信息” 面板中的查询以反映所选的 API 名称。

前 10 名最终用户 IP 汇总:此面板最初显示所有 API 名称中按请求流量排列的前 10 名用户的排名。 在 “响应/分钟” 面板中选择特定时间和 API 名称后,它会动态刷新,显示所选时间范围内的前 10 名用户和特定 API 名称,从而清晰地概述最活跃的用户。

单个域名响应/分钟详细信息:当 API 名称选择 “全部” 时,此面板不会显示详细的源 IP 流量。 但是,在 “API 名称” 选择器或 “响应/分钟” 面板中选择特定 API 名称时,它会显示指定时间范围内的流量详细信息。 在出现警报时,此面板可用于识别负责触发警报的源 IP,从而帮助识别和解决问题。

非白名单 IP 面板:此面板会显示不在白名单内的终端用户对于 Amazon API 网关域名每分钟的访问次数。

自动化配置

解决方案部署在 Amazon EKS 集群中,权限管理通过调用 Grafana 的 Service Account 来完成。实现了包括 Data Source、Dashboard 和 Alerts 等自动化部署。以下以 Grafana Data Source 自动化为例,说明自动部署过程中的方案设计思路。

由于在亚马逊云科技中国区域目前还不支持 Grafana 的托管服务,选择在 Amazon EKS 集群上创建 Grafana 并利用私网网络负载均衡器(AWS NLB)来暴露 Grafana 的 API 并将 API 的基础密钥存储在 AWS Secret Manager 上。

部署工具推荐 AWS CDK,CDK 可以利用高级语言例如 typescript 或 python 对传统的 IaC 代码(例如 AWS Cloudformation)加上上层逻辑。因此,可以在 AWS CDK 代码里配置上 Config 文件,并部署需要的 data source lambda,同时,需要在 AWS Lambda 的部署代码模块中添加一个定制化资源,这样配置后,每当配置文件中发生变化,AWS Lambda 就会被调用从而更新 Grafana 里的配置。

另外,在 AWS Lambda 函数里,利用存储在 AWS Secret Manager 中的 Grafana 基础密钥来生成 Grafana Service Account,并利用 Service Account 来自动化创建 Grafana Data Source。

图 10. Grafana Data Source 自动化架构图 

总结

此解决方案不仅满足了当前对 Amazon API 网关和 VPC 流量的监控需求,更为未来构建统一的云原生数据湖奠定了坚实基础。该架构设计采用了统一的数据存储格式和元数据处理机制,将处理后的数据统一存储在 Amazon S3 上,并通过 Amazon Athena 对数据进行分析查询。在前端展现层面,方案充分利用了 Grafana 强大的可视化能力,通过 Amazon Athena 数据源插件对分析结果进行丰富的图表展示,为用户提供了直观、全面的流量监控视角。凭借这个架构,客户可以高效整合各类数据源,打造统一的数据分析平台,实现真正的数据驱动业务创新。

参考文档

本篇作者

崔新岩

目前就职于亚马逊云科技专业服务部门,有着多年的云上架构和客户服务经验,专注于企业客户入云和云上解决方案的设计和落地实践。

谢丹

亚马逊云科技专业服务团队云架构顾问,拥有多年丰富的云架构和客户服务经验,专注于为企业提供全方位的云上基础设施架构设计、迁移方案设计、最佳实践指导以及落地实施服务。

黄书昊

亚马逊云科技 ProServe DevOps 顾问,致力于解决企业客户 DevOps 咨询和实施问题,在云原生、DevOps、微服务框架、性能优化和加速研发效能领域有深入的研究热情。