O blog da AWS

Executando código após retornar uma resposta de uma função do AWS Lambda

Este post foi escrito por Uri Segev, principal especialista em Serverless Specialist SA.

Ao invocar uma função do AWS Lambda de forma síncrona, você espera que a função retorne uma resposta. Por exemplo, esse é o caso quando um cliente invoca uma função do Lambda por meio do Amazon API Gateway ou do AWS Step Functions. Como o cliente está aguardando a resposta, você deve devolvê-la o mais rápido possível.

No entanto, pode haver casos em que você precise realizar um trabalho adicional que não afete a resposta e possa fazê-lo de forma assíncrona, depois de enviar a resposta. Por exemplo, você pode armazenar dados em um banco de dados ou enviar informações para um sistema de logs.

Depois de enviar a resposta da função, o serviço Lambda congela o ambiente de execução e a função não pode executar código adicional. Mesmo que você crie um encadeamento para executar uma tarefa em segundo plano, o serviço Lambda congela o ambiente de execução quando o manipulador (handler) retorna, fazendo com que o encadeamento congele até a próxima invocação. Embora você possa adiar o retorno da resposta ao cliente até que todo o trabalho seja concluído, essa abordagem pode impactar negativamente a experiência do usuário.

Este blog explora maneiras de executar uma tarefa que pode começar antes do retorno da função, mas continua em execução depois que a função retorna a resposta ao cliente.

Invocando uma função Lambda assíncrona

A primeira opção é dividir o código em duas funções. A primeira função executa o código síncrono; a segunda função executa o código assíncrono. Antes que a função síncrona retorne, ela invoca a segunda função de forma assíncrona, diretamente, usando a API Invoke, ou indiretamente, por exemplo, enviando uma mensagem ao Amazon SQS para acionar a segunda função.

Esse código em Python demonstra como implementar isso:

import json
import time
import os
import boto3
from aws_lambda_powertools import Logger

logger = Logger()
client = boto3.client('lambda')

def calc_response(event):
    logger.info(f"[Function] Calculating response")
    time.sleep(1) # Simulate sync work
    return {
        "message": "hello from async"
    }

def submit_async_task(response):
    # Invoke async function to continue
    logger.info(f"[Function] Invoking async task in async function")
    client.invoke_async(FunctionName=os.getenv('ASYNC_FUNCTION'), InvokeArgs=json.dumps(response))

def handler(event, context):
    logger.info(f"[Function] Received event: {json.dumps(event)}")

    response = calc_response(event)
    
    # Done calculating response, submit async task
    submit_async_task(response)

    # Return response to client
    logger.info(f"[Function] Returning response to client")
    return {
        "statusCode": 200,
        "body": json.dumps(response)
    }
Python

Use o streaming de resposta do Lambda

O streaming de resposta permite que os desenvolvedores comecem a transmitir a resposta assim que tiverem o primeiro byte da resposta, sem esperar pela resposta inteira. Normalmente, você usa streaming de resposta quando precisa minimizar o tempo até o primeiro byte (TTFB) ou quando precisa enviar uma resposta maior que 6 MB (o limite de tamanho da carga útil da resposta do Lambda).

Usando esse método, a função pode enviar a resposta usando o mecanismo de streaming de resposta e pode continuar executando o código mesmo depois de enviar o último byte da resposta. Dessa forma, o cliente recebe a resposta e a função Lambda pode continuar em execução.

Esse código Node.js demonstra como implementar isso:

import json
import time
from aws_lambda_powertools import Logger

logger = Logger()

def handler(event, context):
    logger.info(f"[Async task] Starting async task: {json.dumps(event)}")
    time.sleep(3)  # Simulate async work
    logger.info(f"[Async task] Done")
Python

Use extensões Lambda

As extensões do Lambda podem aumentar as funções do Lambda para integrá-las às suas ferramentas preferidas de monitoramento, observabilidade, segurança e governança. Você também pode usar uma extensão para executar seu próprio código em segundo plano para que ele continue em execução depois que sua função retornar a resposta ao cliente.

Há dois tipos de extensões do Lambda: extensões externas e extensões internas. As extensões externas são executadas como processos separados no mesmo ambiente de execução. A função Lambda pode se comunicar com a extensão usando arquivos na pasta /tmp ou usando uma rede local, por exemplo, por meio de solicitações HTTP. Você deve empacotar extensões externas como uma camada Lambda.

As extensões internas são executadas como encadeamentos separados no mesmo processo que executa o manipulador. O manipulador pode se comunicar com a extensão usando qualquer mecanismo em processo, como filas internas. Este exemplo mostra uma extensão interna, que é uma linha de execução dedicada dentro do processo do manipulador.

Quando o serviço Lambda invoca uma função, ele também notifica todas as extensões da invocação. O serviço Lambda só congela o ambiente de execução quando a função Lambda retorna uma resposta e todas as extensões sinalizam ao runtime que estão concluídas. Com essa abordagem, a função faz com que a extensão execute a tarefa independentemente da própria função e a extensão notifica o runtime do Lambda quando termina de processar a tarefa. Dessa forma, o ambiente de execução permanece ativo até que a tarefa seja concluída.

