Blog de Amazon Web Services (AWS)

Usando Python shell y Pandas en AWS Glue para procesar conjuntos de datos pequeños y medianos

Angelo Carvalho es Arquitecto de Big Data Solutions Architect para Amazon Web Services

AWS Glue es un servicio de ETL totalmente administrado. Entre muchos recursos, este ofrece un ambiente de ejecución sin servidor para ejecutar sus trabajos de ETL. Muchos clientes de AWS están usando el ambiente Spark de AWS Glue para ejecutar tales tareas, pero otra opción es la utilización de empleos Python Shell. Este último tipo de empleo puede ser una opción más económica para el procesamiento de conjuntos de datos pequeños o medianos. Usted puede ejecutar tareas de shell de Python usando 1 DPU (unidad de procesamiento de datos) o 0,0625 DPU (1/16 de una DPU). Una única DPU provee una capacidad de procesamiento compuesta por 4 vCPUs de computación y 16 GB de memoria.

Los trabajos Python shell son compatibles con las versiones 2 y 3 de Python y el ambiente de ejecución ya viene preconfigurado con las bibliotecas más populares usadas por científicos de datos, como NumPy, SciPy, pandas entre otras. En Glue, adicionalmente las bibliotecas preinstaladas, usted también puede instalar otras bibliotecas adicionales.

 

Pyspark vs pandas

Clientes usando Spark Jobs se benefician de una poderosa API para procesamiento de DataFrames. DataFrames son conjuntos de datos organizados en columnas. Son conceptualmente equivalentes a una tabla en un banco de datos relacional y ofrecen operaciones típicas para ETL, como joins, agregaciones y filtros. En cuanto Spark es una estructura distribuida que escala horizontalmente y ofrece poder para procesar millones o billones de registros rápidamente, existen opciones menos escalables, pero igualmente versátiles para la ejecución de este tipo de empleo. Pandas (Python Data Analysis Library) es una de las bibliotecas más utilizadas por la comunidad de administradores y científicos de datos, y es perfecto para el procesamiento de conjuntos de datos pequeños y medianos. En este artículo, escribiremos un script para ser ejecutado en el ambiente de ejecución de Glue, usando pandas para procesar un conjunto de datos con un poco más de un millón de líneas (25MB de datos) en aproximadamente 30 segundos. El conjunto de datos escogido para el ejemplo fue el popular MovieLens.

En nuestro ejemplo, procesaremos 1 millón de valores, agrupados por película, cruzando datos entre dos tablas (películas y valores), y finalmente identificando la nota media de cada película. Para ser justos, consideraremos solamente películas con 1000 o más votos. El objetivo es identificar las 5 películas mejor votadas y crear una nueva tabla con estas informaciones.

 

Creando un Bucket en S3

Antes de que comencemos a trabajar en nuestros scripts, vamos primero a crear un bucket en S3, que usaremos para almacenar nuestros scripts, bibliotecas y también el resultado de la ejecución de un empleo. Para eso, será necesaria la utilización de AWS CLI. Se usted todavía no lo tiene instalado, siga las instrucciones aquí. Se usted ya tiene AWS CLI instalado, cerciórese de que está usando la versión más actualizada. Este script fue probado con la versión 1.16.302.

Una vez que usted tenga AWS CLI instalado y funcionando, baje el comando para crear un bucket en el S3. Acuérdese de colocar un nombre exclusivo para su bucket, sustituyendo el nombre ‘<<nombre_de_su_bucket>>’ por un nombre de bucket válido.

aws s3 mb s3://<<nombre_de_su_bucket>>

Creando su primer python shell job

Además de pandas, utilizaremos en este ejemplo dos bibliotecas adicionales: el s3fs para permitir al pandas acceder el S3, y el pyarrow para permitir al pandas generar archivos Parquet. El formato Parquet es uno de los más indicados para data lakes, ya que es por columnas y ofrece compresión, entregando buen desempeño para búsquedas analíticas y disminuyendo los costos con almacenamiento de datos.

El primer paso entonces es generar un paquete Python Wheels contenido en las dos bibliotecas de arriba. Abra la terminal y cree una carpeta llamada ‘glue_python_shell_sample’. Dentro de esta carpeta, cree un archivo llamado ‘setup.py’ con el siguiente contenido:

from setuptools import setup

setup(
    name="glue_python_shell_sample_module",
    version="0.1",
    install_requires=[
        "pyarrow~=0.15.1",
        "s3fs~=0.4.0"
    ]
) 

Vea que las dos bibliotecas mencionadas anteriormente (s3fs y pyarrow) son declaradas dependientes en el tramo del código de arriba. Eso será debidamente sumado al archivo whells (.whl) que será generado en el próximo paso.

Todavía en la terminal, entre el folder ‘glue_python_shell_sample’ y corra el siguiente comando:

cd glue_python_shell_sample
python3 setup.py bdist_wheel

Este comando generara un folder ‘dist’ y un archivo ‘glue_python_shell_sample_module-0.1-py3-none-any.whl’ dentro de la misma. Si prefiere, haga la descarga del archivo setup.py aquí.

Dentro del folder ‘dist’, vamos ahora a crear nuestro script ETL. Creen un archivo llamado etl_with_pandas.py, , conteniendo las líneas de código abajo. Acuérdese de modificar en el script abajo el valor de la variable que contiene el nombre del bucket, para el nombre de bucket escogido por usted en los pasos anteriores:

import urllib.request
from zipfile import ZipFile
import pandas as pd

#replace “just_another_bucket_name” by any valid bucket name…
bucket = "just_another_bucket_name"

