亚马逊AWS官方博客

基于检索增强生成(RAG)的 Streaming LLM 应用开发实战

大语言模型已经被证明可以在其参数中存储知识,并生成文本或者回答问题,然而,它们操作知识的能力仍然有限,特别是一个具体的特定领域,他们的回答可能变得不那么“自信”,还会产生幻觉(hallucination)。RAG(Retrieval Augmented Generation)检索增强生成,是一种通用采用检索方式增强生成(RAG)的 AI 框架,即大模型 LLM 在回答问题或生成文本时,会先从大量的文档中检索出相关信息,然后基于这些检索出的信息进行回答或生成文本,从而可以提高回答的质量,而不是任由 LLM 来发挥。

在此博客里,我们将构建一个基于 RAG 的 LLM 应用程序,使用 LLM 和 Embedding 的向量生成模型进行语义搜索与文本相似性,来构建一个带有图形界面的问答式文档 Web 应用程序。我们将模拟一个工业生产线故障诊断的搜索,并根据搜索给出智能分析方案。由于大模型基本上都支持 Streaming 的方式生成内容,这样用户不用等待所有结果输出结束就可以看到持续生成的内容。我们这里使用 AWS 基础设施,来构建一个基于 HTTP Streaming 的 LLM 应用。

我们在该方案主要使用到如下服务和组件:

  • Amazon OpenSearch Service 支持 k-NN 方式的信息检索,它能够根据向量(Vector)在海量数据之上快速搜索到与之相关度最高的向量列表,使之非常适合一些特定的语义搜索应用。
  • Amazon SageMaker 是 ML 训练、构建和部署的企业级的应用。我们可以使用 SageMaker Endpoint 来部署需要的模型。
  • AWS Lambda 是一个无服务(ServerLess)的计算服务,可以托管后端服务的代码和应用而无须预置,且只需按您的使用量付费。
  • AWS Application Load Balancer(ALB) – 支持能够分发 HTTP/2 和 WebSocket 流量到多个后端服务。
  • Auto Scaling group 能够根据您设置的健康监控和弹性伸缩的规则,从而弹性管理一组 EC2 机器。
  • Amazon S3 是一项对象存储服务,提供行业领先的可扩展性、数据可用性、安全性和性能。
  • Amazon CloudFront 是一项快速的内容分发网络(CDN)服务,可以在开发人员友好环境中以低延迟和高传输速度向全球客户安全分发数据、视频、应用程序和 API。
  • AWS Cloud Development Kit(AWS CDK) 是 AWS 的一个开源工具,其允许开发者使用熟悉的编程语言来创建和定义云资源,简化复杂软件配置的管理。在这篇文章中,我们将使用 AWS CDK 来构建我们的方案。

基于以上服务和基础设施,我们采用 ASG+EC2 部署 LLM 用来生成高质量的回答,使用 Amazon OpenSearch 作为向量相似度检索来进行语义搜索,并在 Amazon SageMaker 上部署一个用于生成文本向量的模型。

方案综述

该方案主要包含以下两个阶段:

  1. 知识准备阶段 – 在此阶段我们通过 Embedding 向量模型对一组语料文档生成向量表示,我们主要根据问题生成向量,并将其存入 OpenSearch Service 中的一个 k-NN 索引中。
  2. 知识检索增强阶段 – 这是应用程序的推理阶段。在此阶段,我们将对输入的文本采用准备阶段使用的向量模型生成向量,然后通过 k-NN 索引来搜索相似的问题并返回相似的列表给前端,在前端页面可以选择你需要大模型进行分析的项目列表,传入后台大模型进行总结。

知识准备阶段可以参考知识导入的过程,这里不做赘述,知识检索增强阶段的方案如下图所示:

具体的步骤如下:

  1. 在前端 UI 页面上输入关键字查询相关的问题和答案,UI 层将内容发送到服务端(Lambda 等服务);
  2. 使用 Embedding 向量模型把关键字转成向量表达;
  3. 通过关键的向量在 Amazon OpenSearch 的向量库中进行 k-NN 搜索与其相似的问题;
  4. 返回向量库相关搜索结果给前端页面;
  5. 用户在页面上手动选择相关信息,UI 层发送选中信息至后端 LLM 大模型;
  6. 通过 LLM 生成问题分析和解决方案建议;
  7. 开始 Streaming 逐字输出文本;
  8. 在前端 UI 中逐字渲染 Streaming 的文本。

