O blog da AWS

Otimizando o processamento de arrays JSON aninhados usando AWS Step Functions Distributed Map

Por Biswanath Mukherjee, Sr. Solutions Architect e Rahul Sringeri, Technical Account Manager.

Quando você trabalha com grandes conjuntos de dados, provavelmente já encontrou o desafio de processar estruturas JSON complexas em seus fluxos de trabalho automatizados. Você precisa pré-processar arrays dentro de objetos JSON aninhados antes de poder executar o processamento paralelo neles. A extração de dados costumava exigir código personalizado e etapas de processamento extras, atrasando você na construção da lógica principal da sua aplicação.

Com o AWS Step Functions Distributed Map, você pode processar grandes conjuntos de dados com iterações simultâneas de etapas de fluxo de trabalho em entradas de dados. Usando o recurso aprimorado ItemsPointer do Distributed Maps, você pode extrair dados do array diretamente de objetos JSON armazenados no Amazon S3. Alternativamente, para objetos JSON como entrada de estado, você pode usar Items (JSONata) ou ItemsPath (JSONPath). Com esse aprimoramento, você pode apontar diretamente para arrays aninhados dentro de estruturas JSON, eliminando a necessidade de pré-processamento personalizado de seus dados. Com ItemsPointer, Items e ItemsPath, você pode selecionar os dados de array aninhados e simplificar seus fluxos de trabalho.

Nesta postagem, exploramos como otimizar o processamento de dados de array incorporados em estruturas JSON complexas usando AWS Step Functions Distributed Map. Você aprenderá como usar ItemsPointer para reduzir a complexidade de suas definições de máquina de estado, criar designs de fluxo de trabalho mais flexíveis e simplificar seus pipelines de processamento de dados—tudo sem escrever código de transformação adicional ou funções AWS Lambda.

Este post faz parte de uma série de posts sobre AWS Step Functions Distributed Map:

Caso de uso: enriquecimento de dados de produtos de e-commerce

Neste exemplo de caso de uso de e-commerce, você construirá uma aplicação de exemplo que demonstra o processamento de dados de inventário de produtos para uma aplicação de e-commerce usando AWS Step Functions Distributed Map. A aplicação recebe um arquivo JSON de uma aplicação upstream contendo um array de informações de produtos. O fluxo de trabalho do Step Functions lê o arquivo JSON contendo dados de produtos de um bucket S3 e itera sobre o array para enriquecer cada dado de produto no array.

O diagrama a seguir apresenta a máquina de estado do AWS Step Functions.

JSON array processing workflow

Fluxo de trabalho de processamento de array JSON

O array JSON é processado usando o seguinte fluxo de trabalho:

  1. A máquina de estado lê o arquivo product-updates.json de um bucket S3 de entrada. O arquivo contém um array JSON de produtos.
  2. O estado Distributed Map na máquina de estado seleciona o nó do array JSON usando ItemsPointer e itera sobre o array JSON.
  3. Para cada um dos itens dentro do array, a máquina de estado invoca uma função Lambda para enriquecimento de dados. A função Lambda adiciona informações de estoque e preço do produto aos dados do produto.
  4. A máquina de estado salva os dados atualizados do produto em uma tabela do Amazon DynamoDB.
  5. Finalmente, a máquina de estado carrega os metadados de execução em um bucket S3 de saída. Veja os limites relacionados a execuções de máquina de estado e execuções de tarefas.

MaxConcurrency pode ser configurado para especificar o número de execuções de fluxo de trabalho filho em um Distributed Map que podem ser executadas em paralelo. Se não especificado, o Step Functions não limita a simultaneidade e executa 10.000 execuções de fluxo de trabalho filho em paralelo.

Você pode ler um arquivo JSON de um bucket S3 usando ItemReader e seus subcampos. Se o arquivo JSON, do bucket S3, contiver uma estrutura de objeto aninhada, você pode selecionar o nó específico com seu conjunto de dados com um ItemsPointer. Por exemplo, o seguinte arquivo JSON de entrada:

{
  "version": "2024.1",
  "timestamp": "2025-09-26T10:49:36.646197",
  "productUpdates": {
    "items": [
      {
        "productId": "PROD-001",
        "name": "Wireless Headphones",
        "price": 79.99,
        "stock": 150,
        "category": "Electronics"
      },
      {
        "productId": "PROD-002",
        "name": "Smart Watch",
        "category": "Electronics"
      },
      …
    ]
  }
}

A seguinte configuração de fluxo de trabalho baseada em JSONata extrai uma lista aninhada de produtos de productUpdates/items:

"ItemReader": {
   "Resource": "arn:aws:states:::s3:getObject",
   "ReaderConfig": {
      "InputType": "JSON",
      "ItemsPointer": "/productUpdates/items"
   },
   "Arguments": {
      "Bucket": "amzn-s3-demo-bucket",
      "Key": "updates/product-updates.json"
   }
}

Para fluxo de trabalho baseado em JSONPath, observe que Arguments é substituído por Parameters:

"ItemReader": {
   "Resource": "arn:aws:states:::s3:getObject",
   "ReaderConfig": {
      "InputType": "JSON",
      "ItemsPointer": "/productUpdates/items"
   },
   "Arguments": {
      "Bucket": "amzn-s3-demo-bucket",
      "Key": "updates/product-updates.json"
   }
}

