AWS Big Data Blog

Writing SQL on Streaming Data with Amazon Kinesis Analytics – Part 1

Ryan Nienhuis is a Senior Product Manager for Amazon Kinesis

This is the first of two AWS Big Data blog posts on Writing SQL on Streaming Data with Amazon Kinesis Analytics. In this post, I provide an overview of streaming data and key concepts like the basics of streaming SQL, and complete a walkthrough using a simple example. In the next post, I will cover more advanced stream processing concepts using Amazon Kinesis Analytics.

Most organizations use batch data processing to perform their analytics in daily or hourly intervals to inform their business decisions and improve their customer experiences. However, you can derive significantly more value from your data if you are able to process and react in real time. Indeed, the value of insights in your data can decline rapidly over time – the faster you react, the better. For example:

  • Analyzing your company’s key performance indicators over the last 24 hours is a better reflection of your current business than analyzing last month’s metrics.
  • Reacting to an operational event as it is happening is far more valuable than discovering a week later that the event occurred.
  • Identifying that a customer is unable to complete a purchase on your ecommerce site so you can assist them in completing the order is much better than finding out next week that they were unable to complete the transaction.

Real-time insights are extremely valuable, but difficult to extract from streaming data. Processing data in real time can be difficult because it needs to be done quickly and continuously to keep up with the speed at which the data is produced. In addition, the analysis may require data to be processed in the same order in which it was generated for accurate results, which can be hard due to the distributed nature of the data.

Because of these complexities, people start by implementing simple applications that perform streaming ETL, such as collecting, validating, and normalizing log data across different applications. Some then progress to basic processing like rolling min-max computations, while a select few implement sophisticated processing such as anomaly detection or correlating events by user sessions.  With each step, more and more value is extracted from the data but the difficulty level also increases.

With the launch of Amazon Kinesis Analytics, you can now easily write SQL ­­­on streaming data, providing a powerful way to build a stream processing application in minutes. The service allows you to connect to streaming data sources, process the data with sub-second latencies, and continuously emit results to downstream destinations for use in real-time alerts, dashboards, or further analysis.

This post introduces you to Amazon Kinesis Analytics, the fundamentals of writing ANSI-Standard SQL over streaming data, and works through a simple example application that continuously generates metrics over time windows.

What is streaming data?

Today, data is generated continuously from a large variety of sources, including clickstream data from mobile and web applications, ecommerce transactions, application logs from servers, telemetry from connected devices, and many other sources.

Typically, hundreds to millions of these sources create data that is usually small (order of kilobytes) and occurs in a sequence. For example, your ecommerce site has thousands of individuals concurrently interacting with the site, each generating a sequence of events based upon their activity (click product, add to cart, purchase product, etc.). When these sequences are captured continuously from these sources as events occur, the data is categorized as streaming data.

Amazon Kinesis Streams

Capturing event data with low latency and durably storing it in a highly available, scalable data store, such as Amazon Kinesis Streams, is the foundation for streaming data. Streams enables you to capture and store data for ordered, replayable, real-time processing using a streaming application. You configure your data sources to emit data into the stream, then build applications that read and process data from that stream in real-time. To build your applications, you can use the Amazon Kinesis Client Library (KCL), AWS Lambda, Apache Storm, and a number of other solutions, including Amazon Kinesis Analytics.

Amazon Kinesis Firehose

One of the more common use cases for streaming data is to capture it and then load it to a cloud storage service, a database, or other analytics service. Amazon Kinesis Firehose is a fully managed service that offers an easy to use solution to collect and deliver streaming data to Amazon S3, Amazon Redshift, and Amazon Elasticsearch Service.

With Firehose, you create delivery streams using the AWS Management Console to specify your destinations of choice and choose from configuration options that allow you to batch, compress, and encrypt your data before it is loaded into the destination. From there, you set up your data sources to start sending data to the Firehose delivery stream, which loads it continuously to your destinations with no ongoing administration.

Amazon Kinesis Analytics

Amazon Kinesis Analytics provides an easy and powerful way to process and analyze streaming data with standard SQL. Using Analytics, you build applications that continuously read data from streaming sources, process it in real-time using SQL code, and emit the results downstream to your configured destinations.

An Analytics application can ingest data from Streams and Firehose. The service detects a schema associated with the data in your source for common formats, which you can further refine using an interactive schema editor. Your application’s SQL code can be anything from a simple count or average, to more advanced analytics like correlating events over time windows. You author your SQL using an interactive editor, and then test it with live streaming data.

Finally, you configure your application to emit SQL results to up to four destinations, including S3, Amazon Redshift, and Amazon Elasticsearch Service (through a Firehose delivery stream); or to an Amazon Kinesis stream. After setup, the service scales your application to handle your query complexity and streaming data throughput – you don’t have to provision or manage any servers.

Walkthrough (part 1): Run your first SQL query using Amazon Kinesis Analytics

