亚马逊AWS官方博客

基于 RDS 和 Confluence 数据源构建端到端的检索增强生成(RAG)应用

在各行各业中,企业级知识库的需求普遍存在,而大语言模型的应用为企业级知识库带来了全新的可能性,通过 RAG 的方式,企业能高性价比地构建自己的企业知识库问答机器人,而 RAG 的核心就是用 Embedding 模型将企业知识的预料构建成向量数据库。

在和企业级用户进行交流时,我们观察到了用户在实施 RAG 应用时的主要需求,一方面是对企业级数据源的支持,通常使用的数据源包括 Confluence 和 RDS(关系型数据库);另一方面,构建数据 Pipeline 也是一个重要的需求。基于这两个需求,我们利用亚马逊云科技的服务,提供了一个端到端的 RAG Demo 应用,用于给用户展示并拓展到不同的用户 RAG 应用中。在我们的 Demo 环境中,RAG Chatbot Application 后台部分沿用了《下一代智能搜索和知识库解决方案指南》,数据 Pipeline 部分的开发部分参考了《基于大语言模型知识问答应用落地实践》,整体架构如下图,我们也会从以下几个方面展开方案的实现细节。

  • 数据获取:分别支持读取 Confluence 中的页面内容、元数据和存储在  Amazon RDS for MySQL 中的数据。相关 token 信息存储至 Amazon Secret Manager 中。
  • 数据存储:数据源的内容获取与同步由 Glue 实现,最终将内容存储至 S3 中,Confluence 相关元数据存储至 DynamoDB 中。
  • 数据同步 Pipeline:数据写入到 S3 中后,整个 Pipeline 会自动触发,通过 Glue 实现数据向量化后写入到 OpenSearch 中。
  • LLM 应用:结合下一代智能搜索和知识库解决方案指南的后台部分,利用 OpenSearch 和 LLM 模型的功能为用户提供问答服务。

数据源的支持

Confluence

Confluence 作为企业级用户的内容管理工具,存储了用户的各种内部信息,例如 IT 运维过程中的 FAQ,企业新员工入职 FAQ 等等。在利用 Confluence 这部分内容构造自己的 RAG 应用时,是需要一些开发工作量的,包括 Confluence 中内容的第一次获取,Confluence 中内容变更后的处理例如新添加内容和现有内容变更等。为了帮助用户解决这类问题并且更快速的基于这部分数据构建 RAG,我们对于 Confluence 的内容获取进行了实现。

方便理解后续细节,我们首先要了解 Confluence 中的几个概念:Space,Page 和 Label。Space 是用于组织和管理 Page 的容器,每个 Space 会有一个 Space Key,这个 Key 是 Space 的标识;Page 是页面的基本单元,包含特定的内容和属性包括创建时间、更新时间等,每个 Page 会有自己的 Page ID 作为标识;Label,我们为 Confluence Page 设置的自定义标签,例如 Page 的标签为 RAG 应用数据。

在 Confluence 实现的时候,我们从下面三个部分从细节展开:

  1. 获取 Confluence 的账号信息,存储到亚马逊云科技 Secrets Manager 中。
  2. 第一次获取 Confluence 的 RAG 数据后,写入到 S3 中,同时在 DynamoDB 中记录 Page ID 和更新时间。
  3. 根据用户的需求定时获取 Confluence 新增页面和现有页面中更新的数据,重新写入到 S3 中。

Confluence 账号设置

我们利用 Confluence 的 API token 用于读取内容,可以参考下面的方式设置 API token 信息。

  1. 登录到 Confluence 后,打开 https://id.atlassian.com/manage-profile/security/api-tokens 链接。
  2. 点击”Create API Token” 按钮生成新的令牌。
  3. 生成令牌后,将会显示在屏幕上。请确保复制并安全存储令牌在一个安全的位置,因为它将不再可见。

在代码实现中,我们使用 AWS Glue 的 Python Shell 的方式读取 Confluence 中的数据。主要用到 Atlassian Python API(pyAtlassian)库,用于与 Atlassian(包括 Confluence、Jira、Bitbucket 等)进行交互,它提供了功能丰富的方法和类。但考虑到这个库在读取 Page 内容的时候,只能通过 PDF 的形式导出,我们也会通过 Confluence REST API 端点读取 Confluence page 的内容。

