亚马逊AWS官方博客

使用 AWS Lambda 持久函数构建多步骤应用程序和人工智能工作流程

现代应用程序越来越需要服务之间进行复杂的长期协调,例如多步骤支付处理、人工智能代理编排或等待人工决策的审批流程。传统而言,构建这些服务需要付出大量精力来实施状态管理、处理故障和集成多种基础设施服务。

从今天开始,您可以使用 AWS Lambda 持久函数直接在熟悉的 AWS Lambda 体验中构建可靠的多步骤应用程序。持久函数是常规的 Lambda 函数,它们具有您已经熟悉的相同事件处理程序和集成。您可以用自己的首选编程语言编写顺序代码,持久函数会跟踪进度,在失败时自动重试,并在规定的时间点暂停执行长达一年,无需为等待期间的空闲计算付费。

AWS Lambda 持久函数使用检查点和重播机制(称为“持久执行”)来提供这些功能。启用可持久执行的函数后,您可以将新的开源持久执行开发工具包添加到函数代码中。然后,您可以使用诸如“步骤”之类的开发工具包原语,在业务逻辑中添加自动检查点和重播,以及使用“等待”原语,高效地暂停执行而无需支付计算费用。当执行意外终止时,Lambda 将从最后一个检查点恢复,从头开始重播事件处理程序,同时跳过已完成的操作。

开始使用 AWS Lambda 持久函数
让我来为您介绍如何使用持久函数。

首先,在控制台中创建一个新的 Lambda 函数,然后选择从头开始创作。在持久执行部分中,选择启用。请注意,只能在函数创建期间设置持久函数设置,并且目前无法修改现有 Lambda 函数。

创建 Lambda 持久函数后,就可以开始使用提供的代码了。

Lambda 持久函数引入了两个用于处理状态管理和恢复的核心原语:

  • 步骤context.step() 方法可为业务逻辑添加自动重试和检查点。步骤设置完成后,重播期间将跳过该步骤。
  • 等待context.wait() 方法可在指定的时间内暂停执行、终止函数、暂停和恢复执行,不收取计算费用。

此外,Lambda 持久函数还为更复杂的模式提供其他操作:create_callback() 可创建回调,您可以使用该回调来等待 API 响应或人工审批等外部事件的结果;wait_for_condition() 可暂停执行,直到满足特定条件,例如轮询 REST API 来确认是否完成进程;以及适用于高级并发使用案例的 parallel()map() 操作。

建立生产就绪型订单处理工作流程
现在,我们来扩展默认示例,构建生产就绪型订单处理工作流程。我将演示如何使用回调进行外部审批、正确处理错误以及配置重试策略。为了专注于这些核心概念,我特意使用了简洁明了的代码。在全面实施中,您可以使用 Amazon Bedrock 增强验证步骤,添加人工智能驱动的订单分析。

以下是订单处理工作流程的工作原理:

  • 首先,validate_order() 会检查订单数据以确保所有必填字段都存在。
  • 接下来,send_for_approval() 会发送订单以供外部人工审批并等待回调响应,同时暂停执行,这不会收取计算费用。
  • 然后,process_order() 会完成订单处理。
  • 在整个工作流程中,try-catch 错误处理机制会区分立即停止执行的终端错误和触发自动重试的步骤内可恢复错误。

以下是包含步骤定义和主处理程序的完整订单处理工作流程:

import random
from aws_durable_execution_sdk_python import (
    DurableContext,
    StepContext,
    durable_execution,
    durable_step,
)
from aws_durable_execution_sdk_python.config import (
    Duration,
    StepConfig,
    CallbackConfig,
)
from aws_durable_execution_sdk_python.retries import (
    RetryStrategyConfig,
    create_retry_strategy,
)


@durable_step
def validate_order(step_context: StepContext, order_id: str) -> dict:
    """正在使用人工智能验证订单数据。"""
    step_context.logger.info(f"正在验证订单: {order_id}")
    # 在生产环境中: 调用 Amazon Bedrock 验证订单的完整性和准确性
    return {"order_id": order_id, "status": "validated"}


@durable_step
def send_for_approval(step_context: StepContext, callback_id: str, order_id: str) -> dict:
    """使用提供的回调令牌发送订单以等待审批。"""
    step_context.logger.info(f"正在发送订单 {order_id} 以等待审批,callback_id: {callback_id}")
    
    # 在生产环境中: 将 callback_id 发送至外部审批系统
    # 审批完成后,外部系统将使用此 callback_id 调用
    # Lambda 的 SendDurableExecutionCallbackSuccess 或 SendDurableExecutionCallbackFailure API
    
    return {
        "order_id": order_id,
        "callback_id": callback_id,
        "status": "sent_for_approval"
    }


@durable_step
def process_order(step_context: StepContext, order_id: str) -> dict:
    """处理订单,并包含针对瞬时故障的重试逻辑。"""
    step_context.logger.info(f"正在处理订单: {order_id}")
    # 模拟有时会失败的不稳定 API
    if random.random() > 0.4:
        step_context.logger.info("处理失败,将重试")
        raise Exception("处理失败")
    return {
        "order_id": order_id,
        "status": "processed",
        "timestamp": "2025-11-27T10:00:00Z",
    }