我们会在接下来的章节主要介绍与之相关的技术方案。

  • 部署文字向量模型:我们采用 SageMaker 部署 Embedding 模型,用于根据文字生成向量,继而可以用于 OpenSearch 服务的 Vector 相似性查询;
  • 创建 LLM 应用:我们将介绍一个 Hugging Face 的大模型,并使用 Flask 构建 Web 应用,用来智能分析;
  • 使用 OpenSearch 进行语义搜索:我们将介绍相关使用语义进行向量搜索;
  • 导入数据:导入我们生成的模拟 QA 的数据;
  • 示例 UI 的实现和部署

使用 CDK 部署生成文字向量模型

使用 SageMaker 能够把适用于自然语言处理(NLP)的 Hugging Face 模型进行训练、微调和运行推理。其将使用 AWS Deep Learning Containers 去训练和推理工作。有关可用的 Deep Learning Containers 图像的列表,请参阅可用的深度学习容器。这些 Deep Learning Containers 镜像经过维护,并定期使用安全补丁进行更新。

这里我们采用 AWS CDK 来部署一个 Hugging Face 的向量模型 shibing624/text2vec-base-chinese。该模型是一个 CoSENT(Cosine Sentence)的预训练模型,如下片段用来生成我们的 SageMaker 推理节点:

model = sagemaker.CfnModel(
    self,
    f"Model",
    execution_role_arn=self._sagemaker_role.role_arn,
    # the properties below are optional
    enable_network_isolation=False,
    containers=[
        sagemaker.CfnModel.ContainerDefinitionProperty(
            container_hostname=f"{self._project_name}ContainerHostname",
            image=image_uri,
            mode="SingleModel",
            environment={
                "HF_TASK": "feature-extraction",
                "HF_MODEL_ID": "shibing624/text2vec-base-chinese",
                "SAGEMAKER_CONTAINER_LOG_LEVEL": 20,
                "SAGEMAKER_REGION": cdk.Aws.REGION,
            },
        )
    ],
)

...

endpoint_config = sagemaker.CfnEndpointConfig(
    self,
    f"EPConfig",
    production_variants=[
        sagemaker.CfnEndpointConfig.ProductionVariantProperty(
            initial_instance_count=1,
            initial_variant_weight=1.0,
            instance_type=instance_type,
            model_name=model.attr_model_name,
            variant_name=variant_name,
        )
    ],
)

endpoint_config.add_dependency(model)
# ==============================
# ===== SAGEMAKER ENDPOINT =====
# ==============================

endpoint = sagemaker.CfnEndpoint(
    self,
    f"{self._project_name}Endpoint",
    endpoint_name=f'{self._project_name}Endpoint',
    endpoint_config_name=endpoint_config.attr_endpoint_config_name,
)
        
...

创建 LLM 应用

我们将使用具有 GPU 功能的 EC2 来运行我们的 LLM 应用。在大模型选择上,我们选择了 ChatGLM2-6B,ChatGLM2-6B 是开源中英双语对话模型 ChatGLM-6B 的第二代版本,它具备多领域知识、代码能力、常识推理及运用能力。其网络架构基于 Prefix Encoder 方式,推理速度较第一代有 40% 的提升,特别是 RoPE 位置编码也放在了 Attention Layer 之外(第一代放置在 Attention Layer 里)。

ChatGLM 模型(ChatGLM 和 ChatGLM2)主要使用了 Encoder 方式的网络结构,请参见论文 GLM: General Language Model Pretraining with Autoregressive Blank Infilling,我们这里简单介绍一下该 Encoder 的工作原理,如下图表示:

该 Encoder 方式在基础模型阶段和微调阶段是如下工作的。

  • 基础模型阶段