O campo ItemReader não é necessário quando seu conjunto de dados é dados JSON de uma etapa anterior. ItemsPointer é aplicável apenas quando os objetos JSON de entrada são lidos de um bucket S3. Se você estiver usando JSON como entrada de estado para um Distributed Map, então você pode usar o campo ItemsPath (para JSONPath) ou Items (para JSONata) para especificar um local na entrada que aponta para array ou objeto JSON usado para iterações.

Pré-requisito

Para usar o Step Functions Distributed Map, verifique se você tem:

Configurar e executar o fluxo de trabalho

Execute as seguintes etapas para implantar a máquina de estado do Step Functions.

  1. Clone o repositório GitHub em uma nova pasta e navegue até a pasta do projeto.
    git clone https://github.com/aws-samples/sample-stepfunctions-json-array-processor.git
    cd sample-stepfunctions-json-array-processor
  2. Execute os seguintes comandos para implantar a aplicação.
    sam deploy --guided

    Insira os seguintes detalhes:

    • Stack name: Nome da pilha para CloudFormation (por exemplo, stepfunctions-json-array-processor)
    • AWS Region: Uma região AWS suportada (por exemplo, us-east-1)
    • Aceite todos os outros valores padrão.

    As saídas do sam deploy serão usadas nas etapas subsequentes.

  3. Execute o seguinte comando para gerar o arquivo product-updates.json contendo um array JSON aninhado de produtos de exemplo e carregar o arquivo product-updates.json no bucket S3 de entrada. Substitua InputBucketName pelo valor da saída do sam deploy.
    python3 scripts/generate_sample_data.py <InputBucketName>
  4. Execute o seguinte comando para iniciar a execução do fluxo de trabalho do Step Functions. Substitua o StateMachineArn pelo valor da saída do sam deploy.
    aws stepfunctions start-execution \
      --state-machine-arn <StateMachineArn> \
      --input '{}'

    A máquina de estado lê o arquivo de entrada product-updates.json e invoca uma função Lambda para atualizar o banco de dados para cada produto no array após adicionar informações de preço e estoque. Os metadados de execução também são carregados no bucket de resultados.

Monitorar e verificar resultados

Execute as seguintes etapas para monitorar e verificar os resultados do teste.

  1. Execute o seguinte comando para obter os detalhes da execução. Substitua executionArn pelo ARN da sua máquina de estado.
    aws stepfunctions describe-execution --execution-arn <executionArn>

    Aguarde até que o status mostre SUCCEEDED.

  2. Execute os seguintes comandos para validar a saída processada da tabela DynamoDB ProductCatalogTableName. Substitua o valor ProductCatalogTableName pelo valor da saída do sam deploy.
    aws dynamodb scan --table-name <ProductCatalogTableName>
  3. Verifique se a tabela DynamoDB contém os dados de produto enriquecidos, incluindo atributos de preço e estoque. Exemplo de saída:
    {
        "Items": [
            {
                "ProductId": {
                    "S": "PROD-005"
                },
                "lastUpdated": {
                    "S": "2025-10-07T20:33:34.507Z"
                },
                "stock": {
                    "N": "129"
                },
                "price": {
                    "N": "139.25"
                }
            },
            {
                "ProductId": {
                    "S": "PROD-003"
                },
                "lastUpdated": {
                    "S": "2025-10-07T20:33:34.576Z"
                },
                "stock": {
                    "N": "471"
                },
                "price": {
                    "N": "40.92"
                }
            },
    	      …
        ],
        "Count": 5,
        "ScannedCount": 5,
        "ConsumedCapacity": null
    }

Limpeza

Para evitar custos, remova todos os recursos que você criou ao seguir este postagem.

Execute o seguinte comando após substituir a variável <placeholder> para excluir os recursos que você implantou para a solução deste post:

aws s3 rm s3://<InputBucketName> --recursive
aws s3 rm s3://<ResultBucketName> --recursive
sam delete

Conclusão

Nesta postagem, você aprendeu como usar o Step Functions Distributed Map para extrair dados de array nativamente de objetos JSON armazenados em um bucket S3. Ao remover código personalizado de extração de dados, você pode simplificar o processamento de suas cargas de trabalho paralelas em larga escala. Com ItemsPointer você pode extrair dados de array dentro de arquivos JSON armazenados em um bucket S3, e com Items(JSONata) ou ItemsPath (JSONPath), você pode extrair arrays de entrada de estado JSON complexa, adicionando flexibilidade aos seus designs de fluxo de trabalho.

Novas fontes de entrada para Distributed Map estão disponíveis em todas as regiões AWS comerciais onde o AWS Step Functions está disponível. Para uma lista completa de regiões AWS onde o Step Functions está disponível, consulte a Tabela de Regiões AWS. Para começar, você pode usar o modo Distributed Map hoje no console do AWS Step Functions. Para saber mais, visite o guia do desenvolvedor do Step Functions.

Para mais recursos de aprendizado serverless, visite Serverless Land.

Este conteúdo foi traduzido do post original do blog, que pode ser encontrado aqui.

Autores

Biswanath Mukherjee, Sr. Solutions Architect
Rahul Sringeri, Technical Account Manager

Tradutor

Rodrigo Peres é Arquiteto de Soluções na AWS, com mais de 20 anos de experiência trabalhando com arquitetura de soluções, desenvolvimento de sistemas e modernização de sistemas legados.