如何使用 EMR 叢集中的 Spark 連線至 Redshift 叢集?
上次更新日期:2022 年 12 月 23 日
我想在我的 Amazon EMR 叢集中使用 Apache Spark 連接 Amazon Redshift 叢集。
解決方案
注意:請先設定您的 Redshift 叢集和 EMR 叢集並安裝 Spark 服務,再繼續執行下列步驟。
測試從 EMR 叢集到 Redshift 叢集的連線能力
1. 確認針對 TCP 連接埠 5439 的 Redshift 安全性群組 (輸入規則) 允許 EMR 主要、核心和任務節點安全群組。如果 EMR 和 Redshift 叢集部署在兩個不同的 Amazon Virtual Private Clouds (Amazon VPC) 中,請設定 VPC 對等互連。
2. 使用 SSH 連線至 EMR 主節點,並執行下列 Telnet 命令。這個 Telnet 命令會驗證您是否可以在 EMR 叢集和 Redshift 叢集之間建立連線。在下列命令中,將 Redshift_Endpoint 替換為您 Redshift 叢集的正確端點。
telnet Redshift_Endpoint 5439
以下是成功連線的輸出範例:
telnet redshift-cluster-1.XXXX.us-east-1.redshift.amazonaws.com 5439
Trying 172.31.48.21...
Connected to redshift-cluster-1.XXXXX.us-east-1.redshift.amazonaws.com.
Escape character is
使用 EMR-5.x.x 系列叢集中的 Spark 連線至 Redshift 叢集
使用 Databrick 的 spark-redshift 套件 (程式庫)。此程式庫會將資料從 Amazon Redshift 載入 Spark SQL DataFrames,並同時將 DataFrames 存回 Amazon Redshift 資料表。
2. 若要使用 spark-redshift 程式庫,請將下列 .jar 檔案下載至 EMR 叢集:
wget https://repo1.maven.org/maven2/com/databricks/spark-redshift_2.11/2.0.1/spark-redshift_2.11-2.0.1.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar
3. 將下載的 JAR 檔案複製到預設的 Spark 程式庫。Spark 程式庫的路徑是 /usr/lib/spark/jars/。
sudo cp spark-redshift_2.11-2.0.1.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/
4. 使用 Amazon Redshift JDBC 驅動程式執行 spark-shell 命令,以連線到 Redshift 叢集。JDBC 驅動程式包含在 Amazon EMR 4.7.0 及更新版本中。
spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar
5. spark-shell 工作階段初始化完成後,請執行下列步驟以連線至 Redshift 叢集。在以下命令中,根據您的使用案例更新 Amazon Redshift 端點、Amazon Simple Storage Service (Amazon S3) 儲存貯體名稱和資料表詳細資料。
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider
// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey
// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"
// Create DataFrame by loading Redshift query
val df = spark.read.format("com.databricks.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://<S3-path-to-store-temp-data>").option("query", "select * from <table-name>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)
使用 Amazon EMR-6.x.x 系列叢集中的 Spark 連接至 Redshift 叢集
Amazon EMR 6.x 及更新版本使用 Scala 版本 2.12。Amazon EMR 5.x 使用 Scala 版本 2.11。Amazon EMR 5.x 版本使用的 spark-redshift_2.11-2.0.1.jar 檔案與 Amazon EMR 6.x 及更新版本不相容。因此,請使用 Amazon EMR 6.x 及更新版本叢集中的 spark-redshift_2.12-4.2.0.jar 連接器。
2. 若要使用 spark-redshift 程式庫,請將下列 .jar 檔案下載至 EMR 叢集:
wget https://repo1.maven.org/maven2/io/github/spark-redshift-community/spark-redshift_2.12/4.2.0/spark-redshift_2.12-4.2.0.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar
3. 將下載的 JAR 檔案複製到預設的 Spark 程式庫。Spark 程式庫的路徑是 /usr/lib/spark/jars/。
sudo cp spark-redshift_2.12-4.2.0.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/
4. 使用 Amazon Redshift JDBC 驅動程式執行 spark-shell 命令,以連線到 Redshift 叢集。JDBC 驅動程式包含在 EMR 4.7.0 及更新版本中。
spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar
5. spark-shell 工作階段初始化完成後,請執行下列步驟以連線至 Redshift 叢集。在以下命令中,根據您的使用案例更新 Amazon Redshift 端點、S3 儲存貯體名稱和資料表詳細資料。
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider
// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey
// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"
// Create DataFrame by loading Redshift query
val df = spark.read.format("io.github.spark_redshift_community.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://bucket/tmp/").option("query", "select * from <table>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)