输入序列 x 被分成两部分:Part A 是损坏的文本 Xcorrupt,Part B 由 Masked Span 组成,如上图。Part A 中的标记可以相互关注,但不能关注 B 中的任何标记。Part B 中的标记可以关注 Part A 和 B 中先前的标记,但不能关注 B 中任何后续的标记。为了实现自动回归生成,每个 Span 都用特殊标记[START]和[END]进行填充,分别用于输入和输出。采用这种方式,该模型训练了一个双向编码器(Part A)(bidirectional encoder)和单向解码器(unidirectional)(Part B)。

  • 微调阶段

对于文本生成任务,给定的上下文构成输入的 Part A,末尾添加一个 Mask Token,那么模型对 Part B 可以使用自回归方式生成文本。

大模型显存优化

由于大模型参数非常大,为了良好地使用,我们进行了一些优化。这个模型是 6B 的大小,其参数单位为 float16ml.g4dn 系列至少有 16G 的显存,当全部参数 Load 至 GPU 时,大概占用了 12.5G 左右,考虑到推理时需要更多的现存,我们引入了 Accelerate 包。Accelerate 是 Hugging Face 中非常有用的一个工具,可以大幅提高深度学习模型的训练速度和推理处理。它把模型加载至不同的设备上(内存、显存和 Disk)不需要全部加载至显存,这样就不需要太大的显存来存储模型,在运行其增加了一些 Hooks,这样:

  • 在每神经网络层,输入被放置在正确的设备上(因此即使模型跨越几个 GPU,它也可以工作);
  • 对于卸载(Offload)在 CPU 上的模型参数,它们在前向传播之前被放置在 GPU 上,并在之后很快被清理;
  • 对于卸载(Offload)在硬盘上的模型参数,它们被加载到 RAM 中,然后在前向传播之前被放置在 GPU 上,并在之后很快被清理。

以下图示说明了 Hooks 的工作原理:

详情请参阅:https://huggingface.co/docs/accelerate/concept_guides/big_model_inference

以下是我们运行时一个设备快照:

{
 'transformer.embedding': 0,
 'transformer.rotary_pos_emb': 0,
 'transformer.encoder.layers.0': 0,
 'transformer.encoder.layers.1': 0,
 'transformer.encoder.layers.2': 0,
 'transformer.encoder.layers.3': 0,
 'transformer.encoder.layers.4.input_layernorm': 0,
 'transformer.encoder.layers.4.self_attention.query_key_value': 0,
 'transformer.encoder.layers.4.self_attention.core_attention': 0,
 'transformer.encoder.layers.4.self_attention.dense': 'cpu',
 'transformer.encoder.layers.4.post_attention_layernorm': 'cpu',
 'transformer.encoder.layers.4.mlp': 'cpu’,
 …
 'transformer.encoder.layers.27': 'cpu',
 'transformer.encoder.final_layernorm': 'cpu',
 'transformer.output_layer': 'cpu'
 }

我们下载 Hugging Face 的模型文件之后,在包含有 Accelerate lib 的环境中,加载模型的代码片段如下:

tokenizer = AutoTokenizer.from_pretrained("chatglm-6b", trust_remote_code=True)
model = AutoModel.from_pretrained("chatglm-6b", device_map='auto', trust_remote_code=True)

ChatGLM 支持 streaming 的方式生成文本,代码片段如下:

    def summarize_generate():
        history = []
        response = ""
        pre_response = None
        for idx, (response, history) in enumerate(model.stream_chat(tokenizer, stream_input_text, temperature=temperature, history=history, max_length=6000)):            
            if pre_response is not None:
                word = response[len(pre_response):]
                pre_response = response
                yield word.encode('utf-8')
            else:
                pre_response = response
                yield response.encode('utf-8')

    return app.response_class(summarize_generate())

另外,在我们一个关于知识库的案例中,我们发现如下超参数会影响生成文本的质量,各个参数的相关定义如下:

A 说明 ChatGLM 是否支持 默认值
1 Temperature 该参数决定了输出的概率分布。值越高,输出的预测越随机,创造性也越高 0.8
2 Top P Top P 采样过滤低概率的词汇(根据概率) 0.8
3 Top K Top K 采样过滤低概率的词汇(根据数量) x N/A
4 Sample 是否采样增加随机性 TRUE

我们的 Prompt 模版如下:

prompt_template_llm = """As a sophisticated expert in Manufacturing, based on the customer's question ```{question}```, please provide a concise and professional analysis based on the known information and answers provided answers by the following triple backquotes (```) and tell what information the answer is based on. If you cannot give an analysis from it, please say "insufficient information provided". It is not allowed to add hallucinations to the analysis. Please note that the analysis must be provided in English.
Known information and answers:
  ```
  {answers}
  ```
"""

语义搜索

为了在海量数据上进行相似性搜索,我们根据向量模型生成的结果在已存储的 OpenSearch 的向量 Index 上进行搜索,第一步我们得到向量:

...

self._client = boto3_session.client(service_name="sagemaker-runtime")
...

## invoke with boto3
def _invoke(self, json_body):
    try:
        response = self._client.invoke_endpoint(EndpointName=self._endpoint_name,
                                                ContentType='application/json',
                                                Body=json_body,
                                                Accept='application/json')
        return response
    except Exception as e:
        logger.exception(
            f"Invoke sagemaker endpoint {self._endpoint_name} by {json_body} has exception: {json_body}, by endpoint {self._endpoint_name}")

        raise e
        
def generate_vectors(self, keywords: list[str]):
    """
    Send request to sagemaker endpoint to generate labels

    Args:
        :keywords: keywords list to generate embeddings
    """
    json_input = json.dumps({'inputs': keywords, "options": {"wait_for_model": True}})
    logger.debug(f"Generate embedding from sagemaker {self._endpoint_name} by {json_input}")

    response = self._invoke(json_body=json_input)
    try:
        vectors = json.loads(response['Body'].read())
        if not isinstance(vectors, list) or len(vectors) == 0:
            logger.warning(
                f"Generate embedding from sagemaker {self._endpoint_name} by {json_input} has unsuccessful result: {vectors}")
            return []

        if len(vectors) != len(keywords):
            logger.warning(
                f"Generate embedding from sagemaker {self._endpoint_name} by {json_input} has umatched output witu input, vectors len {len(vectors)} != keywords len {len(keywords)}")
            return []

        results = [vector[0][0] for vector in vectors]
        logger.debug(f"Generated embedding from  sagemaker {self._endpoint_name} by {json_input}: {results}")
        return results
    except Exception as e:
        logger.exception(
            f"Generate embedding from sagemaker {self._endpoint_name} by {json_input} has exception: {json_input}")
        raise e
...

第二步,我们进行向量搜索:

## generate query
@staticmethod
def _get_query(vector=[], size_output=5, knn_k=6):
    query = {
        "size": size_output,
        "from": 0,
        "_source": {
            "excludes": ["question_vector"]
        },
        "query": {
            "knn": {
                "question_vector": {
                    "vector": vector,
                    "k": min(knn_k, 256)
                }
            }
        }
    }
    return query

## opensearch client
def _create_opensearch_client(self,
                                boto3_session=None):

   ...

    auth = (username, password)  # For testing only. Don't store credentials in code.
    return OpenSearch(hosts=[{'host': host, 'port': 443}],
                        http_auth=auth,
                        use_ssl=True,
                        verify_certs=True,
                        connection_class=RequestsHttpConnection,
                        ssl_assert_hostname=False,
                        ssl_show_warn=False)
                                
## query by OpenSearch client
def knn_search_by_text_vectors(self,
                                text_vector,
                                knn_k=6,
                                size_output=8):
    """
    Search by vectors

    Args:
        :text_vector: text vector. must not null or empty
        :size_output: max output size
        :knn_k: param k of knn
        :min_confidence: The minimum confidence level for the labels to return
    """
    if text_vector is None or len(text_vector) == 0:
        raise ValueError('Text vectors cannot be null or empty')

    query = OpenSearchClient._get_query(vector=text_vector,
                                        size_output=size_output,
                                        knn_k=knn_k)
    try:
        logger.debug(
            f"Querying answers from index {self._index} by vector with length {len(text_vector)}")

        response = self._client.search(request_timeout=self._request_timeout,
                                        index=self._index,
                                        body=query)

        logger.debug(f"Queried answers from open search index {self._index} by {query}: {response}")
        return self._resolve_result(response)
    except Exception as e:
        logger.exception(
            f"Couldn't query image materials from open search index {self._index} by {query}")

        raise e

