O blog da AWS

Usando Python shell e Pandas no AWS Glue para processar datasets pequenos e médios

Angelo Carvalho is a Big Data Solutions Architect for Amazon Web Services

 

O AWS Glue é um serviço de ETL totalmente gerenciado. Entre muitos recursos, ele oferece um ambiente de execução serverless para executar seus trabalhos de ETL. Muitos clientes da AWS estão usando o ambiente Spark do AWS Glue para executar tais tarefas, mas outra opção é a utilização de jobs Python Shell. Este ultimo tipo de job pode ser uma opção mais econômica para o processamento de datasets pequenos ou médios. Você pode executar tarefas de shell do Python usando 1 DPU (unidade de processamento de dados) ou 0,0625 DPU (1/16 de uma DPU). Uma única DPU fornece uma capacidade de processamento composta por 4 vCPUs de computação e 16 GB de memória.

Os jobs Python shell são compatíveis com as versões 2 e 3 do Python e o ambiente de execução já vem pré-configurado com as bibliotecas mais populares usadas por cientistas de dados, como NumPy, SciPy, pandas entre outras. No Glue, adicionalmente às bibliotecas pré-instaladas, você também pode instalar outras bibliotecas adicionais.

 

Pyspark vs pandas

Clientes usando Spark Jobs se beneficiam de uma poderosa API para processamento de DataFrames. DataFrames são data sets organizados em colunas. São conceitualmente equivalentes a uma tabela em um banco de dados relacional e oferecem operações típicas para ETL, como joins, agregações e filtros. Enquanto o Spark é um framework distribuído que escala horizontalmente e oferece poder para processar milhões ou bilhões de registros rapidamente, existem opções menos escaláveis, mas igualmente versáteis para a execução deste tipo de job. O pandas (Python Data Analysis Library) é uma das bibliotecas mais utilizadas pela comunidade de administradores e cientistas de dados, e é perfeito para o processamento de datasets pequenos e médios. Neste artigo, iremos escrever um script para ser executado no ambiente de execução do Glue, usando o pandas para processar um dataset com um pouco mais de um milhão de linhas (25MB de dados) em aproximadamente 30 segundos. O dataset escolhido para o exemplo foi o popular MovieLens.

No nosso exemplo, iremos processar 1 milhão de avaliações, agrupadas por filme, cruzando dados entre duas tabelas (filmes e avaliações), e finalmente identificando a nota média de cada filme. Para sermos justos, iremos considerar somente filmes com 1000 ou mais votos. O objetivo é identificar os 5 filmes mais bem votados e criar uma nova tabela com estas informações.

 

Criando um Bucket no Amazon S3

Antes de começarmos a trabalhar nos nossos scripts, vamos primeiro criar um bucket no Amazon S3, que usaremos para armazenar nosso script, bibliotecas e também o resultado da execução do job. Para isso, será necessária a utilização do AWS CLI. Se você ainda não o tem instalado, siga as instruções aqui. Se você já tem o AWS CLI instalado, certifique-se de que está usando a versão mais atualizada. Este script foi testado com a versão 1.16.302.

Uma vez que você tenha o AWS CLI instalado e funcionando, rode o comando abaixo para criar um bucket no Amazon S3. Lembre-se de colocar um nome exclusivo para o seu bucket, substituindo o nome ‘<<nome_do_seu_bucket>>’ por um nome de bucket válido.

aws s3 mb s3://<<nome_do_seu_bucket>>

 

Criando o seu primeiro python shell job

Além do pandas, iremos utilizar neste exemplo duas bibliotecas adicionais: o s3fs para permitir ao pandas acessar o Amazon S3, e o pyarrow para permitir ao pandas gerar arquivos Parquet. O formato Parquet é um dos mais indicados para data lakes, visto que é colunar e oferece compressão, entregando boa performance para queries analíticas e diminuindo os custos com armazenamento de dados.

O primeiro passo então é gerar um pacote Python Wheels contendo as duas bibliotecas acima. Abra o terminal e crie uma pasta chamada ‘glue_python_shell_sample’. Dentro desta pasta, crie um arquivo chamado ‘setup.py’ com o seguinte conteúdo:

from setuptools import setup

setup(
    name="glue_python_shell_sample_module",
    version="0.1",
    install_requires=[
        "pyarrow~=0.15.1",
        "s3fs~=0.4.0"
    ]
) 

Veja que as duas bibliotecas mencionadas anteriormente (s3fs e pyarrow) são declaradas como dependências no trecho de código acima. Isso será devidamente adicionado ao arquivo whells (.whl) que será gerado no próximo passo.

Ainda no terminal, entre na pasta ‘glue_python_shell_sample’ e rode o seguinte comando:

cd glue_python_shell_sample
python3 setup.py bdist_wheel

Este comando irá gerar uma pasta ‘dist’ e um arquivo ‘glue_python_shell_sample_module-0.1-py3-none-any.whl’ dentro da mesma. Se preferir, faça o download do arquivo setup.py aqui.

Dentro da pasta ‘dist’, vamos agora criar o nosso script ETL. Crie um arquivo chamado etl_with_pandas.py, , contendo as linhas de código abaixo. Lembre-se de alterar no script abaixo o valor da variável que contém o nome do bucket, para o nome de bucket escolhido por você nos passos anteriores:

import urllib.request
from zipfile import ZipFile
import pandas as pd

#replace “just_another_bucket_name” by any valid bucket name…
bucket = "just_another_bucket_name"

