Blog de Amazon Web Services (AWS)

Gobernanza de modelos de ML con Amazon SageMaker lineage tracking

Por Alejandro Martinez, Partner TAM en AWS

 

Introducción

La creación e implementación de modelos de machine learning (ML) involucra tareas repetitivas. En la mayoría de los escenarios, los científicos de datos y/o ingenieros de ML empiezan esos trabajos desde cero. Por otro lado, es necesario contar con la capacidad de rastrear los resultados obtenidos durante las fases de desarrollo de modelos de ML para poder identificar problemas o simplemente por requerimientos de cumplimiento que pueden involucrar procesos de auditoría que requieren que tengamos la capacidad de reproducir cada uno de los pasos que se implementaron para obtener un modelo de ML. Con estos datos, se puede establecer un esquema de gobernanza de modelos de ML para cumplir con los estándares requeridos por los auditores.

Amazon SageMaker ML Lineage Tracking crea y almacena información acerca de cada uno de los pasos y de los componentes que forman parte de un flujo de trabajo de ML desde la preparación de los datos hasta la implementación del modelo. Con la información que se está rastreando, se pueden reproducir cada uno de los pasos del flujo de trabajo, rastrear la procedencia de los datos, y establecer modelos de gobernanza para cargas de trabajo de ML.

En el diagrama que se muestra a continuación, se puede ver un ejemplo de una gráfica de linaje que Amazon SageMaker crea de forma automática cubriendo todo el flujo de trabajo de implementación de un modelo de ML incluyendo etapas como el entrenamiento y la implementación:

En Amazon SageMaker podemos rastrear los procesos de un modelo de ML mediante el linaje. Existen componentes y procesos que se deben implementar los cuales se describen a continuación:

Entidades de seguimiento de Linaje
Las entidades de seguimiento mantienen una representación de todos los elementos que componen un flujo de trabajo de ML. Se puede utilizar esta representación para establecer una gobernanza de los modelos, reproducir el flujo de trabajo y mantener un registro de todo el historial del trabajo.

Amazon SageMaker crea de forma automática entidades de rastreo para componentes relacionados con pruebas y las pruebas y experimentos cuando se crean trabajos en SageMaker que pueden ser de procesamiento, entrenamiento o de transformación en lotes, este proceso es conocido como rastreo automatizado (auto tracking). También podemos optar por crear de forma manual entidades de rastreo para procesos personalizados en el flujo de trabajo de ML.

Amazon SageMaker reutiliza cualquier entidad existente en lugar de crear nuevas. Por ejemplo, puede haber solo un artefacto con un SourceURI único.

Para poder entender los elementos que forman parte del linaje en un flujo de trabajo de ML, es necesario comprender los siguientes conceptos:

  • Lineage (Linaje) – Metadatos que rastrean las relaciones entre varias entidades en el flujo de trabajo de ML.
  • QueryLineage (Consulta de linaje)- La acción en la que se inspecciona el linaje y se descubren relaciones entre entidades.
  • Lineage Entities (Entidades de linaje) – Los elementos de metadatos mediante los cuales está compuesto un linaje.
  • Cross-account-lineage (linaje con componentes en multiples cuentas) – Un modelo de ML puede utilizar varias cuentas de AWS para ser implementado. Con la funcionalidad cross-account-lineage, se pueden configurar múltiples cuentas de AWS para crear asociaciones de linaje de forma automatizada entre recursos compartidos en entidades. Después, una consulta al linaje puede regresar entidades que se encuentren en otras cuentas que formen parte del flujo de trabajo de ML.
  • Artifacts (artefactos) – Representa un objeto o datos que pueden ser ubicados utilizando una dirección URI. Los artefactos son generalmente las entradas o salidas de datos requeridas/producidas para realizar/ejecutar una acción.
  • Actions (acciones) – Representa alguna acción ejecutada como un cálculo, una transformación o un job de entrenamiento.
  • Lineage Graph (gráfica de linaje) – Es una gráfica que traza un flujo de trabajo de ML de principio a fin utilizando los componentes de linaje y sus asociaciones para su formación.
  • Contexts (contextos) – Proveen un método para agrupar de forma lógica otras entidades. Podemos crear un objeto contexto y asociarlo a diferentes elementos como acciones y artefactos.
  • Asociaciones – Una relación entre dos componentes (acciones, contextos, artefactos).
  • Lineage Traversal (recorrido de linaje)- Recorrido del linaje desde un componente arbitrario de linaje para descubrir y analizar asociaciones entre los diferentes pasos en el flujo de trabajo de ML
  • Experiments (experimentos) – conjunto de pruebas (trial) que tienen como objetivo resolver un caso de uso específico
  • Trial component (componente de prueba) – Componentes que forman parte de una prueba (trial). Pueden ser acciones como jobs de procesamiento, jobs de entrenamiento y jobs de procesamiento tipo batch.
  • Trial (prueba) – Es un conjunto de componentes de prueba (trial components) que dan como resultado un modelo entrenado.
  • Experimentos – Cada prueba que se ejecuta con el objetivo de resolver un caso de uso se denomina experimento. Cada una de estas pruebas dan como resultado un modelo de ML. El conjunto de pruebas dan como resultado un conjunto de modelos de ML que se denominan experimentos. Utilizando SageMaker experiments podemos determinar cual modelo de ML, derivado de una prueba, es el que esta arrojando las predicciones con la mejor precisión de acuerdo con nuestro caso de uso.