The easiest way to understand Amazon Kinesis Analytics is to try it out. You need an AWS account to get started. This walkthrough is in two parts. The first part sets up an application that consists of a basic streaming query on some demo data. Then I step back to describe what you’ve just created, since this might be your first introduction to some streaming data concepts like pumps and tumbling time windows. The second part of the walkthrough augments the application by adding more aggregation functions and an intermediate stream.​

A streaming application consist of three components:

  • Streaming data sources
  • Analytics written in SQL
  • Destinations for the results

The application continuously reads data from a streaming source, generates analytics using your SQL code, and emits those results to up to four destinations. This walkthrough will cover the first two steps and point you in the right direction for completing an end-to-end application by adding a destination for your SQL results.

Create an Amazon Kinesis Analytics application

  1. Open the Amazon Kinesis Analytics console and choose Create a new application.

  1. Provide a name and (optional) description for your application and choose Continue.

You are taken to the application hub page.

Create a streaming data source

For input, Analytics supports Amazon Kinesis Streams and Amazon Kinesis Firehose as streaming data input, and reference data input through S3. The primary difference between these two sources is that data is read continuously from the streaming data sources and at one time for reference data sources. Reference data sources are used for joining against the incoming stream to enrich the data.

In Amazon Kinesis Analytics, choose Connect to a source.

If you have existing Amazon Kinesis streams or Firehose delivery streams, they are shown here.

For the purposes of this post, you will be using a demo stream, which creates and populates a stream with sample data on your behalf. The demo stream is created under your account with a single shard, which supports up to 1 MB/sec of write throughput and 2 MB/sec of read throughput. Analytics will write simulated stock ticker data to the demo stream directly from your browser. Your application will read data from the stream in real time.

Next, choose Configure a new stream and Create demo stream.

Later, you will refer to the demo stream in your SQL code as “SOURCE_SQL_STREAM_001”. Analytics calls the DiscoverInputSchema API action, which infers a schema by sampling records from your selected input data stream. You can see the applied schema on your data in the formatted sample shown in the browser, as well as the original sample taken from the raw stream. You can then edit the schema to fine tune it to your needs.

Feel free to explore; when you are ready, choose Save and continue. You are taken back to the streaming application hub.

Create a SQL query for analyzing data

On the streaming application hub, choose Go to SQL Editor and Run Application.

This SQL editor is the development environment for Amazon Kinesis Analytics. On the top portion of the screen, there is a text editor with syntax highlighting and intelligent auto-complete, as well as a number of SQL templates to help you get started. On the bottom portion of the screen, there is an area for you to explore your source data, your intermediate SQL results, and the data you are writing to your destinations. You can view the entire flow of your application here, end-to-end.

Next, choose Add SQL from Template.

Amazon Kinesis Analytics provides a number of SQL templates that work with the demo stream. Feel free to explore; when you’re ready, choose the COUNT, AVG, etc. (aggregate functions) + Tumbling time window template and choose Add SQL to Editor.

The SELECT statement in this SQL template performs a count over a 10-second tumbling window. A window is used to group rows together relative to the current row that the Amazon Kinesis Analytics application is processing.

Choose Save and run SQL. Congratulations, you just wrote your first SQL query on streaming data!

Streaming SQL with Amazon Kinesis Analytics

In a relational database, you work with tables of data, using INSERT statements to add records and SELECT statements to query the data in a table. In Amazon Kinesis Analytics, you work with in-application streams, which are similar to tables in that you can CREATE, INSERT, and SELECT from them. However, unlike a table, data is continuously inserted into an in-application stream, even while you are executing a SQL statement against it. The data in an in-application stream is therefore unbounded.

In your application code, you interact primarily with in-application streams. For instance, a source in-application stream represents your configured Amazon Kinesis stream or Firehose delivery stream in the application, which by default is named “SOURCE_SQL_STREAM_001”. A destination in-application stream represents your configured destinations, which by default is named “DESTINATION_SQL_STREAM”. When interacting with in-application streams, the following is true:

  • The SELECT statement is used in the context of an INSERT statement. That is, when you select rows from one in-application stream, you insert results into another in-application stream.
  • The INSERT statement is always used in the context of a pump. That is, you use pumps to write to an in-application stream. A pump is the mechanism used to make an INSERT statement continuous.

There are two separate SQL statements in the template you selected in the first walkthrough. The first statement creates a target in-application stream for the SQL results; the second statement creates a PUMP for inserting into that stream and includes the SELECT statement.

Generating real-time analytics using windows

In the console, look at the SQL results from the walkthrough, which are sampled and continuously streamed to the console.

In the example application you just built, you used a 10-second tumbling time window to perform an aggregation of records. Notice the special column called ROWTIME, which represents the time a row was inserted into the first in-application stream. The ROWTIME value is incrementing every 10 seconds with each new set of SQL results. (Some 10 second windows may not be shown in the console because we sample results on the high speed stream.) You use this special column in your tumbling time window to help define the start and end of each result set.

