O blog da AWS

Criando um workflow de rotulamento, treinamento e deploy de machine learning utilizando o Amazon SageMaker, PyTorch e Amazon SageMaker Ground Truth

Por Evandro Franco, Arquiteto de Soluções em Startups na AWS

 

O Amazon SageMaker é um serviço totalmente gerenciado de machine learning (ML), no qual cientistas de dados e desenvolvedores podem criar e treinar modelos de machine learning com rapidez e facilidade, e depois implantá-los em um ambiente gerenciado pronto para produção.

Para treinar um modelo de machine learning, é necessário que você possua uma grande quantidade de dados (dataset), com boa definição e que estejam corretamente rotulados. O Amazon SageMaker Ground Truth ajuda na tarefa de construção desse dataset de alta qualidade. Através dele é possível utilizar colaboradores de uma empresa terceira, do Amazon Mechanical Turk, ou de uma força de trabalho da própria empresa.

Um dos frameworks de machine learning open source mais utilizados do mercado é o PyTorch. Ele permite acelerar o desenvolvimento, desde a prototipação até a produção. Iremos utilizar o PyTorch neste exemplo de hoje para mostrar também a flexibilidade e facilidade da integração deste framework com o Amazon SageMaker.

 

Visão Geral da Solução

Inicialmente, para esta demonstração, iremos treinar um modelo para classificar imagens de objetos. Estas imagens foram selecionadas a partir do dataset público da Caltech 256, que é composto por mais de 30 mil imagens rotuladas em 256 categorias (classes) distintas. A partir deste dataset iremos trabalhar com uma amostra das seguintes 4 classes:

  • Beer Mug (caneca de cerveja);
  • Coffee Mug (caneca de café);
  • Teapot (bule de chá);
  • Wine Bottle (garrafa de vinho);

Obs.: Para demonstrar o fluxo de desenvolvimento de modelos de machine learning, desde o rotulamento no SageMaker Ground Truth, preparação dos dados até a implementação do modelo, vocês podem selecionar 20 imagens de cada categoria acima, totalizando 80 imagens. Em um cenário real, provavelmente seria necessário de um dataset maior para melhorar o desempenho do modelo.

 

 

Criando os recursos necessários (pré-requisitos)

Para começar, será necessário criar um bucket no Amazon S3 (ou utilizar um já existente). O tutorial “Create a Bucket” possui o passo a passo de como criar um bucket.

Em seguida, selecione vinte imagens aleatórias do dataset citado na etapa anterior, considerando apenas as quatro categorias mencionadas (010.beer-mug, 041.coffee-mug, 212.teapot e 246.wine-bottle) e faça o upload dessas imagens para o S3, utilizando apenas um único diretório, pois iremos utilizar o Amazon SageMaker Ground Truth para o rotulamento.

 

 

Criando o job de rotulamento no SageMaker Ground Truth

Agora que já criamos um Bucket no S3 e carregamos as imagens, iremos utilizar o Amazon SageMaker Ground Truth para que as imagens não rotuladas possam ser categorizadas antes do processo de treinamento.

Para iniciar, acesse o SageMaker no console da AWS e no menu da esquerda clique em Labeling Jobs, na seção Ground Truth.

Defina os seguintes atributos no trecho Job Overview:

  • Job name: labeling-demo
  • Input data setup: selecione a opção “Automated data setup”
  • Data setup:
    • S3 location for input datasets: Navegue pelo S3 até o diretório com as imagens.
    • S3 location for output datasets: Navegue pelo S3 até o diretório vazio ou mantenha a mesma localidade.
  • Data type: Image
  • IAM Role: Especifique uma role com permissão AmazonSageMakerFullAccess.
  • Clique no botão “Complete data setup”. Esta etapa irá criar um arquivo de manifesto para as imagens, que será utilizado pelo Ground Truth.

 

 

No trecho Task Type, selecione:

  • Image: Pois nosso dataset é composto por imagens.
  • Image Classification (Single Label): pois cada imagem possui apenas um único objeto ou rótulo.

 

 

Clique em Next para a próxima etapa.
Obs.: Será considerado neste exemplo o primeiro acesso ao Ground Truth. Caso esse acesso já tenha sido realizado, será possível visualizar os times que foram criados anteriormente.

 