Amazon SageMaker Experiments

Con los conceptos descritos anteriormente, se puede comenzar a plantear la forma en que se organizará el linaje de los modelos de ML:

Para comenzar, se puede crear un experimento al cual serán asociados los elementos del flujo de trabajo de ML:

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent

experiment_name = f"RentaBicis-{unique_id}"
exp = Experiment.create(experiment_name=experiment_name, sagemaker_boto_client=sm_client)

trial = Trial.create(
    experiment_name=exp.experiment_name,
    trial_name=f"RentaBicisExperimento-{unique_id}",
    sagemaker_boto_client=sm_client,
)

print(exp.experiment_name)
print(trial.trial_name)

 

Con este bloque de código, se crea un experimento que se alojará en la variable exp y se crea una prueba que se almacenará en la variable trial. Estos, se utilizarán para relacionarlos con los elementos de linaje como los contextos, acciones y artefactos.

Las consultas al linaje pueden ser ejecutadas utilizando como filtro un objeto del tipo trial (prueba). No existe aún la posibilidad de hacer una consulta de linaje utilizando como filtro un experimento que, por ejemplo, nos mostrara todas las asociaciones entre el experimento y todas las pruebas que se han hecho para resolver un caso de uso. La clase experiment cuenta con el metodo list_trials() que nos entrega como resultado una lista con todas las pruebas que estan asociadas a un experimento.

Para vincular las diferentes acciones al experimento, es necesario que cuando se creen los trabajos de procesamiento o entrenamiento, se agregue la información relacionada con el experimento/prueba:

 

#ejemplo de código que incluye información de experimento para un trabajo
#de procesamiento
data_prep_parameters = {
    'inputs':[ProcessingInput(input_name='input',
                    source=f's3://{bucket}/{datasets_prefix}',
                    destination='/opt/ml/processing/input'),
              ProcessingInput(input_name='code',
                    source=data_prep_script_path,
                    destination='/opt/ml/processing/input/code')],
    'outputs':[ProcessingOutput(output_name='training',
                    source=f'/opt/ml/processing/output/training',
                    destination=f's3://{bucket}/{processed_data_prefix}/training'),
               ProcessingOutput(output_name='validation',
                    source=f'/opt/ml/processing/output/validation',
                    destination=f's3://{bucket}/{processed_data_prefix}/validation'),
               ProcessingOutput(output_name='test',
                    source=f'/opt/ml/processing/output/test',
                    destination=f's3://{bucket}/{processed_data_prefix}/test')],
    'experiment_config':{
        'ExperimentName': experiment_name,
        'TrialName': trial.trial_name,
        'TrialComponentDisplayName': "trabajoProcesamientoDatasetRentaBicis"},
    'arguments':['--test-size', '0.3',
                 '--data-file', 'train.csv',
                 '--test-file','test.csv',
                 '--training-file', training_file,
                 '--validation-file', validation_file,
                 '--test-output', test_output_file]}
    
processor_arn = processor.run(**data_prep_parameters)

#ejemplo de código que incluye información de experimento para un trabajo
#de entrenamiento
xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=training_data,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=validation_data,
            content_type="text/csv",
        ),
    },
    experiment_config={
        "ExperimentName": experiment_name,
        "TrialName": trial.trial_name,
        "TrialComponentDisplayName": "trabajoEntrenamientoDatasetRentaBicis",
    },
)

 

Esta información forma parte de los parámetros con los cuales se alimentará el objeto processor en el caso de un job de entrenamiento o al objeto estimator en un job de entrenamiento de un modelo.

Una vez ejecutados los trabajos de procesamiento de datos, se obtendrán artefactos de los datos de entrenamiento, validación, prueba y validación de prueba.

También, en el momento en el que sea ejecutado el job de entrenamiento, se obtendrá el artefacto del modelo entrenado, para que posteriormente pueda ser desplegado en un endpoint.

