Blog de Amazon Web Services (AWS)

Creación de un flujo de trabajo de etiquetado, entrenamiento e implementación de aprendizaje automático con Amazon SageMaker, PyTorch y Amazon SageMaker Ground Truth

Por Evandro Franco, Arquitecto de Soluciones en Startups en AWS

 

Amazon SageMaker es un servicio de aprendizaje automático (ML) totalmente administrado en el que los científicos y desarrolladores de datos pueden crear y entrenar modelos de aprendizaje automático de forma rápida y sencilla e implementarlos en un ambiente administrado, listo para producción.

Para entrenar un modelo de aprendizaje automático, necesita tener una gran cantidad de datos (conjunto de datos), con una buena definición y etiquetados correctamente. Amazon SageMaker Ground Truth le ayuda con la tarea de crear este conjunto de datos de alta calidad. Le permite utilizar empleados de terceros, de Amazon Mechanical Turk, o la fuerza de trabajo de la propia empresa.

Uno de los framewroks de aprendizaje automático de código abierto más utilizados en el mercado es PyTorch. Le permite acelerar el desarrollo, desde la creación de prototipos hasta la producción. Utilizaremos PyTorch en este ejemplo para mostrar también la flexibilidad y facilidad de integrar este framework con Amazon SageMaker.

 

Resumen de la solución

Inicialmente, para esta demostración, entrenaremos un modelo para clasificar imágenes de objetos. Estas imágenes fueron seleccionadas del conjunto de datos público de Caltech 256, que consta de más de 30.000 imágenes etiquetadas en 256 categorías distintas (clases). A partir de este conjunto de datos vamos a trabajar con una muestra de las siguientes 4 clases:

  • Beer Mug (Taza de cerveza);
  • Coffee Mug (taza de café);
  • Teapot (tetera);
  • Wine Bottle (botella de vino);

Nota : Para demostrar el flujo del desarrollo de modelos deaprendizaje automático, desde el etiquetado en SageMaker Ground Truth, la preparación de datos hasta la implementación del modelo, puede seleccionar 20 imágenes de cada categoría, con un total de 80 imágenes.  En un escenario real, probablemente se necesitaría un conjunto de datos más grande para mejorar el rendimiento del modelo.

 

 

Creación de los recursos necesarios (requisitos previos)

Para empezar, deberá crear un bucket en Amazon S3 (o utilizar uno existente). El tutorial «Crear un bucket» presenta el paso a paso cómo crear un bucket.

A continuación, seleccione veinte imágenes aleatorias del conjunto de datos citado en el paso anterior, teniendo en cuenta solo las cuatro categorías mencionadas (010.beer-mug, 041.coffee-mug, 212.teapot e 246.wine-bottle) y cargue estas imágenes a S3, utilizando un solo directorio, ya que utilizaremos Amazon SageMaker Ground Truth para etiquetar.

 

Creación del trabajo de etiquetado en SageMaker Ground Truth

Ahora que hemos creado un bucket en S3 y hemos cargado las imágenes, utilizaremos Amazon SageMaker Ground Truth para que las imágenes sin etiquetar puedan clasificarse antes del proceso de entrenamiento.

Para empezar, vaya a SageMaker desde la consola de AWS y, en el menú izquierdo, haga clic en Etiquetado de trabajos en la sección Ground Truth.

Establezca los siguientes atributos en el fragmento Visión general del trabajo:

  • Nombre del trabajo: labeling-demo
  • Configuración de datos de entrada: seleccione la opción «Configuración automatizada de datos»
  • Configuración de datos:
    • Ubicación de S3 para datasets de entrada: Navegue a través de S3 hasta el directorio con las imágenes.
    • Ubicación de S3 para datasets de salida: Navegue a través de S3 hasta el directorio vacío o mantenga la misma configuración regional.
  • Tipo de datos: Imagen
  • Rol de IAM: Especifique un rol con permiso de AmazonsageMakerFullAccess.
  • Haga clic en el botón «Completar configuración de datos».  Este paso creará un archivo de manifiesto para las imágenes, que será utilizado por Ground Truth.

 

 

