通过 Project Lakechain 创建文件处理管道
了解如何通过 Project Lakechain 在亚马逊云科技创建 AI 驱动的云原生文件处理管道



开始前,请确认已安装以下工具并正确配置:
- AWS CLI
- Docker
- Node.js 18+(推荐使用 nvm)
- Conda(推荐)或 Python 3.9+
- TypeScript 5.0+(可选)
- AWS CDK v2(可选)

概述
在本文中,我们将探索 Project Lakechain--亚马逊云科技实验室推出的一个全新酷炫项目,可用于创建现代化的、AI 驱动的文件处理管道。
Project Lakechain 基于 AWS Cloud Development Kit (AWS CDK),提供 60 多种用于处理处理图像、文本、音频、视频的即用型组件,还集成了 Amazon Bedrock 等生成式 AI 服务和 Amazon OpenSearch 或 LanceDB 等向量数据库。
Project Lakechain 的亮点在于,可以轻松地将这些组件组合成复杂的处理管道,实现数以百万计的文档的大规模处理,而文档质量是紧随其后的另一个亮点……一切只需通过基础设施即代码 (IaC) 即可实现。

先决条件
开始前,请确认已安装以下工具并正确配置:
- AWS CLI
- Docker
- Node.js 18+(推荐使用 nvm)
- Conda(推荐)或 Python 3.9+
- TypeScript 5.0+(可选)
- AWS CDK v2(可选)
更多详情,请查阅 Lakechain 文档的先决条件章节。
初始设置
只需克隆仓库,即可启动 Lakechain
git clone https://github.com/awslabs/project-lakechain
cd project-lakechain
安装项目依赖项
npm install
快速了解
此时可以试用其中一个简单的管道,也可以大胆尝试一下端对端的用例。最好先从快速入门章节开始。它将引导您使用 AWS Rekognition 部署面部识别管道。

文本审核
我们来尝试从零开始搭建一个管道,上面的快速了解部分适用于图片审核,让我们试着创建一个类似的文本管道。
💡本文并不会讲解 AWS CDK 的基础知识。如有需要,请查阅 AWS CDK 开发指南。
如果使用标准的 Mermaid 语言风格,我们将建立如下所示的管道:
flowchart LR
Input([Input Bucket]) -.-> S3[S3 Trigger]
S3 -. Document .-> Comprehend[NLP Text Processor]
Comprehend --> PiiCondition{Contains PII Information?}
PiiCondition -- Yes --> Moderated[Moderated S3 Bucket]
PiiCondition -- No --> SentimentCondition{Non-Negative Sentiment?}
SentimentCondition -- No --> Moderated
SentimentCondition -- Yes --> Safe[Safe S3 Bucket]