Una vez desplegado el modelo en un endpoint, se puede crear un objeto del tipo contexto (context) que sirve como destino para las asociaciones. La clase context, sirve para definir un contexto al cual podemos ligar nuestro endpoint y que de esta manera los artefactos como el modelo entrenado se asocien como ascendientes. De esta forma podemos generar nuestro linaje llegando hasta el endpoint.

Para trabajar con la característica de lineage tracking de Amazon SageMaker, se requiere de algunas librerías las cuales se importan con las siguientes líneas de código:



from sagemaker.lineage.context import Context, EndpointContext
from sagemaker.lineage.action import Action
from sagemaker.lineage.association import Association
from sagemaker.lineage.artifact import Artifact, ModelArtifact, DatasetArtifact

from sagemaker.lineage.query import (
    LineageQuery,
    LineageFilter,
    LineageSourceEnum,
    LineageEntityEnum,
    LineageQueryDirectionEnum,
)

Búsquedas

Para buscar el endpoint que está asociado con el job de entrenamiento que ejecutó previamente y fué asociado al experimento, es necesario ejecutar el siguiente código:

#Traer el nombre del modelo desplegado al endpoint
endpoint_config = sm_client.describe_endpoint_config(EndpointConfigName=endpoint_name)
model_name = endpoint_config['ProductionVariants'][0]['ModelName']

#Obtener el nombre del modelo
model = sm_client.describe_model(ModelName=model_name)

#Buscar el job de entrenamiento que este asociado a los artefactos de modelos
#en Amazon S3
search_params={
   "MaxResults": 1,
   "Resource": "TrainingJob",
   "SearchExpression": { 
      "Filters": [ 
         { 
            "Name": "TrainingJobName",
            "Operator": "Equals",
            "Value": "trabajoEntrenamientoRentaBicis-2022-11-30-17-01-35-294"
         }]},
}
results = sm_client.search(**search_params)

Amazon SageMaker crea de forma automática un objeto de la clase context y lo asocia al endpoint. Para poder encontrar el objeto de la clase context asociado al endpoint se utilizan las siguientes líneas de código:

#buscar el objeto context asociado al endpoint mediante el arn del endpoint
contexts = Context.list(source_uri=endpoint_arn)
context_name = list(contexts)[0].context_name
endpoint_context = EndpointContext.load(context_name=context_name)

Con el objeto association, se crea una relación (asociación) entre dos objetos de linaje la cual sirve para almacenar los metadatos que describen el flujo de trabajo que existe entre dos objetos. La clase association tiene dos atributos llamados destination_arn y source_arn mediante los cuales se establece la relación entre los objetos. Para consultar el linaje, se puede hacer hacia los ascendientes (ascendants) o descendientes (descendants) lo que significa que los objetos se listarán de source_arn a destination_arn o de destination_arn a source_arn.

SageMaker crea asociaciones de forma automática cuando se ejecuta un proceso de entrenamiento. Las asociaciones entre el endpoint, el artefacto del modelo entrenado y los datos de entrenamiento, validación, pruebas y validación de pruebas se crean de forma automática.

Se puede consultar ahora el linaje asociado al endpoint haciendo una consulta que inicia desde el objeto endpoint_context. A continuación, seguirán algunos ejemplos de consultas que se pueden realizar:

Para encontrar todos los sets de datos asociados con el Endpoint:

# Se definen los filtros para buscar por 'ARTIFACT' y que el tipo de fuente sea 'DATASET'

query_filter = LineageFilter(
    entities=[LineageEntityEnum.ARTIFACT], sources=[LineageSourceEnum.DATASET]
)

# Suministrar los filtros de linaje a la consulta y de esta forma buscar´ 
# todos los datasets asociados al 'endpoint_context'
query_result = LineageQuery(sagemaker_session).query(
    start_arns=[endpoint_context.context_arn],
    query_filter=query_filter,
    direction=LineageQueryDirectionEnum.ASCENDANTS,
    include_edges=False,
)

# Filtrar los resultados de la consulta para visualizar aquellos
# que correspondan a un dataset
dataset_artifacts = []
for vertex in query_result.vertices:
    dataset_artifacts.append(vertex.to_lineage_object().source.source_uri)
    
pp.pprint(dataset_artifacts)

Encontrar los modelos asociados con un Endpoint:

# Definir los filtros para buscar por 'ARTIFACT' y que el tipo de fuente sea 'MODEL'
query_filter = LineageFilter(
    entities=[LineageEntityEnum.ARTIFACT], sources=[LineageSourceEnum.MODEL]
)
# Suministramos los filtros de linaje a nuestra consulta y de esta forma buscara 
# todos los modelos asociados a nuestro 'endpoint_context'
query_result = LineageQuery(sagemaker_session).query(
    start_arns=[endpoint_context.context_arn],
    query_filter=query_filter,
    direction=LineageQueryDirectionEnum.ASCENDANTS,
    include_edges=False,
)
# Filtrar los resultados de la consulta para visualizar aquellos
# que correspondan a un modelo
model_artifacts = []
for vertex in query_result.vertices:
    model_artifacts.append(vertex.to_lineage_object().source.source_uri)