Confluence 数据的首次获取

在构建 RAG 应用时 ,用户第一次会把 Confluence 的内容进行批量导入。在实践场景中,我们建议用户把 RAG 需要的所有 Page 内容均复制到同一 Space 中,然后我们可以通过 Space Key 去获取客户知识库的内容,Space Key 可以在 Confluence 链接 https://*****.atlassian.net/wiki/spaces/~712020ac51b1b736ca4d77a5c49a8545711715/overview 中获取到,中间的~712020ac51b1b736ca4d77a5c49a8545711715 为 Space Key。但如果用户没办法把所有 Page 添加到一个 Space 中,可以通过下面的 get_all_spaces 函数获取所有 Space Key 信息,再根据需求去筛选出需要的 Space Key。

# Get all spaces with provided limit
# additional info, e.g. metadata, icon, description, homepage
confluence.get_all_spaces(start=0, limit=500)

以下是 get_all_spaces 返回的某条 Space 信息的结果,key 字段为我们后续获取 Space 内容需要用到的值,我们可以通过 name 字段去筛选 key

{'id': 98306, 'key': '~712020ac51b1b736ca4d77a5c49a8545711715', 'name': 'chatbotdemo', 'type': 'personal', 'status': 'current', *********

获取我们的 Space Key 后,我们可以通过如下代码再获取 Space 下的 Page 内容。

# Get all the pages under this space
all_pages = confluence.get_all_pages_from_space(space_key)

获取到 Page ID 后,我们会执行两个步骤,第一步是获取 page 的最近更新时间写入到 DynamoDB 中,然后读取 Page 的内容并更新到 S3 中。如果需要过滤掉某些 Page 的内容,从技术上我们总结了两种过滤 Page 的方式:1)如果需要过滤的 Page 比较少的话,可以指定 Page ID 的方式过滤掉;2)如果有很多种 Page 信息需要过滤,我们可以通过前面提到的 Label 方式进行过滤。代码的具体实现可以参考 Method 1 和 Method 2 两部分。

for page_num in range(len(all_pages)):
        
        confluence_page_id = all_pages[page_num]["id"]
        print("confluence_page_id", confluence_page_id)
        
        # Method 1:specify the page_id when you need to filter some pages
        # if confluence_page_id =='98389' or confluence_page_id =='98390':
        #     continue
        
        # Method 2:specify the page_id when you need to filter some pages
        # labels = confluence.get_page_labels(confluence_page_id)
        # need_to_filter = False
        # for label_num in range (len(labels["results"])):
        #     if labels["results"][0]["label"] != '<Your_Label>':
        #         need_to_filter = True
        # 
        # if need_to_filter == True:
        #     continue
        
        # Step 1, put the Page's last update in the DynamoDB's table
        lastUpdate = getConfluenceLastUpdate(confluence_page_id)
        
        current_date_of_data.put_currentDateOfData(confluence_page_id, lastUpdate)

        # Step 2, extract the Page and export to S3
        exportConfluencePageToS3(confluence_base_url, confluence_page_id)

我们通过 Atlassian Python 库获取 Page 的更新时间

# Get Last Update Date
def getConfluenceLastUpdate(confluence_page_id):

    page=confluence.get_page_by_id(confluence_page_id, expand='title,history.lastUpdated', status=None, version=None)

    lastUpdate = page["history"]["lastUpdated"]["when"]

    # print(page["history"]["lastUpdated"]["when"])

    return lastUpdate

通过 REST API 的方式获取 Confluence Page 的内容处理后,写入到 S3 中

    # Confluence API endpoint for retrieving page content
    confluence_api_url = f"{confluence_base_url}/rest/api/content/{page_id}?expand=body.storage"

    # Make a GET request to Confluence API to retrieve the page content
    response = requests.get(confluence_api_url, auth=(confluence_username, confluence_password))

    # Check if the request was successful
    if response.status_code == 200:
        page_content = response.json()['body']['storage']['value']

        # Assuming 'page_content' holds the HTML content of the Confluence page
        soup = BeautifulSoup(page_content, 'html.parser')
        text_content = soup.get_text(separator="\n")

        # Process <br> tags and replace them with line breaks
        text_content = re.sub(r'\s*<br\s*/?>\s*', '\n', text_content)

        # Save the page content to S3
        #s3_client = boto3.client('s3')
        s3_client = boto3.client('s3', aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key, region_name='us-east-1')
        s3_file_key = s3_path + 'page_'+ confluence_page_id + '.txt'
        s3_client.put_object(Body=text_content, Bucket=s3_bucket, Key=s3_file_key)

