如何在 EMR 中使用 Spark 连接到 Redshift 集群?

3 分钟阅读
0

我想要在 Amazon EMR 集群中使用 Apache Spark 连接到 Amazon Redshift 集群。

解决方法

**注意:**配置 Redshift 集群和 EMR 集群,并安装 Spark 服务,然后再继续执行后续步骤。

测试 EMR 集群到 Redshift 集群的连接。

1.    验证 TCP 端口 5439 的 Redshift 安全组(入站规则)中是否允许 EMR 主节点、核心节点和任务节点安全组。如果在两个不同的 Amazon Virtual Private Cloud(Amazon VPC)中部署 EMR 和 Redshift 集群,请配置 VPC 对等连接。

2.    使用 SSH 连接到 EMR 主节点,然后运行以下 Telnet 命令。此 Telnet 命令可以验证您是否可在 EMR 集群与 Redshift 集群之间建立连接。在以下命令中,将 Redshift_Endpoint 替换为 Redshift 集群的正确端点。

telnet Redshift_Endpoint 5439

以下是成功连接的示例输出:

telnet redshift-cluster-1.XXXX.us-east-1.redshift.amazonaws.com 5439
Trying 172.31.48.21...
Connected to redshift-cluster-1.XXXXX.us-east-1.redshift.amazonaws.com.
Escape character is

在 EMR-5.x.x 系列集群中使用 Spark 连接到 Redshift 集群

使用 Databrick 的 spark-redshift 包(库)。该库会将数据从 Amazon Redshift 加载到 Spark SQL DataFrames,并且还会将 DataFrames 保存回 Amazon Redshift 表。

1.    使用 SSH 连接到 EMR 主节点

2.    要使用 spark-redshift 库,请将以下 .jar 文件下载到 EMR 集群:

wget https://repo1.maven.org/maven2/com/databricks/spark-redshift_2.11/2.0.1/spark-redshift_2.11-2.0.1.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar

3.    将下载的 JAR 文件复制到默认 Spark 库。Spark 库的路径为 /usr/lib/spark/jars/

sudo cp spark-redshift_2.11-2.0.1.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/

4.    使用 Amazon Redshift JDBC 驱动程序运行 spark-shell 命令,以连接到 Redshift 集群。Amazon EMR 版本 4.7.0 及更高版本中包含 JDBC 驱动程序。

spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar

5.    初始化 spark-shell 会话之后,运行以下步骤以连接到 Redshift 集群。在以下命令中,根据您的使用案例更新 Amazon Redshift 端点、Amazon Simple Storage Service(Amazon S3)存储桶名称和表详细信息。

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"

// Create DataFrame by loading Redshift query
val df = spark.read.format("com.databricks.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://<S3-path-to-store-temp-data>").option("query", "select * from <table-name>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)

在 Amazon EMR-6.x.x 系列集群中使用 Spark 连接到 Redshift 集群

Amazon EMR 版本 6.x 及更高版本使用 Scala 版本 2.12。Amazon EMR 5.x 使用 Scala 版本 2.11。Amazon EMR 5.x 使用的 spark-redshift_2.11-2.0.1.jar 文件与 Amazon EMR 版本 6.x 及更高版本不兼容。因此,在 Amazon EMR 6.x 及更高版本的集群中,请使用 spark-redshift_2.12-4.2.0.jar 连接器

1.    使用 SSH 连接到 EMR 主节点

2.    要使用 spark-redshift 库,请将以下 .jar 文件下载到 EMR 集群:

wget https://repo1.maven.org/maven2/io/github/spark-redshift-community/spark-redshift_2.12/4.2.0/spark-redshift_2.12-4.2.0.jar
wget https://github.com/ralfstx/minimal-json/releases/download/0.9.4/minimal-json-0.9.4.jar

3.    将下载的 JAR 文件复制到默认 Spark 库。Spark 库的路径为 /usr/lib/spark/jars/

sudo cp spark-redshift_2.12-4.2.0.jar /usr/lib/spark/jars/
sudo cp minimal-json-0.9.4.jar /usr/lib/spark/jars/

4.    使用 Amazon Redshift JDBC 驱动程序运行 spark-shell 命令,以连接到 Redshift 集群。EMR 版本 4.7.0 及更高版本中包含 JDBC 驱动程序。

spark-shell --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar

5.    初始化 spark-shell 会话之后,运行以下步骤以连接到 Redshift 集群。在以下命令中,根据您的使用案例更新 Amazon Redshift 端点、S3 存储桶名称和表详细信息。

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

// Set JDBC URL of Redshift
val jdbcUrl = "jdbc:redshift://<cluster-name>.<id>.<region>.redshift.amazonaws.com:5439/<database>?user=<user>&password=<password>"

// Create DataFrame by loading Redshift query
val df = spark.read.format("io.github.spark_redshift_community.spark.redshift").option("url", jdbcUrl).option("tempdir", "s3://bucket/tmp/").option("query", "select * from <table>").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
df.show(2)

AWS 官方
AWS 官方已更新 1 年前