No trecho Workers:

  • Worker types: selecione a opção Private
  • Team name: my-private-team
  • Invite private annotators: lista de e-mails, separados por vírgula
  • Organization: demo-org
  • Contact: e-mail de contato para os workers
  • Task timeout: 5 minutes (tempo máximo para uma única tarefa. Uma única tarefa corresponde a uma única imagem).
  • Task expiration time: 10 days (este timeout considera o job como um todo, ou seja, o conjunto de imagens).

 

 

No trecho Image classification (Single Label) labeling tool, será possível definir o modelo que as imagens aparecerão para serem rotuladas:

  • Coloque uma descrição para as imagens, para que o anotador saiba o que preencher, por exemplo:
    • Please classify images into one of four classes:
  • Preencha as opções de rotulamento:
    • 01.beer-mug
    • 02.coffee-mug
    • 03.teapot
    • 04.wine-bottle

 

 

É possível clicar no botão preview para abrir uma página de exemplo de como será exibido para os trabalhadores.

Clique em create para concluir e criar o job.

Após alguns minutos, os usuários cadastrados receberão um e-mail com o acesso para o portal onde poderão iniciar o rotulamento.

 

 

Após o rotulamento ser concluído, alguns metadados ficarão disponíveis na pasta de output do S3. Dentro desta estrutura de pastas, duas são importantes:

  • manifests: contém os arquivos de manifesto de output do job.
  • annotations:
    • worker-response: contém a resposta individual de cada trabalhador.
    • consolidated-annotation: contém as anotações desejadas.

Para saber mais sobre, visualize essa documentação sobre os outputs do Ground Truth.

 

Criando um notebook:

Também utilizaremos uma instância de notebook do SageMaker. O tutorial “Create a notebook instance” demonstra como criar um notebook Jupyter do SageMaker. Como o propósito deste blog é para teste, pode ser utilizada uma instância do tipo ml.t3.medium.

Com a instância de notebook criada, crie um notebook utilizando o kernel conda_pytorch_p36:

 

 

Em seguida, execute o seguinte trecho de código com as bibliotecas, o Bucket s3 e outras referências que serão utilizadas pelo SageMaker:

 

import sagemaker

from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

sagemaker_session = sagemaker.Session()

# Destination Bucket

bucket = '<your-bucket-name>

prefix = '<your-bucket-prefix/subfolders>'

role = sagemaker.get_execution_role()

 

Explorando os dados do Amazon SageMaker Ground Truth

Para explorar os dados gerados pelo Ground Truth, iremos criar um novo Notebook. Neste notebook, importe as dependências:

 

import os, json

import pandas as pd

import fnmatch

import boto3

from botocore.config import Config

import sagemaker


s3 = boto3.resource('s3')

sagemaker_session = sagemaker.Session()


# Destination Bucket

bucket = '<your-bucket-name>'

prefix = '<prefix-with-ground-truth-output>'

new_prefix= '<new-prefix-to-organize-data>'

 

Na variável prefix, utilize o caminho utilizado para o output do job do Ground Truth.

Utilize o exemplo de código abaixo para baixar o output para o notebook, para avaliar o que foi gerado:

sagemaker_session.download_data('<local_path>', bucket , prefix)

O trecho a seguir, irá ler o manifesto gerado com os dados dos trabalhadores que rotularam as imagens:

 

# Reading Manifest Files (Output)
path_to_json = prefix + '<path to output on manifest folder>'

json_files = [pos_json for pos_json in os.listdir(path_to_json) if pos_json.endswith('.manifest')]

files = []
for index, js in enumerate(json_files):
    with open(os.path.join(path_to_json, js)) as json_file:
        files += list(json_file)

manifest_data=[]
for i in files:
    temp = json.loads(i)
    manifest_data.append(temp)

manifest_data

 

Por fim, o seguinte trecho irá reorganizar as imagens rotuladas no S3, já em uma estrutura de pastas que utilizaremos nos próximos passos:

 

import numpy as np