Confluence 内容更新的数据获取

随着 Confluence 的数据发生变化,客户会把新的数据导入到知识库中,分为两种情况,一个是新建的 Page,另外一个是内容有更新的 Page。对于新建的 Page,我们会重复获取新数据的步骤,获取更新时间并将 Page 内容导出到 S3;对于有内容更新的 Page,我们会更新 DynamoDB 中的最新更新时间,再把内容导出到 S3。

        # Add new page
        if current_date_of_data.check_pageIfExist(confluence_page_id) is None:
        
            lastUpdate = getConfluenceLastUpdate(confluence_page_id)
            current_date_of_data.put_currentDateOfData(confluence_page_id, lastUpdate)
            exportConfluencePageToS3(confluence_base_url, confluence_page_id)
            continue
        
        # Get the updated Confluence Data
        current = current_date_of_data.get_currentDateOfData(confluence_page_id)

        lastUpdate = getConfluenceLastUpdate(confluence_page_id)
        current_date_of_data.put_currentDateOfData(confluence_page_id, lastUpdate)

        if current != lastUpdate:
            print("Confluence is updated.")
            current_date_of_data.update_currentDateOfData(confluence_page_id, lastUpdate)
            exportConfluencePageToS3(confluence_base_url, confluence_page_id)
        else:
            print("Confluence is not updated.")

如果我们想定期的去自动更新数据,可以通过 Glue 的定时任务功能进行设置。

Amazon RDS

在企业的数据存在 Amazon RDS(关系型数据库)中的场景中,比如商品评论记录、用户体验记录、医疗记录、故障记录等,我们也可以将 RDS 中的结构化数据进行处理后导入知识库,对其中的内容进行智能问答。我们使用 Amazon Glue Studio 提供基于 UI 的方式构建 RDS 数据同步流水线,读取 RDS 中的原始数据并进行处理后,将结果导入 S3,最后给后面的数据 Pipeline 进行向量化并导入知识库。

建立连接

首先,Glue 需要与上游 RDS 和下游 S3 建立连接。我们在 Glue connection 里面分别如下图创建 RDS 的 JDBC 连接,以及 S3 的网络连接,详细的步骤可以参考 Glue 连接 RDS 文档Glue 连接 S3 文档

连接 S3 要注意的是,为了保障 Glue 是在 VPC 内访问 S3,需要为 Glue 所在的子网创建访问 S3 的 VPC Endpoints,才能对 S3 进行访问。创建好连接后,我们需要创建一个 Glue crawler。Glue Crawler 会通过我们之前创建的连接访问 RDS,获取表结构信息,并在 Glue Data Catalog 中创建一张对应的表。

通过 Glue Studio 处理 RDS 数据

在 Glue Studio 中,可以通过拖拉拽的形式,添加数据源、数据处理逻辑、和数据目标。数据源这里,我们选择之前步骤创建的 Glue Data Catalog 表。

我们可以在 Glue Studio 中处理添加各种处理逻辑,比如更改字段名称,更改字段类型,删除字段。除此之外,还有筛选功能,对字段值进行筛选;数据聚合;填补空白字段;或通过 SQL 对数据进行处理。

数据目标,我们选择 CSV 或其它需要的的格式,并指定目标 S3 路径。

最后,在运行 Glue job 之前,加上之前创建的 S3 和 RDS 连接。在指定这些参数后,我们就可以把 RDS 中的数据加载到 S3 中,用于后续处理。

数据 Pipeline

在前面的数据源支持的章节中,我们介绍了如果针对 Confluence 以及 RDS 这两种数据源做原始数据的处理,并放入 S3 等待进行 Embedding 处理,下面我们就接下来介绍如果针对这些数据进行 Embedding,存入向量数据库,待后续 RAG 应用后续调用进行知识召回。

