亚马逊AWS官方博客

AWS Lambda 为支持的事件源添加 Amazon Simple Queue Service

现在,我们可以使用 Amazon Simple Queue Service (SQS) 触发 AWS Lambda 函数!这是一项重要更新,其中包括我个人从 4 年多以前就开始期待的一些关键功能。我知道,我们的客户会很高兴能试用这些新功能,因此不妨跳过以下历程回顾,直接跳到后面的演练部分。

SQS 是 AWS 在 2004 年发布的第一项服务,距今已有 14 年。如果从其他角度来看,2004 年,最大的商用硬盘只有 60GB 左右、PHP 5 正式发布、Facebook 刚刚成立、电视剧《老友记》已完结、GMail 全新推出,而我还在上高中。回顾过去,我可以看到,造就当今 AWS 的部分原则在 SQS 开发早期就已显现:完全托管、可通过网络访问、按需付费且无需做出最低用量承诺。现在,SQS 是我们最受欢迎的服务之一。成千上万的大批量客户都将 SQS 用作许多应用程序的基本构建块之一。

相比之下,2014 年在 AWS re:Invent 上发布的 AWS Lambda 则相对较新(那天我也在现场!)。Lambda 是一种计算服务,使用它,您无需预置或管理服务器即可运行代码。而且,该服务于 2014 年掀起了一场无服务器革命。从 Web 和移动后端到 IT 策略引擎,再到数据处理管道,Lambda 已迅速广泛应用于各种使用案例。现在,Lambda 支持 Node.js、Java、Go、C# 和 Python 运行时,从而最大限度地减少了客户更改现有代码库的需求,并可以帮助客户灵活构建新的代码库。在过去 4 年间,我们为 Lambda 添加了大量功能和事件源,帮助客户更轻松完成任务。通过在 Lambda 中添加对 SQS 的支持,消除了运行轮询服务或创建 SQS 到 SNS 映射等大量无差别的繁重任务。

下面我们来看看此功能的具体运行原理。

通过 SQS 触发 Lambda

首先,我需要一个现有的 SQS 标准队列,或者新建一个。 我将转到 AWS 管理控制台并打开 SQS 创建一个新的队列。我们来给该队列起一个有趣的名字。目前,Lambda 触发器仅适用于标准队列,不适用于 FIFO 队列。

现在,我已有队列,我想创建一个 Lambda 函数来处理该队列。我可以导航到 Lambda 控制台,然后创建一个简单的新函数,使用如下 Python 代码只打印出消息正文:

def lambda_handler(event, context):
    for record in event['Records']:
        print(record['body'])

接下来我需要在 Lambda 函数中添加触发器,方法是在控制台中选择左侧的 SQS 触发器。我可以选择用于调用 Lambda 的队列,以及单个 Lambda 将处理的最大记录数(根据 SQS ReceiveMessage API,最多 10 个)。

Lambda 将自动向外横向扩展,为队列中的每条新消息调用一个 Lambda 函数。例如,如果我的队列中有 100 条消息且启用了触发器,则 Lambda 将执行 100 次函数调用,以同时处理这些消息。随着队列流量的波动,Lambda 服务将根据传输中的消息相应扩展和缩减轮询操作次数。我在本文末的附加信息部分详细介绍了这种行为。如果我将批处理大小设置为 10,那么我将调用 10 个新的 Lambda 函数。我可以通过增加或减少函数的并发执行限制来控制此行为。

对于每批已处理的消息,如果函数成功返回,则这些消息将从队列中删除。如果函数出错或超时,则在队列中设置的可见性超时后,这些消息将返回队列。快速提示:Lambda 函数的超时必须低于队列可见性超时,这样才能创建 SQS 到 Lambda 的事件映射。

添加触发器后,我可以对函数进行其他任何更改并进行保存。如果我们跳转回 SQS 控制台,可看到触发器已注册。我也可以通过 SQS 控制台配置和编辑该触发器。

我将使用 AWS CLI 将一条简短消息编入队列来测试其功能:

aws sqs send-message --queue-url https://sqs.us-east-2.amazonaws.com/054060359478/SQS_TO_LAMBDA_IS_FINALLY_HERE_YAY --message-body "hello, world"

我的 Lambda 将收到消息并执行代码,将消息负载打印到 Amazon CloudWatch 日志中。

当然,所有这些都可直接用于 AWS SAM

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Example of processing messages on an SQS queue with Lambda
Resources:
  MySQSQueueFunction:
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python3.6
      CodeUri: src/
      Events:
        MySQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt MySqsQueue.Arn
            BatchSize: 10

  MySqsQueue:
    Type: AWS::SQS::Queue

附加信息

首先,此功能不会额外收费。但由于 Lambda 服务会持续对 SQS 队列进行长轮询,因此将按照标准 SQS 定价费率从账户中收取这些 API 调用费。

接下来快速、深入了解一下并发和自动扩展功能 – 只需记住,这些行为可能会变更。Lambda 的自动扩展行为旨在在队列为空时使轮询成本保持在较低水平,同时让我们能在队列利用率高时扩展到高吞吐量。在最初创建和启用 SQS 事件源映射或消息在没有流量的时间段后首次出现时,Lambda 服务将开始使用五个并行长轮询连接轮询 SQS 队列。Lambda 服务会监控传输中的消息的数量,在检测到此数量有上升趋势时,会提高轮询频率(每分钟增加 20 个 ReceiveMessage 请求)和函数并发量(每分钟增加 60 个)。只要队列处于忙碌状态,Lambda 就会继续扩展,直到达到函数并发上限。在传输中的消息数量呈下降趋势时,Lambda 会降低轮询频率(每分钟减少 10 个 ReceiveMessage 请求),并将用于调用函数的并发调用量每分钟减少 30 个。

您可以查看最新文档,了解更多信息。其中还包括 SQS 事件负载示例。您也可以通过 SQS 的文档了解更多详情。

和往常一样,我们非常高兴能在 Twitter 或下方评论中收到大家对于此功能的反馈。

Randall