Blog de Amazon Web Services (AWS)

Implementando el algoritmo LightGBM en Amazon SageMaker

Por María Gaska, AI/ML Specialist SA AWS Argentina

 

En Amazon SageMaker existen tres modalidades de entrenamiento: algoritmos totalmente manejados, frameworks o algoritmos que soportan script mode y la posibilidad de traer un container propio.

Este último será el caso cuando queramos implementar un modelo de LightGBM utilizando el servicio de entrenamiento de SageMaker.

En este artículo vamos a trabajar con el conocido conjunto de datos de Titanic, donde el objetivo es predecir las probabilidades de un pasajero de sobrevivir o no dadas sus características. Para resolver este problema, crearemos un pipeline de inferencia que permita completar datos faltantes, hacer el one hot encoding de las variables categóricas y por último entrenar un modelo predictivo utilizando LightGBM. El código completo de este ejemplo se puede encontrar aquí.

 

Paso 1: Seleccionar el kernel e instalar las librerías

Este proyecto se puede desarrollar utilizando una instancia de notebook de Amazon SageMaker de tipo ml.m5.xlarge. Seleccionando el kernel CondaPython3 se encontrarán instaladas las librerías más comunes de machine learning. Se pueden consultar las versiones de los paquetes utilizando el siguiente código.

!pip freeze

 

Otro punto importante es asegurarse de estar usando la última versión del SDK de SageMaker (2.x.x.) y para eso es necesario hacer una actualización de la misma.

! pip install --upgrade sagemaker

 

Utilizando el SDK se SageMaker se pueden configurar los parámetros inciales.

sess = sage.Session()

role = get_execution_role()

prefix = 'lgb-model'

 

Paso 2: Escribir el Dockerfile

Para poder aprovechar el servicio de entrenamiento de SageMaker necesitamos construir un container con los métodos necesarios para entrenar un modelo cuando se invoca al container con el parámetro train y para crear un micro servicio cuando se invoca el container con el parámetro serve que implemente los métodos Ping e Invocations de tipo Get y Post respectivamente.  Estos métodos se pueden crear en containers separados, pero en este caso vamos a desarrollar toda la funcionalidad en uno solo.

# Build an image that can do training and inference in SageMaker

# This is a Python 2 image that uses the nginx, gunicorn, flask stack

# for serving inferences in a stable way.




FROM ubuntu:18.04




MAINTAINER Amazon AI <sage-learner@amazon.com>




