AWS News Blog

New – Amazon Redshift Integration with Apache Spark

Voiced by Polly

Apache Spark is an open-source, distributed processing system commonly used for big data workloads. Spark application developers working in Amazon EMR, Amazon SageMaker, and AWS Glue often use third-party Apache Spark connectors that allow them to read and write the data with Amazon Redshift. These third-party connectors are not regularly maintained, supported, or tested with various versions of Spark for production.

Today we are announcing the general availability of Amazon Redshift integration for Apache Spark, which makes it easy to build and run Spark applications on Amazon Redshift and Redshift Serverless, enabling customers to open up the data warehouse for a broader set of AWS analytics and machine learning (ML) solutions.

With Amazon Redshift integration for Apache Spark, you can get started in seconds and effortlessly build Apache Spark applications in a variety of languages, such as Java, Scala, and Python.

Your applications can read from and write to your Amazon Redshift data warehouse without compromising on the performance of the applications or transactional consistency of the data, as well as performance improvements with pushdown optimizations.

Amazon Redshift integration for Apache Spark builds on an existing open source connector project and enhances it for performance and security, helping customers gain up to 10x faster application performance. We thank the original contributors on the project who collaborated with us to make this happen. As we make further enhancements we will continue to contribute back into the open source project.

Getting Started with Spark Connector for Amazon Redshift
To get started, you can go to AWS analytics and ML services, use data frame or Spark SQL code in a Spark job or Notebook to connect to the Amazon Redshift data warehouse, and start running queries in seconds.

In this launch, Amazon EMR 6.9, EMR Serverless, and AWS Glue 4.0 come with the pre-packaged connector and JDBC driver, and you can just start writing code. EMR 6.9 provides a sample notebook, and EMR Serverless provides a sample Spark Job too.

First, you should set AWS Identity and Access Management (AWS IAM) authentication between Redshift and Spark, between Amazon Simple Storage Service (Amazon S3) and Spark, and between Redshift and Amazon S3. The following diagram describes the authentication between Amazon S3, Redshift, the Spark driver, and Spark executors.

For more information, see Identity and access management in Amazon Redshift in the AWS documentation.

Amazon EMR
If you already have an Amazon Redshift data warehouse and the data available, you can create the database user and provide the right level of grants to the database user. To use this with Amazon EMR, you need to upgrade to the latest version of the Amazon EMR 6.9 that has the packaged spark-redshift connector. Select the emr-6.9.0 release when you create an EMR cluster on Amazon EC2.

You can use EMR Serverless to create your Spark application using the emr-6.9.0 release to run your workload.

EMR Studio also provides an example Jupyter Notebook configured to connect to an Amazon Redshift Serverless endpoint leveraging sample data that you can use to get started quickly.

Here is a Scala example to build your applications both with Spark Dataframe and Spark SQL. Use IAM-based credentials for connecting to Redshift and use IAM role for unloading and loading data from S3.

// Create the JDBC connection URL and define the Redshift context
val jdbcURL = "jdbc:redshift:iam://<RedshiftEndpoint>:<Port>/<Database>?DbUser=<RsUser>"
val rsOptions = Map (
  "url" -> jdbcURL,
  "tempdir" -> tempS3Dir, 
  "aws_iam_role" -> roleARN,
  )
// Reference the sales table from Redshift 
val sales_df = spark
  .read 
  .format("io.github.spark_redshift_community.spark.redshift") 
  .options(rsOptions) 
  .option("dbtable", "sales") 
  .load() 
sales_df.createOrReplaceTempView("sales") 
// Reference the date table from Redshift using Data Frame 
sales_df.join(date_df, sales_df("dateid") === date_df("dateid"))
  .where(col("caldate") === "2008-01-05")
  .groupBy().sum("qtysold")
  .select(col("sum(qtysold)"))
  .show() 

If Amazon Redshift and Amazon EMR are in different VPCs, you have to configure VPC peering or enable cross-VPC access. Assuming both Amazon Redshift and Amazon EMR are in the same virtual private cloud (VPC), you can create a Spark job or Notebook and connect to the Amazon Redshift data warehouse and write Spark code to use the Amazon Redshift connector.

