亚马逊AWS官方博客

基于 Amazon CloudFront 的 MPEG-DASH 视频流媒体异常流量检测及自动阻断

引言

MPEG-DASH(Dynamic Adaptive Streaming over HTTP)是一种自适应码流技术,通过动态调整视频质量来适应用户的网络状况。结合 Amazon CloudFront 这一强大的 CDN(内容分发网络)服务,能够有效提升流媒体的加载速度,减少卡顿,提高用户体验。

核心优势

  • 自适应码流+CDN 加速:服务端会准备多种码率的视频文件,MPEG-DASH 根据网络状况动态选择合适的码率进行播放,CloudFront 借助全球节点就近传输适配后的流,确保视频播放流畅,降低卡顿。
  • 智能调度,极致体验:设备端会根据网络情况无缝切换不同码率的视频流,切换过程平滑无卡顿,同时 CloudFront 节点就近分发,双重优化机制让用户获得极致观影体验。
  • 无缝集成,架构简化:MPEG-DASH 可直接从源站分发,无需额外媒体服务器,大幅简化部署架构。此外,CloudFront 的边缘缓存能力可以针对热门节目进行缓存,访问量越大,观看体验越流畅。

这种组合方式在流媒体分发领域得到了广泛应用,尤其适用于需要全球覆盖、高并发支持的视频平台。然而,MPEG-DASH 和 CloudFront 的实现过程中仍然面临一系列挑战。

挑战与风险

  • 安全威胁:视频流在传输过程中可能被不怀好意的第三方调用,存在被非法获取和利用的风险,危及视频内容的安全性,同时也面临 DDoS 攻击的风险,可能导致服务中断。
  • 内容保护:由于流媒体技术的开放性,视频内容可能被非法下载、重新分发,需要采取有效措施保护内容。

因此,我们需要采取有效的解决方案来保证视频播放的安全性和保护内容。

MPEG-DASH 流量特征分析

先观察一个典型的 DASH 流播放的场景:

根据图像中展示的请求模式,可以观察到以下特征,这些特征反映了 DASH 播放过程中的典型请求行为:

  • MPD 文件请求**:播放开始时,首先会请求 mpd 文件,这是 DASH 内容的媒体呈现描述文件,包含了视频的元数据信息和片段文件的位置。
  • 周期性的片段文件请求**:在获取 MPD 文件后,客户端会按顺序周期性地请求 chunk-stream*.m4s 文件,这些是实际的视频和音频数据片段文件。
  • 双轨请求模式**:可以看到有两个轨道的片段文件在交替请求,分别是 chunk-stream0-*.m4s 和 chunk-stream2-*.m4s,这视频和音频两个不同的轨道。
  • 片段文件编号递增**:片段文件名中的数字部分(如 03814、03815 等)在连续请求中逐步递增,这反映了片段文件的播放顺序。
  • 固定请求大小**:每个片段文件的请求大小基本保持一致,约为  50KB 左右。
  • 周期性 MPD 重新获取**:在一定的时间间隔后,客户端会重新请求 MPD 文件,可能是为了检查是否有新的片段可用或者内容发生了变化。

结合以上的请求 dash 的请求特征和 CloudFront 的日志,我们可以通过数据分析及时发现潜在的异常流量访问行为,并自动将其屏蔽,从而最大程度地防止由于恶意访问或非法行为所造成的安全风险和经济损失。

通过通过构建亚马逊云科技数据湖数据分析体系,借助无服务器数据分析服务,快速构建出从数据采集,数据分析,数据 BI 展示的端到端服务(包括但不限于使用  AWS Glue、Spark 作业和 Athena),定时(如 30 分钟)对 Amazon CloudFront 访问日志进行分区和分析,实时获取访问指标和模式。基于访问频率异常高、带宽需求异常低、观看习惯异常等规则,系统可以快速识别出潜在的异常访问行为和威胁源 IP。一旦发现异常 IP,Lambda 函数会立即将其加入 AWS WAF 的 IP 阻止列表中,从而使得 WAF 可以实时屏蔽并阻断这些 IP 对节目的访问。

为了实现高性价比的 WAF(Web 应用程序防火墙)使用方式,我们采取了一种动态关联的策略:

  • 定期检查异常 IP 列表,如果没有新的异常发现,则自动解除 CloudFront 与 WAF 的关联,从而避免持续产生 WAF 的计费开销;
  • 一旦发现新的异常 IP 或攻击行为,则立即重新将 CloudFront 与 WAF 关联,确保及时拦截并防御潜在的安全威胁。

通过这种动态关联机制,我们可以在保证安全防护的前提下,最大限度节省 WAF 的使用成本,实现高性价比的 WAF 应用。这种策略不仅可以降低运营开支,也能够提高资源利用效率,是一种经济高效的 WAF 使用模式。