# Los resultados de la llamada a la API 'lineageQuery' que se entregan como resultado
# el ARN del modelo desplegado en el endpoint asi como el URI de S3 del archivo
# model.tar.gz asociado al modelo
pp.pprint(model_artifacts)

Encontrar los componentes de prueba asociados con un Endpoint:

# Definir los filtros para buscar por 'TRIAL_COMPONENT' y que el tipo de fuente sea
# 'TRAINING_JOB'.

query_filter = LineageFilter(
    entities=[LineageEntityEnum.TRIAL_COMPONENT],
    sources=[LineageSourceEnum.TRAINING_JOB],
)
# Suministrar los filtros de linaje a la consulta y de esta forma buscará 
# todos los modelos asociados al nuestro 'endpoint_context'
query_result = LineageQuery(sagemaker_session).query(
    start_arns=[endpoint_context.context_arn],
    query_filter=query_filter,
    direction=LineageQueryDirectionEnum.ASCENDANTS,
    include_edges=False,
)
# Recorrer los resultados de la consuta para obtener los ARNs de los trabajos de entrenamiento
# asociados con este Endpoint

trial_components = []
for vertex in query_result.vertices:
    trial_components.append(vertex.arn)
pp.pprint(trial_components)

Dirección en las consultas

Observando el código en los bloques anteriores, se puede identificar un punto importante:

direction=LineageQueryDirectionEnum.ASCENDANTS,

Este atributo define que la consulta se hará en hacia los ascendientes. Generalmente un flujo para despliegue de un modelo de ML consta de los siguientes elementos: Datos → job de entrenamiento → Modelo → Endpoint. Tomando como base este ejemplo, el Endpoint es un Descendiente del modelo, el modelo un descendiente del job de entrenamiento y así sucesivamente. De igual forma el modelo es un ascendiente del Endpoint. Esto puede causar un poco de confusión por el sentido de las flechas, el cual, de forma intuitiva nos haría pensar que los elementos que están en la punta de la flecha son los ascendientes de los elementos que están en el inicio de la flecha. Es importante recordar que los elementos en la punta de la flecha son los descendientes de los elementos que están en el inicio de la flecha. De esta forma, para este ejemplo, si se buscan todos los elementos ascendientes del artefacto dataset no se obtendría ningún resultado debido a que es el primer elemento en el linaje.

Se puede cambiar el sentido de la búsqueda para que sea hacia los descendientes (DESCENDANTS) y de esta forma la búsqueda recorra de forma ascendiente el linaje del flujo de trabajo de ML a partir del componente que se especifique.

En este bloque de código se muestra cómo utilizar la dirección en las búsquedas de linaje:

# En este ejemplo, verá el resultado de utilizar el atributo 
# LineageQueryDirectionEnum.ASCENDANTS o LineageQueryDirectionEnum.DESCENDANTS

query_filter = LineageFilter(
    entities=[LineageEntityEnum.ARTIFACT, LineageEntityEnum.CONTEXT],
    sources=[
        LineageSourceEnum.ENDPOINT,
        LineageSourceEnum.MODEL,
        LineageSourceEnum.DATASET,
        LineageSourceEnum.TRAINING_JOB,
        LineageSourceEnum.PROCESSING_JOB,
    ],
)

query_result = LineageQuery(sagemaker_session).query(
    start_arns=[model_artifact.artifact_arn],
    query_filter=query_filter,
    # Orden ascendente
    direction=LineageQueryDirectionEnum.ASCENDANTS,
    include_edges=False,
)

ascendant_artifacts = []

# Cuando se tiene como resultado un objeto tipo TrialComponent, no se puede convertir
# en objeto de linaje por lo que se tiene que utilizar el metodo 'to_lineage_object()'
# para poder extraer el atributo TrialComponent ARN. Este es el caso del 
# Training Job
for vertex in query_result.vertices:
    try:
        ascendant_artifacts.append(vertex.to_lineage_object().source.source_uri)
    except:
        ascendant_artifacts.append(vertex.arn)

print("Ascendant artifacts:")
pp.pprint(ascendant_artifacts)

query_result = LineageQuery(sagemaker_session).query(
    start_arns=[model_artifact.artifact_arn],
    query_filter=query_filter,
    # descendente
    direction=LineageQueryDirectionEnum.DESCENDANTS,
    include_edges=False,
)

descendant_artifacts = []
for vertex in query_result.vertices:
    try:
        descendant_artifacts.append(vertex.to_lineage_object().source.source_uri)
    except:
        # Handling TrialComponents.
        descendant_artifacts.append(vertex.arn)