O exemplo de código Python a seguir isola o código da extensão em seu próprio arquivo e o manipulador o importa e o usa para executar a tarefa em segundo plano:

import json
import time
import async_processor as ap
from aws_lambda_powertools import Logger

logger = Logger()

def calc_response(event):
    logger.info(f"[Function] Calculating response")
    time.sleep(1) # Simulate sync work
    return {
        "message": "hello from extension"
    }

# This function is performed after the handler code calls submit_async_task 
# and it can continue running after the function returns
def async_task(response):
    logger.info(f"[Async task] Starting async task: {json.dumps(response)}")
    time.sleep(3)  # Simulate async work
    logger.info(f"[Async task] Done")

def handler(event, context):
    logger.info(f"[Function] Received event: {json.dumps(event)}")

    # Calculate response
    response = calc_response(event)

    # Done calculating response
    # call async processor to continue
    logger.info(f"[Function] Invoking async task in extension")
    ap.start_async_task(async_task, response)

    # Return response to client
    logger.info(f"[Function] Returning response to client")
    return {
        "statusCode": 200,
        "body": json.dumps(response)
    }
Python

O código Python a seguir demonstra como implementar a extensão que executa a tarefa em segundo plano:

import os
import requests
import threading
import queue
from aws_lambda_powertools import Logger

logger = Logger()
LAMBDA_EXTENSION_NAME = "AsyncProcessor"

# An internal queue used by the handler to notify the extension that it can
# start processing the async task.
async_tasks_queue = queue.Queue()

def start_async_processor():
    # Register internal extension
    logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Registering with Lambda service...")
    response = requests.post(
        url=f"http://{os.environ['AWS_LAMBDA_RUNTIME_API']}/2020-01-01/extension/register",
        json={'events': ['INVOKE']},
        headers={'Lambda-Extension-Name': LAMBDA_EXTENSION_NAME}
    )
    ext_id = response.headers['Lambda-Extension-Identifier']
    logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Registered with ID: {ext_id}")

    def process_tasks():
        while True:
            # Call /next to get notified when there is a new invocation and let
            # Lambda know that we are done processing the previous task.

            logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Waiting for invocation...")
            response = requests.get(
                url=f"http://{os.environ['AWS_LAMBDA_RUNTIME_API']}/2020-01-01/extension/event/next",
                headers={'Lambda-Extension-Identifier': ext_id},
                timeout=None
            )

            # Get next task from internal queue
            logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Wok up, waiting for async task from handler")
            async_task, args = async_tasks_queue.get()
            
            if async_task is None:
                # No task to run this invocation
                logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Received null task. Ignoring.")
            else:
                # Invoke task
                logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Received async task from handler. Starting task.")
                async_task(args)
            
            logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Finished processing task")

    # Start processing extension events in a separate thread
    threading.Thread(target=process_tasks, daemon=True, name='AsyncProcessor').start()

# Used by the function to indicate that there is work that needs to be 
# performed by the async task processor
def start_async_task(async_task=None, args=None):
    async_tasks_queue.put((async_task, args))

# Starts the async task processor
start_async_processor()
Python

Use um runtime personalizado

O Lambda oferece suporte a vários runtimes prontos para uso: Python, Node.js, Java, Dotnet e Ruby. O Lambda também oferece suporte a runtimes personalizados, o que permite desenvolver funções do Lambda em qualquer outra linguagem de programação necessária.

Quando você invoca uma função do Lambda que usa um runtime personalizado, o serviço Lambda invoca um processo chamado ‘bootstrap’ que contém seu código personalizado. O código personalizado precisa interagir com a API Lambda Runtime. Ele chama o endpoint /next para obter informações sobre a próxima invocação. Essa chamada de API está bloqueando e espera até que uma solicitação chegue. Quando a função terminar de processar a solicitação, ela deve chamar o endpoint /response para enviar a resposta de volta ao cliente e, em seguida, deve chamar o endpoint /next novamente para aguardar a próxima invocação. O Lambda congela o ambiente de execução depois que você chama /next, até que uma solicitação chegue.

Usando essa abordagem, você pode executar a tarefa assíncrona depois de chamar /response e enviar a resposta de volta ao cliente e antes de chamar /next, indicando que o processamento foi concluído.

O exemplo de código Python a seguir isola o código de runtime personalizado em seu próprio arquivo e a função o importa e o usa para interagir com a API de runtime:

import time
import json
import runtime_interface as rt
from aws_lambda_powertools import Logger

logger = Logger()

def calc_response(event):
    logger.info(f"[Function] Calculating response")
    time.sleep(1) # Simulate sync work
    return {
        "message": "hello from custom"
    }

def async_task(response):
    logger.info(f"[Async task] Starting async task: {json.dumps(response)}")
    time.sleep(3)  # Simulate async work
    logger.info(f"[Async task] Done")