另外,Athena 与 QuickSight 集成,能基于分析结果生成仪表盘和报告,直观呈现异常访问趋势和细节,供进一步分析和决策参考。

通过上述流程,可以实现对 CloudFront 异常流量访问的持续监测、自动阻断和动态应对,在小时级别就能发现并屏蔽威胁,从而最大限度防止由于恶意行为可能带来的安全风险和经济损失,保护系统和用户的利益。

从 CloudFront 日志中总结规则并发现异常节目和异常访问

尝试从分析结果中提取如下业务维度的指标:

1. 总观看终端访问量趋势

左上角折线图展示了总的终端访问量(visit_count)随时间变化的趋势情况。可以发现在某些时间段,如凌晨或工作日,终端访问量会有明显波动,可能与节目内容及用户观看习惯相关。

2. 热门节目 Top 榜单

右下方表格列出了访问量(visit_count)最高的前 30 个 program_id,即最受欢迎、最火爆的节目。这些数据可用于评估节目人气、调整内容策略等。

3. 高带宽消耗终端

右上角表格按客户 IP(c_ip,代表每个独立观看终端)排列,展示了各终端的访问量(visit_count)、平均请求带宽(avg-request-kb)和产生的总费用(total_cost)等指标,可以发现带宽占用和费用高昂的热门终端。

4. 节目分组汇总

右下角表格按 program_id 分组,列出了每个节目的总访问量(visit_count)、总流量(total_gb)等汇总数据,可以全面评估各节目的整体表现。

5. 按时间和节目维度分析

报表的行数据按时间戳(timestamp)进行切分,列数据则按照节目 ID(program_id)分组,涵盖了两个关键维度,能够深入分析不同时段、不同节目的访问情况。

总的来说,这份报表针对于视频节目的分发和观看情况,全面展示了热门节目、高消耗终端、带宽占用等核心指标,为优化内容策略、控制成本费用、改善用户体验等决策提供了数据支持。

在流媒体传输场景中,对于基于 DASH 协议的视频播放,我们可以分析 CloudFront 访问日志,发现潜在的异常行为和威胁源。以下是几个关键规则:

  • 访问频率异常:如果发现某个 IP 地址或终端在短时间内的访问次数(visit_count)远高于正常用户,很可能是机器人或自动化工具在进行高频访问,存在风险隐患。
  • 带宽需求异常:正常的 DASH 视频播放需要一定的带宽传输视频数据,如果发现某 IP 的平均请求带宽(avg-request-kb)明显偏低,可能是在进行非法下载或其他未经授权的操作。
  • 观看习惯异常:分析一个 IP 或终端在特定时间段内访问的不同节目 ID(program_id)的数量,如果数量异常大,超出了正常用户的观看习惯,也值得警惕。

通过对上述规则的检测和分析,我们能够自动识别出潜在的异常行为,例如同时满足“访问频率高、带宽需求低、观看节目数量多”的 IP,很可能是恶意的威胁源,需要采取阻断或进一步分析等措施,以提高系统安全性,保护合法用户的利益。

自动巡检异常 IP 并阻断

为了拦截某些异常 IP 的访问,同时又需要考虑成本控制,该架构采用了一种自动化的流程,动态地将 AWS WAF 与 CloudFront 进行关联和解除关联。

具体来说:

  1. 通过 EventBridge Scheduler 定时启动 Lambda 函数,在 Lambda 内按照特征查询 Athena,获取异常 IP 列表。
  2. 一旦发现异常 IP,AWS Lambda 函数会自动将这些 IP 加入 WAF 的 IP 阻止列表(IP Set)中。
  3. 同时,Lambda 还会将 WAF 与 CloudFront 关联,让 WAF 开始为 CloudFront 提供 Web 访问控制。
  4. WAF 本身是按请求数收费的,因此长期将其与 CloudFront 关联将产生较高的费用。
  5. 为了节省成本,该架构设计了一个定期检查的流程。
  6. 每隔一段时间(例如 1 小时),系统会重新扫描异常 IP 列表。
  7. 如果在这段时间内没有发现新的异常 IP,就意味着威胁已经解除。
  8. 此时,Lambda 会自动解除 WAF 与 CloudFront 的关联关系,停止 WAF 的使用。
  9. 一旦再次发现新的异常 IP,系统会重新将 WAF 与 CloudFront 关联,周而复始。

通过这种动态关联和解除的自动化流程,您的架构只在发现异常 IP 时短暂启用 WAF 进行拦截,而在没有威胁的时候则自动停止使用 WAF,从而最大限度地减少了 WAF 的使用时长和费用开支。同时,又不影响对异常 IP 的实时拦截能力,平衡了安全性和经济性。

