Amazon Web Services ブログ

NEW – Amazon Redshift と Apache Spark の統合

Apache Spark は、ビッグデータのワークロードによく使用されるオープンソースの分散処理システムです。Amazon EMR、Amazon SageMaker、および AWS Glue で作業する Spark アプリケーション開発者は、 Amazon Redshift によるデータの読み取りと書き込みを可能にするサードパーティの Apache Spark コネクタをよく使用します。これらのサードパーティ製コネクタは、さまざまなバージョンの Spark で本番環境で定期的に保守、サポート、テストされているわけではありません。

2022/11/29、 Apache Spark 向けの Amazon Redshift インテグレーションの一般提供についてお知らせします。これにより、Amazon Redshift と Redshift サーバーレスで Spark アプリケーションを簡単に構築および実行できるようになり、お客様はデータウェアハウスをより幅広い AWS 分析および機械学習(ML)ソリューションに開放できるようになります。

Amazon Redshift と Apache Spark との統合により、すぐに使い始めることができ、Java、Scala、Python などのさまざまな言語で Apache Spark アプリケーションを簡単に構築できます。

アプリケーションは、アプリケーションのパフォーマンスやデータのトランザクションの一貫性を損なうことなく、プッシュダウン最適化によるパフォーマンスの向上を犠牲にすることなく、Amazon Redshift データウェアハウスからの読み取りと書き込みを行うことができます。

Apache Spark 向けの Amazon Redshift 統合は、既存のオープンソースコネクタプロジェクトに基づいて構築され、パフォーマンスとセキュリティを強化し、お客様がアプリケーションのパフォーマンスを最大 10 倍高速化できるよう支援します。これを実現するために私たちと協力してくださったプロジェクトの元の貢献者に感謝します。さらなる機能強化を行いながら、オープンソースプロジェクトへの貢献を続けていきます。

Amazon Redshift 用スパークコネクタ入門
まず、AWS アナリティクスと ML サービスにアクセスし、Spark ジョブまたはノートブックでデータフレームまたは Spark SQL コードを使用して Amazon Redshift データウェアハウスに接続し、クエリの実行を数秒で開始できます。

今回のリリースでは、Amazon EMR 6.9、EMR サーバーレス、AWS Glue 4.0 にコネクタと JDBC ドライバーがあらかじめパッケージ化されており、すぐにコードを書くことができます。EMR 6.9 にはサンプルノートブックが用意されており、EMR サーバーレスにはサンプル Spark ジョブも用意されています。

まず、Redshift と Spark の間、 Amazon Simple Storage Service (Amazon S3) と Spark の間、および Redshift と Amazon S3 の間で AWS Identity and Access Management (AWS IAM) 認証を設定する必要があります。次の図は、Amazon S3、Redshift、Spark ドライバー、および Spark エグゼキューター間の認証を示しています。

詳細については、AWS ドキュメントの「Amazon Redshift でのアイデンティティとアクセス管理」を参照してください。

Amazon EMR
Amazon Redshift データウェアハウスと利用可能なデータがすでにある場合は、データベースユーザーを作成して、データベースユーザーに適切なレベルの権限を与えることができます。これを Amazon EMR で使用するには、 spark-redshift コネクタがパッケージ化された Amazon EMR 6.9 の最新バージョンにアップグレードする必要があります。Amazon EC2 で EMR クラスターを作成する場合は、 emr-6.9.0 リリースを選択します。

EMR サーバーレスを使用して emr-6.9.0 リリースを使用して Spark アプリケーションを作成し、ワークロードを実行できます。

EMR Studio には、サンプルデータを活用して Amazon Redshift サーバーレスエンドポイントに接続するように設定された Jupyter Notebook のサンプルも用意されており、すぐに使い始めることができます。

Spark データフレームと Spark SQL の両方を使用してアプリケーションを構築するスカラーの例を次に示します。Redshift への接続には IAM ベースの認証情報を使用し、S3 からのデータのアンロードとロードには IAM ロールを使用します。

//JDBC 接続 URL を作成し、レッドシフトコンテキストを定義します
val jdbcURL =「jdbc: redshift: iam://<RedshiftEndpoint>:<Port>/<Database>?DbUser =<RsUser>」
val rsOptions = Map (
  "url" -> jdbcURL,
  "tempdir" -> tempS3Dir,
  "aws_iam_role" -> roleARN,
  )
//Redshift の売上テーブルを参照する 
val sales_df = spark
  read: 
  .format("io.github.spark_redshift_community.spark.redshift") 
  .options(rsOptions) 
  .option("dbtable", "sales") 
  .load() 
sales_df.createOrReplaceTempView("sales") 
//データフレームを使用して Redshift から日付テーブルを参照する 
sales_df.join (date_df, sales_df (「dateid」) === date_df (「dateid」))
  .where(col("caldate") === "2008-01-05")
  .groupBy().sum("qtysold")
  .select(col("sum(qtysold)"))
  .show() 

