在 Amazon Bedrock 上使用生成式 AI 来简化 AWS WAF 规则的分析过程

借助生成式 AI,使用自然语言查询来从 AWS WAF 日志中获取洞见。
发布时间:2024 年 5 月 29 日
AWS WAF
Python
可观测性
生成式 AI
Amazon Bedrock
教程
亚马逊云科技
Olawale Olaleye
亚马逊云科技使用经验
100 - 初级
前提条件

海外区域: 注册 / 登录 亚马逊云科技

完成所需时间
10 分钟
上次更新时间
2024 年 5 月 29 日
相关产品

AWS Web 应用程序防火墙 (AWS WAF) 广泛用于保护网站和面向互联网的应用程序免受 Web 漏洞利用和攻击。但在使用时,或多或少会遇到这样的问题:“如何判断当前使用的 AWS WAF 规则是否有效?”以及“如何确认某些请求被阻止的原因?阻止操作是由哪条 AWS WAF 规则触发的?”为了解决这些问题,人们推出了一系列解决方案:例如借助 Amazon CloudWatch Logs Insights,或在 Amazon Athena 中编写 SQL 查询。但时至今日,凭借生成式 AI 的强大功能,您只需使用日常的语言描述需要的细节信息,生成式 AI 基础模型便会根据对 AWS WAF 日志的分析为您提供相关洞见。在此教程中,将向您介绍如何构建一款可以实现上述功能的简易应用程序。

前提条件

  • 准备一个亚马逊云科技账户,并部署相应资源以获得 AWS WAF Web ACL 的保护;如果尚未配置 AWS WAF,请创建一个新账户。
  • 在本地计算机上安装 AWS 命令行界面 (AWS CLI),或直接使用 AWS Cloud9 集成开发环境。
  • 如果本地计算机尚未安装 Python 的 AWS 开发工具 (Boto3),请进行安装。
  • Streamlit 可用于为应用程序构建 Web 用户界面,因此请在本地计算机或 AWS Cloud9 集成开发环境中进行安装。
  • 获取 Amazon Bedrock 基础模型的访问权限。如果尚未获得,请向 AI21 Labs Jurassic 发送访问请求。

构建解决方案

为了保证解决方案的简易性,在本示例中,将查询 AWS WAF 采样请求,当然您也可以启用 AWS WAF 日志来查询完整的 AWS WAF 日志。下面需要做的是:首先,选取某个 AWS WAF Web ACL 并获取过去 60 分钟的采样请求;接着,根据用户使用自然语言输入的希望从日志中获取的详细内容创建提示,并将其传输给正在 Amazon Bedrock 上运行的 AI21 Labs Jurassic-2 Ultra 基础模型,由生成式 AI 来分析日志并给出响应;然后,使用 Streamlit 构建一个简易的 Web 应用程序,并为用户提供基于浏览器的用户界面,以供其输入问题和查看响应;最后,在本地计算机上运行该 Streamlit 应用程序,或将其部署到 Amazon EC2 实例或容器中以通过互联网进行访问。

将下方代码复制到集成开发环境中,并保存为 .py 文件(例如 wafanalysis.py)。

import json
import boto3
import streamlit as st
from datetime import datetime, timedelta

# Initialize AWS clients
session = boto3.Session(profile_name='default')
waf = session.client(service_name='wafv2')
bedrock = session.client(service_name='bedrock-runtime')
bedrock_model_id = 'ai21.j2-ultra-v1'   # Using AI21 Labs Jurassic model. For all model IDs, refer https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html

# Function to fetch and analyze AWS WAF logs
def fetch_and_analyze_waf_logs(web_acl_arn, rule_metric_name, user_prompt):
    # Get the current time and the time 60 minutes ago
    current_time = datetime.utcnow()
    start_time = current_time - timedelta(minutes=60)

    # Fetch the AWS WAF logs for last 60 minutes from a specific WebACL 
    response = waf.get_sampled_requests(
        WebAclArn=web_acl_arn,
        RuleMetricName=rule_metric_name,
        Scope='CLOUDFRONT',  # Change this to REGIONAL if WebACL is associated with regional resources such as ALB or API Gateway
        TimeWindow={
            'StartTime': start_time,
            'EndTime': current_time
        },
        MaxItems=100
    )

    # Create a prompt with WAF logs appended to it
    input_data = "Analyze the following AWS WAF logs and provide insights for the following: " + str(user_prompt) + " " + str(response)

    # FOR DEBUGGING: print the input prompt created in command line. Comment out if not required.
    print("Input Prompt = ")
    print(input_data)

    # Provide inference parameters to the Amazon Bedrock model. This format is specific to AI21 model. For parameters for other models, refer https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters.html
    body = json.dumps({
        "prompt": input_data,
        "maxTokens": 1024, 
        "temperature": 0, 
        "topP": 0.5, 
        "stopSequences": [], 
        "countPenalty": {"scale": 0 }, 
        "presencePenalty": {"scale": 0 }, 
        "frequencyPenalty": {"scale": 0 }
    })

    # Invoke Amazon Bedrock with the input data in JSON format
    bedrock_response = bedrock.invoke_model(
        modelId=bedrock_model_id,
        body=body,
        contentType='application/json',
        accept='application/json'
    )

    # Decode the response body
    response_body = json.loads(bedrock_response['body'].read().decode('utf-8'))  

    # Extract the text from the JSON response
    response_text = response_body.get("completions", [])
    if response_text:
        response_text = response_text[0].get("data", {}).get("text", "")

    return response_text