print("Descendant artifacts:")
pp.pprint(descendant_artifacts)

Inicio en las consultas

También es posible cambiar el objeto de inicio de la búsqueda. No necesariamente tiene que ser desde los extremos del linaje. Para cambiar el inicio de la consulta basta con elegir uno de los elementos del linaje y utilizar el ARN como parámetro para nuestra consulta:

Se obtienen los datos del artefacto que representa el modelo entrenado:

# Obtener el artefacto que representa el modelo entrenado

model_artifact_summary = list(Artifact.list(source_uri=model_package_arn))[0]
model_artifact = ModelArtifact.load(artifact_arn=model_artifact_summary.artifact_arn)

Se ejecuta la consulta al linaje utilizando como punto de partida el artefacto que representa nuestro modelo entrenado:

query_filter = LineageFilter(
    entities=[LineageEntityEnum.ARTIFACT, LineageEntityEnum.CONTEXT],
    sources=[LineageSourceEnum.ENDPOINT, LineageSourceEnum.DATASET],
)

query_result = LineageQuery(sagemaker_session).query(
    start_arns=[model_artifact.artifact_arn],  # Nuestro modelo marca el inicio de la
    # consulta
    query_filter=query_filter,
    # Encuentra todas las entidades en orden descendiente a partir del modelo.
    direction=LineageQueryDirectionEnum.DESCENDANTS,
    include_edges=False,
)

associations = []
for vertex in query_result.vertices:
    associations.append(vertex.to_lineage_object().source.source_uri)

query_result = LineageQuery(sagemaker_session).query(
    start_arns=[model_artifact.artifact_arn],  # Nuestro modelo marca el inicio de la
    # consulta
    query_filter=query_filter,
    # Encuentra todas las entidades en orden ascendente a partir del modelo.
    direction=LineageQueryDirectionEnum.ASCENDANTS,
    include_edges=False,
)

for vertex in query_result.vertices:
    associations.append(vertex.to_lineage_object().source.source_uri)

pp.pprint(associations)

Dado que existen una variedad de objetos en el linaje, se especifica en el filtro los diferentes elementos que se quieren encontrar. En este caso se buscan objetos tipo CONTEXT y ARTIFACT con fuentes tipo ENDPOINT y DATASET.

También se puede hacer una consulta que dé como resultado los objetos ascendientes y descendientes a partir de un elemento del linaje. Mediante el atributo LineageQueryDirectionEnum.BOTH podemos hacer este tipo de consulta:

query_filter = LineageFilter(
    entities=[LineageEntityEnum.ARTIFACT, LineageEntityEnum.CONTEXT],
    sources=[LineageSourceEnum.ENDPOINT, LineageSourceEnum.DATASET],
)

query_result = LineageQuery(sagemaker_session).query(
    start_arns=[model_artifact.artifact_arn],  # Nuestro modelo marca el inicio de la
    # consulta
    query_filter=query_filter,
    # Encuentra todas las entidades en orden ascendente y descendente
    # a partir del modelo.
    direction=LineageQueryDirectionEnum.BOTH,
    include_edges=False,
)

associations = []
for vertex in query_result.vertices:
    associations.append(vertex.to_lineage_object().source.source_uri)

pp.pprint(associations)

Podemos construir una gráfica del linaje en python utilizando la libreria visualizer.py que se encuentra en el proyecto sigiuente: https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-lineage/sagemaker-lineage-multihop-queries.ipynb

Con el siguiente codigo construimos la gráfica de linaje en nuestro notebook de jupyter:

# Graph APIs
# Utilizamos la API `query_lineage` de boto3 para obtener la informacion que queremos graficar

from visualizer import Visualizer

query_response = sm_client.query_lineage(
    StartArns=[endpoint_context.context_arn], Direction="Ascendants", IncludeEdges=True
)

viz = Visualizer()
viz.render(query_response, "Endpoint")

Obteniendo el siguiente resultado:

En el SDK de Sagemaker existen funciones para ayudar a simplificar el código, implementando búsquedas comunes como podrían ser búsquedas de datasets, training jobs, ejecuciones de pipelines y endpoints. Las clases EndpointContext, ModelArtifact y DatasetArtifact cuentan con estas funciones de ayuda:

Ejemplos con la clase EndpointContext:

# Encontrar todos los datasets asociados con un endpoint

datasets = []
dataset_artifacts = endpoint_context.dataset_artifacts()
for dataset in dataset_artifacts:
    datasets.append(dataset.source.source_uri)
print("Datasets : ", datasets)

# Encontrar los trabajos de entrenamiento asociados con un endpoint
training_job_artifacts = endpoint_context.training_job_arns()
training_jobs = []
for training_job in training_job_artifacts:
    training_jobs.append(training_job)