Amazon Redshift と Amazon EMR が異なる VPC にある場合は、VPC ピアリングを設定するか、VPC 間のアクセスを有効にする必要があります。Amazon Redshift と Amazon EMR の両方が同じ仮想プライベートクラウド (VPC) 内にあると仮定すると、Spark ジョブまたはノートブックを作成して Amazon Redshift データウェアハウスに接続し、Amazon Redshift コネクタを使用する Spark コードを記述できます。

詳細については、AWS ドキュメントの「Amazon Redshift でコネクターを使用して Spark を使用する」を参照してください。

AWS Glue
AWS Glue 4.0 を使用する場合、スパークリッドシフトコネクタはソースとターゲットの両方として使用できます。Glue Studio では、ビルトインの Redshift ソースノードまたはターゲットノード内で使用する Redshift 接続を選択するだけで、ビジュアル ETL ジョブを使用して Redshift データウェアハウスへの読み取りまたは書き込みを行うことができます。

Redshift 接続には、Redshift 接続の詳細と、適切な権限で Redshift にアクセスするために必要な認証情報が含まれます。

開始するには、 Glue Studio コンソールの左側のメニューで [ジョブ] を選択します。どちらのビジュアルモードを使用しても、ソースノードまたはターゲットノードを簡単に追加および編集し、コードを書かずにデータに対してさまざまな変換を定義できます。

Create を選択すると、ジョブダイアグラムでソース、ターゲットノード、トランスフォームノードを簡単に追加および編集できます。このとき、ソースとターゲットとして Amazon Redshift を選択します。

完了すると、Apache Spark エンジンの Glue で Glue ジョブを実行できるようになります。このエンジンでは、最新のスパーク-レッドシフトコネクタが自動的に使用されます。

次の Python スクリプトは、spark-redshift コネクタを使用してダイナミックフレームを使用して Redshift に読み書きするジョブの例を示しています。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("================ DynamicFrame Read ===============")
url = "jdbc:redshift://<RedshiftEndpoint>:<Port>/dev"
read_options = {
    "url": url,
    "dbtable": dbtable,
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "include_column_list": "false"
}

redshift_read = glueContext.create_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options=read_options
) 

print("================ DynamicFrame Write ===============")

write_options = {
    "url": url,
    "dbtable": dbtable,
    "user": "awsuser",
    "password": "Password1",
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "DbUser": "awsuser"
}

print("================ dyf write result: check redshift table ===============")
redshift_write = glueContext.write_dynamic_frame.from_options(
    frame=redshift_read,
    connection_type="redshift",
    connection_options=write_options
)

ジョブの詳細を設定する場合、このインテグレーションに使用できるのは Glue 4.0 — Spark 3.3 Python 3 をサポートしているバージョンのみです

詳細については、AWS ドキュメントの「AWS Glue Studio での ETL ジョブの作成」と「AWS Glue Studio でのコネクタと接続の使用」を参照してください。

最高のパフォーマンスを実現
Apache Spark の Amazon Redshift インテグレーションでは、Spark コネクタが自動的に述語とクエリプッシュダウンを適用してパフォーマンスを最適化します。この統合では、アンロードに使用されるコネクタにデフォルトのParquet形式を使用することで、パフォーマンスを向上させることができます。

次のサンプルコードが示すように、Spark コネクタはサポートされている関数を SQL クエリに変換し、Amazon Redshift でクエリを実行します。

import sqlContext.implicits._val
sample= sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url",jdbcURL )
.option("tempdir", tempS3Dir)
.option("unload_s3_format", "PARQUET")
.option("dbtable", "event")
.load()

//以前に作成したデータフレームの一時ビューを作成して、Spark SQL 経由でアクセスできるようにする
sales_df.createOrReplaceTempView("sales")
date_df.createOrReplaceTempView("date")
//Spark SQL API を使用して、特定の日付の総売上高を表示します
spark.sql(
"""SELECT sum(qtysold)
| FROM sales, date
| WHERE sales.dateid = date.dateid
| AND caldate = '2008-01-05'""".stripMargin).show()

Amazon Redshift と Apache Spark の統合により、ソート、集計、制限、結合、スカラー関数などの操作にプッシュダウン機能が追加され、関連するデータのみが Redshift データウェアハウスから使用側の Spark アプリケーションに移動されるため、パフォーマンスが向上します。

今すぐ利用可能
Apache Spark の Amazon Redshift インテグレーションは、Amazon EMR 6.9、AWS Glue 4.0、および Amazon Redshift をサポートするすべてのリージョンで利用できるようになりました。この機能は、新しい Spark 3.3.0 バージョンの EMR 6.9 および Glue Studio 4.0 から直接使用を開始できます。

お試しいただけた際は、AWS re:Post for Amazon Redshiftまたは通常の AWS サポート担当者を通じてフィードバックをお寄せください。

Channy

原文はこちらです。