当然,我们可以根据具体需求调整扫描的频率和 WAF 挂载的频次。

架构说明

架构图

根据所提供的架构图和说明,该架构的主要特点和重点如下:

1. 重新分区 CloudFront 标准日志(Standard Logs)

  • 原始的 CloudFront 标准日志直接存储在 S3 中,没有按任何维度进行分区组织,使用 Lambda 重新对日志按照按年/月/日/时/分进行分区存储。
  • 通过 AWS Glue 和 Apache Spark 作业,生成分区分析数据并存储在 S3 中。
  • 分区后的日志数据更利于后续按时间维度(如半小时、小时等)进行统计和分析。

2. 利用 AWS Lambda 和 Amazon Athena 实现异常访问 IP 监控

  • 利用 AWS Lambda 函数定期查询 Athena 中的分区日志数据。
  • 根据特定条件(如“异常访问频率”)识别出异常的 Source IP 列表。
  • 将识别出的异常 IP 列表更新到 AWS WAF 的 IP 集合(IP Set)中。

3. 动态关联/解除 WAF 与 CloudFront 的关系,节省成本

  • 通过调度器(Scheduler)触发 Lambda 函数执行。
  • 如果发现异常 IP,则将 WAF 与 CloudFront 关联,以阻止异常访问。
  • 如果未发现异常 IP,则解除 WAF 与 CloudFront 的关联,节省 WAF 费用。

4. 利用 Amazon QuickSight 进行数据可视化和报表展示

  • QuickSight 直接与 Athena 集成,可基于分区日志数据生成仪表盘和报告。
  • 展示异常访问情况、趋势等,提供数据分析洞见。

5. 其他

  • 使用 AWS Glue Data Catalog 作为元数据管理。
  • 利用 AWS EventBridge 与其他 AWS 服务集成(异常 IP 检查定时器)。
  • 可能使用 AWS Crawler 从外部源获取新的分区信息更新到 Athena。

总的来说,该架构围绕 CloudFront 日志数据的重新分区和异常访问 IP 的动态监控展开,并通过动态关联 WAF 实现成本优化。同时利用多种 AWS 服务(Athena、QuickSight、Lambda 等)实现数据分析、可视化和自动化运维,构建了一个端到端的数据处理和安全监控解决方案。

方案要点

S3 日志重新分区

在 Lambda 中,在 S3 上传新对象时触发,将上传的对象按照年/月/日/时/分钟的方式进行分区,并将对象复制到相应的分区路径下。

具体来说,它从 Lambda 事件中获取 S3 存储桶和对象键,计算下一个 5 分钟间隔,构建新的分区路径,然后使用 S3 copy_object API 将原始对象复制到新路径。

import boto3
import datetime
from dateutil import tz