我们需要构建一个 S3 > Lambda > Glue 的 Pipeline,将前文中预处理好的文件进行文本分割并且进行 Embedding 处理,其中 Lambda 起到承上启下的作用,它需要将上传的文件的 Object key 以及 Glue 需要用到的环境信息进行传递并触发 Glue 进行任务,接下来我们会分别介绍处理 Confluence 以及 RDS 与处理后的文档的 Pipeline。

Lambda

创建一个 Runtime 为 Python3.9 的 Lambda,将之前存有预处理后的文件的 S3 桶配置成 Lambde 的 Trigger,进入创建好的 Lambda > Configuration > Trigger > Add Trigger。

创建完成后,回到 Lambda >  Code 进行代码编辑,首先将向量数据库,Embedding 模型,Glue job 名称,和 Region 信息配置在 Lambda 的环境变量中,并用下列代码取回

# Initiate all environment variables 
job_name = os.environ['glue_jobname']
embedding_endpoint = os.environ.get("embedding_endpoint", "")
aos_endpoint = os.environ.get("aos_endpoint", "")
region = os.environ.get("region", "")

通过 Event 获取到 S3 事件里面的 Bucket 和 Object key

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    object_key = event['Records'][0]['s3']['object']['key']

将环境变量通过 Arguments 的方式,触发 Glue job 对文件开始进行数据处理, 点击 Deploy 保存代码

import boto3

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    object_key = event['Records'][0]['s3']['object']['key']
    
    glue.start_job_run(JobName=job_name, 
                       Arguments={"--bucket": bucket, 
                                  "--object_key": object_key, 
                                  "--EMB_MODEL_ENDPOINT": embedding_endpoint,
                                  "--AOS_ENDPOINT" : aos_endpoint,
                                  "--REGION" : region,
        
        }
    )

AWS Glue

创建新的 Glue Job,对 Confluence 和 RDS 数据进行 Embedding 处理,处理脚本我们会使用 Python。下面为这次我们要处理的 Confluence 数据的样例,文件名称  confluence.txt。

Question:AWS MSK 如何帮助客户快速配置并部署高度可用的 Apache Kafka 集群?

Answer:AWS MSK 提供了原生多可用区(AZ)的 Apache Kafka 集群部署模式,可以帮助客户快速配置并部署高度可用的 Apache Kafka 集群,且 AZ 之间的流量传输是免费的。

Question:AWS MSK 如何降低运维复杂度?

Answer:AWS MSK 会自动检测底层服务器,并在出现故障时进行替换,也会编排服务器的补丁与升级,同时还确保数据得到长久存储和保护,方便快捷地查看监控指标并设置警报。

Confluence 数据 Embedding 处理

首先从 Lambda 传过来的 Arguments 中获取必要的环境变量

#!/usr/bin/env python
# coding: utf-8

from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ['bucket', 
                                     'object_key',
                                     'EMB_MODEL_ENDPOINT',
                                     'AOS_ENDPOINT',
                                     'REGION'])
bucket = args['bucket']
object_key = args['object_key']
EMB_MODEL_ENDPOINT=args['EMB_MODEL_ENDPOINT']
AOS_ENDPOINT=args['AOS_ENDPOINT']
REGION = args['REGION']

初始化 OpenSearch 的设置,包括 Opensearch 的链接和初始化 Index

from opensearchpy import OpenSearch, RequestsHttpConnection

auth = ("my_username", "my_password")

#initate Opensearch client
client = OpenSearch(
        hosts = [{'host': AOS_ENDPOINT, 'port': 443}],
        http_auth = auth,
        use_ssl = True,
        verify_certs = True,
        connection_class = RequestsHttpConnection
    )
    
 #define index attributes

body = {
    "settings" : {
        "index":{
            "number_of_shards" : 1,
            "number_of_replicas" : 0,
            "knn": "true",
            "knn.algo_param.ef_search": 32
        }
    },
    "mappings": {
        "properties": {
            "id" : {
                "type" : "text"
              },
              "paragraph" : {
                "type" : "text"
              },
              "sentence" : {
                "type" : "text"
              },
              "sentence_vector" : {
                "type" : "knn_vector",
                "dimension" : 768,
                "method" : {
                  "engine" : "nmslib",
                  "space_type" : "l2",
                  "name" : "hnsw",
                  "parameters" : {
                    "ef_construction" : 256,
                    "m" : 128
                  }
                }
              },
              "title" : {
                "type" : "text"
              }
            }
        }
    }