# Streamlit app to capture user input and display results
def main():
    st.title("Analyze AWS WAF Logs with GenAI using Amazon Bedrock")

    # Get user input for WebAclArn and RuleMetricName
    web_acl_arn = st.text_input("Enter WebAclArn", value="")
    rule_metric_name = st.text_input("Enter RuleMetricName", value="")
    user_prompt = st.text_input("Enter your question", value="What rule is causing the 403 block?")

    # Run the analysis when the user clicks the button
    if st.button("Analyze WAF Logs"):
        analysis = fetch_and_analyze_waf_logs(web_acl_arn, rule_metric_name, user_prompt)
        st.write(analysis)

if __name__ == "__main__":
    main()
有关各部分代码的解释,请参考对应注释。在使用本段代码时,您可以原样使用,也可以根据需要进行修改。

大家可以修改传递给基础模型的推理参数。其中,将温度参数设为 0,是希望模型可以根据提供的 AWS WAF 日志给出更忠实而非更具创意性的响应。对于 topP 参数,取了中间值 0.5,但如果担心响应具有误导性,也可以降低取值来减少池的大小,从而限制输出的选择空间。在本段代码中,并没有设置计数惩罚、存在惩罚和频率惩罚,如果希望根据特定术语 (token) 的出现频率来限制响应,更改相应的惩罚参数即可。

运行应用程序

要想运行 Streamlit 应用程序,首先导航到保存 Python 文件的文件夹,然后在本地计算机或 AWS Cloud9 的终端 / 命令行提示符中运行以下命令:。
streamlit run wafanalysis.py --server.port 8080

接着,默认浏览器便会自动打开一个 Web 用户界面。如果没有打开,在浏览器的地址栏中输入 localhost:8080 即可。

然后,在用户界面中输入以下信息:

  • AWS WAF Web ACL 的 ARN。获取 ARN 的步骤如下:首先导航到 AWS 控制台中的 AWS WAF,接着进入 AWS WAF Web ACL 列表并依次点击 AWS WAF Web ACL 名称旁的单选按钮和 “CopyARN” 按钮。得到的 ARN 应如下所示:
arn:aws:wafv2:us-east-1:1234567890:global/webacl/name-of-webacl/46abc41c-04ca-4d90-a317-66f6e5e123bd
  1. 需要进行详细分析的 AWS WAF 规则的名称。要获取该名称,导航到 AWS 控制台中的 AWS WAF Web ACL,接着点击 “Rules” 选项卡即可进行复制。
  2. 以自然语言提问的问题。例如,这个规则阻止了多少请求?这些请求来自哪些 IP?
  3. 点击 “Analyze WAF logs” 按钮并等待几秒。接着,生成式 AI 模型在评估完问题后就会将响应显示在屏幕上。

例如,可以询问为什么某些请求会被设置的 AWS WAF 规则阻拦。

或者,可以询问被阻拦的各种请求之间的共性,以便更好地了解攻击资源的恶意流量来源是否存在模式。

以上只是简单几例。也可以尝试使用其他提示来从 AWS WAF 日志中获取所需洞见。

后续步骤

本教程作为示例,介绍了如何借助生成式 AI 的强大功能来构建一个用于分析 AWS WAF 流量的简易界面。如果想要进一步改进,可以尝试从以下方面着手:使用 Amazon Bedrock 上的其他基础模型;修改推理参数;提供有关应用程序详细信息的私有数据源,借助 Amazon Bedrock 知识库和检索增强生成 (RAG) 技术,来提高分析结果的质量等。