亚马逊AWS官方博客

使用 Bedrock 和 RAG 构建 Text2SQL 行业数据查询助手

背景

随着企业数据量的持续增长,如何让非技术人员也能轻松分析数据、获得商业洞察成为了当前的痛点。本文将介绍如何使用亚马逊云科技的大语言模型服务 Amazon Bedrock 以及 RAG(Retrieval Augmented Generation),实现 Text2SQL 功能,以此为基础构建基于大语言模型(LLM)的行业数据查询助手,达到使用自然语言询问直接获取数据分析结果的目的。针对不同行业,我们都能见到类似的数据查询难题:通用的 BI 看板无法对即时、特殊的需求进行相应。具体地:

【场景 A】某家俱品牌出海电商运营辅助场景。1、数据分析 SQL 生成工具:利用 LLM 辅助数据分析代码工具,对收集到的数据进行分析。可以帮助数据工程团队在数据探索和开发阶段提高效率。2、电商服务平台能够收集和整理与销量相关的数据,对亚马逊电商和第三方独立站运营数据,包括订单量、访问量、产品评价、客诉数、站点 CPC 和 CPO 等进行电商运营辅助分析、选品策略辅助分析。

【场景 B】某冷链物流公司,对人(司机)、货(货物信息)、场(仓储)、动作(配送)等场景中进行提问,实现更精细化的监控和管理,从而提升运营效率、降低成本,并提供更优质的物流服务。例如:今日华北地区配送超时的订单比例有多少,华东地区最近一周哪些司机的配送过程中出现了频繁的延误问题等?

【场景 C】某头部广告公司,业务目标是优化广告投放决策,提高广告效果,降低试错成本。希望尝试创新应用 LLM 的语义理解能力,进行广告数据分析、广告词汇扩展等工作,可以快速进行广告拓词、广告归因分析等,辅助决策,替代传统需要大量手工构建模型的方式,提升工作效率。利用 LLM 的语义和学习能力进行知识推理和数据学习,实现更智能的广告投放决策,可以辅助广告公司优化媒介选择、时间安排、创意设计、预算分配等多方面决策。

技术层面,Text2SQL 是一个典型的机器学习任务,在大语言模型(LLM)出现之前,曾有 seq2seq 等模型架构将其视作机器翻译任务,建立数据和自然语言之间的对齐和映射关系,但在 WikiSQL 等数据集上准确率非常低。在 LLM 出现后,通过其推理、理解以及指令遵循能力,Text2SQL 任务的成功率可以得到非常大的提升。本文旨在通过讲解不同的行业场景中落地的难点,来展现如何利用亚马逊云科技的服务加以设计理念来解决它们。

用户体验

第一步:用户通过问答界面提出问题,如“本月销量前十的产品线是什么?”

第二步:系统直接返回 SQL 生成结果,并通过其查询到对应的数据结果

SELECT i.item_id, COUNT(*) AS num_sales FROM interactions WHERE i.event_type = 'purchase'
AND MONTH(i.timestamp) = 12 GROUP BY i.item_id ORDER BY num_sales DESC
LIMIT 10;

方案难点

构建自然语言数据查询助手是许多行业中企业的重点需求,但实现过程中困难重重。其中最核心的问题是 LLM 对于长尾知识、特定规则的不熟悉,以及对过分复杂提问的理解难度较高。利用 LLM 的推理能力和通用知识理解,辅以外部的知识体系(RAG)和执行规则来达到较好的生成效果。

1)LLM 无法识别长尾领域知识

电商领域中,跳出率指的是一个衡量网站用户行为的指标,表示只访问了一个页面就离开网站的访客数量占所有访客的比例。同时,基于用户习惯的不同,跳出率也可能被称为”单页访问率”或”单次访问率”。若有相应问题是用了单页访问率作为问题,那么大模型有可能无法理解他们所说的内容。

因此可以使用增强检索(RAG)来获取正确的范例,作为生成 SQL 时 Prompt 的一部分来提高性能。这类做法通常被称为 Few-Shot(FS),指模型在推理时给予少量样本,但不进行权重更新。在 Text2SQL 上,我们有两种做法:1)使用 Amazon Bedrock 及 Amazon OpenSearch 来对问题进行向量化,并在问题的向量库中匹配类似的问题。2)使用传统 NLP 模型进行命名实体识别(NER),并进而进行标签匹配,最终把对应正确的问题和 SQL 获取。在持续的客户使用和反馈频率上,我们可以长期提升生成准确率。

