O blog da AWS

Análise de dados em tempo real com a AWS

Por Horacio Ferro, Arquiteto de Soluções, AWS México.

 

Em um mundo dominado por dispositivos digitais, a quantidade e a velocidade com que nossos clientes produzem e recebem dados aumentaram substancialmente. Além de coletar e armazenar para análise posterior, trabalhar com dados em tempo real é uma vantagem competitiva muito importante.

A tomada de decisão estratégica requer a identificação de mudanças nos padrões: financeiros (ações, câmbio, derivados, commodities, etc.); meteorológicos (temperatura, precipitação, vento, pressão, etc.); operacional (tráfego, produção, suprimentos, pessoal, etc.); entre outros, em tempo real permite aumentar eficiência e minimizar o impacto nos negócios.

Neste post vamos criar uma aplicação para cobrir esse objetivo usando os seguintes serviços e kits de desenvolvimento:

  • AWS Cloud Development Kit (CDK) — Com este kit de desenvolvimento, criaremos infraestrutura como código, permitindo sua criação e manutenção de forma simples, além de acelerar a implantação de nossos microserviços e website estático.
  • Amazon Simple Storage Service – Nós usaremos esse serviço como um repositório de dados, todas as informações recebidas serão armazenadas aqui e também o usaremos como host de nosso aplicativo web.
  • Amazon Kinesis – Aqui vamos criar dois fluxos de dados, usando Kinesis Data Stream, teremos acesso a informações com latência extremamente baixa e, usando o Kinesis Firehose, podemos armazenar essas informações em um bucket do S3.
  • AWS Lambda — Com este serviço, implantaremos nossos microserviços de ingest de dados e consulta de informações.
  • Amazon DynamoDB – Criaremos uma tabela no DynamoDB para armazenar as informações e poder acessá-las por meio de um microserviços. Usamos o DynamoDB ao fornecer latências em milissegundos, incluindo microssegundos se habilitarmos o DynamoDB Accelerator.
  • Amazon CloudFront – Usaremos a rede de distribuição de conteúdo para distribuir nosso aplicativo web e fornecer ao usuário final uma melhor experiência de navegação.
  • Amazon CloudWatch – Usaremos este serviço para coletar logs do nosso aplicativos e ser capaz de visualizá-los.
  • Amazon Cognito – Aqui vamos controlar o acesso ao nosso aplicativo web, usando benefícios adicionais, como auto registro e recuperação de senha.
  • AWS Identity and Access Management — Aqui definimos as funções que nossos micro-serviços e serviços, como o Kinesis, podem usar para estabelecer controle de acesso controlado aos nossos recursos.
  • Amazon API Gateway – Nós usaremos o API Gateway como porta de acesso do nosso microsserviço de consulta, usando um Authorizer conectado ao Cognito para fornecer segurança ao nosso microserviço.
  • AWS Amplify — Este kit de desenvolvimento será usado para integrar o Cognito ao API Gateway.

A seguinte arquitetura será usada para criar nosso aplicativo:

 

 

Esta arquitetura é 100% serverless, o que nos permitirá evitar o gerenciamento de infraestrutura, reduzir a responsabilidade de segurança, permitindo ter um baixo custo sem capacidade ociosa.

O código completo da aplicação pode ser baixado do repositório.

No exemplo, usaremos o TypeScript with CDK para definir a infraestrutura, como pré-requisitos, precisaremos ter a AWS Command Line (AWS CLI) e ter o AWS Cloud Development Kit instalados. NOTA: Não se esqueça de configurar a AWS CLI e executar o bootstrap CDK.

Depois de executar o comando:

cdk init app --language=typescript

 

Devemos ter um projeto estruturado da seguinte forma:

- real-time-analytics-poc  - bin    real-time-analytics-poc.ts  - lib    real-time-analytics-poc-stack.ts  - test  .gitignore  .npmignore  cdk.json  jest.config.js  package.json  README.md  tsconfig.json

 

A primeira coisa que faremos instalar as dependências listadas no package.json, como mostrado abaixo:

"dependencies": {  "@aws-cdk/core": "1.73.0",  "@aws-cdk/aws-apigateway": "1.73.0",  "@aws-cdk/aws-cloudfront": "1.73.0",  "@aws-cdk/aws-cognito": "1.73.0",  "@aws-cdk/aws-dynamodb": "1.73.0",  "@aws-cdk/aws-iam": "1.73.0",  "@aws-cdk/aws-kinesis": "1.73.0",  "@aws-cdk/aws-kinesisfirehose": "1.73.0",  "@aws-cdk/aws-lambda": "1.73.0",  "@aws-cdk/aws-lambda-event-sources": "1.73.0",  "@aws-cdk/aws-logs": "1.73.0",  "@aws-cdk/aws-s3": "1.73.0",  "@aws-cdk/aws-s3-deployment": "1.73.0",  "source-map-support": "^0.5.16"}

 

Na aplicação, precisaremos de duas stacks de CDK, uma para a infraestrutura de ingest de dados e consultas, e outra para o site.

Começaremos adicionando a nova stack criando um arquivo chamado real-time-analytics-web-stack.ts dentro do diretório lib. E modificando o arquivo real-time-analytics-poc.ts no diretório bin.

Arquivo real-time-Analytics-Web-Stack.ts import * from cdk '@aws-cdk/core'; export class RealTimeAnalyticsWebStack extends cdk.Stack {  constructor(scope: cdk.Construct, id: String, props?: cdk.StackProps) {    super(scope, id, props);   }} Arquivo real-time-analytics-poc.ts #!/usr/bin/env nodeimport 'source-map-support/register';import * as cdk from '@aws-cdk/core';import { RealTimeAnalyticsPocStack } from '../lib/real-time-analytics-poc-stack';import { RealTimeAnalyticsWebStack } from "../lib/real-time-analytics-web-stack";const app = new cdk.App();new RealTimeAnalyticsPocStack(app, 'RealTimeAnalyticsPocStack');new RealTimeAnalyticsWebStack(app, 'RealTimeAnalyticsWebStack');

 

Uma vez que essas modificações foram feitas, nosso aplicativo tem duas stacks.

Em seguida, definiremos a infraestrutura necessária para a ingestão e consulta de dados, bem como a segurança e os serviços adicionais necessários dentro da stack  RealTimeAnalyticsPocStack localizada no arquivo realTimeAnalytics-poc-stack.ts, isso será adicionado ao corpo do objeto:

export class RealTimeAnalyticsPocStack extends cdk.Stack {

  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {

    super(scope, id, props);
    

    // Definimos infraestrutura aqui.

  }

}

 

Primeiro vamos definir um LogGroup para enviar nossos logs, definindo a politica de retenção (5 dias) e rotação dos logs:

    const logGroup = new logs.LogGroup(this, 'poc-log-group', {      logGroupName: '/realtime/analytics/',      removalPolicy: cdk.RemovalPolicy.DESTROY,      retention: logs.RetentionDays.FIVE_DAYS    });

