通过 Project Lakechain 创建文件处理管道

了解如何通过 Project Lakechain 在亚马逊云科技创建 AI 驱动的云原生文件处理管道

Amazon Comprehend
AWS Rekognition
教程
亚马逊云科技
Olawale Olaleye
难度
100 - 初级
时间
20 分钟
前提条件

海外区域: 注册 / 登录 亚马逊云科技

开始前,请确认已安装以下工具并正确配置:

上次更新时间
2024 年 6 月 26 日

概述

在本文中,我们将探索 Project Lakechain--亚马逊云科技实验室推出的一个全新酷炫项目,可用于创建现代化的、AI 驱动的文件处理管道。

Project Lakechain 基于 AWS Cloud Development Kit (AWS CDK),提供 60 多种用于处理处理图像、文本、音频、视频的即用型组件,还集成了 Amazon Bedrock 等生成式 AI 服务和 Amazon OpenSearchLanceDB 等向量数据库。

Project Lakechain 的亮点在于,可以轻松地将这些组件组合成复杂的处理管道,实现数以百万计的文档的大规模处理,而文档质量是紧随其后的另一个亮点……一切只需通过基础设施即代码 (IaC) 即可实现。

先决条件

开始前,请确认已安装以下工具并正确配置:

更多详情,请查阅 Lakechain 文档的先决条件章节。

初始设置

只需克隆仓库,即可启动 Lakechain

git clone https://github.com/awslabs/project-lakechain
cd project-lakechain

安装项目依赖项

npm install

快速了解

此时可以试用其中一个简单的管道,也可以大胆尝试一下端对端的用例最好先从快速入门章节开始。它将引导您使用 AWS Rekognition 部署面部识别管道

文本审核

我们来尝试从零开始搭建一个管道,上面的快速了解部分适用于图片审核,让我们试着创建一个类似的文本管道。

💡本文并不会讲解 AWS CDK 的基础知识。如有需要,请查阅 AWS CDK 开发指南

如果使用标准的 Mermaid 语言风格,我们将建立如下所示的管道:

flowchart LR
  Input([Input Bucket]) -.-> S3[S3 Trigger]
  S3 -. Document .-> Comprehend[NLP Text Processor]
  Comprehend --> PiiCondition{Contains PII Information?}
  PiiCondition -- Yes --> Moderated[Moderated S3 Bucket]
  PiiCondition -- No --> SentimentCondition{Non-Negative Sentiment?}
  SentimentCondition -- No --> Moderated
  SentimentCondition -- Yes --> Safe[Safe S3 Bucket]

其工作原理如下:

每次上传文件至 Input Bucket 时,都会触发该管道。

接着该管道将调用 Amazon Comprehend 驱动的自然语言处理器 (NLP),以确定主要语种、进行情绪分析、检测个人识别信息 (PII)

如果不包含 PII 数据且为非负面情绪,文本将被发送至 Safe Bucket。

否则,它将存放在 Moderated Bucket 中等待人工审核。如果您想查看代码,可以查看下面完整的文本审核堆栈:

// stack.ts
import * as cdk from 'aws-cdk-lib';
import * as s3 from 'aws-cdk-lib/aws-s3';

import { Construct } from 'constructs';
import { CacheStorage } from '@project-lakechain/core';
import { Condition, CloudEvent } from '@project-lakechain/condition';
import { S3EventTrigger } from '@project-lakechain/s3-event-trigger';
import { NlpTextProcessor, dsl as l } from '@project-lakechain/nlp-text-processor';
import { S3StorageConnector } from '@project-lakechain/s3-storage-connector';
import { TextMetadata } from '@project-lakechain/sdk/models/document/metadata';

/**
 * Example stack for moderating documents in a pipeline.
 * The pipeline looks as follows:
 *
 * ┌──────────────┐   ┌─────────────────┐   ┌──────┐
 * │   S3 Input   ├──►│  NLP Processor  ├──►|  S3  │
 * └──────────────┘   └─────────────────┘   └──────┘
 *
 */
export class TextModerationPipeline extends cdk.Stack {

