EMR クラスターで Spark を使用して Redshift クラスターに接続する方法を教えてください。

所要時間3分
0

Amazon EMR クラスターで Apache Spark を使用して Amazon Redshift クラスターに接続したいと考えています。

解決方法

注: 次の手順に進む前に、Redshift クラスターと EMR クラスターを設定し、Spark サービスをインストールします。

EMR クラスターから Redshift クラスターへの接続をテストする

1.    EMR プライマリ、コア、およびタスクノードのセキュリティグループが TCP ポート 5439 の Redshift のセキュリティグループ (インバウンドルール) で許可されていることを確認します。EMR クラスターと Redshift クラスターが 2 つの異なる Amazon Virtual Private Cloud (Amazon VPC) にデプロイされている場合は、VPC ピアリングを設定します。

2.    SSH を使用して EMR プライマリノードに接続し、次の Telnet コマンドを実行します。この Telnet コマンドは、EMR クラスターと Redshift クラスター間の接続を確立できることを確認します。以下のコマンドで、Redshift_Endpoint を Redshift クラスターの正しいエンドポイントに置き換えます。

telnet Redshift_Endpoint 5439

以下は、接続が成功した場合の出力例です。

telnet redshift-cluster-1.XXXX.us-east-1.redshift.amazonaws.com 5439
Trying 172.31.48.21...
Connected to redshift-cluster-1.XXXXX.us-east-1.redshift.amazonaws.com.
Escape character is

EMR-5.x.x シリーズクラスターで Spark を使用して Redshift クラスターに接続する

Databrick の Spark-Redshift パッケージ (ライブラリ) を使用してください。このライブラリは Amazon Redshift から Spark SQL DataFrames にデータをロードし、DataFrames を Amazon Redshift テーブルに戻して保存します。

1.    SSH を使用して EMR プライマリノードに接続します

2.    Spark-Redshift ライブラリを使用するには、以下の .jar ファイルを EMR クラスターにダウンロードします。

wget https://repo1.maven.org/maven2/com/databricks/spark-redshift_2.11/2.0.1/spark-redshift_2.11-2.0.1.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar

3.    ダウンロードした JAR ファイルを、デフォルトの Spark ライブラリにコピーします。Spark ライブラリのパスは /usr/lib/spark/jars/ です。

sudo cp spark-redshift_2.11-2.0.1.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/

4.    Amazon Redshift JDBC ドライバーを使用して spark-shell コマンドを実行し、Redshift クラスターに接続します。JDBC ドライバーは Amazon EMR バージョン 4.7.0 以降に含まれています。

spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar

5.    spark-shell セッションが初期化されたら、以下の手順を実行して Redshift クラスターに接続します。以下のコマンドで、Amazon Redshift エンドポイント、Amazon Simple Storage Service (Amazon S3) バケット名、およびテーブルの詳細をユースケースに応じて更新します。

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"

// Create DataFrame by loading Redshift query
val df = spark.read.format("com.databricks.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://<S3-path-to-store-temp-data>").option("query", "select * from <table-name>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)

Amazon EMR-6.x.x シリーズクラスターで Spark を使用して Redshift クラスターに接続する

Amazon EMR バージョン 6.x 以降では Scala バージョン 2.12 が使用されています。Amazon EMR 5.x では Scala バージョン 2.11 が使用されています。Amazon EMR 5.x バージョンが使用する spark-redshift_2.11-2.0.1.jar ファイルは、Amazon EMR バージョン 6.x 以降と互換性がありません。そのため、Amazon EMR 6.x 以降のクラスターでは spark-redshift_2.12-4.2.0.jar コネクタを使用してください。

1.    SSH を使用して EMR プライマリノードに接続します

2.    Spark-Redshift ライブラリを使用するには、以下の .jar ファイルを EMR クラスターにダウンロードします。

wget https://repo1.maven.org/maven2/io/github/spark-redshift-community/spark-redshift_2.12/4.2.0/spark-redshift_2.12-4.2.0.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar

3.    ダウンロードした JAR ファイルを、デフォルトの Spark ライブラリにコピーします。Spark ライブラリのパスは /usr/lib/spark/jars/ です。

sudo cp spark-redshift_2.12-4.2.0.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/

4.    Amazon Redshift JDBC ドライバーを使用して spark-shell コマンドを実行し、Redshift クラスターに接続します。JDBC ドライバーは EMR バージョン 4.7.0 以降に含まれています。

spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar

5.    spark-shell セッションが初期化されたら、以下の手順を実行して Redshift クラスターに接続します。以下のコマンドで、Amazon Redshift エンドポイント、S3 バケット名、テーブルの詳細をユースケースに応じて更新します。

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"

// Create DataFrame by loading Redshift query
val df = spark.read.format("io.github.spark_redshift_community.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://bucket/tmp/").option("query", "select * from <table>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)

AWS公式
AWS公式更新しました 1年前
コメントはありません