Windows are important because they define the bounds for which you want your query to operate. The starting bound is usually the current row that Amazon Kinesis Analytics is processing, and the window defines the ending bound. Windows are required with any query that works across rows, because the in-application stream is unbounded and windows provide a mechanism to bind the result set and make the query deterministic. Analytics supports three types of windows: specifically tumbling, sliding, and custom windows. These concepts will be covered in depth in our next blog post.

Tumbling windows, like the one you selected in your template, are useful for periodic reports. You can use a tumbling window to compute an average number of visitors to your website in the last 5 minutes, or the maximum over the past hour. A single result is emitted for each key in the group as specified by the clause at the end of the defined window.

In streaming data, there are different types of time and how they are used is important to the analytics.  Our example uses ROWTIME, or the processing time, which is great for some use cases. However, in many scenarios, you want a time that more accurately reflects when the event occurred, such as the event or ingest time. Amazon Kinesis Analytics supports all three different time semantics for processing data; processing, event, and ingest time. These concepts will be covered in depth in our next blog post.

Part 2: Run your second SQL query using Amazon Kinesis Analytics

The next part of the walkthrough adds some additional metrics to your first SQL query and adds a second step to your application.

Add metrics to the SQL statement

In the SQL editor, add some additional SQL code.

First, add some metrics including the average price, average change, maximum price, and minimum price over the same window. Note that you need to add these in your SELECT statement as well as the in-application stream you are inserting into, DESTINATION_SQL_STREAM.

Second, add the sector to the query so you have additional information about the stock ticker. Note that the sector must be added to both the SELECT and GROUP BY clauses.

When you are finished, your SQL code should look like the following:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price REAL,
    avg_change REAL,
    max_price REAL,
    min_price REAL);

CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM   ticker_symbol,
                sector,
                COUNT(*) AS ticker_symbol_count,
                AVG(price) as avg_price,
                AVG(change) as avg_change,
                MAX(price) as max_price,
                MIN(price) as min_price
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, sector, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

Choose Save and run SQL.

Add a second step to your SQL code

Next, add a second step to your SQL code. You can use in-application streams to store intermediate SQL results, which can then be used as input for additional SQL statements. This allows you to build applications with multiple steps serially before sending it to the destination of your choice. Additionally, you can also use in-application streams to perform multiple steps in parallel and send to multiple destinations.

First, change the DESTINATION_SQL_STREAM name in your two SQL statements to be INTERMEDIATE_SQL_STREAM.

Next, add a second SQL step that selects from INTERMEDIATE_SQL_STREAM and INSERTS into a DESTINATION_SQL_STREAM. The SELECT statement should filter only for companies in the TECHNOLOGY sector using a simple WHERE clause. You must also create the DESTINATION_SQL_STREAM to insert SQL results into. Your final application code should look like the following:

CREATE OR REPLACE STREAM "INTERMEDIATE_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price REAL,
    avg_change REAL,
    max_price REAL,
    min_price REAL);

CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "INTERMEDIATE_SQL_STREAM"
SELECT STREAM   ticker_symbol,
                sector,
                COUNT(*) AS ticker_symbol_count,
                AVG(price) as avg_price,
                AVG(change) as avg_change,
                MAX(price) as max_price,
                MIN(price) as min_price
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, sector, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price REAL,
    avg_change REAL,
    max_price REAL,
    min_price REAL);

CREATE OR REPLACE PUMP "STREAM_PUMP_02" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM   ticker_symbol, sector, ticker_symbol_count, avg_price, avg_change, max_price, min_price
FROM "INTERMEDIATE_SQL_STREAM"
WHERE sector = 'TECHNOLOGY';

Choose Save and run SQL.

You can see both of the in-application streams on the left side of the Real-time analytics tab, and select either to see each step in your application for end-to-end visibility.

From here, you can add a destination for your SQL results, such as an Amazon S3 bucket. After set up, your application continuously reads data from the streaming source, processes it using your SQL code, and emits the results to your configured destination.

Clean up

The final step is to clean up. Take the following steps to avoid incurring charges.

  1. Delete the Streams demo stream.
  2. Stop the Analytics application.

Summary

Previously, real-time stream data processing was only accessible to those with the technical skills to build and manage a complex application. With Amazon Kinesis Analytics, anyone familiar with the ANSI SQL standard can build and deploy a stream data processing application in minutes.

This application you just built provides a managed and elastic data processing pipeline using Analytics that calculates useful results over streaming data. Results are calculated as they arrive, and you can configure a destination to deliver them to a persistent store like Amazon S3.

It’s simple to get this solution working for your use case. All that is required is to replace the Amazon Kinesis demo stream with your own, and then set up data producers. From there, configure the analytics and you have an end-to-end solution for capturing, processing, and durably storing streaming data.

If you have questions or suggestions, please comment below.


 

Related

Process Amazon Kinesis Aggregated Data with AWS Lambda