亚马逊AWS官方博客

Amazon DocumentDB 之中文检索

一.Amazon DocumentDB 的全文检索功能

Amazon DocumentDB 的原生全文搜索功能允许用户使用特殊用途的文本索引对大型文本数据集执行文本搜索,支持以下与 MongoDB API 兼容的功能:

  • 在单个字段上创建文本索引。
  • 创建包含多个文本字段的复合文本索引。
  • 执行单词或多字搜索。
  • 使用权重控制搜索结果。
  • 按分数对搜索结果进行排序。
  • 在聚合管道中使用文本索引。
  • 搜索确切的短语。

同时文本搜索存在以下限制,具体可以参考此链接

  • 仅基于亚马逊 DocumentDB 5.0 实例的集群支持文本搜索。
  • 文本索引仅支持英语。

针对目前全文检索功能仅支持英语的现状,本 Blog 将会演示如何使用 Amazon DocumentDB 结合开源插件「结巴分词」实现中文全文检索。

解决方案架构如下:

存量数据部分:

增量数据部分:

二.方案演示

2.1 创建集群

参考此文档,创建 Amazon DocumentDB,版本需要选择 5.0。

2.2 准备&导入测试数据

笔者准备了一份测试数据,Sample Data 如下:

Book Title Author Publication Date Copyright Open Year Last Modified
尚书 The Shoo King or the Book of Historical Documents 1949 10/9/2022
诗经 The Canon of Poetry unkown 7th century BC 1949 10/9/2022
红楼梦 The Dream of the Red Chamber 曹雪芹、高鶚 18th century 1949 10/9/2022
曾国藩家书 Zeng Guo Fan Jia Shu 曾国藩 19th century 1949 10/9/2022
隋唐演义 Heroes in Sui and Tang Dynasties 褚人穫 1695 1949 10/9/2022
韩非子 Hanfeizi 韩非 3th century BC 1949 10/9/2022
盐铁论 DISCOURSES ON SALT AND IRON 桓寬 1th century BC 1949 10/9/2022
南方草木状 NanFangCaoMuZhuang 嵇含 3th-5th century 1949 10/9/2022
论语 The Analects of Confucius 孔子 5th century BC 1949 10/9/2022
骆驼祥子 TeaHouse & Camel Xiangzi 老舍 1937 2016 10/9/2022

使用 MongoDB 客户端,将这些 Sample Data 导入该 Collection:

# wget https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem
# mongoimport --ssl --db yourdb --collection yourcollection --type csv --headerline --ignoreBlanks --file sample.csv --host yourdocumentdb.regioncode.docdb.amazonaws.com:27017 —sslCAFile global-bundle.pem —username youruser —password yourpassword

创建 text 索引,使用中文检索:

# rs0:PRIMARY> db.yourcollection.createIndex({"Book": "text"});

可以看到当使用完整的中文字符串,例如“骆驼祥子”时,是可以检索出来相关 Document 的。但是换成“骆驼”或者“祥子”时,就无法检索出来。

2.3 测试「结巴分词」

参考链接安装好结巴分词,我们使用 Python 对“骆驼祥子”进行分词效果测试,测试代码如下:

import jieba
strs = "骆驼祥子"
strList = jieba.cut_for_search(strs)
print(" ".join(strList))

运行代码,结果如下,可以看到已经成功按照“骆驼”,“祥子”两个中文词语成功分词了。

2.4 存量数据分词

首先是存量数据,为了不影响原先的数据结构,我们计划新增加一个字段「MyIndex」,并在该字段上创建 Text 类型的索引。然后借助外部程序调用结巴分词对 Collection 中每一条 Document 的「Book」字段做分词处理,将分词后的结果写入「MyIndex」中。因为每个集合上只允许有一个文本索引,所以我们需要先删除之前创建的索引。

import pymongo
import jieba

def parseWord(bookName):
    strList = jieba.cut_for_search(bookName)
    return ' '.join(strList)

myClient = pymongo.MongoClient('mongodb://yourdocumentdb.regioncode.docdb.amazonaws.com:27017/?retryWrites=false',
                               username=youruser,
                               password=yourpassword,
                               tls=True,
                               tlsCAFile='global-bundle.pem')

db = myClient["mydb"]
mybook = db["mybook"]

db.mybook.create_index([('MyIndex', pymongo.TEXT)])

for book in db.mybook.find({}):
    db.mybook.update_one(
        {  '_id': book['_id']  },
        {  '$set': {
              'MyIndex': parseWord(book["Book"])
           }
        }
    )

执行代码后再次检索:

到目前为止,我们已经解决了存量数据的分词及检索,接下来该演示如何解决增量数据的分词及检索了。

2.5 增量数据分词