To learn more, see Use Spark on Amazon Redshift with a connector in the AWS documentation.

AWS Glue
When you use AWS Glue 4.0, the spark-redshift connector is available both as a source and target. In Glue Studio, you can use a visual ETL job to read or write to a Redshift data warehouse simply by selecting a Redshift connection to use within a built-in Redshift source or target node.

The Redshift connection contains Redshift connection details along with the credentials needed to access Redshift with the proper permissions.

To get started, choose Jobs in the left menu of the Glue Studio console. Using either of the Visual modes, you can easily add and edit a source or target node and define a range of transformations on the data without writing any code.

Choose Create and you can easily add and edit a source, target node, and the transform node in the job diagram. At this time, you will choose Amazon Redshift as Source and Target.

Once completed, the Glue job can be executed on Glue for the Apache Spark engine, which will automatically use the latest spark-redshift connector.

The following Python script shows an example job to read and write to Redshift with dynamicframe using the spark-redshift connector.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("================ DynamicFrame Read ===============")
url = "jdbc:redshift://<RedshiftEndpoint>:<Port>/dev"
read_options = {
    "url": url,
    "dbtable": dbtable,
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "include_column_list": "false"
}

redshift_read = glueContext.create_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options=read_options
) 

print("================ DynamicFrame Write ===============")

write_options = {
    "url": url,
    "dbtable": dbtable,
    "user": user,
    "password": password,
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "DbUser": user
}

print("================ dyf write result: check redshift table ===============")
redshift_write = glueContext.write_dynamic_frame.from_options(
    frame=redshift_read,
    connection_type="redshift",
    connection_options=write_options
)

When you set up your job detail, you can only use the Glue 4.0 – Supports spark 3.3 Python 3 version for this integration.

To learn more, see Creating ETL jobs with AWS Glue Studio and Using connectors and connections with AWS Glue Studio in the AWS documentation.

Gaining the Best Performance
In the Amazon Redshift integration for Apache Spark, the Spark connector automatically applies predicate and query pushdown to optimize for performance. You can gain performance improvement by using the default Parquet format for the connector used for unloading with this integration.

As the following sample code shows, the Spark connector will turn the supported function into a SQL query and run the query in Amazon Redshift.

import sqlContext.implicits._val
sample= sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url",jdbcURL )
.option("tempdir", tempS3Dir)
.option("unload_s3_format", "PARQUET")
.option("dbtable", "event")
.load()

// Create temporary views for data frames created earlier so they can be accessed via Spark SQL
sales_df.createOrReplaceTempView("sales")
date_df.createOrReplaceTempView("date")
// Show the total sales on a given date using Spark SQL API
spark.sql(
"""SELECT sum(qtysold)
| FROM sales, date
| WHERE sales.dateid = date.dateid
| AND caldate = '2008-01-05'""".stripMargin).show()

Amazon Redshift integration for Apache Spark adds pushdown capabilities for operations such as sort, aggregate, limit, join, and scalar functions so that only the relevant data is moved from the Redshift data warehouse to the consuming Spark application, thereby improving performance.

Available Now
The Amazon Redshift integration for Apache Spark is now available in all Regions that support Amazon EMR 6.9, AWS Glue 4.0, and Amazon Redshift. You can start using the feature directly from EMR 6.9 and Glue Studio 4.0 with the new Spark 3.3.0 version.

Give it a try, and please send us feedback either in the AWS re:Post for Amazon Redshift or through your usual AWS support contacts.

Channy

Channy Yun

Channy Yun

Channy Yun is a Principal Developer Advocate for AWS, and passionate about helping developers to build modern applications on latest AWS services. A pragmatic developer and blogger at heart, he loves community-driven learning and sharing of technology, which has funneled developers to global AWS Usergroups. His main topics are open-source, container, storage, network & security, and IoT. Follow him on Twitter at @channyun.