亚马逊AWS官方博客

最新推出 — AWS Step Functions 现在支持 200 项 AWS 服务,可更轻松地实现工作流程自动化

今天,AWS Step Functions 推出新的 AWS 开发工具包服务集成功能,将受支持的 AWS 服务的数量从 17 个扩展到 200 多个,AWS API 操作从 46 个增加到 9,000 个以上。

当开发人员构建分布式架构时,他们使用的模式之一是基于工作流的编排模式。此模式有助于在服务内部实现工作流自动化,以执行分布式事务。分布式事务的一个例子是处理订单并始终跟踪事务状态所需的所有任务。

Step Functions 是一种用于实现工作流程自动化的低代码可视化工作流服务,可编排服务并帮助您应用此模式。开发人员将 Step Functions 与托管服务结合使用,例如人工智能服务Amazon Simple Storage Service (Amazon S3)Amazon DynamoDB

隆重推出 Step Functions AWS 开发工具包服务集成
迄今为止,当开发人员构建与 AWS 服务集成的工作流时,他们只能从 Step Functions 提供的 46 种受支持的服务集成中进行选择。如果服务集成不可用,他们必须在 AWS Lambda 函数中编码实现集成。这种方式并不理想,因为会增加应用程序的复杂性和成本。

现在,借助 Step Functions AWS 开发工具包服务集成,开发人员可以将状态机直接集成到支持 AWS 开发工具包的 AWS 服务。

您可以通过 Amazon States Language (ASL)AWS Cloud Development Kit (AWS CDK) 或可视化的 AWS Step Function Workflow Studio 创建使用 AWS 开发工具包服务集成的状态机。要开始使用,请创建一个新的任务状态。然后从任务状态资源字段中的 ASL 直接调用 AWS 开发工具包服务。为此,请使用以下语法。

arn:aws:states:::aws-sdk:serviceName:apiAction.[serviceIntegrationPattern]

下面我将通过一个演示向大家展示如何开始使用这项功能。

演示
在本演示中,我们将构建一个应用程序,对存储在 S3 中的给定视频文件进行转录,然后再从英语翻译成西班牙语。

我们使用 Step Functions 来构建这次演示。带有服务集成的状态机可直接集成到 S3Amazon TranscribeAmazon Translate。用于转录的 API 是异步的。要验证转录任务是否已完成,我们需要一个轮询循环,该循环将等待转录任务就绪。

要构建的状态机

创建状态机
要跟随此演示一起操作,您需要具备以下先决条件:

  • 一个用于放置要处理的原始文件的 S3 存储桶
  • 一个存储在该存储桶中的英文视频或音频文件
  • 一个用于在其中执行前述操作的 S3 存储桶

我将展示如何使用 AWS 管理控制台进行此演示。如果您想将此演示部署为基础设施即代码,请为此项目部署 AWS CloudFormation 模板。

要开始此演示,请创建一个新的标准状态机。选择选项 Write your workflow in code(用代码编写工作流)以使用 ASL 构建状态机。为状态机创建名称并创建新角色。

创建状态机

开启转录任务
要开始进行状态机定义,可以编辑状态机。

编辑状态机定义

以下 ASL 代码片段是包含两个任务的状态机,两个任务均使用新的 AWS 开发工具包服务集成功能。第一个任务将文件从一个 S3 存储桶复制到另一个存储桶中,第二个任务直接通过调用 Amazon Transcribe 开启转录任务

要使用 Step Functions 中的这项新功能,状态类型必须为“任务”。您需要使用以下语法指定服务名称和 API 操作:“arn:aws:states:::aws-sdk:serviceName:apiAction.<serviceIntegrationPattern>”。为资源字段中的 apiAction 名称使用 camelCase,例如“copyObject”,为参数字段的参数名称使用 PascalCase,例如“CopySource”。

对于各项参数,可在此服务的 AWS API 文档及 API 操作中查找名称和所需参数。

