亚马逊AWS官方博客

基于 Amazon Bedrock 托管 Claude3 实现 RAG 增强文本检索的 Text to SQL 方案

实现自然语言转译成 SQL 一直以来是数据分析/可视化领域比较热门的方案。目前,基于大语言模型(Large Language Model, LLM)的语义理解和丰富的参数,实现自然语言转译成 SQL 已被证实为新的可行性方向之一,基于此技术可以深入垂直行业进行定制化报表展示而无需依赖专业的数据开发工程师。大语言模型甚至可以更好的帮助您生成效率更高的 SQL,辅助您对业务数据的洞察。

此文将会介绍,如何在 Amazon Elastic Compute Cloud(Amazon EC2)上部署一个 Next.js 服务,使用 Amazon Bedrock 托管的大语言模型(如 Amazon Titan Embedding)实现对自然语言的向量化,进而实现 RAG 增强文本检索,改进 prompt,提高 Claude 模型对问题的理解,读取预置的表结构 Schema,以实现自然语言转译成 SQL,并到数据存储服务(如 Amazon Redshift、Amazon RDS MySQL 等)中进行数据查询、智能分析,最终呈现可视化图表,完整实现一个自然语言转 SQL,并展示数据结果、分析结果、可视化展示的解决方案。

本文假定在一个 HR 招聘场景下,分析笔试、面试、到岗情况数据,所有示例数据均为测试数据。

我们在该方案主要使用到如下服务和组件:

  • Amazon Elastic Compute Cloud(Amazon EC2):是一种 Web 服务,可以在云中提供安全并且可应需调整的计算容量。访问可靠、可扩展、按需支配型基础设施。凭借 99.99% 可用性的 SLA 承诺,在几分钟内扩展容量。为您的应用程序提供安全计算。
  • Amazon Bedrock:是一项完全托管的服务,通过单个 API 提供来自 AI21 Labs、Anthropic、Cohere、Meta、Stability AI 和 Amazon 等领先人工智能公司的高性能基础模型(FM),以及通过安全性、隐私性和负责任的 AI 构建生成式人工智能应用程序所需的一系列广泛功能。
  • Amazon Redshift:Amazon Redshift 使用 SQL 在数据仓库、运营数据库和数据湖间分析结构化和半结构化数据,使用 AWS 设计的硬件和机器学习在任意规模提供最佳性价比。
  • Amazon Relational Database Service (Amazon RDS):是一项适用于 MySQL、PostgreSQL、MariaDB、Oracle BYOL 或 SQL Server 的托管式关系数据库服务。
  • Amazon S3:是一项对象存储服务,提供行业领先的可扩展性、数据可用性、安全性和性能。
  • Amazon Cognito:Customer Identity And Access Management.
  • Cloudscape:由 AWS 贡献的 AWS 风格开源 UI 组件库(Apache 2.0 License),此文中报表部分实现依赖此组件。

项目 UI 层依赖 GitHub 开源项目 chatbot-ui:https://github.com/mckaywrigley/chatbot-ui

事先准备

  • 您需要确保您已有亚马逊云科技 Global 账号,能够访问美东 1 区(us-east-1)、美西 2 区(us-west-2)、东京区(ap-northeast-1)、新加坡区(ap-southeast-1)其中任意一区,且能够在 Amazon Bedrock 控制台申请 Claude3 的权限,请参考:Amazon Bedrock endpoints and quotas – AWS General Reference Amazon Bedrock
  • 您需要准备亚马逊云科技账号的 Access Key、Secret Access Key,且此账号拥有足够 IAM 权限。
  • 您需要准备一台已安装 Nodejs(v>=18)、AWS SDK 环境的电脑/EC2 服务器,将上述步骤的- Access Key、Secret Access Key 通过 aws configure 配置,上述所有服务均启动。

方案综述

示例页面

部署架构图

此项目为一个 Next.js 应用,打包成 Docker 镜像运行在 Amazon ECS 上,通过 Amazon Cognito 进行鉴权。ECS 上的代码发起请求获取 Amazon Bedrock 上托管的 Claude3 的响应,再去数据库(如 Amazon Redshift、Amazon RDS)上查询数据,得到结果返回给浏览器。

数据库表结构

假设所有数据均洗入两张表/视图,recruitment_data 表和 dept_data 表。