print("Training Jobs : ", training_jobs)

# Obtenerel ARN del pipeline de ejecución asociado con un endpoint
pipeline_executions = endpoint_context.pipeline_execution_arn()
if pipeline_executions:
    for pipeline in pipelines_executions:
        print(pipeline)

Ejemplos con la clase ModelArtifact:

# Utilizamos la clase ModelArtifact para encontrar todos los datasets y endpoints
# asociados con un modelo en particular

dataset_artifacts = model_artifact.dataset_artifacts()
endpoint_contexts = model_artifact.endpoint_contexts()

datasets = [dataset.source.source_uri for dataset in dataset_artifacts]
endpoints = [endpoint.source.source_uri for endpoint in endpoint_contexts]

print("Datasets associated with this model : ")
pp.pprint(datasets)

print("Endpoints associated with this model : ")
pp.pprint(endpoints)

Ejemplos con la clase DatasetArtifact:

# La clase DatasetArtifact nos ayuda a encontrar todos los endpoints que alojan
# modelos que fueron entrenados con un dataset en particular
# Encontramos el artefacto que fue asociado con el dataset

dataset_artifact_arn = list(Artifact.list(source_uri=training_data))[0].artifact_arn
dataset_artifact = DatasetArtifact.load(artifact_arn=dataset_artifact_arn)

# Encontramos los endpoints que utilizaron los datos de entrenamiento asociados al
# artefacto
endpoint_contexts = dataset_artifact.endpoint_contexts()
endpoints = [endpoint.source.source_uri for endpoint in endpoint_contexts]

print("Endpoints associated with the training dataset {}".format(training_data))
pp.pprint(endpoints)

Pruebas con artefactos únicos

Al ejecutar las diferentes pruebas para generar nuestro modelo de ML que nos ayude a resolver un caso de uso, podemos observar que SageMaker genera el linaje de forma automatica y esto nos puede llevar a tener asociaciones de un modelo con multiples artefactos como datasets aun cuando estos no hayan sido utilizados para entrenar nuestro modelo. SageMaker considera a todos los datasets que se encuentre en el directorio establecido en los atributos inputs de los objetos processor y estimator como una sola versión. En caso de requerir diferenciar un modelo de otro por que se tiene una nueva versión de los datos podemos diferenciarla guardando este archivo en un directorio distinto que tenga un nombre que lo diferencie de los datos utilizados como version anterior. En el siguiente bloque de código buscamos que todos los datos que subimos sean únicos guardándolos en un directorio que siempre tendrá un nombre distinto mediante el uso de la variable id_unico:

directorio_datos = f'{directorio_proyecto}/datasets-{id_unico}'
directorio_codigo = f'{directorio_proyecto}/code-{id_unico}'
directorio_datos_procesados = f'{directorio_proyecto}/processed-{id_unico}'
directorio_contenedores = f'{directorio_proyecto}/images-{id_unico}'