RUN apt-get -y update && apt-get install -y --no-install-recommends \
         wget \

         python \

         python3.6 \

         nginx \

         ca-certificates \

         libgcc-5-dev \

         build-essential \

         python3-dev \

    && rm -rf /var/lib/apt/lists/*







# Symlink /usr/bin/python to the python version we're building for.

RUN rm /usr/bin/python && ln -s /usr/bin/python3.6 /usr/bin/python




# Here we get all python packages.

# There's substantial overlap between scipy and numpy that we eliminate by

# linking them together. Likewise, pip leaves the install caches populated which uses

# a significant amount of space. These optimizations save a fair amount of space in the

# image, which reduces start up time.

RUN wget https://bootstrap.pypa.io/3.3/get-pip.py && python3.6 get-pip.py

RUN pip install --upgrade pip && \

 pip3 install lightgbm==3.1.0 pandas==1.0.5 scikit-learn==0.23.1 flask  gunicorn && \

 pip3 install gevent --pre && \

 rm -rf /root/.cache




# Set some environment variables. PYTHONUNBUFFERED keeps Python from buffering our standard

# output stream, which means that logs can be delivered to the user quickly. PYTHONDONTWRITEBYTECODE

# keeps Python from writing the .pyc files which are unnecessary in this case. We also update

# PATH so that the train and serve programs are found when the container is invoked.




ENV PYTHONUNBUFFERED=TRUE

ENV PYTHONDONTWRITEBYTECODE=TRUE

ENV PATH="/opt/program:${PATH}"




# Set up the program in the image

COPY lgb /opt/program

WORKDIR /opt/program

 

Paso 3: Crear el archivo con la funcionalidad de train

Noten que la carpeta lgb será la carpeta de trabajo del container. Aquí se debe colocar la funcionalidad necesaria para ejecutar el entrenamiento y la predicción.  Un archivo de Python llamado únicamente “train” será el punto de entrada cuando SageMaker invoque la ejecución del container con el parámetro train. En este script se deben leer el o los archivos de entrenamiento (en caso de que el volumen de datos sea grande es conveniente tener los datos divididos),  se ejecuta el modelo, se imprime un score de cross validation y se guarda el modelo serializado (en este caso con la librería pickle) en la ubicación que SageMaker entrega como parámetro.

Amazon SageMaker internamente va a tomar ese modelo serializado, comprimirlo en formato tar.gz. y colocarlo en una ubicación de S3 asociada al training job que lo produjo.  Todos los logs que se impriman desde este script quedarán almacenados en CloudWatch y asociados al training job. Los mismos se pueden consultar desde la consola de Amazon SageMaker.

prefix = '/opt/ml/'




input_path = prefix + 'input/data'

output_path = os.path.join(prefix, 'output')

model_path = os.path.join(prefix, 'model')

param_path = os.path.join(prefix, 'input/config/hyperparameters.json')




# This algorithm has a single channel of input data called 'training'. Since we run in

# File mode, the input files are copied to the directory specified here.




channel_name='training'

training_path = os.path.join(input_path, channel_name)





# The function to execute the training.

def train():

    print('Starting the training.')

    try:

        # Read in any hyperparameters that the user passed with the training job

#         with open(param_path, 'r') as tc:

#             trainingParams = json.load(tc)




        # Take the set of files and read them all into a single pandas dataframe

        input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) if 'csv' in file  ]

       

        print(input_files)

        if len(input_files) == 0:

            raise ValueError(('There are no files in {}.\n' +

                              'This usually indicates that the channel ({}) was incorrectly specified,\n' +

                              'the data specification in S3 was incorrectly specified or the role specified\n' +

                              'does not have permission to access the data.').format(training_path, channel_name))

           

        raw_data = [ pd.read_csv(file, error_bad_lines=False) for file in input_files ]

       

        df_train = pd.concat(raw_data)

       

        X = df_train.iloc[:,1:]

        y = df_train.iloc[:,0]





        print("csv parsed")



       

        # Define model

        numeric_features = X.select_dtypes(include=np.number).columns.tolist()

        numeric_transformer = Pipeline(steps=[

            ('imputer', SimpleImputer(strategy='median'))])




        categorical_features = [x for x in X.columns if x not in numeric_features]

        categorical_transformer = Pipeline(steps=[

            ('imputer', SimpleImputer(strategy='most_frequent', fill_value='missing')),

            ('onehot', OneHotEncoder(handle_unknown='ignore'))])




        preprocessor = ColumnTransformer(

            transformers=[

                ('num', numeric_transformer, numeric_features),

                ('cat', categorical_transformer, categorical_features)])




        clf = Pipeline(steps=[('preprocessor', preprocessor),

                              ('classifier', LGBMClassifier(n_jobs=-1))])

       

        print("model defined")




        oof_pred = cross_val_predict(clf,

                             X,

                             y,

                             cv=5,

                             method="predict_proba")

                            

        print("Cross validation AUC {:.4f}".format(roc_auc_score(y, oof_pred[:,1])))

        clf.fit(X,y)

       

        # save the model

        filename = os.path.join(model_path, 'lgb_model.pkl')

        pickle.dump(clf, open(filename, 'wb'))

        print('Training complete.')

       

    except Exception as e:

        # Write out an error file. This will be returned as the failureReason in the

        # DescribeTrainingJob result.

        trc = traceback.format_exc()

        with open(os.path.join(output_path, 'failure'), 'w') as s:

            s.write('Exception during training: ' + str(e) + '\n' + trc)

        # Printing this causes the exception to be in the training job logs, as well.

        print('Exception during training: ' + str(e) + '\n' + trc, file=sys.stderr)

        # A non-zero exit code causes the training job to be marked as Failed.

        sys.exit(255)




if __name__ == '__main__':

    train()




    # A zero-exit code causes the job to be marked a Succeeded.

    sys.exit(0)   


Paso 4: Crear la funcionalidad de predict

A continuación, en el archivo predictor.py se especifica la funcionalidad necesaria para generar los endpoints ‘ping’ e ‘invocations’.

 

prefix = '/opt/ml/'

model_path = os.path.join(prefix, 'model')




# A singleton for holding the model. This simply loads the model and holds it.

# It has a predict function that does a prediction based on the model and the input data.




class ScoringService(object):

    model = None                # Where we keep the model when it's loaded




    @classmethod

    def get_model(cls):

        """Get the model object for this instance, loading it if it's not already loaded."""

        if cls.model == None:

            file_name = 'lgb_model.pkl'

            file_path = os.path.join(model_path,file_name)

            cls.model = pickle.load( open( file_path, "rb" ) )

        return cls.model




    @classmethod

    def predict(cls, input):

        """For the input, do the predictions and return them.




        Args:

            input (a pandas dataframe): The data on which to do the predictions. There will be

                one prediction per row in the dataframe"""

        clf = cls.get_model()

        return clf.predict(input)




# The flask app for serving predictions

app = flask.Flask(__name__)




@app.route('/ping', methods=['GET'])

def ping():

    """Determine if the container is working and healthy. In this sample container, we declare

    it healthy if we can load the model successfully."""

    health = ScoringService.get_model() is not None  # You can insert a health check here




    status = 200 if health else 404

    return flask.Response(response='\n', status=status, mimetype='application/json')




@app.route('/invocations', methods=['POST'])

def transformation():

    """Do an inference on a single batch of data. In this sample server, we take data as CSV, convert

    it to a pandas data frame for internal use and then convert the predictions back to CSV (which really

    just means one prediction per line, since there's a single column.

    """

    data = None




    # Convert from CSV to pandas

    if flask.request.content_type == 'text/csv':

        data = flask.request.data.decode('utf-8')

        s = StringIO(data)

        data = pd.read_csv(s)

        print (data.columns)

    else:

        return flask.Response(response='This predictor only supports CSV data', status=415, mimetype='text/plain')




    print('Invoked with {} records'.format(data.shape[0]))




    # Drop first column, since sample notebook uses training data to show case predictions

    # data.drop(data.columns[[0]],axis=1,inplace=True)




    # Do the prediction

    predictions = ScoringService.predict(data)




    # Convert from numpy back to CSV

    out = StringIO()

    pd.DataFrame({'results':predictions}).to_csv(out, header=False, index=False)

    result = out.getvalue()




    return flask.Response(response=result, status=200, mimetype='text/csv')

Paso 5: Construir la imagen localmente y subirla al Elastic Container Registry

Una vez completado el Dockerfile, vamos a compilarlo localmente y subirlo al ECR asociado a nuestra cuenta:

%%sh




# The name of our algorithm

algorithm_name=lgb-model




cd container




chmod +x lgb/train

chmod +x lgb/serve




account=$(aws sts get-caller-identity --query Account --output text)




# Get the region defined in the current configuration (default to us-west-2 if none defined)

region=$(aws configure get region)

region=${region:-us-west-2}




fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"




# If the repository doesn't exist in ECR, create it.

aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1




if [ $? -ne 0 ]

then

    aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null

fi




# Get the login command from ECR and execute it directly

$(aws ecr get-login --region ${region} --no-include-email)




# Build the docker image locally with the image name and then push it to ECR

# with the full name.




docker build  -t ${algorithm_name} .

docker tag ${algorithm_name} ${fullname}




docker push ${fullname}

 

Paso 6: Entrenamiento y deploy utilizando el SDK de Amazon SageMaker

A continuación, utilizaremos el SDK de Amazon SageMaker para entrenar el modelo y desplegarlo en un endpoint capaz de entregar predicciones en tiempo real.

df = pd.read_csv('titanic.csv',sep='|')

df = df.drop(['PassengerId','Cabin','Ticket','Name'],axis=1)




df_train, df_test = train_test_split(df, test_size=0.2)




df_train.to_csv('data/train.csv',index=False)

df_test.to_csv('data/test.csv',index=False)




sess.upload_data('data/train.csv', key_prefix=prefix + '/training')




data_location = f's3://{sess.default_bucket()}/{prefix}/training'




s3_input = {'training': data_location}




account = sess.boto_session.client('sts').get_caller_identity()['Account']

region = sess.boto_session.region_name

image = '{}.dkr.ecr.{}.amazonaws.com/lgb-model:latest'.format(account, region)




lgb = sage.estimator.Estimator(image,

                       role, 1, 'ml.c4.2xlarge',

                       output_path="s3://{}/output".format(sess.default_bucket()),

                       sagemaker_session=sess)




lgb.fit(s3_input)




from sagemaker.predictor import csv_serializer

predictor = lgb.deploy(1, 'ml.m4.xlarge', serializer=csv_serializer)

 

Paso 7: Evaluación del modelo

Una vez que finaliza la creación del endpoint podemos utilizar el mismo para realizar inferencias. Para esto vamos a correr el proceso de inferencia sobre los datos del holdout set que creamos al principio.

Los resultados deberían ser similares al promedio obtenido en la evaluación de cross validation.

test_data = pd.read_csv("data/test.csv")

test_data.iloc[:,1:].to_csv('data/x_test.csv',index=False)

client = boto3.client('sagemaker-runtime')

endpoint_name = predictor.endpoint_name                              

content_type = "text/csv"                                       

response = client.invoke_endpoint(

    EndpointName=endpoint_name,

    ContentType=content_type,

    Body=open('data/x_test.csv', 'rb')

    )

prob_scores = [eval(pred)[1] for pred in list(preds)]

roc_auc_score(test_data.iloc[:,0],prob_scores)

 

Paso 8: Clean up

Es importante eliminar el endpoint cuando ya no se va a utilizarlo ya que la infraestructura que lo soporta tiene un costo por el tiempo que se encuentra disponible.

predictor.delete_endpoint()

 

Conclusión

Amazon SageMaker permite combinar la funcionalidad de un servicio de entrenamiento y de inferencia manejados mientras que otorga la flexibilidad suficiente para utilizar cualquier lenguaje de programación, framework o librería en tanto se respeten las especificaciones necesarias a la hora de desarrollar un container propio.

 

 


Sobre el autor

María es arquitecta de soluciones en AWS desde hace casi dos años. En su rol, ayuda a los clientes tanto a determinar la mejor arquitectura para sus distintas aplicaciones como a encontrar los mejores algoritmos para resolver problemas de Machine Learning e IA. Antes de AWS, trabajó como desarrolladora de modelos de deep learning en un startup enfocado en NLP y chatbots y también como profesora full time en una coding school a cargo de un curso de data science.

 

 

Sobre los revisores

Andres es arquitecto de soluciones especialista en Analytics en AWS. En su rol apoya a los clientes a encontrar la mejor solucion y arquitectura para sus necesidades al igual que aprovechar los servicios de AI/ML para generar innovación y mejorar la productividad. Antes de AWS, trabajó para consultoras Big Four en las áreas de Data y Analytics, tanto como en consultoría estratégica, arquitectura e implementación de soluciones de procesamiento y consumo distribuidas.

 

 

 

Sergio es arquitecto de soluciones especialista en AI/ML en AWS. En su rol apoya a los clientes a encontrar la mejor solución y arquitectura para sus necesidades al igual que aprovechar los servicios de AI/ML para generar innovación y mejorar la productividad.