@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
    try:
        order_id = event.get("order_id")
        
        # 步骤 1: 验证订单
        validated = context.step(validate_order(order_id))
        if validated["status"] != "validated":
            raise Exception("验证失败")  # 终端错误 - 停止执行
        context.logger.info(f"订单已验证: {validated}")
        
        # 步骤 2: 创建回调
        callback = context.create_callback(
            name="awaiting-approval",
            config=CallbackConfig(timeout=Duration.from_minutes(3))
        )
        context.logger.info(f"已创建回调,id: {callback.callback_id}")
        
        # 步骤 3: 使用 callback_id 发送审批请求
        approval_request = context.step(send_for_approval(callback.callback_id, order_id))
        context.logger.info(f"审批请求已发送: {approval_request}")
        
        # 步骤 4: 等待回调结果
        # 此处会受阻,直至外部系统调用 SendDurableExecutionCallbackSuccess 或 SendDurableExecutionCallbackFailure
        approval_result = callback.result()
        context.logger.info(f"已收到审批结果: {approval_result}")
        
        # 步骤 5: 使用自定义重试策略处理订单
        retry_config = RetryStrategyConfig(max_attempts=3, backoff_rate=2.0)
        processed = context.step(
            process_order(order_id),
            config=StepConfig(retry_strategy=create_retry_strategy(retry_config)),
        )
        if processed["status"] != "processed":
            raise Exception("处理失败")  # 终端错误
        
        context.logger.info(f"订单已成功处理: {processed}")
        return processed
        
    except Exception as error:
        context.logger.error(f"处理订单时出错: {error}")
        raise error  # 重新抛出以使执行失败

这段代码演示了几个重要的概念:

  • 错误处理:try-catch 模块处理终端错误。在步骤之外抛出未处理的异常(例如验证检查)时,它将立即终止执行。在订单数据无效等没有必要进行重试的情况下,这很有用。
  • 步骤重试:在 process_order 步骤中,异常会根据默认值(步骤 1)或配置的 RetryStrategy(步骤 5)触发自动重试。这可以处理瞬时故障,例如临时 API 不可用。
  • 日志:我在主处理程序中使用 context.logger,在步骤中使用 step_context.logger。上下文记录器会在重播期间抑制重复日志。

现在,我使用 order_id 创建一个测试事件,并异步调用该函数来启动订单工作流程。我导航到测试选项卡并填写可选的持久执行名称来标识此次执行。请注意,持久函数提供内置的幂等性。如果我两次使用相同的执行名称调用函数,则第二次调用将返回现有的执行结果,而不是创建副本。

我可以通过导航到 Lambda 控制台中的持久执行选项卡来监控执行情况:

在这里,我可以看到每个步骤的状态和时间。执行显示 CallbackStarted,然后显示 InvocationCompleted,这表示该函数已终止并暂停执行,以免在等待审批回调时产生空闲费用。

我现在可以通过选择发送成功发送失败直接从控制台完成回调,也可以使用 Lambda API 以编程方式完成回调。

我选择发送成功

回调完成后,执行将继续并处理订单。如果 process_order 步骤由于模拟的不稳定 API 而失败,则系统将根据配置的策略自动重试。所有重试成功后,执行就能成功完成。

使用 Amazon EventBridge 监控执行情况
您还可以使用 Amazon EventBridge 监控持久函数的执行。Lambda 会自动将执行状态更改事件发送到默认事件总线,以便您构建下游工作流程、发送通知或与其他 AWS 服务集成。

要接收这些事件,请使用以下模式在默认事件总线上创建 EventBridge 规则:

{
  "source": ["aws.lambda"],
  "detail-type": ["Durable Execution Status Change"]
}

注意事项
以下是需要注意的要点:

  • 可用性:Lambda 持久函数现已在美国东部(俄亥俄州)AWS 区域推出。要了解最新的区域可用性,请访问按区域列出的 AWS 功能页面。
  • 编程语言支持:在发布时,AWS Lambda 持久函数支持 JavaScript/TypeScript(Node.js 22/24)和 Python(3.13/3.14)。我们建议您使用首选程序包管理器,将持久执行开发工具包与您的函数代码捆绑在一起。这些开发工具包发展迅速,因此当新功能可用时,您可以轻松更新依赖关系。
  • 使用 Lambda 版本:在将持久函数部署到生产环境时,使用 Lambda 版本来确保始终在相同的代码版本上进行重播。如果您在暂停执行时更新函数代码,重播将使用启动执行的版本,从而防止在长期工作流程中因代码更改而出现不一致的情况。
  • 测试持久函数:您可以使用具有 pytest 集成的单独的测试开发工具包,以及适用于更复杂的集成测试的 AWS Serverless Application Model(AWS SAM)命令行界面(CLI),在不使用 AWS 凭证的情况下在本地测试持久函数。
  • 开源开发工具包:持久执行开发工具包针对 JavaScript/TypeScriptPython 开源。您可以查看源代码、贡献改进并随时了解最新功能。
  • 定价:要了解有关 AWS Lambda 持久函数定价的更多信息,请参阅 AWS Lambda 定价页面。

访问 AWS Lambda 控制台,开始使用 AWS Lambda 持久函数。要了解更多信息,请参阅 AWS Lambda 持久函数文档页面。

祝您构建顺利!

Donnie