RAG in Text2SQL 的若干实现方式

2)数据库的元数据过于庞大复杂,影响最终效果

呼叫中心案例中,存在表格可能有一个表专门用于存储用户基本信息(如用户 ID、姓名、联系方式等),另一个表用于存储用户的账单信息(如账单 ID、用户 ID、账单日期、金额等),再有一个表用于存储用户的呼叫记录(如呼叫 ID、用户 ID、呼叫日期、通话时长等)。这样,每个表的宽度(即字段数量)可能都不会很大,但是由于需要处理的数据种类多,所以表的数量可能会非常多,常常出现上百张表。表的复杂不仅提高了 SQL 生成难度也受到输入字符数量的限制。

因此,在原始数据上构建指标体系是最有效的方案,通过实现新的少量宽表来减少 SQL 生成的复杂度。

指标层的提取的可以大大降低生成难度

与此同时,我们可以构建选表流程(Schema Linking)来将表格的选取当作单独的步骤剥离,在选好表之后,再讲对应表的结构加入 Prompt 中。这样可以有效地减少 Token 的输入量及提高生成的准确率。

Schema Linking 可以解决 Input Token 不足的和过高复杂度的问题

3)语法错误

在 Text2SQL 任务中,经常会出现执行错误,例如:1)Group By,Join,Nested 的错误,由数据库在执行时返回;2)列、表的选择错误,甚至无法将表名或列名拼写对。通常我们采用两种方式提升准确率:

  1. 使用思维链(Chain-of-Thoughts)拆解步骤:把一个多步推理问题分解出多个中间步骤,并且让大语言模型更加可解释,并且提升整体生成质量,例如在复杂的 Text2SQL 任务中。
  2. 使用模型自我修复错误:这通常也是思维链的一部分。以下是一个示例:

错误的 SQL 输入:

SELECT *
FROM Orders WHERE Customer_ID = 100;

将错误重新喂入 LLM:

(Previous DDL, hints, and question pasted here)
After we executed SQL (SQL pasted here),  we observed the following error.
```ERROR: column "Customer_ID" does not exist```
Please fix the SQL and give explanation:

输出的回答:

The issue here is that the column name is incorrect. In our Orders table, the column is named CustomerID, not Customer_ID.
Here is the fixed SQL statement:
```sql
SELECT *
FROM Orders WHERE CustomerID = 100;
````

4)提问无法回答

当提问者问出“为什么我的销量下降”,这个问题已经不再是 SQL 语句能够解决的。因此我们需要进行意图识别来进行问题分流,其目的是:

  1. 将最常见通用的问题导向模版,提升整体的准确率
  2. 将不太可能回答的问题识别出来,并告知用户超出的提问范围
  3. 将可以回答,但暂时缺少关键信息的问题识别出来,并告知用户需要补充哪些信息
  4. 将模版之外的问题直接导向 Text2SQL 模块

利用 LLM 实现意图识别和意图分类,再区别化处理不同形态的分类

解决方案指引

我们提供的标准方案构建于亚马逊云科技的服务体系之上,问答页面使用 ECS 容器上部署。用户问题由 API Gateway 路由到 Lambda 函数,在 Lambda 中查询历史问答库、转换 SQL、获取结果。历史问答库使用 OpenSearch 服务。SQL 转换使用 RAG 大语言模型服务。数据库使用 AWS Aurora 储存必要的查询、用户信息。该系统最大的价值是提供标准部署方案,以较高的准确度使用 Amazon Bedrock 自动完成自然语言到 SQL 的转换,无需人工编码规则。

架构图

示例代码

生成 SQL 的模型方面,我们可以根据客户需求,选择 Bedrock 提供的业界翘楚的商业化模型(如:Claude),也可以选择在 Amazon 机器学习基础设施平台 SageMaker 上部署 code generation 的开源模型(如:SQLCoder/CodeLlama)。

如果客户没有太多 LLM 技术栈的积累,及足够的算法工程师团队,那么 Amazon Bedrock 是一个较好的选择,在 Amazon Bedrock 上提供了广泛的商业化成熟的 Functional Model,其中 Athropic 的 Claude 模型经测试在各项 sql 生成中稳定性和功能均表现突出。

Bedrock 通过 SDK 调用 Claude 模型进行 sql 生成及自然语言交互的核心代码示例如下:

def get_bedrock_client(assumed_role: Optional[str] = None,
    region: Optional[str] = None,
    runtime: Optional[bool] = True,
):
  
    if region is None:
        target_region = os.environ.get("AWS_REGION", os.environ.get("AWS_DEFAULT_REGION"))
    else:
        target_region = region

    print(f"Create new client\n  Using region: {target_region}")
    session_kwargs = {"region_name": target_region}
    client_kwargs = {**session_kwargs}

    profile_name = os.environ.get("AWS_PROFILE")
    if profile_name:
        print(f"  Using profile: {profile_name}")
        session_kwargs["profile_name"] = profile_name

    retry_config = Config(
        region_name=target_region,
        retries={
            "max_attempts": 10,
            "mode": "standard",
        },
    )
    session = boto3.Session(**session_kwargs)

    if assumed_role:
        print(f"  Using role: {assumed_role}", end='')
        sts = session.client("sts")
        response = sts.assume_role(
            RoleArn=str(assumed_role),
            RoleSessionName="langchain-llm-1"
        )
        print(" ... successful!")
        client_kwargs["aws_access_key_id"] = response["Credentials"]["AccessKeyId"]
        client_kwargs["aws_secret_access_key"] = response["Credentials"]["SecretAccessKey"]
        client_kwargs["aws_session_token"] = response["Credentials"]["SessionToken"]
        

    if runtime:
        service_name='bedrock-runtime'
    else:
        service_name='bedrock'

    client_kwargs["aws_access_key_id"] = os.environ.get("AWS_ACCESS_KEY_ID","")
    client_kwargs["aws_secret_access_key"] = os.environ.get("AWS_SECRET_ACCESS_KEY","")
    
    bedrock_client = session.client(
        service_name=service_name,
        config=retry_config,
        **client_kwargs
    )

    print("boto3 Bedrock client successfully created!")
    print(bedrock_client._endpoint)
    return bedrock_client

#role based initial client#######
os.environ["AWS_DEFAULT_REGION"] = "us-west-2"  # E.g. "us-east-1"
os.environ["AWS_PROFILE"] = "default"
os.environ["BEDROCK_ASSUME_ROLE"] = "arn:aws:iam::687912291502:role/service-role/AmazonSageMaker-ExecutionRole-20211013T113123"  # E.g. "arn:aws:..."

#新boto3 sdk session方式初始化bedrock
boto3_bedrock = get_bedrock_client(
    assumed_role=os.environ.get("BEDROCK_ASSUME_ROLE", None),
    region=os.environ.get("AWS_DEFAULT_REGION", None)
)

parameters_bedrock = {
    "max_tokens_to_sample": 2048,
    #"temperature": 0.5,
    "temperature": 0,
    #"top_k": 250,
    #"top_p": 1,
    "stop_sequences": ["\n\nHuman"],
}

####langchain bedrock集成示例
bedrock_llm = Bedrock(model_id="anthropic.claude-v2", 
                      client=boto3_bedrock, 
                      model_kwargs=parameters_

db = SQLDatabase.from_uri(
"mysql+pymysql://admin:admin12345678@database-us-west-2-demo.cluster-c1qvx9wzmmcz.us-west-2.rds.amazonaws.com/llm",
include_tables=['dws_truck_portrait_index_sum_da','dws_ots_waybill_info_da','ads_bi_quality_monitor_shipping_detail','dim_pub_truck_info'], # we include only one table to save tokens in the prompt :)
sample_rows_in_table_info=0)
db_chain = CustomerizedSQLDatabaseChain.from_llm(llm=bedrock_llm, db=db, verbose=False, return_sql=True)
db_chain.run("2022年的运输总量是多少吨?")                      

如上代码示例,其中

  • get_bedrock_client:该函数为 amazon boto3 sdk 获取 bedrock 服务客户端的方法,可以支持 AKSK 或者 IAM Role 方式
  • db =SQLDatabase.from_uri 及 CustomerizedSQLDatabaseChain:该部分为通过 bedrock 客户端调用 Claude 模型,并通过 langchain SqlDatabaseChain 进行数据库连接及最终 sql 生成

如果客户希望有更多的模型选择性,且自身算法工程师的研发团队有一定的技术实力,希望对模型做更多的定制,以便更加适应自身业务系统的个性化需求(如 fine tune 模型微调),从而更加嵌入到自身的端到端业务流程,那可以选择业界的开源 sql 生成模型,其中 SQLCoder 模型在基本 sql 生成中稳定性和功能较其他开源模型更加优质。

SQLCoder 模型是一个 SOTA 大型语言模型, 在开发者的开源评估框架 SQLEval 中,SQLCoder 的性能优于其他多款主要的开源模型。其中 SQLCoder 15b 参数的模型,在大量 SQL 复杂查询上进行微调后,在针对单个数据库模式进行推理生成时,它的性能和功能在某些场景甚至优于 OpenAI 的 GPT-3.5,因此比较适合对自身业务数据库定制化需求比较高的 SQL 生成场景。

在 Amazon SageMaker 平台上,可以通过 inference endpoint 终端节点,简单的几行代码即可部署 SQLCoder 的模型,一旦部署完成,即可通过 SageMaker 的 inference sdk 接口进行推理生成,简化工程化落地工作量,提高生产化上线的效率。

使用 SageMaker inference endpoint 部署 SQLcoder 15b 模型及推理调用的代码示例如下:

inference_image_uri = (
f"763104351884.dkr.ecr.{region}.amazonaws.com/djl-inference:0.23.0-deepspeed0.9.5-cu118"
)

engine=Python
option.entryPoint=model.py
option.load_in_4bit=TRUE
option.tensor_parallel_degree=4
option.s3url=s3://sagemaker-us-west-2-687912291502/LLM-RAG/workshop/LLM_sqlcoder_model/

####SageMaker inference endpoint部署sdk
from sagemaker.utils import name_from_base
import boto3
model_name = name_from_base(f"sqlcoder") #Note: Need to specify model_name
print(f"Image going to be used is ---- > {inference_image_uri}")
create_model_response = sm_client.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer={
        "Image": inference_image_uri,
        "ModelDataUrl": s3_code_artifact
    },
)
model_arn = create_model_response["ModelArn"]
print(f"Created Model: {model_arn}")

endpoint_config_name = f"{model_name}-config"
endpoint_name = f"{model_name}-endpoint"
# Note: ml.g4dn.2xlarge 也可以选择
endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "VariantName": "variant1",
            "ModelName": model_name,
            "InstanceType": "ml.g4dn.12xlarge",
            "InitialInstanceCount": 1
        },
    ],
)

create_endpoint_response = sm_client.create_endpoint(
EndpointName=f"{endpoint_name}", EndpointConfigName=endpoint_config_name
)
print(f"Created Endpoint: {create_endpoint_response['EndpointArn']}")

#### 模型推理测试
import json
import boto3
smr_client = boto3.client("sagemaker-runtime")
parameters = {
    "max_new_tokens": 400,
    "do_sample": False
}

question="我想知道top客户的住址"
sql_prompt= """
### Instructions:
Your task is to convert a question into a SQL query, given a Mysql database schema.
Adhere to these rules:
- **Deliberately go through the question and database schema word by word** to appropriately answer the question
- **Use Table Aliases** to prevent ambiguity. For example, `SELECT table1.col1, table2.col1 FROM table1 JOIN table2 ON table1.id = table2.id`.
- When creating a ratio, always cast the numerator as float
### Input:
Generate a SQL query that answers the question `{question}`.
This query will run on a database whose schema is represented in this string:
...省略
### Response:
Based on your instructions, here is the SQL query I have generated to answer the question `{question}`:
```sql
""".format(question=question)

response_model = smr_client.invoke_endpoint(
            EndpointName=endpoint_name,
            Body=json.dumps(
            {
                "inputs": sql_prompt,
                "parameters": parameters
            }
            ),
            ContentType="application/json",
        )

result_sql=response_model['Body'].read().decode('utf8')
print(result_sql.split("```sql")[-1].split("```")[0].split(";")[0].strip().replace("\\n"," ") + ";")

如上示例代码所示,其中:

  • inference_image_uri:该部分为 SageMaker inference 推理镜像,SageMaker 提供了各种开箱即用的推理容器镜像,如 deepspeed,vllm,tensorRT,此处我们选择 deepspeed 的推理加速镜像
  • engine=Python:该部分及其它配置项为 inference 推理容器的配置文件,可以指定推理的各种优化方式及模型加载位置等
  • entryPoint=model.py:指定模型推理脚本
  • load_in_4bit=TRUE:是否 4bit 量化加载
  • tensor_parallel_degree=4:多卡并行推理
  • s3url=s3://sagemaker-us-west-2-687912291502/LLM-RAG/workshop/LLM_sqlcoder_model/: 模型文件 S3 路径
  • create_endpoint_config:配置部署终端节点,如机型,实例数量
  • sql_prompt:生成 sql 提示词
  • response_model = smr_client.invoke_endpoint:触发推理生成 sql

在元数据召回/意图识别精准调整 sql 生成时,向量化模型也是其中重要的一环,通过讲自然语言文本(包括一个单词、短语甚至大型文档)转换为数字表示形式,然后使用这些向量从向量数据库中准确搜索相关段落,这样您就能充分利用自己的私域专业元数据(如数据库名,表名及明细字段等)与 sql 生成的基础模型(FM)组合,进行基于语义相似度的搜索,提高检索 sql 生成的库表元数据的准确性,及终端用户提问的语义识别的准确性。

关于向量化模型的选择,与 sql 生成模型一样,也可以选择商业化或者开源模型两种部署方式,其中商业化模型可以选择 Amazon Bedrock Titan Embedding 模型,它针对文本检索进行了优化,超过 25 种语言,包括英语、中文和西班牙语。最多可以输入 8192 个 token,因此非常适合处理 sql 生成场景中较大的元数据检索的需求。并且 Bedrock Titan 的输出向量可以支持 1,536 个维度的长度,使其具有更高的准确性。可通过 Amazon Bedrock 的无服务器体验获得,因此您可以使用单个 API 轻松访问它,无需管理任何基础设施。

同样,如果针对向量嵌入模型需要更多的定制,以在语义相似检索时获得更高的召回准确率,那么开源的bge模型是一个不错的选择。BGE 模型是国内智源发布的开源可商用的中英文语义向量模型 BGE(BAAI General Embedding),BGE 保持了同等参数量级模型中的最小向量维度,使用成本更低。在中文语义向量综合表征能力评测 C-MTEB 的实验结果显示(Table 1),BGE 中文模型(BGE-zh)在对接大语言模型最常用到的检索能力上领先优势尤为显著。

在 Amazon SageMaker 的 deploy 部署 bge large zh 版本模型及 embedding 向量化核心代码示例如下:

serving.properties
engine=Python
option.tensor_parallel_degree=1
option.s3url = s3://sagemaker-us-east-1-106839800180/LLM-RAG/workshop/bge-zh-model/

#### SageMaker endpoint创建及部署
model_name = name_from_base("bge-zh-15") 
print(model_name)
print(f"Image going to be used is ---- > {inference_image_uri}")
create_model_response = sm_client.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer={
        "Image": inference_image_uri,
        "ModelDataUrl": s3_code_artifact
    },
)
model_arn = create_model_response["ModelArn"]
print(f"Created Model: {model_arn}")

endpoint_config_name = f"{model_name}-config"
endpoint_name = f"{model_name}-endpoint"
endpoint_config_response = sm_client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "VariantName": "variant1",
            "ModelName": model_name,
            "InstanceType": "ml.g4dn.xlarge",
            "InitialInstanceCount": 1,
            "ContainerStartupHealthCheckTimeoutInSeconds": 15*60,
        },
    ],
)

create_endpoint_response = sm_client.create_endpoint(
EndpointName=f"{endpoint_name}", EndpointConfigName=endpoint_config_name
)
print(f"Created Endpoint: {create_endpoint_response['EndpointArn']}")


def load_model(properties):
    tensor_parallel = properties["tensor_parallel_degree"]
    model_location = properties['model_dir']
    if "model_id" in properties:
        model_location = properties['model_id']
    logging.info(f"Loading model in {model_location}")

    model =  FlagModel(model_location)
    
    return model

model = None

def handle(inputs: Input):
    global model
    if not model:
        model = load_model(inputs.get_properties())

    if inputs.is_empty():
        return None
    data = inputs.get_as_json()
    
    input_sentences = None
    inputs = data["inputs"]
    if isinstance(inputs, list):
        input_sentences = inputs
    else:
        input_sentences =  [inputs]
        
    is_query = data["is_query"]
    instruction = data["instruction"]
    logging.info(f"inputs: {input_sentences}")
    logging.info(f"is_query: {is_query}")
    logging.info(f"instruction: {instruction}")
    
    if is_query and instruction:
        input_sentences = [ instruction + sent for sent in input_sentences ]
        
    sentence_embeddings =  model.encode(input_sentences)
        
    result = {"sentence_embeddings": sentence_embeddings}
    return Output().add_as_json(result)
    
    
def get_vector_by_sm_endpoint(questions, sm_client, endpoint_name):
    parameters = {
    }

    response_model = sm_client.invoke_endpoint(
        EndpointName=endpoint_name,
        Body=json.dumps(
            {
                "inputs": questions,
                "is_query": True,
                "instruction" :  "Represent this sentence for searching relevant passages:"
            }
        ),
        ContentType="application/json",
    )
    json_str = response_model['Body'].read().decode('utf8')
    json_obj = json.loads(json_str)
    embeddings = json_obj['sentence_embeddings']
    return embeddings

如上示例代码所示,其中:

  • load_model:模型加载的方法,BGE 需要使用 FlagModel 类进行加载
  • handle:使用 bge 模型对输入文本进行向量化的方法,其中 instruction 指令为“为以下中文文本生成向量嵌入”
  • properties:与 SQLCoder 模型部署一样,BGE 向量模型在 Amazon SageMaker 上的部署配置文件
  • get_vector_by_sm_endpoint:调用上面的 SageMaker 的 endpoint 进行文本向量化

总结

不同行业对数据的需求千变万化,将自然语言问题自动转换为 SQL 查询一直是自然语言处理领域的难点。有了 LLM 的进步使这变得可能。我们希望看到在更多企业场景中应用该解决方案,赋能业务人员与数据自由对话。

本篇作者

王舟童

亚马逊云科技资深行业解决方案架构师,负责 AI 在行业侧的解决方案设计和构建,拥有多年计算机视觉在零售行业的方案设计及落地经验。

徐峰

亚马逊云科技资深行业解决方案架构师,负责跨行业用户体验和可持续发展领域的行业解决方案的设计、构建和推广。曾就职于群硕软件、平安陆金所等 IT 公司,有 19 年软件行业实践经验,目前主要专注于云原生数据分析类解决方案的设计和推广。

唐清原

亚马逊云科技高级解决方案架构师,负责 Data Analytic & AIML 产品服务架构设计以及解决方案。10+数据领域研发及架构设计经验,历任 IBM 咨询顾问,Oracle 高级咨询顾问,澳新银行数据部领域架构师职务。在大数据 BI,数据湖,推荐系统,MLOps 等平台项目有丰富实战经验。

张佳隽

亚马逊云科技资深行业解决方案架构师,负责行业领域的数据平台和产品解决方案的咨询和设计工作。在零售快消、新能源、制造等行业拥有多年的数据方案设计和落地经验。在加入 AWS 之前,曾在远景能源和阿迪达斯等公司担任数据平台负责人的职位。

吴楠

AWS 解决方案架构师,负责面向跨国企业客户的云计算方案架构咨询和设计,客户覆盖医疗,零售等行业。

石阳

AWS 零售行业解决方案架构师,负责面向国内零售行业以及国内企业出海的行业解决方案设计,以及基于行业解决方案的行业生态赋能及客户项目支持。