Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production
This is a guest post by Rafi Ton, founder and CEO of NUVIAD. NUVIAD is, in their own words, “a mobile marketing platform providing professional marketers, agencies and local businesses state of the art tools to promote their products and services through hyper targeting, big data analytics and advanced machine learning tools.”
At NUVIAD, we’ve been using Amazon Redshift as our main data warehouse solution for more than 3 years.
We store massive amounts of ad transaction data that our users and partners analyze to determine ad campaign strategies. When running real-time bidding (RTB) campaigns in large scale, data freshness is critical so that our users can respond rapidly to changes in campaign performance. We chose Amazon Redshift because of its simplicity, scalability, performance, and ability to load new data in near real time.
Over the past three years, our customer base grew significantly and so did our data. We saw our Amazon Redshift cluster grow from three nodes to 65 nodes. To balance cost and analytics performance, we looked for a way to store large amounts of less-frequently analyzed data at a lower cost. Yet, we still wanted to have the data immediately available for user queries and to meet their expectations for fast performance. We turned to Amazon Redshift Spectrum.
In this post, I explain the reasons why we extended Amazon Redshift with Redshift Spectrum as our modern data warehouse. I cover how our data growth and the need to balance cost and performance led us to adopt Redshift Spectrum. I also share key performance metrics in our environment, and discuss the additional AWS services that provide a scalable and fast environment, with data available for immediate querying by our growing user base.
Amazon Redshift as our foundation
The ability to provide fresh, up-to-the-minute data to our customers and partners was always a main goal with our platform. We saw other solutions provide data that was a few hours old, but this was not good enough for us. We insisted on providing the freshest data possible. For us, that meant loading Amazon Redshift in frequent micro batches and allowing our customers to query Amazon Redshift directly to get results in near real time.
The benefits were immediately evident. Our customers could see how their campaigns performed faster than with other solutions, and react sooner to the ever-changing media supply pricing and availability. They were very happy.
However, this approach required Amazon Redshift to store a lot of data for long periods, and our data grew substantially. In our peak, we maintained a cluster running 65 DC1.large nodes. The impact on our Amazon Redshift cluster was evident, and we saw our CPU utilization grow to 90%.
Why we extended Amazon Redshift to Redshift Spectrum
Redshift Spectrum gives us the ability to run SQL queries using the powerful Amazon Redshift query engine against data stored in Amazon S3, without needing to load the data. With Redshift Spectrum, we store data where we want, at the cost that we want. We have the data available for analytics when our users need it with the performance they expect.
Seamless scalability, high performance, and unlimited concurrency
Scaling Redshift Spectrum is a simple process. First, it allows us to leverage Amazon S3 as the storage engine and get practically unlimited data capacity.
Second, if we need more compute power, we can leverage Redshift Spectrum’s distributed compute engine over thousands of nodes to provide superior performance – perfect for complex queries running against massive amounts of data.
Third, all Redshift Spectrum clusters access the same data catalog so that we don’t have to worry about data migration at all, making scaling effortless and seamless.
Lastly, since Redshift Spectrum distributes queries across potentially thousands of nodes, they are not affected by other queries, providing much more stable performance and unlimited concurrency.
Keeping it SQL
Redshift Spectrum uses the same query engine as Amazon Redshift. This means that we did not need to change our BI tools or query syntax, whether we used complex queries across a single table or joins across multiple tables.
An interesting capability introduced recently is the ability to create a view that spans both Amazon Redshift and Redshift Spectrum external tables. With this feature, you can query frequently accessed data in your Amazon Redshift cluster and less-frequently accessed data in Amazon S3, using a single view.
Leveraging Parquet for higher performance
Parquet is a columnar data format that provides superior performance and allows Redshift Spectrum (or Amazon Athena) to scan significantly less data. With less I/O, queries run faster and we pay less per query. You can read all about Parquet at https://parquet.apache.org/ or https://en.wikipedia.org/wiki/Apache_Parquet.
From a cost perspective, we pay standard rates for our data in Amazon S3, and only small amounts per query to analyze data with Redshift Spectrum. Using the Parquet format, we can significantly reduce the amount of data scanned. Our costs are now lower, and our users get fast results even for large complex queries.
What we learned about Amazon Redshift vs. Redshift Spectrum performance
When we first started looking at Redshift Spectrum, we wanted to put it to the test. We wanted to know how it would compare to Amazon Redshift, so we looked at two key questions:
- What is the performance difference between Amazon Redshift and Redshift Spectrum on simple and complex queries?
- Does the data format impact performance?
During the migration phase, we had our dataset stored in Amazon Redshift and S3 as CSV/GZIP and as Parquet file formats. We tested three configurations:
- Amazon Redshift cluster with 28 DC1.large nodes
- Redshift Spectrum using CSV/GZIP
- Redshift Spectrum using Parquet
We performed benchmarks for simple and complex queries on one month’s worth of data. We tested how much time it took to perform the query, and how consistent the results were when running the same query multiple times. The data we used for the tests was already partitioned by date and hour. Properly partitioning the data improves performance significantly and reduces query times.
First, we tested a simple query aggregating billing data across a month:
We ran the same query seven times and measured the response times (red marking the longest time and green the shortest time):
|Execution Time (seconds)|
|Amazon Redshift||Redshift Spectrum
|Redshift Spectrum Parquet|
For simple queries, Amazon Redshift performed better than Redshift Spectrum, as we thought, because the data is local to Amazon Redshift.
What was surprising was that using Parquet data format in Redshift Spectrum significantly beat ‘traditional’ Amazon Redshift performance. For our queries, using Parquet data format with Redshift Spectrum delivered an average 40% performance gain over traditional Amazon Redshift. Furthermore, Redshift Spectrum showed high consistency in execution time with a smaller difference between the slowest run and the fastest run.
Comparing the amount of data scanned when using CSV/GZIP and Parquet, the difference was also significant:
|Data Scanned (GB)|
Because we pay only for the data scanned by Redshift Spectrum, the cost saving of using Parquet is evident and substantial.
Next, we compared the same three configurations with a complex query.
|Execution Time (seconds)|
|Amazon Redshift||Redshift Spectrum CSV||Redshift Spectrum Parquet|
This time, Redshift Spectrum using Parquet cut the average query time by 80% compared to traditional Amazon Redshift!
Bottom line: For complex queries, Redshift Spectrum provided a 67% performance gain over Amazon Redshift. Using the Parquet data format, Redshift Spectrum delivered an 80% performance improvement over Amazon Redshift. For us, this was substantial.
Optimizing the data structure for different workloads
Because the cost of S3 is relatively inexpensive and we pay only for the data scanned by each query, we believe that it makes sense to keep our data in different formats for different workloads and different analytics engines. It is important to note that we can have any number of tables pointing to the same data on S3. It all depends on how we partition the data and update the table partitions.
For example, we have a process that runs every minute and generates statistics for the last minute of data collected. With Amazon Redshift, this would be done by running the query on the table with something as follows:
(Assuming ‘ts’ is your column storing the time stamp for each event.)
With Redshift Spectrum, we pay for the data scanned in each query. If the data is partitioned by the minute instead of the hour, a query looking at one minute would be 1/60th the cost. If we use a temporary table that points only to the data of the last minute, we save that unnecessary cost.
Creating Parquet data efficiently
On the average, we have 800 instances that process our traffic. Each instance sends events that are eventually loaded into Amazon Redshift. When we started three years ago, we would offload data from each server to S3 and then perform a periodic copy command from S3 to Amazon Redshift.
Recently, Amazon Kinesis Firehose added the capability to offload data directly to Amazon Redshift. While this is now a viable option, we kept the same collection process that worked flawlessly and efficiently for three years.
This changed, however, when we incorporated Redshift Spectrum. With Redshift Spectrum, we needed to find a way to:
- Collect the event data from the instances.
- Save the data in Parquet format.
- Partition the data effectively.
To accomplish this, we save the data as CSV and then transform it to Parquet. The most effective method to generate the Parquet files is to:
- Send the data in one-minute intervals from the instances to Kinesis Firehose with an S3 temporary bucket as the destination.
- Aggregate hourly data and convert it to Parquet using AWS Lambda and AWS Glue.
- Add the Parquet data to S3 by updating the table partitions.
With this new process, we had to give more attention to validating the data before we sent it to Kinesis Firehose, because a single corrupted record in a partition fails queries on that partition.
To store our click data in a table, we considered the following SQL create table command:
The above statement defines a new external table (all Redshift Spectrum tables are external tables) with a few attributes. We stored ‘ts’ as a Unix time stamp and not as Timestamp, and billing data is stored as float and not decimal (more on that later). We also said that the data is partitioned by date and hour, and then stored as Parquet on S3.
First, we need to get the table definitions. This can be achieved by running the following query:
This query lists all the columns in the table with their respective definitions:
Now we can use this data to create a validation schema for our data:
Next, we create a function that uses this schema to validate data:
Near real-time data loading with Kinesis Firehose
On Kinesis Firehose, we created a new delivery stream to handle the events as follows:
This delivery stream aggregates event data every minute, or up to 100 MB, and writes the data to an S3 bucket as a CSV/GZIP compressed file. Next, after we have the data validated, we can safely send it to our Kinesis Firehose API:
Now, we have a single CSV file representing one minute of event data stored in S3. The files are named automatically by Kinesis Firehose by adding a UTC time prefix in the format YYYY/MM/DD/HH before writing objects to S3. Because we use the date and hour as partitions, we need to change the file naming and location to fit our Redshift Spectrum schema.
Automating data distribution using AWS Lambda
We created a simple Lambda function triggered by an S3 put event that copies the file to a different location (or locations), while renaming it to fit our data structure and processing flow. As mentioned before, the files generated by Kinesis Firehose are structured in a pre-defined hierarchy, such as:
All we need to do is parse the object name and restructure it as we see fit. In our case, we did the following (the event is an object received in the Lambda function with all the data about the object written to S3):
Now, we can redistribute the file to the two destinations we need—one for the minute processing task and the other for hourly aggregation:
Kinesis Firehose stores the data in a temporary folder. We copy the object to another folder that holds the data for the last processed minute. This folder is connected to a small Redshift Spectrum table where the data is being processed without needing to scan a much larger dataset. We also copy the data to a folder that holds the data for the entire hour, to be later aggregated and converted to Parquet.
Because we partition the data by date and hour, we created a new partition on the Redshift Spectrum table if the processed minute is the first minute in the hour (that is, minute 0). We ran the following:
After the data is processed and added to the table, we delete the processed data from the temporary Kinesis Firehose storage and from the minute storage folder.
Migrating CSV to Parquet using AWS Glue and Amazon EMR
The simplest way we found to run an hourly job converting our CSV data to Parquet is using Lambda and AWS Glue (and thanks to the awesome AWS Big Data team for their help with this).
Creating AWS Glue jobs
What this simple AWS Glue script does:
- Gets parameters for the job, date, and hour to be processed
- Creates a Spark EMR context allowing us to run Spark code
- Reads CSV data into a DataFrame
- Writes the data as Parquet to the destination S3 bucket
- Adds or modifies the Redshift Spectrum / Amazon Athena table partition for the table
Note: Because Redshift Spectrum and Athena both use the AWS Glue Data Catalog, we could use the Athena client to add the partition to the table.
Here are a few words about float, decimal, and double. Using decimal proved to be more challenging than we expected, as it seems that Redshift Spectrum and Spark use them differently. Whenever we used decimal in Redshift Spectrum and in Spark, we kept getting errors, such as:
S3 Query Exception (Fetch). Task failed due to an internal error. File 'https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet has an incompatible Parquet schema for column 's3://nuviad-events/events.lat'. Column type: DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq
We had to experiment with a few floating-point formats until we found that the only combination that worked was to define the column as double in the Spark code and float in Spectrum. This is the reason you see billing defined as float in Spectrum and double in the Spark code.
Creating a Lambda function to trigger conversion
Next, we created a simple Lambda function to trigger the AWS Glue script hourly using a simple Python code:
Using Amazon CloudWatch Events, we trigger this function hourly. This function triggers an AWS Glue job named ‘convertEventsParquetHourly’ and runs it for the previous hour, passing job names and values of the partitions to process to AWS Glue.
Redshift Spectrum and Node.js
Our development stack is based on Node.js, which is well-suited for high-speed, light servers that need to process a huge number of transactions. However, a few limitations of the Node.js environment required us to create workarounds and use other tools to complete the process.
Node.js and Parquet
The lack of Parquet modules for Node.js required us to implement an AWS Glue/Amazon EMR process to effectively migrate data from CSV to Parquet. We would rather save directly to Parquet, but we couldn’t find an effective way to do it.
One interesting project in the works is the development of a Parquet NPM by Marc Vertes called node-parquet (https://www.npmjs.com/package/node-parquet). It is not in a production state yet, but we think it would be well worth following the progress of this package.
Timestamp data type
The result is that you cannot store Timestamp correctly in Parquet using Node.js. The solution is to store Timestamp as string and cast the type to Timestamp in the query. Using this method, we did not witness any performance degradation whatsoever.
You can benefit from our trial-and-error experience.
Lesson #1: Data validation is critical
As mentioned earlier, a single corrupt entry in a partition can fail queries running against this partition, especially when using Parquet, which is harder to edit than a simple CSV file. Make sure that you validate your data before scanning it with Redshift Spectrum.
Lesson #2: Structure and partition data effectively
One of the biggest benefits of using Redshift Spectrum (or Athena for that matter) is that you don’t need to keep nodes up and running all the time. You pay only for the queries you perform and only for the data scanned per query.
Keeping different permutations of your data for different queries makes a lot of sense in this case. For example, you can partition your data by date and hour to run time-based queries, and also have another set partitioned by user_id and date to run user-based queries. This results in faster and more efficient performance of your data warehouse.
Storing data in the right format
Use Parquet whenever you can. The benefits of Parquet are substantial. Faster performance, less data to scan, and much more efficient columnar format. However, it is not supported out-of-the-box by Kinesis Firehose, so you need to implement your own ETL. AWS Glue is a great option.
Creating small tables for frequent tasks
When we started using Redshift Spectrum, we saw our Amazon Redshift costs jump by hundreds of dollars per day. Then we realized that we were unnecessarily scanning a full day’s worth of data every minute. Take advantage of the ability to define multiple tables on the same S3 bucket or folder, and create temporary and small tables for frequent queries.
Lesson #3: Combine Athena and Redshift Spectrum for optimal performance
Moving to Redshift Spectrum also allowed us to take advantage of Athena as both use the AWS Glue Data Catalog. Run fast and simple queries using Athena while taking advantage of the advanced Amazon Redshift query engine for complex queries using Redshift Spectrum.
Redshift Spectrum excels when running complex queries. It can push many compute-intensive tasks, such as predicate filtering and aggregation, down to the Redshift Spectrum layer, so that queries use much less of your cluster’s processing capacity.
Lesson #4: Sort your Parquet data within the partition
We achieved another performance improvement by sorting data within the partition using sortWithinPartitions(sort_field). For example:
We were extremely pleased with using Amazon Redshift as our core data warehouse for over three years. But as our client base and volume of data grew substantially, we extended Amazon Redshift to take advantage of scalability, performance, and cost with Redshift Spectrum.
Redshift Spectrum lets us scale to virtually unlimited storage, scale compute transparently, and deliver super-fast results for our users. With Redshift Spectrum, we store data where we want at the cost we want, and have the data available for analytics when our users need it with the performance they expect.
About the Author
With 7 years of experience in the AdTech industry and 15 years in leading technology companies, Rafi Ton is the founder and CEO of NUVIAD. He enjoys exploring new technologies and putting them to use in cutting edge products and services, in the real world generating real money. Being an experienced entrepreneur, Rafi believes in practical-programming and fast adaptation of new technologies to achieve a significant market advantage.