{
  "Comment": "A State Machine that process a video file",
  "StartAt": "GetSampleVideo",
  "States": {
    "GetSampleVideo": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:s3:copyObject",
      "Parameters": {
        "Bucket.$": "$.S3BucketName",
        "Key.$": "$.SampleDataInputKey",
        "CopySource.$": "States.Format('{}/{}',$.SampleDataBucketName,$.SampleDataInputKey)"
      },
      "ResultPath": null,
      "Next": "StartTranscriptionJob"
    },
    "StartTranscriptionJob": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:transcribe:startTranscriptionJob",
      "Parameters": {
        "Media": {
          "MediaFileUri.$": "States.Format('s3://{}/{}',$.S3BucketName,$.SampleDataInputKey)"
        },
        "TranscriptionJobName.$": "$$.Execution.Name",
        "LanguageCode": "en-US",
        "OutputBucketName.$": "$.S3BucketName",
        "OutputKey": "transcribe.json"
      },
      "ResultPath": "$.transcription",
      "End": true
    }
  }
}

在上一段代码中,我们可以看到 ASL 提供的一个内置函数的有趣使用案例。我们可以使用不同的参数构建一个字符串。将内置函数与 AWS 开发工具包服务集成结合使用,可以在不使用 Lambda 函数的情况下操作数据。例如下行:

"MediaFileUri.$": "States.Format('s3://{}/{}',$.S3BucketName,$.SampleDataInputKey)"

向状态机授予权限
如果现在开始执行状态机,它将失败。因为此状态机无权访问 S3 存储桶或使用 Amazon TranscribeStep Functions 无法为大多数 AWS 开发工具包服务集成自动生成 IAM 策略,因此我们需要手动将这些策略添加到角色中。

将这些权限添加到为此状态机创建的 IAM 角色。您可以在状态机详细信息中找到角色的快速链接。将“AmazonTranscribeFullAccess”和“AmazonS3FullAccess”策略附加到该角色。

IAM 角色的链接

首次运行状态机
现在权限已经就绪,可以开始运行这个状态机了。此状态机的输入包括用于存储已上传的原始视频的 S3 存储桶名称、文件的名称以及要存储此文件并执行全部处理的 S3 存储桶的名称。

此状态机要正常工作,这里的文件必须是视频或音频文件,并且必须使用英语。转录任务完成后,它会将结果保存在输入中指定的存储桶中,名称为 transcribe.json。

 {
  "SampleDataBucketName": "<name of the bucket where the original file is>",
  "SampleDataInputKey": "<name of the original file>",
  "S3BucketName": "<name of the bucket where the processing will happen>"
}

由于 StartTranscriptionJob 是异步调用,我们不会立即看到结果。状态机只会调用 API,然后它就完成了。我们需要等到转录任务准备就绪,方可在输出存储桶中的 transcribe.json 文件中看到结果。

添加轮询循环
因为我们想使用转录结果进行翻译文本,所以状态机需要等待转录任务完成。要在状态机中构建 API 轮询器,可以使用“任务”、“等待”和“选择”状态。

  • 任务状态获取任务的状态。在本例中,它调用 Amazon Transcribe 服务以及 API getTranscriptionJob
  • 等待状态等待 20 秒,转录任务的时长取决于输入文件的大小。
  • 根据任务状态的结果,选择状态会移动到正确的步骤。如果任务完成,它将移动到状态机中的下一步,如果没有完成,它将继续等待。

轮询循环的状态

等待状态
我们要添加的第一个状态是等待状态。下面是一个等待 20 秒的简单状态。

"Wait20Seconds": {
        "Type": "Wait",
        "Seconds": 20,
        "Next": "CheckIfTranscriptionDone"
      },

任务状态
要添加的下一个状态是任务状态,它调用 API getTranscriptionJob。要调用此 API,我们需要传递转录任务名称。此状态返回作为选择状态输入的任务状态。

"CheckIfTranscriptionDone": {
        "Type": "Task",
        "Resource": "arn:aws:states:::aws-sdk:transcribe:getTranscriptionJob",
        "Parameters": {
          "TranscriptionJobName.$": "$.transcription.TranscriptionJob.TranscriptionJobName"
        },
        "ResultPath": "$.transcription",
        "Next": "IsTranscriptionDone?"
      },