for i in manifest_data:
    src_ref = i["source-ref"]
    filename = src_ref.split("/")[-1]
    label = i["<JOB-NAME>-metadata"]["class-name"]
    if np.random.rand(1) < 0.2:
        new_path =  new_prefix + '/test/' + label + '/' + filename
        print(new_path)
        s3.Object(bucket, new_path).copy_from(CopySource=src_ref.replace("s3://",""))
    else:
        new_path =  new_prefix + '/train/' + label + '/' + filename
        print(new_path)
        s3.Object(bucket, new_path).copy_from(CopySource=src_ref.replace("s3://",""))
 
       

 

Obs.: O valor <JOB-NAME> deve ser alterado pelo nome do job utilizado no Ground Truth. Neste exemplo utilizamos um fator randômico para separar o dataset em treino e test, apenas para o propósito demonstrativo.

 

Treinando o modelo no SageMaker

Iremos utilizar uma técnica de Machine Learning chamada Transfer Learning. Na prática, nem sempre é necessário treinar uma rede neural convolucional do zero, porque é difícil possuir um dataset com tamanho suficiente, além de demandar um tempo de treinamento muito grande. Ao invés disto, é comum utilizar um grande dataset, pré-treinar uma rede com esse dataset e depois utilizar este modelo pré-treinado como ponto de início para desenvolver o novo modelo com o novo dataset para outro domínio específico, acelerando consideravelmente o tempo de treinamento e sem a necessidade de tantas imagens. Neste exemplo, utilizamos a rede ResNet18, pré-treinada com o dataset ImageNet.

A seguir utilizaremos o script transfer_learning.py com o código completo empregando a técnica de transfer learning e o framework PyTorch. Esse script será o ponto de entrada para o treinamento e deploy do nosso modelo:

 
       
# transfer_learning.py
import argparse
import json
import logging
import os
import time
import sys
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.utils.data
import torch.utils.data.distributed
import torchvision
import numpy as np
from torchvision import datasets, transforms, models
import copy
from collections import OrderedDict

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def _get_data_loader(batch_size, training_dir, is_distributed, **kwargs):
    logger.info("Get dataset into data_loader")
    
    data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(size=256, scale=(0.8, 1.0)),
        transforms.RandomRotation(degrees=15),
        transforms.RandomHorizontalFlip(),
        transforms.CenterCrop(size=224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    }

    data_dir = training_dir
    image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'test']}

    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None
    
    dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size,
                                                shuffle=train_sampler is None, 
                                                sampler=train_sampler, **kwargs)
                  for x in ['train', 'test']}
    
    dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'test']}

    return dataloaders, dataset_sizes

def model_fn(model_dir):
    try:
        logger.info('model_fn')
        device = "cuda" if torch.cuda.is_available() else "cpu"
        with open(os.path.join(model_dir, 'model.pth'), 'rb') as f:
            ckpt = torch.load(f, map_location='cpu')
        optimizer = ckpt['optimizer']
        epoch = ckpt['epoch']
        model = ckpt['model']
        load_dict = OrderedDict()
        for k, v in model.items():
            if k.startswith('module.'):
                k_ = k.replace('module.', '')
                load_dict[k_] = v
            else:
                load_dict[k] = v
        
        model = models.resnet18(pretrained=False)
        num_ftrs = model.fc.in_features
        
        model.fc = nn.Sequential(
            nn.Linear(num_ftrs, 256),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(256, 4), 
            nn.LogSoftmax(dim=1) # For using NLLLoss()
        )
        
        model.load_state_dict(load_dict)
        return model.to(device)
    except Exception as err:
        print(err)
        raise

def save_model(model, optimizer, epoch, model_dir):
    logger.info("Saving the model.")
    path = os.path.join(model_dir, 'model.pth')
    # recommended way from http://pytorch.org/docs/master/notes/serialization.html
    torch.save(
        {
            "model" : model.state_dict(), 
            "optimizer": optimizer.state_dict(),
            "epoch": epoch
        },
        path)

def train_model(dataloaders, dataset_sizes, device, model, criterion, optimizer, 
                scheduler, num_epochs=10):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'test']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'test' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