-- recruitment_data表 存储招聘岗位/面试流程/offer发放与反馈/入职相关字段
CREATE TABLE recruitment_data (
    application_id integer ENCODE az64, -- 主键 申请Id
    year integer ENCODE az64,
    time_type character varying(256) ENCODE lzo, -- 当前周 如 20240318-20240324
    application_time date ENCODE az64, -- 申请时间
    employment_title character varying(256) ENCODE lzo, -- 岗位名称/Id
    dept_id integer ENCODE az64, -- 招聘所属组织/部门的Id
    dept_name character varying(256) ENCODE lzo, -- 招聘所属组织/部门的名称
    interviewee_id character varying(256) ENCODE lzo, -- 面试者姓名/Id
    written_exercise_time timestamp without time zone ENCODE az64, -- 笔试时间
    written_exercise_is_need integer ENCODE az64,  -- 是否需要笔试
    written_exercise_pass integer ENCODE az64, -- 是否通过笔试
    first_interview_start_time timestamp without time zone ENCODE az64, -- 一面时间
    first_interview_is_need integer ENCODE az64,  -- 是否需要一面
    first_interview_pass integer ENCODE az64,  -- 是否通过一面
    second_interview_start_time date ENCODE az64,   -- 二面时间
    second_interview_is_need integer ENCODE az64,  -- 是否需要二面
    second_interview_pass integer ENCODE az64,  -- 是否通过二面
    hr_interview_start_time date ENCODE az64,  -- hr面时间
    hr_interview_is_need integer ENCODE az64,  -- 是否需要面
    hr_interview_pass integer ENCODE az64,  -- 是否通过hr面
    offer_time timestamp without time zone ENCODE az64, -- offer 发放时间
    interviewee_is_offered integer ENCODE az64, -- 面试者是否有offer
    interviewee_accept_offer integer ENCODE az64, -- 面试者是否接受offer
    interviewee_accept_offer_start_time timestamp without time zone ENCODE az64, -- offer 接受时间
    offer_feedback character varying(256) ENCODE lzo, -- offer 反馈内容
    interviewee_is_onboarded integer ENCODE az64, -- 候选人是否入职
    interviewee_onboarded_start_time timestamp without time zone ENCODE az64 -- 候选人入职时间
) DISTSTYLE AUTO;

-- dept_data表 存储部门名称 dept_data.dept_id与recruitment_data.dept_id 做外键关联
CREATE TABLE dept_data (
    dept_id integer ENCODE az64, -- 主键 部门Id
    dept_name character varying(256) ENCODE lzo, -- 部门名称
    l1_dept_id character varying(256) ENCODE lzo, -- 这个部门所属一集部门Id
    l2_dept_id character varying(256) ENCODE lzo, -- 这个部门所属二集部门Id
    l3_dept_id character varying(256) ENCODE lzo -- 这个部门所属三集部门Id
) DISTSTYLE AUTO;

Prompt 设置

由于上述场景内有 2 张(多张表),prompt 里需要做分表逻辑或关联查询逻辑。流程如下:

所以,这里一个 Query 会进行三次查询,第一次进行选表(将问题与每个表的描述发给大模型,返回需要查询的表);第二次根据选表结果,将表 Schema 与 RAG 检索的样本示例发给大模型;第三次进行将结果发给大模型进行 Insight(第三次可选)。

第一次 Prompt 如下:

你现在是一个AWS Redshift数据库查询助理, 现在有几张数据表需要你的查询,只返回一个你认为最正确的
表的名字, 不需要返回其他任何文本, 如果你认为没有对应的数据, 请返回
\"ERROR: You can only read data.\"。表如下:
'recruitment_data'表: 是招聘信息表, 存储招聘岗位和候选人投递、面试耗时、是否入职的相关信息,
不包含任何简历的信息。
'dept_data'表: 是部门信息表, 存储公司部门信息和组织架构。
你要查询的问题是:{被重复投递最多的是哪个岗位?}

// 一定要说明 你要查询的问题是,否则大语言模型不知道你问什么

第二次 Prompt 如下(如果第一次返回 recruitment_data,只查询招聘场景,需要在 Prompt 中说明角色、表结构、公式等提示词):

// 角色设定
You are an expert in AWS Redshift. Your task is to understand the database tables if
given, and translate human instructions into SQL to query that database. I want you
to reply with a valid SQL in triple quotes. Do not write explanations. All valid
human instructions are given in curly braces {like this}. For any query that contains
delete or update in SQL, please respond: 'ERROR: You can only read data.'.
 涉及到比率等除法运算时,请将数据类型转换为 float4。 涉及到输出时间类型的运算时,
 请将数据转换成数值类型输出(秒数或者天数).
 
 // 表结构提示词