En el fragmento Tipo de tarea, seleccione:

  • Imagen: Porque nuestro conjunto de datos se compone de imágenes.
  • Clasificación de imagen (etiqueta única): porque cada imagen tiene un solo objeto o etiqueta.

 

 

Haga clic en Siguiente para el siguiente paso.

Nota: En este ejemplo será considerado el primer acceso a Ground Truth. Si este acceso ya se ha realizado, podrá ver los equipos que se crearon previamente.

En el fragmento de los trabajadores:

  • Tipos de trabajador: seleccione la opción Privado
  • Nombre del equipo: my-private-team
  • Invitar a comentadores privados: lista de correo, separada por comas
  • Organización: demo-org
  • Contacto: Correo electrónico de contacto para los trabajadores
  • Tiempo de espera de tarea: 5 minutos (tiempo máximo para una sola tarea. Una sola tarea corresponde a una sola imagen).
  • Tiempo de vencimiento de la tarea: 10 días (este tiempo de espera considera el trabajo como un todo, es decir, el conjunto de imágenes).

 

 

En el fragmento de la Clasificación de imágenes (etiqueta única) herramienta de etiquetado, puede definir la plantilla que parecerá etiquetar las imágenes:

  • Introduzca una descripción para las imágenes para que el anotador sepa como clasificar, por ejemplo:
    • Por favor clasifique las imágenes en una de las cuatro clases:
  • Rellene las opciones de etiquetado:
    • 01.beer-mug
    • 02.coffee-mug
    • 03.teapot
    • 04.wine-bottle

 

 

Puede hacer clic en el botón de vista previa para abrir una página de muestra de cómo se mostrará a los trabajadores.

Haga clic en Crear para completar y crear el trabajo.

Después de unos minutos, los usuarios registrados recibirán un correo electrónico con acceso al portal donde podrán empezar a etiquetar.

 

 

Una vez finalizado el etiquetado, algunos metadatos estarán disponibles en la carpeta de salida de S3. Dentro de esta estructura, dos son importantes:

  • manifiestos: contiene los archivos de manifiesto de salida del trabajo.
  • anotaciones:
    • worker-response: Contiene la respuesta individual de cada trabajador.
    • consolidated-annotation: contiene las anotaciones deseadas.

Para obtener más información, consulte esta documentación sobre los resultados de Ground Truth

 

Creación de un notebook:

También utilizaremos una instancia de notebook de SageMaker. El tutorial «Crear una instancia de Notebooks» muestra cómo crear un notebook de Jupyter en SageMaker Jupyter. Como el propósito de este blog es para probar, se puede usar una instancia de tipo ml.t3.medium.

Con la instancia de notebook creada, cree un notebook utilizando el kernel conda_pytorch_p36:

 

 

A continuación, ejecute el siguiente fragmento de código con las bibliotecas, s3 Bucket y otras referencias que SageMaker utilizará:

 

import sagemaker

from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

sagemaker_session = sagemaker.Session()

# Destination Bucket

bucket = '<your-bucket-name>

prefix = '<your-bucket-prefix/subfolders>'

role = sagemaker.get_execution_role()

 

Exploración de los datos de Amazon SageMaker Ground Truth

Para explorar los datos generados por Ground Truth, crearemos un nuevo Notebook. En este notebook, importe las dependencias:

 

import os, json

import pandas as pd

import fnmatch

import boto3

from botocore.config import Config

import sagemaker


s3 = boto3.resource('s3')

sagemaker_session = sagemaker.Session()


# Destination Bucket

bucket = '<your-bucket-name>'

prefix = '<prefix-with-ground-truth-output>'

new_prefix= '<new-prefix-to-organize-data>'

 

En la variable de prefijo, utilice la ruta utilizada para la salida del trabajo Ground Truth.

Utilice el siguiente ejemplo de código para descargar la salida a su notebook y evaluar lo que se generó:

sagemaker_session.download_data('<local_path>', bucket , prefix)

El siguiente extracto leerá el manifiesto generado con los datos de los trabajadores que etiquetaron las imágenes:

 