其工作原理如下:
每次上传文件至 Input Bucket 时,都会触发该管道。
接着该管道将调用 Amazon Comprehend 驱动的自然语言处理器 (NLP),以确定主要语种、进行情绪分析、检测个人识别信息 (PII)。
如果不包含 PII 数据且为非负面情绪,文本将被发送至 Safe Bucket。
否则,它将存放在 Moderated Bucket 中等待人工审核。如果您想查看代码,可以查看下面完整的文本审核堆栈:
// stack.ts
import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';
import { Construct } from 'constructs';
import { CacheStorage } from '@project-lakechain/core';
import { Condition, CloudEvent } from '@project-lakechain/condition';
import { S3EventTrigger } from '@project-lakechain/s3-event-trigger';
import { NlpTextProcessor, dsl as l } from '@project-lakechain/nlp-text-processor';
import { S3StorageConnector } from '@project-lakechain/s3-storage-connector';
import { TextMetadata } from '@project-lakechain/sdk/models/document/metadata';
/**
* Example stack for moderating documents in a pipeline.
* The pipeline looks as follows:
*
* ┌──────────────┐ ┌─────────────────┐ ┌──────┐
* │ S3 Input ├──►│ NLP Processor ├──►| S3 │
* └──────────────┘ └─────────────────┘ └──────┘
*
*/
export class TextModerationPipeline extends cdk.Stack {
/**
* Stack constructor.
*/
constructor(scope: Construct, id: string, env: cdk.StackProps) {
super(scope, id, {
description: 'A pipeline demonstrating how to use Amazon Comprehend for text moderation.',
...env
});
///////////////////////////////////////////
/////// S3 Storage ///////
///////////////////////////////////////////
// The source bucket.
const source = new s3.Bucket(this, 'Bucket', {
encryption: s3.BucketEncryption.S3_MANAGED,
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
autoDeleteObjects: true,
removalPolicy: cdk.RemovalPolicy.DESTROY,
enforceSSL: true
});
// The moderated texts bucket.
const moderated = new s3.Bucket(this, 'Moderated', {
encryption: s3.BucketEncryption.S3_MANAGED,
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
autoDeleteObjects: true,
removalPolicy: cdk.RemovalPolicy.DESTROY,
enforceSSL: true
});
// The safe texts bucket.
const safe = new s3.Bucket(this, 'Safe', {
encryption: s3.BucketEncryption.S3_MANAGED,
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
autoDeleteObjects: true,
removalPolicy: cdk.RemovalPolicy.DESTROY,
enforceSSL: true
});
// The cache storage.
const cache = new CacheStorage(this, 'Cache', {});
///////////////////////////////////////////
/////// Lakechain Pipeline ///////
///////////////////////////////////////////
// Monitor a bucket for uploaded objects.
const trigger = new S3EventTrigger.Builder()
.withScope(this)
.withIdentifier('Trigger')
.withCacheStorage(cache)
.withBucket(source)
.build();
// The NLP text process will identify PII information
// and perform sentiment analysis
const nlpProcessor = new NlpTextProcessor.Builder()
.withScope(this)
.withIdentifier('NlpTextProcessor')
.withCacheStorage(cache)
.withSource(trigger)
.withIntent(
l.nlp()
.language()
.sentiment()
.pii(l.confidence(90))
)
.build()
const condition = new Condition.Builder()
.withScope(this)
.withIdentifier('Condition')
.withCacheStorage(cache)
.withSource(nlpProcessor)
.withConditional(async (event: CloudEvent) => {
const metadata = event.data().metadata();
const attrs = metadata.properties?.attrs as TextMetadata;
const piis = attrs.stats?.piis;
const sentiment = attrs.sentiment;
const has_pii = piis != 0;
const non_negative_sentiment = sentiment == "positive" || sentiment == "neutral";
return !has_pii && non_negative_sentiment;
})
.build();
// Writes the results to the moderated bucket when
// PII labels exist in the document metadata and the
// sentiment is not positive
condition.onMismatch(
new S3StorageConnector.Builder()
.withScope(this)
.withIdentifier('ModeratedStorage')
.withCacheStorage(cache)
.withDestinationBucket(moderated)
.build()
);
// Writes the results to the safe bucket when PII
// labels do not exist in the document metadata and
// the sentiment is positive
condition.onMatch(
new S3StorageConnector.Builder()
.withScope(this)
.withIdentifier('SafeStorage')
.withCacheStorage(cache)
.withDestinationBucket(safe)
.build()
);
// Display the source bucket information in the console.
new cdk.CfnOutput(this, 'SourceBucketName', {
description: 'The name of the source bucket.',
value: source.bucketName
});
// Display the moderated bucket information in the console.
new cdk.CfnOutput(this, 'ModeratedBucketName', {
description: 'The name of the bucket containing moderated documents.',
value: moderated.bucketName
});
// Display the safe bucket information in the console.
new cdk.CfnOutput(this, 'SafeBucketName', {
description: 'The name of the bucket containing safe documents.',
value: safe.bucketName
});
}
}
// Creating the CDK application.
const app = new cdk.App();
// Environment variables.
const account = process.env.CDK_DEFAULT_ACCOUNT ?? process.env.AWS_DEFAULT_ACCOUNT;
const region = process.env.CDK_DEFAULT_REGION ?? process.env.AWS_DEFAULT_REGION;
// Deploy the stack.
new TextModerationPipeline(app, 'TextModerationPipeline', {
env: {
account,
region
}
});
cd examples/simple-pipelines/text-moderation-pipeline
执行以下命令来创建管道
npm install
npm run build-pkg
亚马逊云科技证书及目标区域配置完成后,可将示例部署到您的账户
npm run deploy

您可以进入 AWS CloudWatch 日志控制台,实时查看部署了middlewares(中间件)的日志组中的日志:

我们尝试上传一个文件(小红帽)至 Input Bucket,来看一下效果:

数秒后,文件和一些元数据将被添加至 Safe Bucket

在元数据文档中,我们可以看到主要语种为英语💂,情绪为中性😐,Amazon Comprehend 未找到 PII 信息👤。
(...)
"metadata": {
"custom": {
"__condition_result": "true"
},
"language": "en",
"properties": {
"attrs": {
"pii": "s3://textmoderationpipeline-cachestorageXXXXX-XXXXX/nlp-text-processor/XXXXX",
"sentiment": "neutral",
"stats": {
"piis": 0
}
},
"kind": "text"
}
}
(...)
您可以自行尝试其他文档,记得结束后清理即可
npm run destroy

由 Amazon Bedrock SDXL 1.0 生成