导入数据

我们采用大模型生成了一些 sample 数据进行测试,样本数据如下。

我们在 OpenSearch 创建了如下 index:

import boto3, json
import requests

....
awsauth = get_auth()
host = get_host()

# create index
index_name = 'semantic_search_knowledge_index'
v_dimension = 768 # Embbeding vector dimension

headers = { "Content-Type": "application/json" }

payloads = {
    "settings": {
       "index.knn": True,
        "knn.space_type": "l2"
   },
    "mappings": {
        "properties": {
            "question_vector": {
                "type": "knn_vector",
                "dimension": v_dimension,
                "method": {
                    "name": "hnsw",
                    "space_type": "l2",
                    "engine": "nmslib",
                    "parameters": {
                        "ef_construction": 256,
                        "m": 32
                    }
                }
            },
            "question": {
                "type": "text"
            },
            "answers": {
                "type": "text"
            }
        }
    }
}

# Create Index
r = requests.put(host+index_name, auth=awsauth, headers=headers, json=payloads)

以下代码片段是用于导入数据的:

def import_single_row(payload):
    ## invoke SageMaker to generate text embedding
    question_vector = generate_vector(payload['question'])
    payload['question_vector'] = question_vector
    
    first = json.dumps({ "index": { "_index": index_name} }, ensure_ascii=False) + "\n"
    second = json.dumps(payload, ensure_ascii=False) + "\n"
    payloads = first + second
    r = requests.post(url, auth=awsauth, headers=headers, data=payloads.encode()) # requests.get, post, and delete have similar syntax


def import_data(json_array):
    for payload in tqdm(json_array):
        import_single_row(payload)
        sleep(0.01)

json_array=[]
with open('qa_samples.csv', encoding = 'utf-8') as csv_file_handler:
    csv_reader = csv.DictReader(csv_file_handler)
    for row in csv_reader:
        json_array.append(row)
        
import_data(json_array)

示例 UI 实现

UI 部署架构图

UI 层为 React 单页面应用,静态资源部署在 Amazon S3 中,通过 Amazon CloudFront 访问 S3 中的资源。

其中,为了确保 S3 Bucket 的安全性,仅允许指定的 CloudFront 访问。用户的浏览器通过 Amazon CloudFront 访问 Amazon S3 中的静态资源。

UI 页面展示

结果返回(http 请求流式输出)

UI 框架:@cloudscape(Apache License 2.0)Cloudscape was built for and is used by Amazon Web Services(AWS) products and services. 具体请参阅 AppLayout、Table、Alert 组件。

流式输出实现

const [answerMsg, setAnswerMsg] = useState("");

const requestApi = 'solution_backend_url'
const postData = { answers: requestAnswer, question: query };
try {
      const response = await fetch(requestApi, {
        method: "POST",
        headers: HEADERS,
        body: JSON.stringify(postData),
        signal,
      });
      const data = response.body;
      const reader = data.getReader();
      const decoder = new TextDecoder();
      let done = false;
      let text = "";
      // 循环等待结束标识 并将拿到的数据上屏渲染
      while (!done) {
        if (stopConversationRef.current === true) {
          controller.abort();
          done = true;
          break;
        }
        const { value, done: doneReading } = await reader.read();
        done = doneReading;
        const chunkValue = decoder.decode(value);
        text += chunkValue;
        setAnswerMsg(text + "_");
      }
      setAnswerMsg(text);
    } catch (error) {
      console.error("doSummarizeError", error);
    } finally {
    }
    
    
 ......
 
       <div>
        {answerMsg && (
          <Alert
            key="alt-answer-msg"
            statusIconAriaLabel="Info"
            header="Analyze result"
            dismissible
            onDismiss={alertDismiss}
          >
            {answerMsg}
          </Alert>
        )}
      </div>

通过 CDK 部署示例 UI

这里的逻辑为:

  1. 执行本地 npm i & npm run build 命令,安装依赖并对代码进行打包构建。
  2. 创建 Web S3 存储桶,设置 SPA 应用静态页面存储类型。
  3. 将打包产物上传 Bucket。
  4. 创建 CloudFront,设置回源到 S3 存储桶,并设置 S3 存储桶权限允许 CloudFront 回源访问。