Agora vamos definir a configuração Cognito que nos ajudará a controlar o acesso ao nosso site, para isso precisamos de um pool de usuários, um cliente para aplicativos e um conjunto de identidades:

  const userPool = new cognito.UserPool(this, 'analytics-user-pool', {      accountRecovery: cognito.AccountRecovery.PHONE_AND_EMAIL, // Metodos para recuperar senha.      passwordPolicy: { // Política de password        minLength: 8,        requireDigits: true,        requireLowercase: true,        requireUppercase: true,        tempPasswordValidity: cdk.Duration.days(7) // Vida útil da senha gerada pelo Cognito.      },      selfSignUpEnabled: true, // Sinalizar para habilitar o serviço de autorregistro      signInAliases: { // Aqui definimos que você pode usar o usuário para fazer o login.        email: true,        username: true      }    });     const appClient = userPool.addClient('api-client', {      userPoolClientName: 'AnalyticsAPI' // Nome do cliente.    });     const identityPool = new cognito.CfnIdentityPool(this, 'analytics-identity-pool', {      allowUnauthenticatedIdentities: false, // Desabilitamos entidades não autenticadas.      cognitoIdentityProviders: [{        clientId: appClient.userPoolClientId, // O relacionamento com o cliente que iremos utilizar Amplify.        providerName: userPool.userPoolProviderName // Pool de usuarios.      }]    });

Agora vamos criar um bucket do S3 com acesso privado para depositar mensagens do Kinesis Data Streams usando o Kinesis Firehose:

    const rawDataBucket = new s3.Bucket(this, 'raw-data-bucket', {      accessControl: s3.BucketAccessControl.PRIVATE,      removalPolicy: cdk.RemovalPolicy.DESTROY    });

Agora vamos criar uma tabela do DynamoDB para poder consultá-la através de nosso aplicativo web; que configuraremos com leitura e gravação sob demanda, uma chave primária chamada “instrument” tipo numérico e da mesma forma será apagada ao remover o stack:

    const analyticsTable = new ddb.Table(this, 'analytics-table', {      billingMode: ddb.BillingMode.PAY_PER_REQUEST,      partitionKey: {        name: 'instrument',        type: ddb.AttributeType.NUMBER      },      removalPolicy: cdk.RemovalPolicy.DESTROY    });

 

Agora vamos criar o Kinesis Data Stream com uma retenção de 2 dias e 10 shards (esta configuração deve ser ajustada dependendo da carga de dados a ser recebida):

    const dataStream = new kinesis.Stream(this, 'real-time-stream', {      retentionPeriod: cdk.Duration.days(2),      shardCount: 10    });

 

Agora vamos configurar um Kinesis Firehose para enviar todas as informações recebidas, geralmente isso deve ser feito para a camada RAW ou dados brutos de um data lake, essa configuração requer um fluxo de dados para logs, uma função que usará o serviço para acessar recursos e o próprio fluxo:

const firehoseLogStream = new logs.LogStream(this, 'kinesis-firehose-log', {      logGroup: logGroup, // Está associado ao grupo criado inicialmente.      logStreamName: 'firehose-stream',      removalPolicy: cdk.RemovalPolicy.DESTROY // Exclui o fluxo limpando a pilha.    });     const firehoseRole = new iam.Role(this, 'firehose-role', {      assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),      description: 'Role used by Kinesis Firehose to have access to the S3 bucket.',      inlinePolicies: {        's3-policy': new iam.PolicyDocument({ // Criamos uma política S3 online.          statements: [            new iam.PolicyStatement({              actions: [ // Colocamos apenas as permissões mínimas necessárias para usar o S3.                 's3:AbortMultipartUpload',                's3:GetBucketLocation',                's3:GetObject',                's3:ListBucket',                's3:ListBucketMultipartUploads',                's3:PutObject'              ],              effect: iam.Effect.ALLOW,              resources: [ // Referencia ao bucket inicial.                rawDataBucket.bucketArn,                rawDataBucket.bucketArn + '/*'              ]            })          ]        }),        'kinesis-policy': new iam.PolicyDocument({ // Política para usar Kinesis Data Stream.          statements: [            new iam.PolicyStatement({              actions: [                'kinesis:DescribeStream',                'kinesis:GetShardIterator',                'kinesis:GetRecords',                'kinesis:ListShards'              ],              effect: iam.Effect.ALLOW,              resources: [                dataStream.streamArn // Stream previamente criado.              ]            })          ]        }),        'cloudwatch': new iam.PolicyDocument({ // Política para logs, per mínimos.          statements: [            new iam.PolicyStatement({              actions: [                'logs:PutLogEvents'              ],              effect: iam.Effect.ALLOW,              resources: [                logGroup.logGroupArn + ':log-stream:' + firehoseLogStream.logStreamName              ]            })          ]        })      }    });     const firehoseStream = new kfh.CfnDeliveryStream(this, 'analytics-firehose-stream', {      deliveryStreamName: 'analytics-ingestion-fh',      deliveryStreamType: 'KinesisStreamAsSource', // Estabelecemos o tipo de origem.      kinesisStreamSourceConfiguration: {        kinesisStreamArn: dataStream.streamArn, // Stream de kinesis.        roleArn: firehoseRole.roleArn // Role com per mínimos necesarios.      },      s3DestinationConfiguration: { // Bucket de datos raw.        bucketArn: rawDataBucket.bucketArn,        bufferingHints: {          intervalInSeconds: 60,          sizeInMBs: 100        },        cloudWatchLoggingOptions: { // Grupo de fluxo de logs.          logGroupName: logGroup.logGroupName,          logStreamName: firehoseLogStream.logStreamName        },        compressionFormat: 'Snappy', // Compressão.        encryptionConfiguration: {          noEncryptionConfig: 'NoEncryption' // sem encrypt.        },        errorOutputPrefix: 'failed-data/', // Prefijo de errores.        prefix: 'ingested-data/', // Prefixo onde as informações são depositadas.        roleArn: firehoseRole.roleArn // Role com permissões mínimos.      }    });

 

Agora vamos criar o microsserviço de ingest de dados, que iremos implantar em um lambda e precisa de uma permissão para poder acessar os diferentes serviços com os quais interatuaremos:

 const ingestLambdaRole = new iam.Role(this, 'ingest-lambda-role', {      assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),      description: 'Execution role for the data ingestion Lambda.',      inlinePolicies: {        'dynamodb': new iam.PolicyDocument({ // Política para acesso ao DynamoDB.          statements: [            new iam.PolicyStatement({              actions: [                'dynamodb:UpdateItem' // O microsserviço requer apenas a atualização de itens.                            ],              effect: iam.Effect.ALLOW,              resources: [                analyticsTable.tableArn // Tabela criada na stack.              ]            })          ]        }),        'kinesis-policy': new iam.PolicyDocument({ // Política para acceso a Kinesis Data Streams.          statements: [            new iam.PolicyStatement({              actions: [ // Permissões necessárias para consumir logs.                'kinesis:DescribeStream',                'kinesis:GetShardIterator',                'kinesis:GetRecords',                'kinesis:ListShards'              ],              effect: iam.Effect.ALLOW,              resources: [                dataStream.streamArn // Stream criado previamente.              ]            })          ]        })      },      managedPolicies: [ // Política gerenciada com as permissões mais básicas para executar lambdas.        iam.ManagedPolicy.fromManagedPolicyArn(this, 'aws-lambda-basic',          'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole')      ]    });     const ingestLambda = new lambda.Function(this, 'data-ingestion-lambda', {      code: new lambda.AssetCode('functions/ingest_data'), // Definimos que o código lambda está no subdiretório. functions/ingest_data      environment: {        'ANALYTICS_TABLE': analyticsTable.tableName // Expomos o nome da tabela do DynamoDB por meio de variáveis ​​de ambiente.      },      events: [ // Definimos os eventos que disparam o lambda, neste caso novos registros no Kinesis.        new lev.KinesisEventSource(dataStream, {          // Esses parâmetros devem ser configurados de acordo com o número de registros esperados.          batchSize: 100,          bisectBatchOnError: true,          parallelizationFactor: 2,          retryAttempts: 2,          startingPosition: lambda.StartingPosition.LATEST        })      ],      handler: 'app.lambda_handler',      role: ingestLambdaRole, // Role previamente criado.      runtime: lambda.Runtime.PYTHON_3_8    });

 

Vamos configurar a função de consulta:

const queryLambdaRole = new iam.Role(this, 'query-lambda-role', {      assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),      description: 'Execution role for the query lambda.',      inlinePolicies: { // Política para permissões de DynamoDB.        'dynamodb': new iam.PolicyDocument({          statements: [            new iam.PolicyStatement({              actions: [ // Nós só precisamos executar scans.                'dynamodb:Scan'              ],              effect: iam.Effect.ALLOW,              resources: [                analyticsTable.tableArn // Tabela criada.              ]            })          ]        })      },      managedPolicies: [ // Política com permissões mínimas.        iam.ManagedPolicy.fromManagedPolicyArn(this, 'aws-lambda-query-basic',          'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole')      ]    });     const queryLambda = new lambda.Function(this, 'api-lambda', {      code: new lambda.AssetCode('functions/query_data'),      environment: {        'ANALYTICS_TABLE': analyticsTable.tableName      },      handler: 'app.lambda_handler',      role: queryLambdaRole,      runtime: lambda.Runtime.PYTHON_3_8    });

 

Para consumir este microserviço utilizaremos API Gateway para expor como uma API REST.

     const analyticsApi = new api.RestApi(this, 'analytics-api', { // Creamos la API.      description: 'Analytics query API.',      defaultCorsPreflightOptions: { // Agregamos CORS en la API.        allowCredentials: true,        allowHeaders: ['*'],        allowMethods: ['GET'],        allowOrigins: ['*'],        statusCode: 200      }    });     // O recurso onde a API de dados residirá é criado.    const dataResource = analyticsApi.root.addResource('data');     // Definimos o método GET no recurso e a integração com o lambda é feita.    const getDataMethod = dataResource.addMethod('GET',      new api.LambdaIntegration(queryLambda, {        proxy: true      }));     // O autorizador cognito é criado para evitar chamadas não autenticadas para a API.    const apiAuthorizer = new api.CfnAuthorizer(this, 'analytics-authorizer', {      identitySource: 'method.request.header.Authorization',      identityValidationExpression: 'Bearer (.*)',      name: 'AnalyticsAuthorizer-' + cdk.Aws.ACCOUNT_ID,      providerArns: [userPool.userPoolArn],      restApiId: analyticsApi.restApiId,      type: 'COGNITO_USER_POOLS'    });

 

Para facilitar a configuração do aplicativo Web, criamos as seguintes saídas para obter informações sobre o Cognito e APIs:

new cdk.CfnOutput(this, 'identity-pool-id', {      description: 'IdentityPoolId',      value: identityPool.ref    });     new cdk.CfnOutput(this, 'region', {      description: 'Cognito Region',      value: cdk.Aws.REGION    });     new cdk.CfnOutput(this, 'user-pool-id', {      description: 'UserPoolId',      value: userPool.userPoolId    });     new cdk.CfnOutput(this, 'user-pool-web-client-id', {      description: 'UserPoolWebClientId',      value: appClient.userPoolClientId    });     new cdk.CfnOutput(this, 'api-endpoint', {      description: 'API Gateway URL',      value: analyticsApi.url    });

 

Agora vamos construir a stack para o aplicativo web:

// Criamos o bucket S3 para o site e configuramos a página inicial.     const webAppBucket = new s3.Bucket(this, 'web-app-bucket', {      websiteIndexDocument: 'index.html'    });     // Geramos a exibição do site indicando o caminho source.     const deployment = new s3d.BucketDeployment(this, 'web-deployment', {      sources: [s3d.Source.asset('web/dashboard/dist')],      destinationBucket: webAppBucket    });     // Criamos uma identidade de acesso de origem para dar permissões de leitura do CloudFront no bucket S3.    const cloudFrontOia = new cf.OriginAccessIdentity(this, 'web-site-oia', {      comment: 'OIA for real time analytics.'    });     // Criamos o layout do site no CloudFront e atribuímos o OAI.     const webDistribution = new cf.CloudFrontWebDistribution(this, 'web-cf-distribution', {      originConfigs: [{        behaviors: [{          isDefaultBehavior: true        }],        s3OriginSource: {          originAccessIdentity: cloudFrontOia,          s3BucketSource: webAppBucket        }      }]    });     // Criamos uma política de acesso no bucket S3 com o OAI.    const cfPolicy = new iam.PolicyStatement();    cfPolicy.addActions('s3:GetBucket*', 's3:GetObject*', 's3:List*');    cfPolicy.addResources(webAppBucket.bucketArn, webAppBucket.bucketArn + '/*')    cfPolicy.addCanonicalUserPrincipal(cloudFrontOia.cloudFrontOriginAccessIdentityS3CanonicalUserId)    webAppBucket.addToResourcePolicy(cfPolicy)     // URL de CloudFront.    new cdk.CfnOutput(this, 'cloud-front-url', {      description: 'Cloud Front URL',      value: webDistribution.distributionDomainName    });

 

Usando o comando npm run build , podemos verificar se o código está correto, por exemplo:

npm run build> real-time-analytics-poc@0.1.0 build /home/user/projects/serverless-realtime-analytics> tsc

 

Para implantar a infraestrutura, é necessário criar o código-fonte para os micro serviços.

Microsserviço de entrada a ser colocado em funções/ingest_data:

import base64import boto3import osfrom botocore.exceptions import ClientErrorfrom decimal import * TABLE_NAME = os.getenv('ANALYTICS_TABLE')DYNAMODB_CLIENT = boto3.client('dynamodb')  def lambda_handler(event, context):    """    Lambda handler, process the records in a Kinesis Stream and updates the DynamoDB table.     :param event: Received by the lambda.    :param context: Execution context.    :return: Ok string.    """    for record in event['Records']:        data = str(base64.b64decode(record['kinesis']['data'])).replace("'", '').replace('\\n', '').replace('"', '')        print(data)        item = extract_data(data)        key = {'instrument': {'N': str(item['instrument'])}}        del item['instrument']        exp, values = create_expression_values(item)        try:            outcome = DYNAMODB_CLIENT.update_item(                TableName=TABLE_NAME,                Key=key,                UpdateExpression=exp,                ExpressionAttributeValues=values            )            print(outcome)        except ClientError as e:            if e.response['Error']['Code'] == 'ConditionalCheckFailedException':                print("Ignoring message out of sync: ")                print(e.response['Error'])            else:                raise e    return "Ok"  def extract_data(data: str):    """    Extracts the data from the String, splits the fields by | and then gets the name before the = and value.     :param data: The string to parse.    :return: Dictionary with the parsed data.    """    fields = data.split('|')    item = dict()    for field in fields:        if len(field) > 3:            name_value = field.split('=')            name = name_value[0]            value = name_value[1]            if name.startswith('price_'):                item[name] = Decimal(str(int(value) / 100))            elif name == 'instrument' or name == 'reception':                item[name] = int(value)            elif name == 'sequence':                item['seq'] = int(value)            elif name == 'type':                item['t'] = int(value)            elif name == 'level':                item['l'] = int(value)            else:                item[name] = value    return item  def create_expression_values(item: dict):    """    Creates the update expression using the provided dictionary.     :param item: Dictionary of the data to use for the expression.    :return: String with the expression.    """    expression = 'SET '    values = dict()    for name in item:        expression += '{} = :{}, '.format(name, name)        values[':{}'.format(name)] = {'S' if type(item[name]) is str else 'N': str(item[name])}    return expression[0:-2], values

 

Consultar microsserviço a ser colocado em funções/query_data:

import boto3import jsonimport osfrom decimal import Decimal TABLE_NAME = os.getenv('ANALYTICS_TABLE')DYNAMODB_CLIENT = boto3.client('dynamodb')  def lambda_handler(event, context):    """    Simple handler, scans the DynamoDB to retrieve all the items.     :param event: Received from the API Gateway.    :param context: Execution context.    :return: The available items in DynamoDB.    """    response = DYNAMODB_CLIENT.scan(        TableName=TABLE_NAME,        Select='ALL_ATTRIBUTES',        ConsistentRead=True    )    print(response['Items'])    items = list()    for item in response['Items']:        new_item = dict()        for key in item:            if str(key).startswith('price_'):                new_item[key] = float(item[key]['N'])            elif key == 'instrument':                new_item[key] = int(item[key]['N'])            else:                new_item[key] = item[key]        items.append(new_item)    return {        'statusCode': 200,        'headers': {            'Access-Control-Allow-Origin': '*',            'Access-Control-Allow-Credentials': 'true',            'Access-Control-Allow-Methods': 'GET',            'Access-Control-Allow-Headers': '*',            'Content-Type': 'application/json'        },        'body': json.dumps(items)    }

 

Uma vez que os microsserviços são criados em sua pasta respectiva, é necessário instalar a infraestrutura da primeira stack executando o comando “cdk deploy RealTimeAnalyticsPocStack”. No final do deploy, obteremos os valores necessários para configurar a aplicação web:

No arquivo main.js do aplicativo (baixe o código do repositório git), precisamos modificar as configurações do Amplify mostradas abaixo. Nota: Neste exemplo, a região é us-east-1:

Amplify.configure({  Auth: {    identityPoolId: 'us-east-1:XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',    identityPoolRegion: 'us-east-1',    userPoolId: 'us-east-X_XXXXXXXXX',    userPoolWebClientId: 'XXXXXXXXXXXXXXXXXXXXXXXXXX',    region: 'us-east-1'  },  API: {    endpoints: [      {        name: 'AnalyticsAPI',        endpoint: 'https://XXXXXXXXXX.execute-api.us-east-1.amazonaws.com/prod'      }    ]  }})

 

Uma vez que esta configuração foi modificada, podemos prosseguir para implantar nosso aplicativo web com o comando “cdk deploy RealTimeAnalyticsWebStack”.

Ao finalizar a implantação, podemos navegar até a URL de saída e ver a página de login.

Para testar a aplicação, precisaremos usar o Kinesis Data Generator, que pode ser obtido a partir do seguinte link, juntamente com suas instruções de uso: Gerador de dados do Kinesis

Aqui está o modelo para gerar os registros:

session={{date.now('YYYYMMDD')}}|sequence={{date.now('x')}}|reception={{date.now('x')}}|instrument={{random.number(9)}}|l={{random.number(20)}}|price_0={{random.number({"min":10000, "max":30000})}}|price_1={{random.number({"min":10000, "max":30000})}}|price_2={{random.number({"min":10000, "max":30000})}}|price_3={{random.number({"min":10000, "max":30000})}}|price_4={{random.number({"min":10000, "max":30000})}}|price_5={{random.number({"min":10000, "max":30000})}}|price_6={{random.number({"min":10000, "max":30000})}}|price_7={{random.number({"min":10000, "max":30000})}}|price_8={{random.number({"min":10000, "max":30000})}}|

Conclusão

Usando essa arquitetura, é possível realizar análises de informações em tempo real (near real-time) e análise de informações passadas, podemos ainda fazer uma combinação das duas visões para gerar modelos preditivos usando AI/ML e realizar a detecção de anomalias ou até mesmo integrar o Kinesis Data Analytics para o mesmo propósito.

 

Próximos Passos

Este aplicativo de exemplo permite a visualização de dados em tempo semirreal, como próximo passo você pode integrar aos fluxos produtivos permitindo que os usuários do negócio sejam capazes de tomar decisões com essas informações.

Mais informações:

https://aws.amazon.com/cdk/

https://aws.amazon.com/s3/

https://aws.amazon.com/kinesis/

https://aws.amazon.com/lambda/

https://aws.amazon.com/dynamodb/

https://aws.amazon.com/cloudfront/

https://aws.amazon.com/cloudwatch/

https://aws.amazon.com/cognito/

https://aws.amazon.com/iam/

https://aws.amazon.com/apigateway/

https://aws.amazon.com/amplify/

https://aws.amazon.com/cli/


Sobre o autor:

Horacio Ferro é Arquiteto de Soluções na AWS México.

 

 

 

 

Revisores:

Enrique Compañ é Arquiteto de Soluções na AWS México.

 

 

 

 

Enrique Valladares é Sr.Manager Arquiteto de Soluções na AWS México.

 

 

 

 

 

Use seus dados para impulsionar o crescimento do negócio. Inove continuamente usando o data flywheel.