Amazon Web Services ブログ

AWS Data Wrangler v1.0がリリースされました

2019年9月、当ブログでもご紹介したAWS Data Wrangler(以下、Data Wrangler)のv1.0がリリースされました。
以前紹介したときと比べて、主にAmazon Redshift(以下、Redshift)・AWS Glueデータカタログとの連携強化、その他Amazon EMR・Amazon CloudWatch Logsとの連携が追加されており、全体的により使いやすくなりました。v1.0になったことに伴い、改めて「AWS Data Wranglerとは?」および「簡単なチュートリアル」についてご紹介します。

(*)ブログ下段に、2019年9月のブログ投稿以降に追加された主要アップデートのサマリーを記載しています。

AWS Data Wranglerとは?

Data WranglerはPandasライブラリの機能をAWSに拡張するオープンソースのPythonライブラリです。DataFrameとAWSデータ関連サービス(Redshift, AWS Glue(以下、Glue), Amazon Athena(以下、Athena), Amazon EMRなど)と接続します。

Pandas、Apache Arrow、Boto3、s3fs、SQLAlchemy、Psycopg2、PyMySQLなどのオープンソースライブラリを用いて、データレイク、データウェアハウス、データベースからのデータのロード/アンロードといった通常のETLタスクに必要となる抽象化された関数を提供します。
(公式ドキュメント:「What is AWS Data Wrangler?」より翻訳・引用)

チュートリアル

公式ドキュメントにいくつかサンプルチュートリアルがあります。本ブログではAmazon SageMaker(以下、SageMaker)ノートブックを用いて、公式チュートリアル“01-Introduction.ipynb”、“06-Amazon Athena.ipynb”を参考に、簡易版をご紹介します。

1.S3の設定

1-1.AWSマネジメントコンソールにログインして、サービス一覧から”S3″を選択します。

1-2.[バケットを作成]ボタンをクリックし、[バケット名]に任意の名前(※世界で一意)を入力、リージョンが「アジアパシフィック(東京)になっていることを確認し、[作成]ボタンをクリックします。

1-3.バケットが作成されたら、[フォルダの作成]をクリックし、フォルダ名に「data」といれてフォルダを作成します。

2.Glueデータカタログのデータベース作成

2-1.サービス一覧から”Glue”のコンソールを開き、左のメニューバーから[データベース]を選択します。

2-2.[データベースの追加]ボタンをクリックし、データベース名に「awswrangler_test」と入力し、[作成]ボタンをクリックします。

3.SageMakerノートブックの起動

※SageMakerノートブックの起動からコード実行までの手順は簡略化したものとなっています。詳細については、下記URLのステップ2およびステップ3をご確認ください。
https://docs.aws.amazon.com/ja_jp/sagemaker/latest/dg/gs-setup-working-env.html

3-1.サービス一覧から”SageMaker”のコンソールを開き、左のメニューバーから[ノートブックインスタンス]を選択し、[ノートブックインスタンスの作成]を選択します。

3-2.ノートブックインスタンス名に任意の名前を入力し、ノートブックを作成します。(※ここでは新規にAWS IAMロールを作成します。)

3-3.サービス一覧から”IAM”を選択します。

3-4.手順“3-2”で作成したAWS IAMロールに対して、「AmazonS3FullAccess」と「AmazonAthenaFullAccess」を付与します。

3-5.作成したノートブックインスタンスから[Jupyterを開く]を選択し、ノートブックを起動します。

4.サンプルチュートリアルの実行

以下チュートリアルのシナリオはAmazon S3(以下、S3)上にあるCSVファイルをParquetファイルに変換、その後、Athenaからクエリを実行する例です。

4-1.[New]タグから[conda_python3]を選択し、新規のファイルを作成します。

4-2.下記コマンドを実行し、Data Wranglerをインストールします。

!pip install awswrangler

4-3.バージョン確認のコードを実行して、バージョンが表示されることを確認します。(※本ブログ投稿時はV1.0.0で実行しています)

import awswrangler as wr
wr.__version__

4-4.初期設定として、S3バケットの設定を行います。実行後、テキスト入力が可能です。そこで、1-2で作成したS3バケット名を指定します。

import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"

4-5.サンプルのCSVファイルを読み込みます。(※データ量が多いため、少し時間がかかります。)

cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/189",
    names=cols,
    parse_dates=["dt", "obs_time"])  
    
df

4-6.CSVファイルをParquetファイルに変換します。

res = wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="noaa"
)

[補足]
上記コードの「database」には2-2で作成したGlueのデータベース名が入ります。この例では、CSVからParquetへの変換とともに、Glueデータカタログの「awswrangler_test」データベースに対して、「noaa」テーブルを作成しています。処理完了後に1-3で作成したS3のPrefix以下にParquetファイル作成されていること、Glueデータカタログに「noaa」というテーブルが作成されていることが確認できます。詳細はこちらのAPIリファレンスをご参照ください。

4-6.Glueのデータカタログからテーブルを検索します。

wr.catalog.table(database="awswrangler_test", table="noaa")

4-7.Athenaからクエリを実行します。

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")

公式ドキュメントではCTASを有効有無によるクエリ速度、メモリに制限がある環境での処理方法について言及しております。興味がある方は公式ドキュメントからお試しください。

4-8.S3のdataフォルダを削除します。

wr.s3.delete_objects(path)

4-9. Glueのデータカタログを削除します。

for table in wr.catalog.get_tables(database="awswrangler_test"):
    wr.catalog.delete_table_if_exists(database="awswrangler_test", table=table["Name"])

以上で終了です。

今回、Athenaからのクエリ実行のご紹介のみでしたが、他にもData Wranglerの基本的な使い方からRedshiftを使ったチュートリアルなどいくつか用意されています。

前回からのアップデートサマリー

2019年9月から多くのアップデートがあったため、主要なものを絞って記述します。アップデートの詳細はGithubページをご確認ください。

  • S3 から直接 Parquetファイルを Pandas DataFrame に読み込む(Link)
  • Parquet出力経由でRedshiftの結果をPandas DataFrameに読み込む(Link)
  • Redshiftの結果をParquetとしてS3に保存(Link)
  • CTASを使って、Athenaの結果をPandas DataFrameに読み込む(Link)
  • PandasからRedshiftへのUpsert(Link)

まとめ

1コマンドで簡単にインストールができ、ETL処理を手軽に実行できることはデータを扱う方々にとって大きなメリットではないでしょうか。今後も機能拡張される予定です。ぜひご利用ください。

著者について

倉光 怜(Satoshi Kuramitsu)はAWSのソリューションアーキテクトです。好きなAWSサービスはAWS Glue、Amazon Kinesis、Amazon S3です。