现在有一个aws redshift表, 表名是'recruitment_data', 存储的是一家公司的岗位招聘的应聘流程信息,
每一位应聘者每应聘一个岗位就会产生一条记录, 表内有如下列:'application_id', 数据类型是(integer), 
是主键; // ......其他列的描述,对于主要的字段要说明这一列含有那些数据,如employment_title包含['测试','开发','架构师']等值。
除了上述表结构, 生成SQL的时候, 还要参考以下公式:
offer接受率 = offer接受 / offer数:(coalesce({interviewee_accept_offer} * 1.0 /nullif({interviewee_is_offered}, 0), 0)),
SQL生成时的常见错误如下, 请在生成SQL的时候避免再犯这样的错误:
Redshift不会做隐式的数据类型转换, 因此如果数据类型需要转换, 其类型需要明确在SQL中指定。
有一类错误是对问题本身没有仔细的理解, 比如问题: “被重复投递最多的是哪个岗位?”,
可能会被错误的理解成”不同岗位的记录的重复数量“, 其真正的含义是需要理解到“重复”指的是候选人/应聘者
重复投递多个不同岗位, 或者一个岗位的情况。所以, 在理解问题的时候, 你需要考虑它的深层含义,
可以做适当的扩展和假设。
MUST output JSON, 仅生成SELECT语句。所有非计数的SQL必须限制结果不超过20条,
统计数量的SQL结果不限制,
如果SQL查询的结果适合柱状图, 请返回需要展示的字段和BarChart,
如果SQL查询的结果适合饼图, 请返回需要展示的字段和PieChart。
其中'columnList'属性是需要展示的字段的数组, 'chartType'属性是图表类型(BarChart,PieChart),
'finalSQL'属性是查询的SQL,
下面的例子你可以参考:

// RAG检索结果...
输出json格式示例:{query:"query",finalSQL:"finalSQL",chartType:"PieChart"}
你要查询的问题是:{被重复投递最多的是哪个岗位?}

// 一定要说明 你要查询的问题是,否则大语言模型不知道你问什么

【可选】第三次 Prompt 如下:

这里建议添加受众/读者类型,否则大语言模型会从工程化/数据分析师的角度解读数据。

现在您需要对数据结果进行分析, 并尽可能的对指标的变化作出下钻归因, 数据受众xxxxx,
添加您对HR招聘业务的理解,不要暴露具体的SQL字段。
数据是:
// 序列化后的数据
存储他的数据库表结构是 // 第二步中prompt设置的表schema

把它查询出来的SQL是:// 第二步中返回的SQL
Query: {被重复投递最多的是哪个岗位?请根据数据做智能分析}

// 您也可以在此添加背景信息,方便大语言模型根据背景信息做下钻分析。

注意:出于对注意力机制的考虑,输入的 prompt 前后语义要一致,例如您输入查询 SQL 的 prompt,但是又输入不需要 SQL 的 prompt/语义,可能会造成输出结果与预期偏差较大。

基于 LLM 的 transformer 机制,进行多次生成和自我检查

由于目前大多数大语言模型(包括本文使用的 Claude3)都是基于 transformer 训练,Transformer 会在输出下一个 Token 的时候看到全部之前的输出,因此,我们可以让大语言模型在输出的时候对输出内容重新进行多次生成和自我检查。

第二次 Prompt 稍作修改:

// 第二次Prompt原文
// 增加
1.请注意, 在生成SQL的时候, 这里有几个追加的要求, 具体如下: 先检查问题的句子成分和含义成分是否清晰,
是否有歧义, 如果不清晰的话要进行扩展, 使问题变得清晰。将澄清后的问题输出到'clarify'属性中。
你需要一步一步思考, 并对SQL进行解释, 解释部分放在输出的'reasoning'属性中。
针对一个问题, 由于SQL可能有不同的写法, 你需要先从不同的角度思考,生成最多五个针对当前问题不同的写法
的SQL, 并包含独立的解释。生成的SQL结果放到'referenceSql'属性中。你接下来要检查上面的几个SQL是
否有错误, 检查的时候你要从是否正确理解问题等角度, 重新思考, 并假定SQL就是错误的。
检查的结果需要写到每一个SQL的'check1'和'check2'...'checkN'属性中。如果没有错误就写它全对的概率,
如{'check1': {'no_error': 0.6'}}, 如果有错误则参考这样的输出例子:
 {'check2':{'error': 'VARCHAR需要转换成Float类型。'}}
 最后一步是根据上面的几个'referenceSql'和各自'check'的结果confidence来生成你认为正确的最终SQL
 和分析, 放到'finalSQL', 'reasoningFinal'中。
 2. 结果样式的例子如下(注意顺序): // 一定要加注意顺序 否则不会触发自检查
 {
    'clarify': '问题应该是: XYZ ',
    'reasoning1': 'reason AAA',
    'referenceSql1': 'sql AAA',
    'reasoning2': 'reason BBB',
    'referenceSql2': 'sql BBB',
    'check1': 'For SQL AAA: no error',
    'check2': 'For SQL BBB: VARCHAR需要转换成Float类型',
    'reasoningFinal': 'reason for final SQL',
    'finalSQL': 'The final SQL based on above two reference sql and the error check result.',
    'columnList': ['column1', 'column2']
}

通过上述检查,可以将输出进行自我打分和校准(关于打分标准:一般只检查 SQL 性能和对语义理解准确性)。

RAG 增强检索

为什么需要增强检索:由于大语言模型对于某个专业领域知识需要输入匹配的 Prompt 或者做 Fine-tuning,成本较高,且不易控制。RAG 可以对 LLM 的检索、联想能力的外扩,依赖大语言模型自身的推理能力对示例知识做推理,低成本不需要专业工程师即可提升回答准确度。

Embedding:

  • 将问答对/知识库内容通过 Amazon Titan Text Embedding 进行向量编码。
  • 将上一步的向量编码通过向量检索器生成索引文件,保存在服务端。
  • 查询时,首先将问题再通过 Amazon Titan Text Embedding 进行向量编码,然后通过检索检索索引文件。
  • 得到最短向量,回到问答对/知识库内容中将此向量对应索引查询返回,继续发给大模型做参考。

在第二次请求大模型前,先通过向量检索预置的 RAG 问答对,然后将检索结果作为示例一起发给大模型。

接入 Amazon Bedrock 托管的 Claude3,示例代码

以下代码为示例,您也可以参考以下文档:

关于请求 Amazon Titan Embedding:生成式人工智能的基础模型 – Amazon Titan – AWS
关于请求 Claude3:Amazon Bedrock 上的 Anthropic Claude — AWS

import {
  BedrockRuntimeClient,
  InvokeModelCommand,
} from '@aws-sdk/client-bedrock-runtime';

const bedrockStream = async (
  messages: Message[],
  promptConfig: any,
  requestId: string
) => {
  try {
    const AWS_PARAM = {
        region:"",
        credentials:{}
    }
    const bedrockruntime = new BedrockRuntimeClient(AWS_PARAM);

    const userQuestion = messages[messages.length - 1].content;
    const promptStr = `各类提示词 RAG检索结果${promptConfig} 你要回答的问题是: {${userQuestion}}`;
    
    const bedrockResult = await bedrockClaude3Respone(
      [
        ...(delete messages[messages.length-1]),
        {
          role: "user",
          content: promptStr,
        },
      ],
      bedrockruntime,
      requestId
    );
    const bedrockResultObj = JSON.parse(bedrockResult);
    return bedrockResultObj;

    // 后续再解析SQL查询DB/Redshift ,进行Insight
  } catch (error) {
    console.error(
      "Bedrock request or respone error " + requestId + " >>>",
      error
    );
    throw new Error(`Bedrock request or respone error`);
  }
};

const bedrockClaude3Respone = async (
  bodyData: any[],
  bedrockruntime: {
    send: (
      arg0: InvokeModelWithResponseStreamCommand | InvokeModelCommand,
    ) => any;
  },
  requestId: string,
) => {

  
  const command = new InvokeModelCommand({
    body: JSON.stringify({
      anthropic_version: 'bedrock-2023-05-31',
      messages: bodyData,
      max_tokens: BEDROCK_MAX_TOKEN,
    }),
    modelId: 'anthropic.claude-3-sonnet-20240229-v1:0',
    contentType: 'application/json',
    accept: 'application/json',
  });
  const respone = await bedrockruntime.send(command);
  if (respone.$metadata.httpStatusCode !== 200) {
    console.error('Bedrock respone error >>>', respone);
    throw new Error(`Bedrock returned an error`);
  }
  const resultData = await resolveClaude3Body(respone.body, requestId);
  return resultData;
};

const resolveClaude3Body = async (
  responeBody: any,
  requestId: string | undefined,
) => {
  // 解析内容
  const resultStr = responeBody.transformToString('utf8');
  const resultJson = JSON.parse(resultStr);
  return resultJson.content[0].text;
};

总结

本文介绍了基于 Amazon Bedrock 托管 Claude3 实现 RAG 增强文本检索的 Text to SQL 方案。其中 RAG 通过 Amazon Titan 实现 Embedding,通过向量检索工具进行向量文件保存和向量检索,最后得到有知识库背景的 prompt,连同上下文一起发送给大模型,得到更精准的输出。

详细代码您可联系亚马逊云科技的支持人员获取,感谢您的阅读。

本篇作者

王宇

亚马逊云科技快速原型方案架构师,负责大前端领域的产品研究与交付。针对应用程序中所涉及的移动端、前端、BFF 层原型及交付等均有涉猎,曾主导过金融、零售与广告、企业应用、大数据、AI 等领域多个大型业务系统的交互设计与实现。

郝亮

亚马逊云科技 Analytics 快速原型解决方案架构师,负责根据客户实际的业务场景,利用最新最适用于场景的大数据技术,基于 AWS 服务快速搭建核心系统,解决客户的关键业务诉求,验证方案的可行性。

内容审阅

姬军翔

亚马逊云科技快速原型解决方案架构师,AI/Machine Learning 领域专家,负责项目的开展与落地交付。