Amazon Web Services ブログ

AWS Data Wranglerを使って、簡単にETL処理を実現する

2019年9月、Github上にAWS Data Wrangler(以下、Data Wrangler)が公開されました。Data Wranglerは、各種AWSサービスからデータを取得して、コーディングをサポートしてくれるPythonのモジュールです。

現在、Pythonを用いて、Amazon Athena(以下、Athena)やAmazon Redshift(以下、Redshift)からデータを取得して、ETL処理を行う際、PyAthenaやboto3、Pandasなどを利用して行うことが多いかと思います。その際、本来実施したいETLのコーディングまでに、接続設定を書いたり、各種コーディングが必要でした。Data Wraglerを利用することで、AthenaやAmazon S3(以下、S3)上のCSVからPandasを利用するのが、数行で実施できたり、PySparkからRedshiftに連携できるなど、お客様側はETLの処理の記述内容に集中することができます。
本モジュールはインスタンスに対してpipでインストールできることに加え、Lambda Layerとしての利用やGlue上でeggファイルをアップロードして利用することができます。

本ブログでは、Amazon SageMaker(以下、SageMaker) Notebookを用いて、Athenaにクエリを実行、前処理をし、結果をS3に配置するチュートリアルをご紹介いたします。

 

チュートリアル

実行するシナリオは下記の通りです。

シナリオ

ユースケースとしては、AWSサービスの構築のご経験が少ない方でもノートブック上から機械学習の前処理などでご利用いただけるかと思います。例えば、SageMaker内のビルトインアルゴリズムであるXG Boostを利用する目的で、データベース内のデータを必要な形に変換したり、またそれ以外にも欠損値を平均値で穴埋めするといったことがあるでしょう。

このチュートリアルでは、Athenaでクエリした結果の中から分析対象外データを削除し、その後、項目を別の値に置き換える一連の手順をご紹介します。なお、環境構築自体は“東京リージョン”(ap-northeast-1)で実施します。

手順

0.データのダウンロード

サンプルデータのダウンロードを行います。

(サンプルデータのURL内にある2019年6月の“Green Taxi Trip Records(CSV)”を利用します。)

1.データセットの準備

1-1.AWSマネジメントコンソールにログインして、サービス一覧から“S3”を選択します。

1-2.[バケットを作成する]ボタンをクリックし、“バケット名”に任意の名前(※世界で一意)を入力、リージョンが“アジアパシフィック(東京)”になっていることを確認し、[作成]ボタンをクリックします。

1-3.バケットが作成されたら、作成したバケットに手順“0”でダウンロードしたCSVファイルをアップロードします。

1-4.サービス一覧から“Athena”を選択します。

1-5.下記クエリを実行し、Athenaのデータベースとテーブルを作成します。

・データベースの作成

CREATE DATABASE [YOUR DATABASE NAME]; 

・テーブルの作成

CREATE EXTERNAL TABLE green_tripdata(
  VendorID string, 
  lpep_pickup_datetime string,
  lpep_dropoff_datetime string,
  store_and_fwd_flag string,
  RatecodeID string,
  PULocationID string,
  DOLocationID string,
  passenger_count int,
  trip_distance double,
  fare_amount double,
  extra double,
  mta_max double,
  tip_amount double,
  tolls_amount double,
  ehail_fee string,
  improvement_surcharge double,
  total_amount double,
  payment_type string,
  trip_type string,
  congestion_surcharge double
  )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
LOCATION 's3://[YOUR S3 BUCKET NAME]/[PREFIX]';  --CSVデータの配置場所

1-6.Athenaでクエリを実行して、データが取得できることを確認します。

・確認のクエリ

select count(*) from green_tripdata; 

※本バケット構成の場合、毎回ファイル全体をスキャンします。Athenaのクエリ最適化については「Amazon Athena のパフォーマンスチューニング Tips トップ 10」をご確認ください。

2.SageMaker Notebookの起動

※SageMaker Notebookの起動からコード実行までの手順は簡略化したものとなっています。詳細については、下記URLのステップ2およびステップ3をご確認ください。
https://docs.aws.amazon.com/ja_jp/sagemaker/latest/dg/gs-setup-working-env.html

2-1.SageMakerのコンソールを開き、左のメニューバーから“ノートブックインスタンス”を選択し、[Create notebook instance]を選択します。

2-2.ノートブックインスタンス名を入力し、ノートブックを作成します。※ここでは新規にIAMロールを作成します。

2-3.サービス一覧から“IAM”を選択します。

2-4.手順“2-2”で作成したIAMロールに対して、“AmazonS3FullAccess”と“AmazonAthenaFullAccess”を付与します。

2-5.作成したノートブックインスタンスから[Open Jupyter]を選択し、ノートブックを起動します。

3.NotebookからData Wranglerを用いて、Athenaでクエリ実行する

3-1.[New]タグから[conda_python3]を選択し、新規のファイルを作成します。

3-2.下記コマンドを実行し、Data Wranglerをインストールします。

!pip install awswrangler

インストールが成功されていることを確認します。

3-3.Athenaでクエリを実行するコードをSageMaker Notebook上で実行します。

・コード例

import pandas
import awswrangler

session = awswrangler.Session()
df = session.pandas.read_sql_athena(
    sql="select * from green_tripdata",
    database="[YOUR DATABASE NAME]"
)

print(df)

出力された内容が手順“1-6”で実行した内容と同じであることを確認します。

4.ETLの実行

4-1.“trip_distance”が0のデータは分析対象外とみなし、行の削除処理を行います。

・コード例

## trip_distanceが0の値を抽出、件数確認
rows_drop = df.index[df["trip_distance"] == 0.00]

print(df.loc[rows_drop].count())

## trip_distanceが0の値を削除、件数確認
df_drop = df.drop(rows_drop)
print(df_drop)

df_lens = df_drop.count()
print(df_lens)

4-2.処理結果の合計件数が列の分だけ減っていることを確認します。

4-3.不要データを削除したものに対して、データ内のカラムの置き換えを行います。ここでは“payment_type”という項目に対して、データの置き換えを行います。

・コード例

df_replace = df_drop.replace(
    {'payment_type': 
        {
            '1': 'Credit card', 
            '2': 'Cash', 
            '3': 'No charge', 
            '4': 'Dispute', 
            '5': 'Unknown', 
            '6': 'Voided trip'
        }
    }
)

print(df_replace)

payment_typeが定義した内容に置き換わったことを確認します。

4-4.CSVファイルをS3に出力します。

session.pandas.to_csv(
    dataframe=df_replace,
    path="s3://[YOUR S3 BUCKET NAME]/[PREFIX]",  ##出力先バケット名
    sep=",",
    database=None,
    table=None,
    partition_cols=None,
    preserve_index=True, 
    mode='append',
    procs_cpu_bound=None, 
    procs_io_bound=None
)

4-5.S3のコンソール上で、出力したCSVが出力されていることを確認します。

以上で終了です。

まとめ

まだリリースされたばかりのため、できることは限られていますが、数行でクエリからカラムの値の置き換え、出力まで実行できることが実感いただけたのではないかと思います。
ぜひご利用ください。

著者について

倉光 怜(Satoshi Kuramitsu)はAWSのソリューションアーキテクトです。好きなAWSサービスはAWS Glue、Amazon Kinesis、Amazon S3です。