def lambda_handler(event, context):
    print(event)
    print("-----<>------")
    # 从事件中获取 S3 存储桶和键
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']

    # 获取事件发生时间
    event_time = event['Records'][0]['eventTime']
    event_datetime = datetime.datetime.fromisoformat(event_time.replace('Z', '+00:00'))
    print(event_datetime)
    print("\r\n")

    # 创建 S3 客户端
    s3 = boto3.client('s3')

    # 从源键中提取文件名
    source_file_name = key.split('/')[-1]

    # 计算下一个 5 分钟间隔
    minutes = (event_datetime.minute // 5 + 1) * 5
    new_minute = str(minutes % 60).zfill(2)

    # 构建新的目标路径
    new_key = f"cf_logs/part/{event_datetime.year}/{event_datetime.month:02d}/{event_datetime.day:02d}/{event_datetime.hour:02d}/{new_minute}/{source_file_name}"

    # 将对象复制到新的目标路径
    s3.copy_object(
        Bucket=bucket,
        CopySource={'Bucket': bucket, 'Key': key},
        Key=new_key
    )

    print(f"S3 对象已从 {key} 复制到 {new_key}")

    return {
        'statusCode': 200,
        'body': f'S3 对象已从 {key} 复制到 {new_key}'
    }
PowerShell

Glue Spark Job 分析逻辑

Job 源码:https://github.com/nimysan/awsgists/blob/main/cloudfront_log/spark.py

1. 处理输入参数和构建 S3 路径

  • 获取事件时间、S3 存储桶和处理时间间隔等参数(可以根据分区最低设置为 5 分钟
  • 根据时间参数计算出需要处理的时间范围
  • 生成对应时间范围内 CloudFront 日志文件的 S 3路径列表

2. 读取并处理日志数据

  • 定义 CloudFront 日志数据的 Schema
  • 读取 S3 路径列表中的日志文件,并解析数据
  • 过滤空行,从 URI 中提取节目标识(program_id)

3. 聚合并保存结果

  • 按 program_id 和 IP 地址分组,统计访问次数和总流量
  • 计算数据传输成本、请求成本和总成本
  • 将结果按时间维度分区,以 Parquet 格式写入 S3
  • 额外统计每个边缘位置和节目的请求数量、流量和首字节时间,也写入 S3,Athena 查询特征

然后通过 Glue 自身的 Schedule 来调度

EventBridge 定时触发 Lambda 查找异常 IP 列表并关联 WAF

这个 Lambda 函数的主要功能是:

  • 根据 Athena 查询结果获取最近 2 小时内的高成本异常 IP 列表,并将这些 IP 添加到 WAF 的 IP 阻止集合中。
  • 根据当前时间是否符合预设的时间规则,决定是否将 WAF Web ACL 与指定的 CloudFront 分发关联或解除关联。
  • 如果当前时间符合规则且存在异常 IP,则将 WAF 与 CloudFront 关联以阻止异常流量;反之则解除关联,从而根据实际情况动态管理 WAF 的启用状态,在保证安全性的同时也节省了不必要的 WAF 使用成本。

完整代码代码:https://github.com/nimysan/awsgists/blob/main/cloudfront_log/schedule/lambda_waf.py

def lambda_handler(event, context):
    """
    Lambda 入口函数
    """
    wcl_id = "xxxx"  # 替换为您的 WAF ACL ID
    cloudfront_ids = ["xxx","xxxx"] # 替换为您的 CloudFront 分发 ID

    # 查询ip列表并更新到Ip set
    res = get_top_20_high_cost_ip_addresses()
    # print(res)
    ip_list = res['ip_list']
    cidr_list = [f"{ip}/32" for ip in ip_list]
    combined_ip_addresses = add_ips_to_waf_ip_set("test", cidr_list)

    # 如果ip_set不为空
    if len(combined_ip_addresses) > 0:
        current_time = get_current_time()
        current_hour = current_time.hour
        current_minute = current_time.minute
        print(f"{current_time} - {current_hour} - {current_minute}")
        print(f"Current time: {current_hour}:{current_minute}")
        if check_time_rules(current_time):
            # 在时间段内,关联 WAF ACL 到 CloudFront 分发
            manage_waf_cloudfront_associations(wcl_id, cloudfront_ids, associate=True)
            print(f"Unique IPs in the last 2 hours: {combined_ip_addresses}")
        else:
            # 不在时间段内,取消关联 WAF ACL 和 CloudFront 分发
            manage_waf_cloudfront_associations(wcl_id, cloudfront_ids, associate=False)
            print("Current time does not meet the rules.")
    else:
        # 如果ip_set为空, 把WAF取消
        manage_waf_cloudfront_associations(wcl_id, cloudfront_ids, associate=False)
        print("no ip in waf ip set,  disassociate waf rule")
PowerShell

这个 Lambda 通过 EventBridge 的 Schduler 去触发,定期执行。

总结

该架构将 MPEG-DASH 视频技术与 Amazon CloudFront CDN 服务相结合,实现了高效、流畅的视频分发。通过多码率视频和就近分发,确保了优质的观看体验。

为保障视频传输安全,架构基于 AWS 服务构建了分析和防御系统,利用 Glue、Athena 等服务对 CloudFront 日志进行分区分析,识别异常高频访问 IP,通过 Lambda 将这些 IP 动态添加到 WAF 的阻止列表中,同时采用动态关联策略,根据异常 IP 存在与否决定是否将 WAF 与 CloudFront 关联,从而最大限度节省 WAF 使用成本。

该架构还融入了 QuickSight 可视化服务,基于分析数据生成报告,展示异常访问趋势和统计指标,支持决策分析。不仅解决了视频分发和安全需求,更重要的是建立了数据分析决策体系。通过深挖用户行为和内容趋势洞见,可优化内容策略、运营决策等。后续可以与 GenAI 相结合,可赋能营收增长,提供个性化推荐、内容优化等智能化服务,发掘数据价值。

参考资料

本篇作者

叶小微

亚马逊云科技解决方案架构师,曾就职于 IBM,后从事电商相关和企业数字化转型工作,拥有多年架构设计、研发、项目管理经验。在工作流、微服务、系统集成等方向有丰富的解决实际问题的经验。

王跃

亚马逊云科技解决方案架构师,负责基于亚马逊云科技的解决方案咨询和设计,在系统架构、大数据、网络、应用研发领域有丰富的研发和实践经验。

王成

亚马逊云技术客户经理,主要负责为企业级大客户提供运维与架构优化、成本管理、及技术咨询等服务。在加入亚马逊云科技之前,他曾就职于德勤,拥有丰富的数据建模和商业智能领域的经验。