背景
AWS Step Functions 是一项低代码、可视化的工作流服务,让开发人员可通过 AWS 服务轻松构建分布式应用程序、自动化 IT 和业务流程并构建数据和机器学习管道。通过嵌套 Step Functions 工作流程,您可以将大型和复杂的工作流程构建为更小型和简单的多个工作流程。但是,嵌套Step Functions需要您在同一个账号上部署。而在很多实际使用场景中,需要在多个AWS账号下进行工作流的编排。
文章提供了一个通过Amazon API Gateway来实现跨账号工作流服务的解决方案,并且提供AWS 管理控制台和AWS CDK基础架构即代码(IaC)两种部署方式作为参考。
AWS Step Functions
AWS Step Functions可以与其他AWS服务进行集成,在工作流中直接调用其他服务的的API。例如:
- 调用AWS Lambda function
- 插入或者读取Amazon DynamoDB的数据
- 运行一个 Amazon Elastic Container Service (Amazon ECS)任务,并等待它运行结束。
- 在Amazon Simple Queue Service (Amazon SQS)发送消息
- 运行其他的AWS Step Functions工作流
- 向Amazon API Gateway发送请求
更多请参考 Call other AWS services
架构设计
以下架构图展示了上游账号A的工作流通过API Gateway,异步调用下游账号B的工作流,并传递有效业务信息。其通过资源策略来对API和Step Functions工作流进行访问权限控制,防止匿名访问。此架构展示的是两个AWS账号间的跨账号工作流服务,你也可以扩展为跨多个区域、多个账号的工作流服务。
工作流程
- 上游账号A的Step Functions状态机向下游账号B的API Gateway端点发起请求,请求中包含下游账号B的Step Functions状态机ARN及其他业务信息。
- 下游账号B的API Gateway判断工作流流向,异步调用所需下游账号B的Step Functions状态机并传递业务信息。
- 下游账号B的Step Functions状态机运行其工作流。
配置步骤
文章会先展示在AWS管理控制台进行无代码的部署,如果您需要参考CDK代码,请至文章的CDK部署模块。
上游账号A
在Step Functions中建立状态机,用于向托管在不同账号上的API发起请求。
创建Step Functions状态机所需 IAM Role
使用控制台或者CDK创建IAM Role,命名为SenderStateMachineRole,满足以下条件:
- Permissions
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"execute-api:Invoke",
],
"Resource": "arn:aws:execute-api:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"logs:*",
],
"Resource": "*"
}
]
}
- Trust Relationships 选择AWS Step Functions
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "states.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
创建Step Functions状态机
上游账号A运行状态机工作流的任务,向下游账号B的API发送请求。
{
"Comment": "A description of my state machine",
"StartAt": "API Gateway Invoke",
"States": {
"API Gateway Invoke": {
"Type": "Task",
"Resource": "arn:aws:states:::apigateway:invoke",
"Parameters": {
"ApiEndpoint.$": "<下游账号B的API端点>",
"Method": "POST",
"Stage": "dev",
"Path": "/execution",
"Headers": {},
"RequestBody": {
"input.$": "$.body",
"stateMachineArn.$": "$.stateMachineArn"
},
"AuthType": "RESOURCE_POLICY"
},
"End": true
}
}
}
下游账号B
创建DynamoDB Table
创建Table命名为ReceiverTable,Partition Key为message。此Table会储存账号A状态机工作流向账号B发送的信息。
创建Step Functions状态机所需 IAM Role
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem"
],
"Resource": [
"arn:aws:dynamodb:<region>:<账号B>:table/ReceiverTable"
]
},
{
"Effect": "Allow",
"Action": [
"logs:*",
],
"Resource": "*"
},
]
}
创建Step Functions状态机
创建一个简单的状态机ReceiverStateMachine,可以执行DynamoDB的PutItem。该Task可以将API Gateway传来的参数存储进DynamoDB ReceiverTable。
{
"Comment": "A description of my state machine",
"StartAt": "PutItem",
"States": {
"PutItem": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "ReceiverTable",
"Item": {
"message": {
"S.$": "$.inputValue"
}
}
},
"End": true
}
}
}
创建API Gateway所需 IAM Role
使用控制台或者CDK创建IAM Role,命名为ReceiverApiRole,满足以下条件:
- Permissions
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"states:*"
],
"Resource": "<Receiver state machine ARN>"
}
]
}
- Trust relationships
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "apigateway.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
创建API Gateway RestAPI
- 创建RestAPI,命名为ReceiverApi。
- 编辑Resource Policy,让API仅接受来自账号A的状态机SenderStateMachine的API请求。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "states.amazonaws.com"
},
"Action": "execute-api:Invoke",
"Resource": "arn:aws:execute-api:<region>:<账号B>:<RestAPI ID>/*/*/*",
"Condition": {
"StringEquals": {
"aws:SourceArn": "arn:aws:states:<region>:<账号A>:stateMachine:SenderStateMachine"
}
}
}
]
}
- 建立Resources为 /execution,再建立method为POST。
- 设置Integration Request,使API在被请求时,向Step Functions发送StartExecution的命令
- 设置Method Request Authorization为AWS IAM
- Deploy API,选择[New Stage] ,输入dev为Stage name。
运行工作流
执行上游账号A的工作流,输入以下参数。
{
"stateMachineArn": "<ReceiverStateMachineArn>",
"body": "{\"inputValue\":\"some message\"}"
}
运行结果
- 上游账号A状态机SenderStateMachine
- 下游账号B状态机ReceiverStateMachine
- DynamoDB插入数据
AWS CDK部署(参考代码)
上游账号A
import { Construct, Stack, StackProps } from '@aws-cdk/core'
import { PolicyDocument, PolicyStatement, Role, ServicePrincipal } from '@aws-cdk/aws-iam'
import { JsonPath, StateMachine, Succeed, TaskInput } from "@aws-cdk/aws-stepfunctions";
import { AuthType, CallApiGatewayRestApiEndpoint, HttpMethod } from "@aws-cdk/aws-stepfunctions-tasks";
import { LogGroup } from "@aws-cdk/aws-logs";
import { RestApi } from '@aws-cdk/aws-apigateway'
export class SenderStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// sender state machine Cloudwatch log group
const logGroup = new LogGroup(this, "LogGroup", {
logGroupName: "SenderStateMachineLogGroup"
})
// sender state machine IAM role
const senderStateMachineRole = new Role(this, "SenderStateMachineRole", {
roleName: "SenderStateMachineRole",
assumedBy: new ServicePrincipal("states.amazonaws.com"),
inlinePolicies: {
stateMachinePolicy: new PolicyDocument({
statements: [
// api invoke permission
new PolicyStatement({
actions: ["execute-api:Invoke"],
resources: ["arn:aws:execute-api:*:*:*"]
}),
new PolicyStatement({
actions: ["logs:*"],
resources: [logGroup.logGroupArn]
})
]
})
}
})
// sender Step Functions state machine to call api from account B
const senderStateMachine = new StateMachine(this, 'SenderStateMachine', {
stateMachineName: "SenderStateMachine",
definition: new CallApiGatewayRestApiEndpoint(this, "CallApiGatewayRestApiEndpoint", {
api: <ReceiverStateMachine arn>,
stageName: "dev",
method: HttpMethod.POST,
requestBody: TaskInput.fromObject({
input: JsonPath.stringAt('$.body'),
stateMachineArn: JsonPath.stringAt('$.receiverStateMachineArn'),
}),
authType: AuthType.RESOURCE_POLICY
})
.next(new Succeed(this, "TaskCompleted")),
role: receiverStateMachineRole,
logs: {
destination: logGroup
}
});
}
}
下游账号B
import { Stack, Construct, StackProps } from '@aws-cdk/core'
import { Table, AttributeType } from '@aws-cdk/aws-dynamodb'
import { Role, Effect, PolicyDocument, PolicyStatement, ServicePrincipal, Condition } from '@aws-cdk/aws-iam'
import { LogGroup } from '@aws-cdk/aws-logs'
import { StateMachine, JsonPath, Succeed } from '@aws-cdk/aws-stepfunctions'
import { DynamoPutItem, DynamoAttributeValue } from "@aws-cdk/aws-stepfunctions-tasks";
import { RestApi, AwsIntegration, AuthorizationType, Stage, Deployment } from '@aws-cdk/aws-apigateway'
export class ReceiverStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// DynamoDB table
const receiverTable = new Table(this, "ReceiverTable", {
tableName: "ReceiverTable",
partitionKey: { name: 'message', type: AttributeType.STRING }
});
// receiver state machine Cloudwatch log group
const logGroup = new LogGroup(this, "LogGroup", {
logGroupName: "ReceiverStateMachineLogGroup"
})
// receiver state machine IAM role
const receiverStateMachineRole = new Role(this, "ReceiverStateMachineRole", {
roleName: "ReceiverStateMachineRole",
assumedBy: new ServicePrincipal("states.amazonaws.com"),
inlinePolicies: {
stateMachinePolicy: new PolicyDocument({
statements: [
// DynamoDB permissions
new PolicyStatement({
actions:[
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem"],
resources: [receiverTable.tableArn],
effect: Effect.ALLOW
}),
new PolicyStatement({
actions: [
"logs:*"
],
resources: [logGroup.logGroupArn],
effect: Effect.ALLOW
})
]
})}})
// receiver Step Functions state machine to put item in DynamoDB
const receiverStateMachine = new StateMachine(this, 'ReceiverStateMachine', {
stateMachineName: "ReceiverStateMachine",
definition: new DynamoPutItem(this, "DynamodbPutItemTask", {
item: {
message: DynamoAttributeValue.fromString(JsonPath.stringAt('$.inputValue')),
},
table: receiverTable
}).next(new Succeed(this, "TaskCompleted")),
role: receiverStateMachineRole,
logs: {
destination: logGroup
}
});
// API Gateway IAM role
const apiRole = new Role(this, 'ReceiverApiRole', {
roleName: "ReceiverApiRole",
assumedBy: new ServicePrincipal("apigateway.amazonaws.com"),
inlinePolicies: {
apiPolicy: new PolicyDocument({
statements: [
new PolicyStatement({
actions: ["logs:*"],
resources: ["*"]
}),
// execute state machine permissions
new PolicyStatement({
actions: ["states:*"],
resources: [receiverStateMachine.stateMachineArn]
})
]
})
}
})
// API Gateway RestAPI
const api = new RestApi(this, "ReceiverApi", {
restApiName: "ReceiverApi"
})
// resource policy to restrict access
const resourcePolicy = new PolicyDocument({
statements: [
new PolicyStatement({
actions: ["execute-api:Invoke"],
resources: [api.arnForExecuteApi("POST", "/execution")],
conditions: {
StringEquals: {
<SenderStateMachine arn>
}
}
})
]
})
// API resource to start state machine execution with AWS Integration
api.root.addResource('execution').addMethod('POST',
new AwsIntegration({
service: "states",
action: "StartExecution",
region: this.region,
options: {
credentialsRole: apiRole
}
}),
{
authorizationType: AuthorizationType.IAM
})
// deploy api
const deployment = new Deployment(this, 'Deployment', {api});
const stage = new Stage(this, 'dev', {deployment});
}
}
总结
文章提供了部署基于API Gateway和Step Functions的跨账号工作流的详细步骤,并提供了CDK部署的参考代码。上游账号的Step Functions工作流通过API Gateway作为前端,调用下游账号的工作流。此方案可以进行扩展,通过一个上游工作流集中管理账号,管理多区域和多账号的工作流服务,避免了在不同账号进行重复工作,
更多的了解无服务器架构及Step Functions,请参考 AWS Step Functions
本篇作者