def create_index():
    #create unique index name
    now = datetime.datetime.now()
    index_name = "confluence-index-" + now.strftime("%Y-%m-%d-%H-%M-%S-%f")
    #initate index
    client.indices.create(index=index_name, 
                          body=body, 
                          ignore=400)
    return index_name

#run the function
INDEX_NAME = create_index()

定义 Embedding 处理功能,在这里我们会需要用到用 SageMaker 部署好的 Embedding 模型 Endpoint,部署过程可参考《基于智能搜索和大模型打造企业下一代知识库》之《手把手快速部署指南》中的 3.1.2 章节。

def get_st_embedding(smr_client, text_input, endpoint_name=EMB_MODEL_ENDPOINT):
    parameters = {
      "max_new_tokens": 50,
      "temperature": 0,
      "min_length": 10,
      "no_repeat_ngram_size": 2,
    }

    response_model = smr_client.invoke_endpoint(
                EndpointName=endpoint_name,
                Body=json.dumps(
                {
                    "inputs": [text_input],
                    "parameters": parameters
                }
                ),
                ContentType="application/json",
            )
    
    json_str = response_model['Body'].read().decode('utf8')
    json_obj = json.loads(json_str)
    print("print json obj", json_obj)
    embeddings = json_obj[0][0]
    
    return embeddings[0]

将数据进行 embedding 处理

def write_vector_index_to_aos(paragraph_array, 
                              smr_client, 
                              object_key, 
                              index_name):
    """
    write paragraph to AOS for Knn indexing.
    :param paragraph_input : document content 
    :param aos_endpoint : AOS endpoint
    :param index_name : AOS index name
    :return None
    """

    def get_embs():
        for paragraph in paragraph_array:
            print("********** paragraph : " + paragraph)

            documents = []
            if paragraph.lower().find("question:") > -1:
                question, answer = paragraph.split("\n", 1)
                question = question.replace("Question: ", "")
                answer = answer.replace("Answer: ", "")
                #若符合FAQ格式,仅对问题进行embedding处理并放入index
                documents.append({ "title" : object_key, "sentence" : question, "paragraph" : question+" "+answer, "sentence_vector" : get_st_embedding(smr_client, question)})
            else:
                #若不符合,则退而求其次对整个文本进行embedding处理并放入index
                documents.append({ "title" : object_key, "sentence" : paragraph, "paragraph" : "", "sentence_vector" : get_st_embedding(smr_client, paragraph)})

            for document in documents:
                yield {"_index": index_name, "_source": document, "_id": hashlib.md5(str(document).encode('utf-8')).hexdigest()}

    get_embs_func = get_embs()
    
    response = helpers.bulk(client, get_embs_func)

定义文本分割器,用“Question”为单位进行文本分割,然后调用 write_vector_index_to_aos 进行 embedding 处理并写入 index

smr_client = boto3.client("sagemaker-runtime")

def split_by(content):
    sep='Question'
    arr = content.split(sep)
    p_arr = [ f"{sep}{paragraph}" for paragraph in arr ]
    #print("********** p_arr " + p_arr)
    return p_arr[1:]

def process_s3_uploaded_file(bucket, object_key):
    
    print("********** object_key : " + object_key)
    obj = s3.Object(bucket,object_key)
    body = obj.get()['Body'].read().decode('utf-8').strip()
            
    if(len(body) > 0):
        WriteVecIndexToAOS(split_by(body), smr_client, object_key)

RDS 数据 Embedding 处理

针对 RDS 数据,我们基本可以复用上一个章节所用到的代码,但我们需要在 write_vector_index_to_aos,split_by 这两个功能模块进行修改,这次的 RDS 数据我们用了一些电商上面评论的数据,原始数据经过处理已成为 csv 格式,样例如下:

日期 品牌 型号 内容
2023/6/29 10:29 Banana 50Pro 除了客服不作为 其余只能说一般吧。反正客服真心不怎么样,买了3台,说的给礼盒,没有了,也不给补,反正不怎么样。
2023/6/29 08:44 Banana 50Pro 拍照效果很好,运行特别流畅。
2023/6/29 10:29 Banana 50Pro 手机真的很棒,拍照很好,强烈推荐。换了手机壳好酷。

我们会将 CSV 文档转成 Python Dictionary 数据格式,因文本内容不多,出于方便搜索的角度,我们将品牌,型号,内容结合并以病进行 embedding 处理并入库,作为后面搜索匹配使用。

def write_vector_index_to_aos(paragraph_array, 
                              smr_client, 
                              object_key, 
                              index_name):
    """
    write paragraph to AOS for Knn indexing.
    :param paragraph_input : document content 
    :param aos_endpoint : AOS endpoint
    :param index_name : AOS index name
    :return None
    """
    
    def get_embs_rds():
        for row in paragraph_array:
            combine_content = f'{row["品牌"]} {row["型号"]} {row["内容"]}'
            row["title"] =  object_key
            row["paragraph"] = combine_content
            row["sentence"] =  combine_content
            row["sentence_vector"] =  get_st_embedding(smr_client, combine_content)
            print(row)
            client.index(index=INDEX_NAME, body=row)

    get_embs_func = get_embs()

最后运行 process_s3_uploaded_file,从 S3 下载文件,并进行 embedding 处理,入库

import io
import csv

smr_client = boto3.client("sagemaker-runtime")

def split_by(content):
    # Read body as string
    body = obj['Body'].read().decode('utf-8')

    # Parse CSV
    reader = csv.DictReader(io.StringIO(body))
    return reader

def process_s3_uploaded_file(bucket, object_key):
    
    print("********** object_key : " + object_key)
    body = s3.get_object(Bucket=bucket, Key=object_key)

    if(len(body) > 0):
        WriteVecIndexToAOS(split_by(body), smr_client, object_key)

process_s3_uploaded_file(bucket,object_key)

Demo 演示

最后我们会采用 https://github.com/aws-solutions-library-samples/guidance-for-custom-search-of-an-enterprise-knowledge-base-on-aws/tree/v2 方案的 chatbot 功能来围绕我们上面章节中处理好的数据进行问题测试,调整 top_k 去要求知识库机器人引用多少条根据我们的问题召回的知识进行问题回答。

Confluence 数据(MSK FAQ)

我们尝试问一个 MSK FAQ 里有的问题,结果如下:

RDS 数据(电商评论)

在这里我们想知道在基于电商网站评论中,我们虚构的香蕉手机拍照怎么样,结果如下:

总结

本篇博客从企业级用户的实际需求出发,为用户提供了不同数据源的获取,数据处理,数据向量化和自动化的数据 Pipeline,根据不同数据源头,自动化了数据内容同步,和数据源保持时刻同步,减少了数据的运营维护成本。据此,结合 下一代智能搜索和知识库解决方案 RAG 应用解决方案,客户可以利用多种 prompt 生成符合自己业务场景需求的文本答案,实现文本总结,文本分类,智能客服,产品介绍等场景。

本篇作者

冯秋爽

AWS 解决方案架构师,负责跨国企业级客户基于 AWS 的技术架构设计、咨询和设计优化工作。在加入 AWS 之前曾就职于 IBM、甲骨文等 IT 企业,积累了丰富的程序开发和数据库的实践经验。

黄俊杰

AWS 解决方案架构师,负责面向跨国企业客户的云计算方案架构咨询和设计,客户涵盖医疗、零售、咨询等行业。擅长混合云网络设计、DevOps 以及新技术钻研,对 GenAI、Web3 领域有着浓厚兴趣。

原媛

AWS 解决方案架构师,负责基于 AWS 云计算方案的架构咨询和设计实现,具有丰富的解决客户实际问题的经验,同时热衷于时间序列数据分析的相关研究与应用。

张子曼

亚马逊云科技解决方案架构师,负责基于 AWS 云计算方案架构的咨询和设计,在国内推广 AWS 云平台技术和各种解决方案。专注于 Serverless 和数据库等技术方向。