data_prep_parameters = {
    'inputs':[ProcessingInput(input_name='input',
                    source=f's3://{bucket}/{directorio_datos}',
                    destination='/opt/ml/processing/input'),
              ProcessingInput(input_name='code',
                    source=ruta_codigo_preparacion_datos,
                    destination='/opt/ml/processing/input/code')],
                    ...
                   
configuracion_datos_entrenamiento = sagemaker.session.TrainingInput(
    s3_data=ruta_datos_entrenamiento_procesados,
    content_type='csv',
    s3_data_type='S3Prefix')

configuracion_datos_validacion = sagemaker.session.TrainingInput(
    s3_data=ruta_datos_validacion_procesados,
    content_type='csv',
    s3_data_type='S3Prefix'
)

Construcción de linaje de forma manual

Amazon SageMaker puede automatizar la creación de los objetos de linaje y sus asociaciones partiendo de un endpoint y llegando hasta los datos utilizados para entrenar el modelo. Sin embargo, para aquellos casos en que están involucrados pasos adicionales en el flujo de trabajo de ML, como por ejemplo un job de entrenamiento, Sagemaker no crearía los objetos de linaje asociados a estas partes del flujo de trabajo.

Para complementar el linaje se pueden crear componentes de forma manual y asociarlos a los elementos existentes o incluso construir de inicio todo el linaje.

A continuación se construirá el linaje de un flujo de trabajo de ML desde cero

Inicialmente, se creará un contexto que representará el flujo de trabajo:

nombre_contexto = f'flujo-de-trabajo-ML-{unique_id}'

contexto_flujo_ml = Context.create(
    context_name=nombre_contexto,
    context_type="Flujo de trabajo ML",
    source_uri=unique_id,
    # el atributo properties sirve para almacenar metadatos de entidades de 
    # linaje adicionalmente a las etiquetas (tags)
    properties={"example": "true"},
)

Validar que se haya creado el contexto con el siguiente código:

#listar los contextos para validar que se creo el solicitado
contextos = Context.list(sort_by="CreationTime", sort_order="Descending")

for ctx in contextos:
    if nombre_contexto in ctx.context_name :
        print(f'FOUND: {ctx.context_name}')

Crear acciones que serán parte del flujo de trabajo de ML:

accion_preparacion_datos = Action.create(
    action_name=f"preparacion-datos-{unique_id}",
    action_type="trabajo-procesamiento-datos",
    source_uri=unique_id,
    properties={"Example": "Metadata"},
)
accion_entrenamiento_modelo = Action.create(
    action_name=f"entrenamiento-modelo-{unique_id}",
    action_type="EntrenarModelo",
    source_uri=unique_id,
    properties={"Example": "Metadata"},
)

Crear los artefactos que representan los datos crudos y los datos procesados:

artefacto_dataset_entrada = Artifact.create(
    artifact_name=f"dataset-renta-bicis",
    artifact_type="dataset",
    source_types=[{"SourceIdType": "Custom", "Value": unique_id}],
    source_uri=dataset_s3_path,
)

artefacto_set_entrenamiento = Artifact.create(
    artifact_name="set-entrenamiento",
    artifact_type="dataset-procesado",
    source_types=[{"SourceIdType": "Custom", "Value": unique_id}],
    source_uri=f's3://{bucket}/{processed_data_prefix}/training/{training_file}',
)

artefacto_set_validacion = Artifact.create(
    artifact_name="set-validacion",
    artifact_type="dataset-procesado",
    source_types=[{"SourceIdType": "Custom", "Value": unique_id}],
    source_uri=f's3://{bucket}/{processed_data_prefix}/validation/{validation_file}',
)

artefacto_set_pruebas = Artifact.create(
    artifact_name="set-pruebas",
    artifact_type="dataset-procesado",
    source_types=[{"SourceIdType": "Custom", "Value": unique_id}],
    source_uri=f's3://{bucket}/{processed_data_prefix}/test/{test_output_file}',
)

Crear el artefacto que representa el modelo entrenado:

artefacto_modelo_entrenado = Artifact.create(
    artifact_name="modelo",
    artifact_type="model",
    source_types=[{"SourceIdType": "Custom", "Value": unique_id}],
    source_uri='s3://sagemaker-us-east-1-318769515028/multihop-example/xgb_model',
)

Hasta este punto se han creado los elementos que forman parte del flujo de trabajo de ML. El paso siguiente es crear las asociaciones entre los elementos de acuerdo al flujo que se tendrá entre los distintos componentes. Con estos elementos la idea sería construir el linaje representado gráficamente de la siguiente forma:

→ Datos entrenamiento ¬
Dataset → job de entrenamiento → Datos validación → Trabajo → Modelo → Endpoint → Flujo trabajo ML
→ Datos pruebas _I Entrenamiento

La primera asociación, será entre el dataset y el job de entrenamiento. Esta asociación será descendiente del dataset al job de entrenamiento:

# asociacion entre dataset y job de entrenamiento
asociacion_dataset_procesamiento = Association.create(
    source_arn=artefacto_dataset_entrada.artifact_arn,
    destination_arn=accion_preparacion_datos.action_arn,
    association_type="ContributedTo",
)

Aquí se puede notar que el tipo de asociación es “ContributedTo” lo que significa que el dataset está contribuyendo con datos al job de entrenamiento.

La siguiente asociación será entre el job de entrenamiento y los datos de entrenamiento, validación y pruebas. En este caso el tipo de asociación será “Produced” dado que el job de entrenamiento esta “produciendo” los datasets con los datos de entrenamiento, validación y pruebas:

asociacion_procesamiento_datos_entrenamiento = Association.create(
    source_arn=accion_preparacion_datos.action_arn,
    destination_arn=artefacto_set_entrenamiento.artifact_arn,
    association_type="Produced",
)

asociacion_procesamiento_datos_validacion = Association.create(
    source_arn=accion_preparacion_datos.action_arn,
    destination_arn=artefacto_set_validacion.artifact_arn,
    association_type="Produced",
)

asociacion_procesamiento_datos_pruebas = Association.create(
    source_arn=accion_preparacion_datos.action_arn,
    destination_arn=artefacto_set_pruebas.artifact_arn,
    association_type="Produced",
)

Las siguientes asociaciones serán entre los datos de entrenamiento, validación y pruebas y el job de entrenamiento. Similar a la asociación inicial, el tipo de asociación en este caso será “ContributedTo”:

asociacion_entrenamiento_datos_entrenamiento = Association.create(
    source_arn=artefacto_set_entrenamiento.artifact_arn,
    destination_arn=accion_entrenamiento_modelo.action_arn,
    association_type="ContributedTo",
)

asociacion_entrenamiento_datos_validacion = Association.create(
    source_arn=artefacto_set_validacion.artifact_arn,
    destination_arn=accion_entrenamiento_modelo.action_arn,
    association_type="ContributedTo",
)

asociacion_entrenamiento_datos_pruebas = Association.create(
    source_arn=artefacto_set_pruebas.artifact_arn,
    destination_arn=accion_entrenamiento_modelo.action_arn,
    association_type="ContributedTo",
)

Una vez ejecutado el job de entrenamiento se producirá un modelo de ML. Este modelo lse vincular a un artefacto y posteriormente se creará s una asociación del tipo “Produced” entre el job de entrenamiento y el modelo creado:

asociacion_entrenamiento_modelo = Association.create(
    source_arn=accion_entrenamiento_modelo.action_arn,
    destination_arn=artefacto_modelo_entrenado.artifact_arn,
    association_type="Produced",
)

Posteriormente se desplegará el modelo en un Endpoint. Para reflejar esta relación se creará una asociación del tipo “ContributedTo” entre el modelo y el endpoint:

asociacion_modelo_endpoint = Association.create(
    source_arn=artefacto_modelo_entrenado.artifact_arn,
    destination_arn=endpoint_context.context_arn,
    association_type="ContributedTo",
)

Finalmente se creará una asociación entre el contexto del Endpoint con el contexto que se creó en un inicio llamado Flujo de trabajo ML:

asociacion_endpoint_flujo = Association.create(
    source_arn=endpoint_context.context_arn,
    destination_arn=contexto_flujo_ml.context_arn,
    association_type="ContributedTo",
)

Con esto, se finaliza la creación del linaje para el flujo de trabajo de ML completo y obtenemos la siguiente gráfica de linaje:

Adicionalmente, cada una de las clases de los elementos de linaje contiene el método list() que muestra todos los objetos de la clase creados en la sesión de SageMaker actual:

# mostrar lista de artefactos. puede arrojar muchos resultados
for artfct in Artifact.list():
    print(artfct)

# mostrar lista de contextos:
# definir la forma del orden de los resultados
contexts = Context.list(sort_by="CreationTime", sort_order="Descending")

for ctx in contexts:
    print(f'name: {ctx.context_name}, arn: {ctx.context_arn}')

# muestra la lista de asociaciones ligadas a un destination_arn:
asociaciones = Association.list(destination_arn=endpoint_context.context_arn)

for asc in asociaciones:
    print(f'FOUND: {asc.source_name} to {asc.destination_name}')
    
# muestra la lista de asociaciones ligadas a un source_arn:
asociaciones = Association.list(source_arn=endpoint_context.context_arn)

for asc in asociaciones:
    print(f'FOUND: {asc.source_name} to {asc.destination_name}')

Otras funciones que resultan útiles son las necesarias para eliminar elementos que se hayan creado con errores y que después se quieran eliminar. Los elementos que más pueden causar problemas son las asociaciones ya que no puede haber dos asociaciones con los mismos valores para los atributos source_arn y destination_arn. Otra situación que se puede dar es que se haya creado algún objeto como un artefacto o asociación y no se haya almacenado el objeto en una variable. Para estas situaciones, se pueden utilizar las siguientes funciones:

# borra una asociacion a partir del sourcearn y source_name
def delete_association_by_src(sourcearn, source_name, sm_session):
    asociaciones = Association.list(source_arn=sourcearn)

    for asc in asociaciones:
        if source_name in asc.source_name:
            print(f'FOUND: {asc.source_name} to {asc.destination_name}')
            assct = Association(
                source_arn=asc.source_arn,
                destination_arn=asc.destination_arn,
                sagemaker_session=sm_session,
            )
            assct.delete()
# borra un artefacto a partir del source_URI
def delete_artifact(source_URI):
    for artfct in Artifact.list():
        if source_URI in artfct.source.source_uri:
            print(f'FOUND: {artfct.source.source_uri}')
            to_delete = artfct
            to_delete.delete()

Conclusión

En esta publicación mostramos los elementos que componen el linaje en un flujo de trabajo de ML en SageMaker y la forma en la que podemos establecer relaciones entre estos elementos para poder rastrear todos los elementos que llevaron al despliegue de un modelo de ML. Esto nos facilita tener una gobernanza de nuestros modelos de ML al permitirnos rastrear desde los datos asociados al modelo como los trabajos de procesamiento y entrenamiento que dieron como resultado el modelo.

 


Acerca del autor

Alejandro Martinez es Partner TAM en AWS