  /**
   * Stack constructor.
   */
  constructor(scope: Construct, id: string, env: cdk.StackProps) {
    super(scope, id, {
      description: 'A pipeline demonstrating how to use Amazon Comprehend for text moderation.',
      ...env
    });

    ///////////////////////////////////////////
    ///////         S3 Storage          ///////
    ///////////////////////////////////////////

    // The source bucket.
    const source = new s3.Bucket(this, 'Bucket', {
      encryption: s3.BucketEncryption.S3_MANAGED,
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      autoDeleteObjects: true,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      enforceSSL: true
    });

    // The moderated texts bucket.
    const moderated = new s3.Bucket(this, 'Moderated', {
      encryption: s3.BucketEncryption.S3_MANAGED,
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      autoDeleteObjects: true,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      enforceSSL: true
    });
  
    // The safe texts bucket.
    const safe = new s3.Bucket(this, 'Safe', {
      encryption: s3.BucketEncryption.S3_MANAGED,
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      autoDeleteObjects: true,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      enforceSSL: true
    });

    // The cache storage.
    const cache = new CacheStorage(this, 'Cache', {});

    ///////////////////////////////////////////
    ///////     Lakechain Pipeline      ///////
    ///////////////////////////////////////////

    // Monitor a bucket for uploaded objects.
    const trigger = new S3EventTrigger.Builder()
      .withScope(this)
      .withIdentifier('Trigger')
      .withCacheStorage(cache)
      .withBucket(source)
      .build();

    // The NLP text process will identify PII information
    // and perform sentiment analysis
    const nlpProcessor = new NlpTextProcessor.Builder()
      .withScope(this)
      .withIdentifier('NlpTextProcessor')
      .withCacheStorage(cache)
      .withSource(trigger)
      .withIntent(
        l.nlp()
          .language()
          .sentiment()
          .pii(l.confidence(90))
      )
      .build()

    const condition = new Condition.Builder()
      .withScope(this)
      .withIdentifier('Condition')
      .withCacheStorage(cache)
      .withSource(nlpProcessor)
      .withConditional(async (event: CloudEvent) => {
        const metadata = event.data().metadata();
        const attrs = metadata.properties?.attrs as TextMetadata;
        const piis = attrs.stats?.piis;
        const sentiment = attrs.sentiment;
        const has_pii = piis != 0;
        const non_negative_sentiment = sentiment == "positive" || sentiment == "neutral";
        return !has_pii && non_negative_sentiment;
      })
      .build();

    // Writes the results to the moderated bucket when
    // PII labels exist in the document metadata and the
    // sentiment is not positive
    condition.onMismatch(
      new S3StorageConnector.Builder()
        .withScope(this)
        .withIdentifier('ModeratedStorage')
        .withCacheStorage(cache)
        .withDestinationBucket(moderated)
        .build()
    );

    // Writes the results to the safe bucket when PII
    // labels do not exist in the document metadata and
    // the sentiment is positive
    condition.onMatch(
      new S3StorageConnector.Builder()
        .withScope(this)
        .withIdentifier('SafeStorage')
        .withCacheStorage(cache)
        .withDestinationBucket(safe)
        .build()
    );

    // Display the source bucket information in the console.
    new cdk.CfnOutput(this, 'SourceBucketName', {
      description: 'The name of the source bucket.',
      value: source.bucketName
    });
  
    // Display the moderated bucket information in the console.
    new cdk.CfnOutput(this, 'ModeratedBucketName', {
      description: 'The name of the bucket containing moderated documents.',
      value: moderated.bucketName
    });

    // Display the safe bucket information in the console.
    new cdk.CfnOutput(this, 'SafeBucketName', {
      description: 'The name of the bucket containing safe documents.',
      value: safe.bucketName
    });
  }
}

// Creating the CDK application.
const app = new cdk.App();

// Environment variables.
const account = process.env.CDK_DEFAULT_ACCOUNT ?? process.env.AWS_DEFAULT_ACCOUNT;
const region = process.env.CDK_DEFAULT_REGION ?? process.env.AWS_DEFAULT_REGION;

// Deploy the stack.
new TextModerationPipeline(app, 'TextModerationPipeline', {
  env: {
    account,
    region
  }
});

好消息!无需创建新的堆栈,因为最新版本的代码中已经包含了文本审核管道的实现。

在 Project Lakechain 仓库中,找到文本审核管道目录

cd examples/simple-pipelines/text-moderation-pipeline

执行以下命令来创建管道

npm install
npm run build-pkg

亚马逊云科技证书及目标区域配置完成后,可将示例部署到您的账户

npm run deploy

您可以进入 AWS CloudWatch 日志控制台,实时查看部署了middlewares(中间件)的日志组中的日志:

我们尝试上传一个文件(小红帽)至 Input Bucket,来看一下效果:

数秒后,文件和一些元数据将被添加至 Safe Bucket

在元数据文档中,我们可以看到主要语种为英语💂,情绪为中性😐,Amazon Comprehend 未找到 PII 信息👤。

(...)
"metadata": {
    "custom": {
        "__condition_result": "true"
    },
    "language": "en",
    "properties": {
        "attrs": {
            "pii": "s3://textmoderationpipeline-cachestorageXXXXX-XXXXX/nlp-text-processor/XXXXX",
            "sentiment": "neutral",
            "stats": {
                "piis": 0
            }
        },
        "kind": "text"
    }
}
(...)

您可以自行尝试其他文档,记得结束后清理即可

npm run destroy

由 Amazon Bedrock SDXL 1.0 生成