Amazon Lambda 现在支持将 Amazon DocumentDB 变更流作为事件源。Amazon DocumentDB(与 MongoDB 兼容)中的变更流功能提供了发生在集群集合中的一系列按时间排序的变更事件。客户现在可以在其基于 Lambda 构建的无服务器应用程序中使用这些事件。具体可以参考此链接。同时 Amazon DocumentDB 现在支持读取器实例上的变更流,通过读取器实例上的变更流,客户现在可以将变更流工作负载隔离到特定的读取器实例,从而减少集群写入器实例的负载。具体可以参考此链接。本章节中我们使用 Amazon DocumentDB 中的变更流功能来实现增量数据的分词。当客户端插入 Document 时,会触发 Amazon DocumentDB 的变更流功能,将该 Document 传入后端的 Amazon Lambda 中。在 Amazon Lambda 中完成对 Document 里「Book」字段的分词处理,将分词结果写入「MyIndex」字段中。最后将处理后的 Document 更新回 Amazon DocumentDB。接下来我们开始构造这个 Lambda,在这个 Lambda 中我们需要使用到「结巴分词」和「PyMongo」。建议将第三方依赖单独打包成 Lambda Layer,可以参考以下手册:

1)首先安装 Python 3.12

2)使用 Python 虚拟环境,安装依赖项,打包 Layer 压缩文件,成功后可以看到「zip」

# mkdir layerTest
# cd layerTest/
# python3 -m venv env
# source env/bin/activate
# pip3 install pymongo
# pip3 install jieba
# mkdir python
# cp -r lib/ python/
# zip -q -r mongoPocLayer.zip python/

3)创建 Lambda Layer

4)创建 Secret Manager

5)创建用于 Amazon Lambda 的 IAM Role

按照以下内容创建 trust-relationship.json 文件并创建 IAM Role

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
    
aws iam create-role \
    --role-name my-poc-docdb5-lambda-role \
   --assume-role-policy-document file://trust-relationship.json
my-poc-docdb5-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": "arn:aws:secretsmanager:regioncode:accountid:secret:poc/my-poc-docdb5-*"
        }
    ]
}
               
aws iam create-policy \
--policy-name my-poc-docdb5-policy \
--policy-document file://my-poc-docdb5-policy.json

附加 IAM Policy 到 IAM Role

aws iam attach-role-policy \
  --policy-arn arn:aws:iam::account-id:policy/my-poc-docdb5-policy \
  --role-name my-poc-docdb5-lambda-role

aws iam attach-role-policy \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole \
  --role-name my-poc-docdb5-lambda-role

6)开启 Change Stream

db.adminCommand({modifyChangeStreams: 1,
    database: "mydb",
    collection: "mybook", 
    enable: true});

7)创建 lambda_function.py,并粘贴以下代码

import json
import jieba
import pymongo
from bson.objectid import ObjectId
import boto3
from botocore.exceptions import ClientError

def lambda_handler(event, context):
    mySecretDic = get_secret()
    myClient = pymongo.MongoClient('mongodb://' + mySecretDic['host'] + ':27017/?retryWrites=false',
        username=mySecretDic['username'],
        password=mySecretDic['password'],
        tls=True,
        tlsCAFile='global-bundle.pem')
        
   db = myClient["mydb"]
    mybook = db["mybook"]
    
   eventList = event['events']
    for tmpEvent in eventList:
        operationType = tmpEvent['event']['operationType']
       if operationType != 'insert':
            continue
        bookId = tmpEvent['event']['fullDocument']['_id']['$oid']
        bookName = tmpEvent['event']['fullDocument']['Book']
        indexValue = { "$set": { "MyIndex": parseWord(bookName) } }
        db.mybook.update_one({"_id" : ObjectId(bookId)}, indexValue)
        
def parseWord(bookName):
    strList = jieba.cut_for_search(bookName)
    return ' '.join(strList)

def get_secret():

   secret_name = "poc/my-poc-docdb5"
    region_name = "ap-northeast-1"

   session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    
   try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        raise e

   secret = get_secret_value_response['SecretString']
    return json.loads(secret)

并下载 CA 证书

# wget https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem

将代码与证书文件打包至「lambdaTest.zip」

# zip -q lambdaTest.zip lambda_function.py global-bundle.pem

8)创建 Lambda

选择和 Amazon DocumentDB 相同的 VPC,安全组中允许 Lambda 访问 DocumentDB

创建函数,上传之前打包好的 zip 包

编辑 Lambda Layer

9)配置 Lambda Trigger

10)插入数据进行测试

db.mybook.insertOne(
  {
    "Book" : "本草纲目",
    "Title" : "Herbal Foundation Compendium",
    "Author" : "李时珍",
    "Publication Date" : "1596 AD",
    "Copyright Open Year" : 1949,
    "Last Modified" : "2022/10/9"
  }
);

11)进行检索测试

rs0:PRIMARY> db.mybook.find({$text: {$search: '本草'}});

查看更新后的 Document,可以看到「MyIndex」已经更新了。

rs0:PRIMARY> db.mybook.findOne({"Book" : "本草纲目"});

三.总结

至此,我们就完成了结合 Amazon DocumentDB 和「结巴分词」实现的中文索引的所有演示。

本篇作者

杨探

亚马逊云科技解决方案架构师,负责互联网行业云端架构咨询和设计。

刘冰冰

亚马逊云科技数据库解决方案架构师,负责基于亚马逊云科技的数据库解决方案的咨询与架构设计,同时致力于大数据方面的研究和推广。在加入亚马逊云科技之前曾在 Oracle 工作多年,在数据库云规划、设计运维调优、DR 解决方案、大数据和数仓以及企业应用等方面有丰富的经验。