选择状态
选择状态使用一个规则来检查转录任务的状态是否为已完成。如果该规则返回结果为 True,将进入下一个状态。如果为 False,则将进入等待状态。

 "IsTranscriptionDone?": {
        "Type": "Choice",
        "Choices": [
          {
            "Variable": "$.transcription.TranscriptionJob.TranscriptionJobStatus",
            "StringEquals": "COMPLETED",
            "Next": "GetTranscriptionText"
          }
        ],
        "Default": "Wait20Seconds"
      },

获取转录文本
在这一步中,我们将从转录任务返回的输出文件中提取转录文本。我们只需要转录文本,而结果文件中有许多元数据,这使文件过长且难以翻译。

这一步通常使用 Lambda 函数来完成。但可以使用 ASL 直接从状态机上执行此操作。

首先,我们需要使用AWS 开发工具包服务集成创建一个状态,用于从 S3 获取结果文件。然后使用另一个 ASL 内置函数将文件文本从字符串转换为 JSON。

在下一个状态中,您可以将文件作为 JSON 对象进行处理。此状态为“通过”状态,它清理先前状态的输出,仅提取转录的文本。

 "GetTranscriptionText": {
        "Type": "Task",
        "Resource": "arn:aws:states:::aws-sdk:s3:getObject",
        "Parameters": {
          "Bucket.$": "$.S3BucketName",
          "Key": "transcribe.json"
        },
        "ResultSelector": {
          "filecontent.$": "States.StringToJson($.Body)"
        },
        "ResultPath": "$.transcription",
        "Next": "PrepareTranscriptTest"
      },
  
      "PrepareTranscriptTest" : {
        "Type": "Pass",
        "Parameters": {
          "transcript.$": "$.transcription.filecontent.results.transcripts[0].transcript"
        },
        "Next": "TranslateText"
      },

翻译文本
准备好转录的文本后,就可以开始进行翻译了。为此,我们可以直接从状态机使用 Amazon Translate API translateText。这是本状态机的最后一个状态,它将在此状态的输出中返回翻译后的文本。

"TranslateText": {
        "Type": "Task",
        "Resource": "arn:aws:states:::aws-sdk:translate:translateText",
        "Parameters": {
          "SourceLanguageCode": "en",
          "TargetLanguageCode": "es",
          "Text.$": "$.transcript"
         },
         "ResultPath": "$.translate",
        "End": true
      }

通过附加托管策略“TranslateReadOnly”,向状态机添加调用 Translate API 的权限。

现在一切准备就绪,可以开始运行状态机了。状态机结束运行后,您将在最后一个状态的输出中看到翻译后的文本。

结束状态机

重要注意事项
以下是一些对使用 AWS 开发工具包服务集成有帮助的信息:

  • 从任务状态资源字段中的 ASL 直接调用 AWS 开发工具包服务。要执行此操作,请使用以下语法:arn:aws:states:::aws-sdk:serviceName:apiAction.[serviceIntegrationPattern]
  • 为资源字段中的 apiAction 名称使用 camelCase,例如“copyObject”,为参数字段的参数名称使用 PascalCase,例如“CopySource”。
  • Step Functions 无法为大多数 AWS 开发工具包服务集成自动生成 IAM 策略,因此我们需要手动将这些策略添加到状态机的 IAM 角色中。
  • 充分利用 ASL 内置函数,这些函数可以操作数据,避免在简单的转换中使用 Lambda 函数。

立即开始使用!
AWS 开发工具包服务集成在以下区域正式推出:美国东部(弗吉尼亚北部)美国东部(俄亥俄)美国西部(俄勒冈)加拿大(中部)欧洲(爱尔兰)欧洲(米兰)非洲(开普敦)亚太地区(东京)。该功能在未来几天内将在提供 Step Functions 的所有其他商业区域正式推出。

如需了解有关这项新功能的详情,请阅读相关文档

Marcia