亚马逊AWS官方博客

新增功能 – Amazon Redshift 与 Apache Spark 集成

Apache Spark 是一种开源的分布式处理系统,通常用于大数据工作负载。在 Amazon EMRAmazon SageMakerAWS Glue 中工作的 Spark 应用程序开发人员经常会使用第三方 Apache Spark 连接器,以方便他们使用 Amazon Redshift 读取和写入数据。这些第三方连接器缺乏定期维护、支持,或者不会使用不同版本的 Spark 进行测试,以用于生产。

今天,我们宣布适用于 Apache Spark 的 Amazon Redshift 集成已全面开放,让客户可以轻松在 Amazon Redshift 和 Redshift Serverless 上构建和运行 Spark 应用程序,从而建立适用于更广泛 AWS 分析和机器学习(ML)解决方案的数据仓库。

借助适用于 Apache Spark 的 Amazon Redshift 集成,您可以在几秒钟内开始使用 Java、Scala 和 Python 等各种语言轻松构建 Apache Spark 应用程序。

应用程序可以从您的 Amazon Redshift 数据仓库读取和写入数据,而不会影响应用程序的性能或数据的事务一致性,还可以通过下推优化提高性能。

适用于 Apache Spark 的 Amazon Redshift 集成以一个 现有的开源连接器项目 为基础,包含了多项性能和安全性增强,可帮助客户将应用程序性能提高多达 10 倍。我们感谢项目的原始贡献者,此集成的成功要归功于他们与我们的协作。我们将不断进行增强,继续为此开源项目贡献力量。

适用于 Amazon Redshift 的 Spark 连接器入门
首先,您可以访问 AWS 分析和机器学习服务,在 Spark 作业或笔记本电脑实例中使用数据框或 Spark SQL 代码连接到 Amazon Redshift 数据仓库,然后在几秒钟内即可开始运行查询。

在本次发布中,Amazon EMR 6.9、EMR Serverless 和 AWS Glue 4.0 附带了预打包的连接器和 JDBC 驱动程序,让您可以直接开始编写代码。EMR 6.9 提供了示例笔记本电脑实例,EMR Serverless 也提供了 Spark 作业示例。

首先,您需要在 Redshift 和 Spark 之间,在 Amazon Simple Storage Service(Amazon S3)和 Spark 之间,以及在 Redshift 和 Amazon S3 之间设置 AWS Identity and Access Management(AWS IAM)身份验证。下图描述了 Amazon S3、Redshift、Spark 驱动程序和 Spark 执行程序之间的身份验证。

有关更多信息,请参阅 AWS 文档中的 Amazon Redshift 中的身份和访问管理

Amazon EMR
如果您已经有 Amazon Redshift 数据仓库和可用的数据,则可以创建数据库用户并为该数据库用户提供适当级别的授权。要将此与 Amazon EMR 结合使用,您需要升级到封装了 spark-redshift connector 的最新版本 Amazon EMR 6.9。在 Amazon EC2 上创建 EMR 集群时,请选择 emr-6.9.0 版本。

您可以通过 EMR Serverless 来创建将使用 emr-6.9.0 版本来运行工作负载的 Spark 应用程序。

EMR Studio 还提供了一个示例 Jupyter Notebook,该笔记本电脑实例配置为利用示例数据连接到 Amazon Redshift Serverless 端点。您可以使用这些示例数据快速入门。

这是一个使用 Spark Dataframe 和 Spark SQL 构建应用程序的 Scalar 示例。使用基于 IAM 的凭证连接到 Redshift,并使用 IAM 角色从 S3 卸载和加载数据。

// Create the JDBC connection URL and define the Redshift context
val jdbcURL = "jdbc:redshift:iam://<RedshiftEndpoint>:<Port>/<Database>?DbUser=<RsUser>"
val rsOptions = Map (
  "url" -> jdbcURL,
  "tempdir" -> tempS3Dir,
  "aws_iam_role" -> roleARN,
  )
// Reference the sales table from Redshift 
val sales_df = spark
  .read 
  .format("io.github.spark_redshift_community.spark.redshift") 
  .options(rsOptions) 
  .option("dbtable", "sales") 
  .load() 
sales_df.createOrReplaceTempView("sales") 
// Reference the date table from Redshift using Data Frame 
sales_df.join(date_df, sales_df("dateid") === date_df("dateid"))
  .where(col("caldate") === "2008-01-05")
  .groupBy().sum("qtysold")
  .select(col("sum(qtysold)"))
  .show() 