def train(args):
    is_distributed = len(args.hosts) > 1 and args.backend is not None
    logger.debug("Distributed training - {}".format(is_distributed))
    use_cuda = args.num_gpus > 0
    logger.debug("Number of gpus available - {}".format(args.num_gpus))
    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
    device = torch.device("cuda" if use_cuda else "cpu")

    if is_distributed:
        # Initialize the distributed environment.
        world_size = len(args.hosts)
        os.environ['WORLD_SIZE'] = str(world_size)
        host_rank = args.hosts.index(args.current_host)
        dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size)
        logger.info('Initialized the distributed environment: \'{}\' backend on {} nodes. '.format(
            args.backend, dist.get_world_size()) + 'Current host rank is {}. Number of gpus: {}'.format(
            dist.get_rank(), args.num_gpus))

    # set the seed for generating random numbers
    torch.manual_seed(args.seed)
    if use_cuda:
        torch.cuda.manual_seed(args.seed)

    dataloaders, dataset_sizes = _get_data_loader(args.batch_size, args.data_dir, is_distributed, **kwargs)

    model_ft = models.resnet18(pretrained=True)
    num_ftrs = model_ft.fc.in_features
    
    # Change the final layer of ResNet18 Model for Transfer Learning
    #model_ft.fc = nn.Linear(num_ftrs, 4)
    model_ft.fc = nn.Sequential(
        nn.Linear(num_ftrs, 256),
        nn.ReLU(),
        nn.Dropout(0.4),
        nn.Linear(256, 4), 
        nn.LogSoftmax(dim=1) # For using NLLLoss()
    )

    model_ft = model_ft.to(device)
    if is_distributed and use_cuda:
        # multi-machine multi-gpu case
        model_ft = torch.nn.parallel.DistributedDataParallel(model_ft)
    else:
        # single-machine multi-gpu case or single-machine or multi-machine cpu case
        model_ft = torch.nn.DataParallel(model_ft)

    criterion = nn.NLLLoss()

    # Observe that all parameters are being optimized
    optimizer_ft = optim.SGD(model_ft.parameters(), lr=args.lr, momentum=args.momentum)

    # Decay LR by a factor of 0.1 every 7 epochs
    exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

    # Training
    model_ft = train_model(dataloaders, dataset_sizes, device, model_ft, criterion, 
                            optimizer_ft, exp_lr_scheduler, args.epochs)

    # Save Model
    save_model(model_ft, optimizer_ft, args.epochs, args.model_dir)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Data and model checkpoints directories
    parser.add_argument('--batch-size', type=int, default=4, metavar='N',
                        help='input batch size for training (default: 4)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=10, metavar='N',
                        help='number of epochs to train (default: 10)')
    parser.add_argument('--lr', type=float, default=0.001, metavar='LR',
                        help='learning rate (default: 0.001)')
    parser.add_argument('--momentum', type=float, default=0.9, metavar='M',
                        help='SGD momentum (default: 0.9)')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='random seed (default: 1)')
    parser.add_argument('--log-interval', type=int, default=100, metavar='N',
                        help='how many batches to wait before logging training status')
    parser.add_argument('--backend', type=str, default=None,
                        help='backend for distributed training (tcp, gloo on cpu and gloo, nccl on gpu)')

    # Container environment
    parser.add_argument('--hosts', type=list, default=json.loads(os.environ['SM_HOSTS']))
    parser.add_argument('--current-host', type=str, default=os.environ['SM_CURRENT_HOST'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--data-dir', type=str, default=os.environ['SM_CHANNEL_TRAINING'])
    parser.add_argument('--num-gpus', type=int, default=os.environ['SM_NUM_GPUS'])

    train(parser.parse_args())

 

Neste script, é importante estar atento aos seguintes trechos:

  • if__name__ == '__main__': o ponto de entrada do código, onde os argumentos são tratados e o método de treinamento train() é chamado;
  • def train(args): possui toda a lógica de treinamento do modelo;
  • def model_fn(model_dir): executada apenas para inferência, é responsável por carregar o modelo na inicialização do container de inferência do SageMaker.

Após analisar este script, ele pode ser criado no mesmo diretório onde está o notebook. Voltando ao notebook iremos criar um estimator do SageMaker, que possuirá as instruções necessárias para o processo de treinamento, sendo os principais parâmetros indicados:

  • O script transfer_learning.py criado na etapa anterior como entrada;
  • A role (IAM) com as permissões necessárias;
  • O tipo da instância de treinamento (neste caso, como é um exemplo, pode ser utilizada a instância m5.large);
  • Número de instâncias (neste caso, apenas uma).
 
       
# Criando o Estimator
from sagemaker.pytorch import PyTorch

pytorch_estimator = PyTorch('transfer_learning.py',
                            role=role,
                            instance_type='ml.m5.large',
                            instance_count=1,
                            framework_version='1.5.0',
                            py_version='py3',
                           )

 

Este trecho pega o caminho para as imagens no S3 e passa como parâmetro para o processo de treinamento do SageMaker. Durante este treinamento o SageMaker irá lançar as instâncias necessárias (definidas no passo anterior), executar o treinamento, salvar o modelo treinado no S3 e encerrar as instâncias.

Após isto, para iniciar o processo de treinamento, basta executar o seguinte trecho de código:

bucket_uri = 's3://' + bucket + '/' + prefix
pytorch_estimator.fit({'training': bucket_uri})

Inferindo o resultado no SageMaker

Após a conclusão do processo de treinamento, é possível fazer o deploy do modelo em uma instância de inferência do SageMaker. Esta instância é otimizada e preparada com as dependências necessárias para reduzir a necessidade de gerenciamento de infraestrutura. Esta instância já possui um endpoint HTTPS para que o modelo seja acessado e retorne uma predição. Para conhecer mais, visite a página Implantar um modelo nos serviços de hospedagem do SageMaker.

Através do seguinte comando é possível criar uma instância de inferência para testar o modelo:

predictor = pytorch_estimator.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge')

Assim como no treinamento, aqui também definimos a quantidade e o tipo de instância.

Para testar, utilize o script abaixo, que irá utilizar uma imagem qualquer (pode ser utilizada qualquer imagem do dataset original, de preferência uma que não tenha sido utilizada no processo de treinamento):

 
       
from PIL import Image

loader = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

def image_loader(image_name):
    """load image, returns cuda tensor"""
    image = Image.open(image_name)
    image = loader(image).float()
    image = image.unsqueeze(0) 
    return image

image = image_loader("02_coffee_mug.jpg")

import numpy as np

objects_category = ['01-beer-mug','02-coffee-mug','03-teapot','04-wine-bottle']

response = predictor.predict(image)
output = torch.exp(torch.tensor(response))
index = np.argmax(output)
print("Result --> label: " + objects_category[index] + " | probability: " + str(output[0][index]))

 

O resultado será algo parecido com o output abaixo:

Result --> label: 02-coffee-mug | probability: tensor(0.6101, dtype=torch.float64)

Como o proposito aqui é apenas demonstrativo, o pre-processamento para redimensionar tamanho das imagens e normalizar os canais RGB da imagem antes de enviar ao modelo e também o pós-processamento para a conversão para escala entre 0 e 1 da probabilidade foi feito diretamente no notebook. Contudo, essa lógica poderia ser inclusa em funções input_fn, predict_fn e output_fn, dentro do script python (transfer_learning.py) que seria utilizado tanto no treinamento e como na inferência. Mais detalhes de como customizar a lógica de inferência podem ser encontrados aqui.

 

Limpando os recursos

Para evitar custos não desejados, o endpoint criado para a inferência do modelo pode ser deletado ao término, através do comando:

predictor.delete_endpoint()

 

Conclusão

Nesta publicação, demonstramos como criar um workflow de Machine Learning, desde a etapa de rotulamento das imagens, passando pelo treinamento, utilizando um script PyTorch no SageMaker e finalizando com a criação de uma instância de inferência para realizar inferências em tempo real.

 

 


Sobre o autor

Evandro Franco é um arquiteto de soluções do time de Startups na Amazon Web Services. Em sua função, ele ajuda as Startups a superar os desafios de negócio utilizando a plataforma da AWS. Ele tem mais de 15 anos de experiência na área de tecnologia. Em seu tempo livre, gosta de correr e passar tempo com sua família.