# Reading Manifest Files (Output)

path_to_json = prefix + '<path to output on manifest folder>'

json_files = [pos_json for pos_json in os.listdir(path_to_json) if pos_json.endswith('.manifest')]

files = []

for index, js in enumerate(json_files):

with open(os.path.join(path_to_json, js)) as json_file:

files += list(json_file)

manifest_data=[]

for i in files:

temp = json.loads(i)

manifest_data.append(temp)

manifest_data

 

Finalmente, el siguiente fragmento reorganizará las imágenes etiquetadas en S3, ya en una estructura de carpetas que utilizaremos en los siguientes pasos:

 

import numpy as np


for i in manifest_data:

src_ref = i["source-ref"]

filename = src_ref.split("/")[-1]

label = i["<JOB-NAME>-metadata"]["class-name"]

if np.random.rand(1) < 0.2:

new_path =  new_prefix + '/test/' + label + '/' + filename

print(new_path)

s3.Object(bucket, new_path).copy_from(CopySource=src_ref.replace("s3://",""))

else:

new_path =  new_prefix + '/train/' + label + '/' + filename

print(new_path)

s3.Object(bucket, new_path).copy_from(CopySource=src_ref.replace("s3://",""))

 

Nota: El valor <JOB-NAME> debe cambiarse por el nombre del trabajo utilizado en Ground Truth.  En este ejemplo hemos utilizado un factor aleatorio para separar el conjunto de datos en entrenamiento y prueba, solo para el propósito de demostración.

 

Entrnando un modelo en SageMaker

Utilizaremos una técnica de aprendizaje automático llamada Transfer Learning. En la práctica, no siempre es necesario entrenar una red neuronal convolucional desde cero, porque es difícil poseer un conjunto de datos de tamaño suficiente, además de demandar un tiempo de entrenamiento muy grande. En su lugar, es común usar un conjunto de datos grande, preentrenar una red con ese conjunto de datos y luego usar este modelo preentrenado como punto de partida para desarrollar el nuevo modelo con el nuevo conjunto de datos para otro dominio específico, en gran medida acelerando el tiempo de entrenamiento y sin necesidad de tantas imágenes. En este ejemplo, utilizamos la red ResNet18, preformada con el dataset ImageNet.

A continuación utilizaremos el script transfer_learning.py con el código completo utilizando la técnica de aprendizaje de transferencia y el framework PyTorch. Este script será el punto de entrada para el entrenamiento e implementación de nuestro modelo:

 

# transfer_learning.py

import argparse

import json

import logging

import os

import time

import sys

import torch

import torch.distributed as dist

import torch.nn as nn

import torch.nn.functional as F

import torch.optim as optim

from torch.optim import lr_scheduler

import torch.utils.data

import torch.utils.data.distributed

import torchvision

import numpy as np

from torchvision import datasets, transforms, models

import copy

from collections import OrderedDict


logger = logging.getLogger(__name__)

logger.setLevel(logging.DEBUG)

logger.addHandler(logging.StreamHandler(sys.stdout))


def _get_data_loader(batch_size, training_dir, is_distributed, **kwargs):

logger.info("Get dataset into data_loader")


data_transforms = {

'train': transforms.Compose([

transforms.RandomResizedCrop(size=256, scale=(0.8, 1.0)),

transforms.RandomRotation(degrees=15),

transforms.RandomHorizontalFlip(),

transforms.CenterCrop(size=224),

transforms.ToTensor(),

transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

]),

'test': transforms.Compose([

transforms.Resize(256),

transforms.CenterCrop(224),

transforms.ToTensor(),

transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

]),

}


data_dir = training_dir

image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),

data_transforms[x])

for x in ['train', 'test']}


train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None


dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size,

shuffle=train_sampler is None,

sampler=train_sampler, **kwargs)

for x in ['train', 'test']}


dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'test']}


return dataloaders, dataset_sizes


def model_fn(model_dir):

try:

logger.info('model_fn')

device = "cuda" if torch.cuda.is_available() else "cpu"