def main():
    # You can add initialization code here

    # The following loop runs forever waiting for the next invocation
    # and sending the response back to the client
    while True:
        # Call /next to wait for next request (and indicate 
        # that we are done processing the previous request)

        requestId, event = rt.get_next()

        # The code from here to send_response() is the code
        # that usually goes inside the Lambda handler()

        logger.info(f"[Function] Received event: {json.dumps(event)}")

        # Calculate response
        response = calc_response(event)

        # Done calculating response, send response to client
        logger.info(f"[Function] Returning response to client")
        rt.send_response(requestId, {
            "statusCode": 200,
            "body": json.dumps(response)
        })

        logger.info(f"[Function] Invoking async task")
        async_task(response)

main()
Python

Esse código Python demonstra como interagir com a API de runtime:

import requests
import os
from aws_lambda_powertools import Logger

logger = Logger()
run_time_endpoint = os.environ['AWS_LAMBDA_RUNTIME_API']

def get_next():
    logger.debug("[Custom runtime] Waiting for invocation...")
    request = requests.get(
        url=f"http://{run_time_endpoint}/2018-06-01/runtime/invocation/next",
        timeout=None
    )
    event = request.json()
    requestId = request.headers["Lambda-Runtime-Aws-Request-Id"]
    return requestId, event

def send_response(requestId, response):
    logger.debug("[Custom runtime] Sending response")
    requests.post(
        url=f"http://{run_time_endpoint}/2018-06-01/runtime/invocation/{requestId}/response",
        json = response,
        timeout=None
    )
Python

Conclusão

Este blog mostra quatro maneiras de combinar tarefas síncronas e assíncronas em uma função Lambda, permitindo que você execute tarefas que continuam em execução após a função retornar uma resposta ao cliente. A tabela a seguir resume os prós e os contras de cada solução:

URLs de função, não podem ser usados com o API Gateway, sempre públicos

Invocação assíncrona Streaming de resposta Extensões Lambda Runtime personalizado
Complexidade Mais fácil de implementar Mais fácil de implementar A solução mais complexa de implementar, pois requer interação com a API de extensões e um thread dedicado Médio, pois interage com a API de runtime
Implantação Precisa de dois artefatos: a função síncrona e a função assíncrona Um único artefato de implantação que contém todo o código Um único artefato de implantação que contém todo o código Um único artefato de implantação requer o empacotamento de todos os arquivos de runtime necessários
Custo Mais caro, pois incorre em um custo adicional de invocação, e a duração geral de ambas as funções é maior do que tê-la em uma. Menos caro Menos caro Menos caro
Iniciando a tarefa assíncrona Antes de retornar do manipulador A qualquer momento durante a invocação do manipulador A qualquer momento durante a invocação do manipulador Depois de retornar a resposta ao cliente, a menos que você use um tópico dedicado
Limitações A carga enviada para a função assíncrona não pode exceder 256 KB Compatível apenas com Node.js e runtimes personalizados. Requer URLs da função Lambda, não pode ser usado com o API Gateway, sempre público
Benefícios adicionais Melhor dissociação entre código síncrono e assíncrono Capacidade de enviar respostas em etapas. Suporta cargas maiores que 6 MB (com custo adicional) A tarefa assíncrona é executada em seu próprio thread, o que pode reduzir a duração e o custo gerais
Tenta novamente em caso de falha no código assíncrono Gerenciado pelo serviço Lambda Responsabilidade do desenvolvedor Responsabilidade do desenvolvedor Responsabilidade do desenvolvedor

A escolha da abordagem correta depende do seu caso de uso. Se você escrever sua função em Node.js e invocá-la usando URLs de funções Lambda, use streaming de resposta. Essa é a maneira mais fácil de implementar e a mais econômica.

Se houver uma chance de falha na tarefa assíncrona (por exemplo, um banco de dados não estiver acessível) e você precisar garantir que a tarefa seja concluída, use o método de invocação assíncrona do Lambda. O serviço Lambda tenta novamente sua função assíncrona até que ela seja bem-sucedida. Eventualmente, se todas as novas tentativas falharem, ele invocará um destino do Lambda para que você possa agir.

Se você precisar de um runtime personalizado porque precisa usar uma linguagem de programação à qual o Lambda não oferece suporte nativo, use a opção de runtime personalizado. Caso contrário, use a opção de extensões Lambda. É mais complexo de implementar, mas é econômico. Isso permite empacotar o código em um único artefato e começar a processar a tarefa assíncrona antes de enviar a resposta ao cliente.

Para obter mais recursos de aprendizado sem servidor, visite Serverless Land.

Este blog é uma tradução do conteúdo original em inglês (link aqui).


Biografia do Autor

Uri Segev é um especialista principal em Serverless Specialist SA.

Biografia do tradutor

Daniel Abib é arquiteto de soluções sênior na AWS, com mais de 25 anos trabalhando com gerenciamento de projetos, arquiteturas de soluções escaláveis, desenvolvimento de sistemas e CI/CD, microsserviços, arquitetura Serverless & Containers e segurança. Ele trabalha apoiando clientes corporativos, ajudando-os em sua jornada para a nuvem.

https://www.linkedin.com/in/danielabib/

Biografia do Revisor

Rodrigo Peres é arquiteto de soluções na AWS, com mais de 20 anos de experiência trabalhando com arquitetura de soluções, desenvolvimento de sistemas e modernização de sistemas legados.