# download MovieLens 1M Dataset
print("downloading file from movielens website...")
urllib.request.urlretrieve(
        'http://files.grouplens.org/datasets/movielens/ml-1m.zip',
        '/tmp/ml-1m.zip')

# extract the zip file
print("extracting dataset into tmp folder...")
with ZipFile('/tmp/ml-1m.zip', 'r') as zipObj:
   zipObj.extractall('/tmp/')

# read the csv
print("reading csv files...")
movies_df = pd.read_csv("/tmp/ml-1m/movies.dat", "::", 
                        engine='python', 
                        header=None, 
                        names=['movieid', 'title', 'genres']) 
print("movies_df has %s lines" % movies_df.shape[0])
ratings_df = pd.read_csv("/tmp/ml-1m/ratings.dat", "::", 
                         engine='python', 
                         header=None, 
                         names=['userid', 'movieid', 'rating', 'timestamp']) 
print("ratings_df has %s lines" % ratings_df.shape[0])

# join both dataframes
print("merging dataframes...")
merged_df = pd.merge(movies_df, ratings_df, on='movieid')

# aggregate data from dataframes, counting votes...
print("aggregating data...")
aggregation_df = merged_df.groupby('title').agg({'rating': ['count', 'mean']})
aggregation_df.columns = aggregation_df.columns.droplevel(level=0)
aggregation_df = aggregation_df.rename(columns={
    "count": "rating_count", "mean": "rating_mean"
})

# sorting data and filtering only movies with more than 1000 votes...
print("sorting data...")
aggregation_df = aggregation_df.sort_values(
        'rating_mean', 
        ascending=False).loc[aggregation_df['rating_count'] > 1000].head()

# writing data...
print("writing file to s3...")
aggregation_df.to_parquet(
        "s3://" + 
        bucket + 
        "/data/processed/best_movies/best_movies.parquet.snappy")

# reading data...
print("reading file from s3 and printing result...")
result_df = pd.read_parquet(
        "s3://" + 
        bucket + 
        "/data/processed/best_movies/best_movies.parquet.snappy")
print("result_df has %s lines" % result_df.size)

print("Best rated movie is: ")
print(result_df[0:1])

Se preferir, simplesmente faça o download do arquivo etl_with_pandas.py aqui. Se tiver tempo, explore o repositório no github. Lá você encontrará arquivos adicionais, como um notebook jupyter contendo o script ETL para ser executado de forma iterativa.

Já temos tudo que precisamos para iniciar o deploy, então agora vamos copiar os nossos scripts para o bucket que criamos alguns passos atrás. Lembre-se de substituir o nome correto do seu bucket nos comandos abaixo, antes de executá-los no seu terminal, substituindo ‘<<nome_do_seu_bucket>>’ pelo nome real do seu bucket.

aws s3 cp glue_python_shell_sample_module-0.1-py3-none-any.whl \
s3://<<nome_do_seu_bucket>>/lib/
aws s3 cp etl_with_pandas.py s3://<<nome_do_seu_bucket>>/scripts/

Pronto. Já podemos criar nosso job usando os recursos copiados para o Amazon S3. Mas antes, você vai precisar de uma IAM role para o AWS Glue. Se você já usa o Glue com frequência, possivelmente já tem uma role e pode reaproveitá-la, apenas se certificando de que ela tem acesso para escrever e ler no bucket que você criou no passo acima. Se você ainda não tem uma IAM Role criada, ou não sabe como proceder para adicionar as permissões, siga as instruções deste link.

Execute o comando abaixo no seu terminal para criar o seu primeiro job. Lembre-se de substituir o nome da role (<<nome_do_seu_iam_role>>) pelo nome que você usou no passo acima e também substituir o nome do bucket (<<nome_do_seu_bucket>>) para o bucket criado anteriormente:

aws glue create-job --name etl_with_pandas \
    --role <<nome_do_seu_iam_role>> \
    --command '{"Name" :  "pythonshell", "PythonVersion" : "3", "ScriptLocation" : "s3://<<nome_do_seu_bucket>>/scripts/etl_with_pandas.py"}' \
    --default-arguments '{"--extra-py-files" : "s3://<<nome_do_seu_bucket>>/lib/glue_python_shell_sample_module-0.1-py3-none-any.whl"}'


Se tudo correu bem, você tem agora um job phython shell criado no AWS Glue. Localize o mesmo no console (AWS Glue / ETL / Jobs). Selecione o job denominado ‘etl_with_pandas’ e clique em Action / Run job.

Aguarde até o término da execução e verifique o conteúdo do seu bucket, na pasta /data/processed/best_movies/. Você deverá encontrar um arquivo chamado ‘best_movies.parquet.snappy’, que contém o resultado do ETL: a lista dos filmes mais bem votados.

Você  também poderá consultar na aba “History” os logs de execução, a capacidade alocada, que neste caso foi apenas 1/16 de uma DPU, ou seja, 1GB de memória RAM e 25% de uma vCPU. Por ser um ambiente bem mais leve, ambientes Python shell podem ser executados com bem menos recursos computacionais alocados.

Veja abaixo um exemplo da tela após a execução do job:

Agora execute o job várias vezes. Você vai perceber que, na média, ele será executado em aproximadamente 30 segundos. Nada mal para um job que processa mais de 1 milhão de registros.

 

Conclusão

O AWS Glue é a forma mais rápida de se começar com ETL na AWS. Além da escalabilidade do Spark para processamento de data sets gigantes, os clientes podem também explorar a simplicidade do Python shell, utilizando frameworks como pandas para o processamento de data sets pequenos ou médios.