with open(os.path.join(model_dir, 'model.pth'), 'rb') as f:

ckpt = torch.load(f, map_location='cpu')

optimizer = ckpt['optimizer']

epoch = ckpt['epoch']

model = ckpt['model']

load_dict = OrderedDict()

for k, v in model.items():

if k.startswith('module.'):

k_ = k.replace('module.', '')

load_dict[k_] = v

else:

load_dict[k] = v


model = models.resnet18(pretrained=False)

num_ftrs = model.fc.in_features


model.fc = nn.Sequential(

nn.Linear(num_ftrs, 256),

nn.ReLU(),

nn.Dropout(0.4),

nn.Linear(256, 4),

nn.LogSoftmax(dim=1) # For using NLLLoss()

)


model.load_state_dict(load_dict)

return model.to(device)

except Exception as err:

print(err)

raise


def save_model(model, optimizer, epoch, model_dir):

logger.info("Saving the model.")

path = os.path.join(model_dir, 'model.pth')

# recommended way from http://pytorch.org/docs/master/notes/serialization.html

torch.save(

{

"model" : model.state_dict(),

"optimizer": optimizer.state_dict(),

"epoch": epoch

},

path)


def train_model(dataloaders, dataset_sizes, device, model, criterion, optimizer,

scheduler, num_epochs=10):

since = time.time()


best_model_wts = copy.deepcopy(model.state_dict())

best_acc = 0.0


for epoch in range(num_epochs):

print('Epoch {}/{}'.format(epoch, num_epochs - 1))

print('-' * 10)


# Each epoch has a training and validation phase

for phase in ['train', 'test']:

if phase == 'train':

model.train()  # Set model to training mode

else:

model.eval()   # Set model to evaluate mode


running_loss = 0.0

running_corrects = 0


# Iterate over data.

for inputs, labels in dataloaders[phase]:

inputs = inputs.to(device)

labels = labels.to(device)


# zero the parameter gradients

optimizer.zero_grad()


# forward

# track history if only in train

with torch.set_grad_enabled(phase == 'train'):

outputs = model(inputs)

_, preds = torch.max(outputs, 1)

loss = criterion(outputs, labels)


# backward + optimize only if in training phase

if phase == 'train':

loss.backward()

optimizer.step()


# statistics

running_loss += loss.item() * inputs.size(0)

running_corrects += torch.sum(preds == labels.data)

if phase == 'train':

scheduler.step()


epoch_loss = running_loss / dataset_sizes[phase]

epoch_acc = running_corrects.double() / dataset_sizes[phase]


print('{} Loss: {:.4f} Acc: {:.4f}'.format(

phase, epoch_loss, epoch_acc))


# deep copy the model

if phase == 'test' and epoch_acc > best_acc:

best_acc = epoch_acc

best_model_wts = copy.deepcopy(model.state_dict())


print()


time_elapsed = time.time() - since

print('Training complete in {:.0f}m {:.0f}s'.format(

time_elapsed // 60, time_elapsed % 60))

print('Best val Acc: {:4f}'.format(best_acc))


# load best model weights

model.load_state_dict(best_model_wts)

return model


def train(args):

is_distributed = len(args.hosts) > 1 and args.backend is not None

logger.debug("Distributed training - {}".format(is_distributed))

use_cuda = args.num_gpus > 0

logger.debug("Number of gpus available - {}".format(args.num_gpus))

kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}

device = torch.device("cuda" if use_cuda else "cpu")


if is_distributed:

# Initialize the distributed environment.

world_size = len(args.hosts)

os.environ['WORLD_SIZE'] = str(world_size)

host_rank = args.hosts.index(args.current_host)

dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size)

logger.info('Initialized the distributed environment: \'{}\' backend on {} nodes. '.format(

args.backend, dist.get_world_size()) + 'Current host rank is {}. Number of gpus: {}'.format(

dist.get_rank(), args.num_gpus))


# set the seed for generating random numbers

torch.manual_seed(args.seed)

if use_cuda:

torch.cuda.manual_seed(args.seed)