代码示例:

        # prepare the static web pages
        self._prepare_static_web_pages(main_api, summarize_api)

        # create website bucket
        website_bucket = self._create_and_upload_asset_to_s3(id)

        # Create cloudfront
        distribution = self._create_cloudfront_distribution(website_bucket)
        self._grant_cloudfront_access(distribution, website_bucket)

        cdk.CfnOutput(
            self,
            "SmartSearchUrl",
            value=f"https://{distribution.distribution_domain_name}",
            description="Smart search url")
......

def _create_cloudfront_distribution(self, website_bucket):
        distribution: cloudfront.CloudFrontWebDistribution = cloudfront.CloudFrontWebDistribution(
            self,
            "StaticWebsiteDistribution",
            origin_configs=......,
            default_root_object="index.html",
        )

        # add access control for frontend
        cfn_origin_access_control = cloudfront.CfnOriginAccessControl(
            self,
            id="StaticWebsiteDistributionControl",
            origin_access_control_config=cloudfront.CfnOriginAccessControl.OriginAccessControlConfigProperty(
                name="StaticWebsiteDistributionControlConfig",
                origin_access_control_origin_type="s3",
                signing_behavior="always",
                signing_protocol="sigv4",
                description="Default Origin Access Control",
            ),
        )
        # add access
        overriden_distribution = typing.cast(
            cloudfront.CfnDistribution,
            distribution.node.default_child,
        )

        overriden_distribution.add_property_override(
            "DistributionConfig.Origins.0.OriginAccessControlId",
            cfn_origin_access_control.get_att("Id"),
        )

        return distribution
    
......
 

Summary

本文,我们演示了如何使用 AWS 的技术和服务构建一个基于 RAG 的知识库。我们将知识数据以向量形式存储在数据库中,通过 K-nn 检索可以过滤相关的知识,然后交给大模型进行分析。其中,

  • 大模型和生成向量的模型你可以根据需要进行选择;
  • 在一张 Index 里,可以存储多个 Vector 向量,组合其这些 Vector 向量字段的权重给出过滤的知识,可以让您的知识更加丰富;
  • 通过 Web UI 页面提供可视化的 K-nn 检索功能,并通过 UI 页面实现使用者选择结果与大模型分析的交互、流式返回。
  • 通过 AWS 开源的 Cloudscape UI 组件提供 AWS 风格的交互页面。

该方案更加详细的部署您可以参考 Workshop:Question Answering via RAG and LLM,代码是开源的,请参考:smartsearch-ai-knowledge-workshop

参考

RAG:https://research.facebook.com/publications/retrieval-augmented-generation-for-knowledge-intensive-nlp-tasks/

ChatGLM:https://github.com/THUDM/ChatGLM-6B/tree/main

ChatGLM2:https://github.com/THUDM/ChatGLM2-6B

Hugging Face Accelerate:https://github.com/huggingface/accelerate

React:https://react.dev/learn

Cloudscape:https://cloudscape.design/components/

本篇作者

刘济华

亚马逊云科技快速原型方案架构师,主要领域包括 AI/ML,微服务,金融和 IoT 等方向,有近 20 年的开发经验。

王宇

亚马逊云科技快速原型方案架构师,负责大前端领域的产品研究与交付。针对应用程序中所涉及的移动端、前端、BFF 层原型及交付等均有涉猎,曾主导过金融、零售与广告、企业应用、大数据、AI 等领域多个大型业务系统的交互设计与实现。

内容审阅

熊俊峰

亚马逊云科技行业解决方案架构师,主要领域包括 AI/ML,制造业和医疗健康。曾就职于腾讯三年,负责国家医学影像云平台的产品架构设计和图像处理算法的开发工作。研究方向包括大语言模型和计算机视觉等,以第一作者发表 SCI 和国际会议论文 13 篇,以第一发明人申请发明专利 11 项。

程红波

亚马逊云科技解决方案架构师,负责企业级客户的架构咨询及设计优化,同时致力于容器和无服务器技术在国内和全球企业客户的应用和推广。