如果 Amazon Redshift 和 Amazon EMR 位于不同的 VPC 中,则必须配置 VPC 对等连接或启用跨 VPC 访问权限。假设 Amazon Redshift 和 Amazon EMR 位于同一虚拟私有云(VPC)中,则可以创建一个 Spark 作业或笔记本电脑实例,然后连接到 Amazon Redshift 数据仓库,并编写 Spark 代码以使用 Amazon Redshift 连接器。

要了解更多信息,请参阅 AWS 文档中的 通过连接器在 Amazon Redshift 上使用 Spark

AWS Glue
使用 AWS Glue 4.0 时,spark-redshift 连接器既可以作为源也可以作为目标。在 Glue Studio 中,您只需选择将在内置 Redshift 源节点或目标节点中使用的 Redshift 连接,即可使用可视化 ETL 作业对 Redshift 数据仓库进行读取或写入。

Redshift 连接包含 Redshift 连接详细信息以及使用适当权限访问 Redshift 所需的凭证。

要开始使用,请在 Glue Studio 控制台的左侧菜单中选择 Jobs (作业)。您可以使用任何一种可视化模式,轻松添加和编辑源节点或目标节点,并定义一个数据转换范围,而无需编写任何代码。

选择 Create(创建),即可轻松地在作业图中添加和编辑源、目标节点和转换节点。然后您需要选择 Amazon Redshift 作为 Source(源)和 Target(目标)。

完成后,可以在适用于 Apache Spark 引擎的 Glue 上执行 Glue 作业,这将自动使用最新版本的 spark-redshift 连接器。

以下 Python 脚本显示了通过使用 spark-redshift 连接器的 dynamicframe 读取和写入 Redshift 的示例作业。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("================ DynamicFrame Read ===============")
url = "jdbc:redshift://<RedshiftEndpoint>:<Port>/dev"
read_options = {
    "url": url,
    "dbtable": dbtable,
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "include_column_list": "false"
}

redshift_read = glueContext.create_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options=read_options
) 

print("================ DynamicFrame Write ===============")

write_options = {
    "url": url,
    "dbtable": dbtable,
    "user": "awsuser",
    "password": "Password1",
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "DbUser": "awsuser"
}

print("================ dyf write result: check redshift table ===============")
redshift_write = glueContext.write_dynamic_frame.from_options(
    frame=redshift_read,
    connection_type="redshift",
    connection_options=write_options
)

设置作业详细信息时,只能将 Glue 4.0 – 支持 spark 3.3 Python 3 版本用于此集成。

要了解更多信息,请参阅 AWS 文档中的 使用 AWS Glue Studio 创建 ETL 作业将连接器和连接与 AWS Glue Studio 结合使用

优化性能
在适用于 Apache Spark 的 Amazon Redshift 集成中,Spark 连接器会自动应用谓词和查询下推来优化性能。对于通过此集成卸载的连接器,使用默认的 Parquet 格式可以提高性能。

如以下示例代码所示,Spark 连接器会将支持的函数转换为 SQL 查询,并在 Amazon Redshift 中运行查询。

import sqlContext.implicits._val
sample= sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url",jdbcURL )
.option("tempdir", tempS3Dir)
.option("unload_s3_format", "PARQUET")
.option("dbtable", "event")
.load()

// Create temporary views for data frames created earlier so they can be accessed via Spark SQL
sales_df.createOrReplaceTempView("sales")
date_df.createOrReplaceTempView("date")
// Show the total sales on a given date using Spark SQL API
spark.sql(
"""SELECT sum(qtysold)
| FROM sales, date
| WHERE sales.dateid = date.dateid
| AND caldate = '2008-01-05'""".stripMargin).show()

适用于 Apache Spark 的 Amazon Redshift 集成增加了排序、聚合、限制、连接和标量函数等操作的下推功能,因此仅需将相关数据从 Redshift 数据仓库转移到需要使用数据的 Spark 应用程序,从而提高性能。

现已开放
适用于 Apache Spark 的 Amazon Redshift 集成现已在所有支持 Amazon EMR 6.9、AWS Glue 4.0 和 Amazon Redshift 的区域开放。对于新的 Spark 3.3.0 版本,您可以直接从 EMR 6.9 和 Glue Studio 4.0 使用此功能。

欢迎试用并通过 AWS re:Post for Amazon Redshift 或通过您通常的 AWS Support 联系方式发送反馈。

Channy