dataloaders, dataset_sizes = _get_data_loader(args.batch_size, args.data_dir, is_distributed, **kwargs)


model_ft = models.resnet18(pretrained=True)

num_ftrs = model_ft.fc.in_features


# Change the final layer of ResNet18 Model for Transfer Learning

#model_ft.fc = nn.Linear(num_ftrs, 4)

model_ft.fc = nn.Sequential(

nn.Linear(num_ftrs, 256),

nn.ReLU(),

nn.Dropout(0.4),

nn.Linear(256, 4),

nn.LogSoftmax(dim=1) # For using NLLLoss()

)


model_ft = model_ft.to(device)

if is_distributed and use_cuda:

# multi-machine multi-gpu case

model_ft = torch.nn.parallel.DistributedDataParallel(model_ft)

else:

# single-machine multi-gpu case or single-machine or multi-machine cpu case

model_ft = torch.nn.DataParallel(model_ft)


criterion = nn.NLLLoss()


# Observe that all parameters are being optimized

optimizer_ft = optim.SGD(model_ft.parameters(), lr=args.lr, momentum=args.momentum)


# Decay LR by a factor of 0.1 every 7 epochs

exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)


# Training

model_ft = train_model(dataloaders, dataset_sizes, device, model_ft, criterion,

optimizer_ft, exp_lr_scheduler, args.epochs)


# Save Model

save_model(model_ft, optimizer_ft, args.epochs, args.model_dir)


if __name__ == '__main__':

parser = argparse.ArgumentParser()


# Data and model checkpoints directories

parser.add_argument('--batch-size', type=int, default=4, metavar='N',

help='input batch size for training (default: 4)')

parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',

help='input batch size for testing (default: 1000)')

parser.add_argument('--epochs', type=int, default=10, metavar='N',

help='number of epochs to train (default: 10)')

parser.add_argument('--lr', type=float, default=0.001, metavar='LR',

help='learning rate (default: 0.001)')

parser.add_argument('--momentum', type=float, default=0.9, metavar='M',

help='SGD momentum (default: 0.9)')

parser.add_argument('--seed', type=int, default=1, metavar='S',

help='random seed (default: 1)')

parser.add_argument('--log-interval', type=int, default=100, metavar='N',

help='how many batches to wait before logging training status')

parser.add_argument('--backend', type=str, default=None,

help='backend for distributed training (tcp, gloo on cpu and gloo, nccl on gpu)')


# Container environment

parser.add_argument('--hosts', type=list, default=json.loads(os.environ['SM_HOSTS']))

parser.add_argument('--current-host', type=str, default=os.environ['SM_CURRENT_HOST'])

parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])

parser.add_argument('--data-dir', type=str, default=os.environ['SM_CHANNEL_TRAINING'])

parser.add_argument('--num-gpus', type=int, default=os.environ['SM_NUM_GPUS'])


train(parser.parse_args())

 

En este script, es importante tener en cuenta los siguientes fragmentos

    • if __name__ == ‘__main__‘: el punto de entrada del código, donde se manejan los argumentos y se llama al método de entrenamiento train () ;
    • def train (args): tiene toda la lógica de entrenamiento modelo;
    • def model_fn (model_dir): Ejecutado sólo para inferencia, es responsable de cargar la plantilla al inicio del contenedor de inferencia de SageMaker.

    Después de analizar este script, se puede crear en el mismo directorio que el notebook. Volviendo al notebook crearemos un estimador de SageMaker, que tendrá las instrucciones necesarias para el proceso de entrenamiento, indicando los principales parámetros:

    • El script py creado en el paso anterior como entrada;
    • Un rol (IAM) con los permisos necesarios;
    • El tipo de instancia de entrenamiento (en este caso, como es un ejemplo, se puede usar la instancia m5.large);
    • Número de instancias (en este caso, sólo una).
# Creando el Estimator
from sagemaker.pytorch import PyTorch

pytorch_estimator = PyTorch('transfer_learning.py',
                            role=role,
                            instance_type='ml.m5.large',
                            instance_count=1,
                            framework_version='1.5.0',
                            py_version='py3',
                           )

