AWS Big Data Blog
Best practices to implement near-real-time analytics using Amazon Redshift Streaming Ingestion with Amazon MSK
Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, straightforward, and secure analytics at scale. Tens of thousands of customers rely on Amazon Redshift to analyze exabytes of data and run complex analytical queries, making it the most widely used cloud data warehouse. You can run and scale analytics in seconds on all your data, without having to manage your data warehouse infrastructure.
You can use the Amazon Redshift Streaming Ingestion capability to update your analytics databases in near-real time. Amazon Redshift streaming ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use Structured Query Language (SQL) to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams, and pull data directly to Amazon Redshift.
In this post, we discuss the best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK.
Overview of solution
We walk through an example pipeline to ingest data from an MSK topic into Amazon Redshift using Amazon Redshift streaming ingestion. We also show how to unnest JSON data using dot notation in Amazon Redshift. The following diagram illustrates our solution architecture.
The process flow consists of the following steps:
- Create a streaming materialized view in your Redshift cluster to consume live streaming data from the MSK topics.
- Use a stored procedure to implement change data capture (CDC) using the unique combination of Kafka Partition and Kafka Offset at the record level for the ingested MSK topic.
- Create a user-facing table in the Redshift cluster and use dot notation to unnest the JSON document from the streaming materialized view into data columns of the table. You can continuously load fresh data by calling the stored procedure at regular intervals.
- Establish connectivity between an Amazon QuickSight dashboard and Amazon Redshift to deliver visualization and insights.
As part of this post, we also discuss the following topics:
- Steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift
- Best practices to achieve optimized performance from streaming materialized views
- Monitoring techniques to track failures in Amazon Redshift streaming ingestion
Prerequisites
You must have the following:
- An AWS account.
- One of the following resources, depending on your use case:
- A Redshift cluster if you are using Amazon Redshift Provisioned. For instructions, refer to Create a sample Amazon Redshift cluster.
- A Redshift workgroup if you are using Amazon Redshift Serverless. For instructions, refer to Create a workgroup with a namespace.
- An MSK cluster. For instructions, refer to Create an Amazon MSK cluster.
- A topic in your MSK cluster where your data producer can publish data.
- A data producer to write data to the topic in your MSK cluster.
Considerations while setting up your MSK topic
Keep in mind the following considerations when configuring your MSK topic:
- Make sure that the name of your MSK topic is no longer than 128 characters.
- As of this writing, MSK records containing compressed data can’t be directly queried in Amazon Redshift. Amazon Redshift doesn’t support any native decompression methods for client-side compressed data in an MSK topic.
- Follow best practices while setting up your MSK cluster.
- Review the streaming ingestion limitations for any other considerations.
Set up streaming ingestion
To set up streaming ingestion, complete the following steps:
- Set up the AWS Identity and Access Management (IAM) role and trust policy required for streaming ingestion. For instructions, refer to the Setting up IAM and performing streaming ingestion from Kafka.
- Make sure that data is flowing into your MSK topic using Amazon CloudWatch metrics (for example, BytesOutPerSec).
- Launch the query editor v2 from the Amazon Redshift console or use your preferred SQL client to connect to your Redshift cluster for the next steps. The following steps were run in query editor v2.
- Create an external schema to map to the MSK cluster. Replace your IAM role ARN and the MSK cluster ARN in the following statement:
- Optionally, if your topic names are case sensitive, you need to enable
enable_case_sensitive_identifier
to be able to access them in Amazon Redshift. To use case-sensitive identifiers, setenable_case_sensitive_identifier
to true at either the session, user, or cluster level: - Create a materialized view to consume the stream data from the MSK topic:
The metadata column kafka_value
that arrives from Amazon MSK is stored in VARBYTE format in Amazon Redshift. For this post, you use the JSON_PARSE function to convert kafka_value
to a SUPER data type. You also use the CAN_JSON_PARSE function in the filter condition to skip invalid JSON records and guard against errors due to JSON parsing failures. We discuss how to store the invalid data for future debugging later in this post.
- Refresh the streaming materialized view, which triggers Amazon Redshift to read from the MSK topic and load data into the materialized view:
You can also set your streaming materialized view to use auto refresh capabilities. This will automatically refresh your materialized view as data arrives in the stream. See CREATE MATERIALIZED VIEW for instructions to create a materialized view with auto refresh.
Unnest the JSON document
The following is a sample of a JSON document that was ingested from the MSK topic to the Data column of SUPER type in the streaming materialized view Orders_Stream_MV
:
Use dot notation as shown in the following code to unnest your JSON payload:
The following screenshot shows what the result looks like after unnesting.
If you have arrays in your JSON document, consider unnesting your data using PartiQL statements in Amazon Redshift. For more information, refer to the section Unnest the JSON document in the post Near-real-time analytics using Amazon Redshift streaming ingestion with Amazon Kinesis Data Streams and Amazon DynamoDB.
Incremental data load strategy
Complete the following steps to implement an incremental data load:
- Create a table called Orders in Amazon Redshift, which end-users will use for visualization and business analysis:
Next, you create a stored procedure called SP_Orders_Load
to implement CDC from a streaming materialized view and load into the final Orders
table. You use the combination of Kafka_Partition
and Kafka_Offset
available in the streaming materialized view as system columns to implement CDC. The combination of these two columns will always be unique within an MSK topic, which makes sure that none of the records are missed during the process. The stored procedure contains the following components:
- To use case-sensitive identifiers, set
enable_case_sensitive_identifier
to true at either the session, user, or cluster level. - Refresh the streaming materialized view manually if auto refresh is not enabled.
- Create an audit table called
Orders_Streaming_Audit
if it doesn’t exist to keep track of the last offset for a partition that was loaded into Orders table during the last run of the stored procedure. - Unnest and insert only new or changed data into a staging table called
Orders_Staging_Table
, reading from the streaming materialized viewOrders_Stream_MV
, whereKafka_Offset
is greater than the last processedKafka_Offset
recorded in the audit tableOrders_Streaming_Audit
for theKafka_Partition
being processed. - When loading for the first time using this stored procedure, there will be no data in the
Orders_Streaming_Audit
table and all the data fromOrders_Stream_MV
will get loaded into the Orders table. - Insert only business-relevant columns to the user-facing
Orders
table, selecting from the staging tableOrders_Staging_Table
. - Insert the max
Kafka_Offset
for every loadedKafka_Partition
into the audit tableOrders_Streaming_Audit
We have added the intermediate staging table Orders_Staging_Table
in this solution to help with the debugging in case of unexpected failures and trackability. Skipping the staging step and directly loading into the final table from Orders_Stream_MV
can provide lower latency depending on your use case.
- Create the stored procedure with the following code:
- Run the stored procedure to load data into the
Orders
table: - Validate data in the Orders table.
Establish cross-account streaming ingestion
If your MSK cluster belongs to a different account, complete the following steps to create IAM roles to set up cross-account streaming ingestion. Let’s assume the Redshift cluster is in account A and the MSK cluster is in account B, as shown in the following diagram.
Complete the following steps:
- In account B, create an IAM role called
MyRedshiftMSKRole
that allows Amazon Redshift (account A) to communicate with the MSK cluster (account B) namedMyTestCluster
. Depending on whether your MSK cluster uses IAM authentication or unauthenticated access to connect, you need to create an IAM role with one of the following policies:- An IAM
policAmazonAmazon
MSK using unauthenticated access: - An IAM policy for Amazon MSK when using IAM authentication:
- An IAM
The resource section in the preceding example gives access to all topics in the MyTestCluster
MSK cluster. If you need to restrict the IAM role to specific topics, you need to replace the topic resource with a more restrictive resource policy.
- After you create the IAM role in account B, take note of the IAM role ARN (for example,
arn:aws:iam::0123456789:role/MyRedshiftMSKRole
). - In account A, create a Redshift customizable IAM role called
MyRedshiftRole
, that Amazon Redshift will assume when connecting to Amazon MSK. The role should have a policy like the following, which allows the Amazon Redshift IAM Role in account A to assume the Amazon MSK role in account B: - Take note of the role ARN for the Amazon Redshift IAM role (for example,
arn:aws:iam::9876543210:role/MyRedshiftRole
). - Go back to account B and add this role in the trust policy of the IAM role
arn:aws:iam::0123456789:role/MyRedshiftMSKRole
to allow account B to trust the IAM role from account A. The trust policy should look like the following code: - Sign in to the Amazon Redshift console as account A.
- Launch the query editor v2 or your preferred SQL client and run the following statements to access the MSK topic in account B. To map to the MSK cluster, create an external schema using role chaining by specifying IAM role ARNs, separated by a comma without any spaces around it. The role attached to the Redshift cluster comes first in the chain.
Performance considerations
Keep in mind the following performance considerations:
- Keep the streaming materialized view simple and move transformations like unnesting, aggregation, and case expressions to a later step—for example, by creating another materialized view on top of the streaming materialized view.
- Consider creating only one streaming materialized view in a single Redshift cluster or workgroup for a given MSK topic. Creation of multiple materialized views per MSK topic can slow down the ingestion performance because each materialized view becomes a consumer for that topic and shares the Amazon MSK bandwidth for that topic. Live streaming data in a streaming materialized view can be shared across multiple Redshift clusters or Redshift Serverless workgroups using data sharing.
- While defining your streaming materialized view, avoid using Json_Extract_Path_Text to pre-shred data, because
Json_extract_path_text
operates on the data row by row, which significantly impacts ingestion throughput. It is preferable to land the data as is from the stream and then shred it later. - Where possible, consider skipping the sort key in the streaming materialized view to accelerate the ingestion speed. When a streaming materialized view has a sort key, a sort operation will occur with every batch of ingested data from the stream. Sorting has a performance overheard depending on the sort key data type, number of sort key columns, and amount of data ingested in each batch. This sorting step can increase the latency before the streaming data is available to query. You should weigh which is more important: latency on ingestion or latency on querying the data.
- For optimized performance of the streaming materialized view and to reduce storage usage, occasionally purge data from the materialized view using delete, truncate, or alter table append.
- If you need to ingest multiple MSK topics in parallel into Amazon Redshift, start with a smaller number of streaming materialized views and keep adding more materialized views to evaluate the overall ingestion performance within a cluster or workgroup.
- Increasing the number of nodes in a Redshift provisioned cluster or the base RPU of a Redshift Serverless workgroup can help boost the ingestion performance of a streaming materialized view. For optimal performance, you should aim to have as many slices in your Redshift provisioned cluster as there are partitions in your MSK topic, or 8 RPU for every four partitions in your MSK topic.
Monitoring techniques
Records in the topic that exceed the size of the target materialized view column at the time of ingestion will be skipped. Records that are skipped by the materialized view refresh will be logged in the SYS_STREAM_SCAN_ERRORS system table.
Errors that occur when processing a record due to a calculation or a data type conversion or some other logic in the materialized view definition will result in the materialized view refresh failure until the offending record has expired from the topic. To avoid these types of issues, test the logic of your materialized view definition carefully; otherwise, land the records into the default VARBYTE column and process them later.
The following are available monitoring views:
- SYS_MV_REFRESH_HISTORY – Use this view to gather information about the refresh history of your streaming materialized views. The results include the refresh type, such as manual or auto, and the status of the most recent refresh. The following query shows the refresh history for a streaming materialized view:
- SYS_STREAM_SCAN_ERRORS – Use this view to check the reason why a record failed to load via streaming ingestion from an MSK topic. As of writing this post, when ingesting from Amazon MSK, this view only logs errors when the record is larger than the materialized view column size. This view will also show the unique identifier (offset) of the MSK record in the position column. The following query shows the error code and error reason when a record exceeded the maximum size limit:
- SYS_STREAM_SCAN_STATES – Use this view to monitor the number of records scanned at a given record_time. This view also tracks the offset of the last record read in the batch. The following query shows topic data for a specific materialized view:
- SYS_QUERY_HISTORY – Use this view to check the overall metrics for a streaming materialized view refresh. This will also log errors in the error_message column for errors that don’t show up in SYS_STREAM_SCAN_ERRORS. The following query shows the error causing the refresh failure of a streaming materialized view:
Additional considerations for implementation
You have the choice to optionally generate a materialized view on top of a streaming materialized view, allowing you to unnest and precompute results for end-users. This approach eliminates the need to store the results in a final table using a stored procedure.
In this post, you use the CAN_JSON_PARSE function to guard against any errors to more successfully ingest data—in this case, the streaming records that can’t be parsed are skipped by Amazon Redshift. However, if you want to keep track of your error records, consider storing them in a column using the following SQL when creating the streaming materialized view:
You can also consider unloading data from the view SYS_STREAM_SCAN_ERRORS into an Amazon Simple Storage Service (Amazon S3) bucket and get alerts by sending a report via email using Amazon Simple Notification Service (Amazon SNS) notifications whenever a new S3 object is created.
Lastly, based on your data freshness requirement, you can use Amazon EventBridge to schedule the jobs in your data warehouse to call the aforementioned SP_Orders_Load
stored procedure on a regular basis. EventBridge does this at fixed intervals, and you may need to have a mechanism (for example, an AWS Step Functions state machine) to monitor if the previous call to the procedure completed. For more information, refer to Creating an Amazon EventBridge rule that runs on a schedule. You can also refer to Accelerate orchestration of an ELT process using AWS Step Functions and Amazon Redshift Data API. Another option is to use Amazon Redshift query editor v2 to schedule the refresh. For details, refer to Scheduling a query with query editor v2.
Conclusion
In this post, we discussed best practices to implement near-real-time analytics using Amazon Redshift streaming ingestion with Amazon MSK. We showed you an example pipeline to ingest data from an MSK topic into Amazon Redshift using streaming ingestion. We also showed a reliable strategy to perform incremental streaming data load into Amazon Redshift using Kafka Partition and Kafka Offset. Additionally, we demonstrated the steps to configure cross-account streaming ingestion from Amazon MSK to Amazon Redshift and discussed performance considerations for optimized ingestion rate. Lastly, we discussed monitoring techniques to track failures in Amazon Redshift streaming ingestion.
If you have any questions, leave them in the comments section.
About the Authors
Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.
Adekunle Adedotun is a Sr. Database Engineer with Amazon Redshift service. He has been working on MPP databases for 6 years with a focus on performance tuning. He also provides guidance to the development team for new and existing service features.