# download MovieLens 1M Dataset
print("downloading file from movielens website...")
urllib.request.urlretrieve(
        'http://files.grouplens.org/datasets/movielens/ml-1m.zip',
        '/tmp/ml-1m.zip')

# extract the zip file
print("extracting dataset into tmp folder...")
with ZipFile('/tmp/ml-1m.zip', 'r') as zipObj:
   zipObj.extractall('/tmp/')

# read the csv
print("reading csv files...")
movies_df = pd.read_csv("/tmp/ml-1m/movies.dat", "::", 
                        engine='python', 
                        header=None, 
                        names=['movieid', 'title', 'genres']) 
print("movies_df has %s lines" % movies_df.shape[0])
ratings_df = pd.read_csv("/tmp/ml-1m/ratings.dat", "::", 
                         engine='python', 
                         header=None, 
                         names=['userid', 'movieid', 'rating', 'timestamp']) 
print("ratings_df has %s lines" % ratings_df.shape[0])

# join both dataframes
print("merging dataframes...")
merged_df = pd.merge(movies_df, ratings_df, on='movieid')

# aggregate data from dataframes, counting votes...
print("aggregating data...")
aggregation_df = merged_df.groupby('title').agg({'rating': ['count', 'mean']})
aggregation_df.columns = aggregation_df.columns.droplevel(level=0)
aggregation_df = aggregation_df.rename(columns={
    "count": "rating_count", "mean": "rating_mean"
})

# sorting data and filtering only movies with more than 1000 votes...
print("sorting data...")
aggregation_df = aggregation_df.sort_values(
        'rating_mean', 
        ascending=False).loc[aggregation_df['rating_count'] > 1000].head()

# writing data...
print("writing file to s3...")
aggregation_df.to_parquet(
        "s3://" + 
        bucket + 
        "/data/processed/best_movies/best_movies.parquet.snappy")

# reading data...
print("reading file from s3 and printing result...")
result_df = pd.read_parquet(
        "s3://" + 
        bucket + 
        "/data/processed/best_movies/best_movies.parquet.snappy")
print("result_df has %s lines" % result_df.size)

print("Best rated movie is: ")
print(result_df[0:1])

Si prefiere, simplemente haga la descarga del archivo etl_with_pandas.py aquí. Se tiene tiempo, explore el repositorio en github. Ahí usted encontrara archivos adicionales, como un cuaderno jupyter conteniendo el script.

Ya tenemos todo lo que necesitamos para iniciar el despliegue, entonces ahora vamos a copiar nuestros scripts para el bucket que creamos algunos pasos atrás. Acuérdese de substituir el nombre correcto de su bucket en los comandos de abajo, antes de ejecutarlos en su terminal, sustituyendo ‘<<nombre_de_su_bucket>>’ por el nombre real de su bucket.

aws s3 cp glue_python_shell_sample_module-0.1-py3-none-any.whl \
s3://<<nombre_de_su_bucket>>/lib/
aws s3 cp etl_with_pandas.py s3://<<nombre_de_su_bucket>>/scripts/

Listo. ya podemos crear nuestro trabajo usando los recursos copiados para el S3. Pero antes, usted va a necesitar de un rol IAM para AWS Glue. Si usted ya utiliza Glue con frecuencia, posiblemente ya tiene un rol y puede reaprovecharlo, solo certificándose de que ella tiene acceso para escribir y leer en el bucket que usted creo en el paso de arriba. Si usted todavía no ha creado un Rol IAM, o no sabe cómo proceder para agregar los permisos, siga las instrucciones de este link.

Ejecute el comando abajo en su terminal para crear su primero trabajo. Acuérdese de substituir el nombre del rol (<<nombre_de_su_rol_>>) por el nombre que usted utilizó en el paso de arriba y también substituir el nombre del bucket (<<nombre_de_su_bucket>>) para el bucket creado anteriormente:

aws glue create-job --name etl_with_pandas \
    --role <<nome_do_seu_iam_role>> \
    --command '{"Name" :  "pythonshell", "PythonVersion" : "3", "ScriptLocation" : "s3://<<nome_do_seu_bucket>>/scripts/etl_with_pandas.py"}' \
    --default-arguments '{"--extra-py-files" : "s3://<<nome_do_seu_bucket>>/lib/glue_python_shell_sample_module-0.1-py3-none-any.whl"}'

Se todo corrió bien, usted tiene ahora un job phython shell creado en AWS Glue. Localice el mismo en la consola (AWS Glue / ETL / Jobs). Seleccione el trabajo denominado ‘etl_with_pandas’ y de click en Action / Run trabajo.

Aguarde hasta que termine la ejecución y verifique el contenido de su bucket, en el folder /data/processed/best_movies/. Usted deberá encontrar un archivo llamado ‘best_movies.parquet.snappy’, que contiene el resultado del ETL: la lista de las películas mejor votadas.

Usted también podrá consultar en la pestaña “History” los registros de ejecución, la capacidad asignada, que en este caso fue solo 1/16 de una DPU, o sea, 1GB de memoria RAM y 25% de una vCPU. Por ser un ambiente bien pero ligero, los ambientes Python shell pueden ser ejecutados con muchos menos recursos computacionales asignados.

Vea abajo un ejemplo de la pantalla después de la ejecución del trabajo:

Ahora ejecute el trabajo varias veces. Usted va a notar que, en promedio será ejecutado en aproximadamente 30 segundos. Nada mal para un trabajo que procesa más de 1 millón de registros.

 

Conclusión

AWS Glue es la forma más rápida de comenzar con ETL en AWS. Además de la escalabilidad de Spark para procesamiento de conjuntos de datos gigantes, los clientes pueden también explorar la simplicidad de Python shell, utilizando estructuras como pandas para el procesamiento de conjuntos de datos pequeños o medianos.