Después de esto, para iniciar el proceso de entrenamiento, simplemente ejecute el siguiente fragmento de código:

 

bucket_uri = 's3://' + bucket + '/' + prefix

pytorch_estimator.fit({'training': bucket_uri})

 

Este fragmento toma la ruta a las imágenes de S3 y pasa como parámetro al proceso de formación de SageMaker. Durante este entrenamiento, SageMaker lanzará las instancias requeridas (definidas en el paso anterior), ejecutará el entrenamiento, guardará la plantilla formada en S3 y finalizará las instancias.

 

Inferir el resultado en SageMaker

Una vez finalizado el proceso de entrenamiento, puede implementar el modelo en una instancia de inferencia de SageMaker. Esta instancia está optimizada y preparada con las dependencias necesarias para reducir la necesidad de administración de infraestructura. Esta instancia ya tiene un endpoint HTTPS para acceder a la plantilla y devuelve una predicción. Para obtener más información, visite la página Implementar una plantilla en los servicios de alojamiento de SageMaker.

A través del siguiente comando es posible crear una instancia de inferencia para probar el modelo:

predictor = pytorch_estimator.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge')

Como en el entrenamiento, aquí también definimos la cantidad y el tipo de instancia.

Para probar, utilice el siguiente script, que usará cualquier imagen (se puede usar cualquier imagen del conjunto de datos original, preferiblemente una que no se haya utilizado en el proceso de entrenamiento):

 

from PIL import Image

loader = transforms.Compose([

transforms.Resize(256),

transforms.CenterCrop(224),

transforms.ToTensor(),

transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

])

def image_loader(image_name):

"""load image, returns cuda tensor"""

image = Image.open(image_name)

image = loader(image).float()

image = image.unsqueeze(0)

return image

image = image_loader("02_coffee_mug.jpg")

import numpy as np

objects_category = ['01-beer-mug','02-coffee-mug','03-teapot','04-wine-bottle']

response = predictor.predict(image)

output = torch.exp(torch.tensor(response))

index = np.argmax(output)

print("Result --> label: " + objects_category[index] + " | probability: " + str(output[0][index]))

 

El resultado se verá algo así como el resultado a continuación:

Result --> label: 02-coffee-mug | probability: tensor(0.6101, dtype=torch.float64)

Como el propósito aquí es sólo demostrativo, el preprocesamiento para cambiar el tamaño de la imagen y normalizar los canales RGB de la imagen antes de enviarlo al modelo y también post-procesamiento para la conversión a escala entre 0 y 1 de la probabilidad se hizo directamente en el notebook. Sin embargo, esta lógica podría incluirse en las funciones input_fn, predict_fn y output_fn dentro del script python (transfer_learning.py) que se usaría tanto en entrenamiento como en inferencia. Más detalles sobre cómo personalizar la lógica de inferencia se pueden encontrar aquí.

 

Limpiar recursos

Para evitar costos no deseados, el endpoint creado para la inferencia de plantilla se puede eliminar al finalizar, utilizando el comando:

predictor.delete_endpoint()

 

Conclusión

En esta publicación, demostramos cómo crear un flujo de trabajo de Machine Learning, desde el etiquetado de imágenes hasta la formación, el uso de un guión PyTorch en SageMaker, y terminando con la creación de una instancia de inferencia para hacer inferencias en tiempo real.

 

 


Sobre el autor

Evandro Francoes arquitecto de soluciones para el equipo de Startups de Amazon Web Services. En su puesto, ayuda a las startups a superar los desafíos empresariales aprovechando la plataforma de AWS. Tiene más de 15 años de experiencia en el campo de la tecnología. En su tiempo libre, le gusta correr y pasar tiempo con su familia.

 

 

 

Revisor

Luisa Vesga es arquitecta de soluciones para el equipo de Startups de Amazon Web Services. En su puesto, ayuda a las startups a superar los desafíos empresariales aprovechando la plataforma de AWS. Tiene más de 10 años de experiencia en el campo de la tecnología.