Tag: Amazon Kinesis

Build a Real-time Stream Processing Pipeline with Apache Flink on AWS

by Steffen Hausmann | on | | Comments

This post has been translated into Japanese.

In today’s business environments, data is generated in a continuous fashion by a steadily increasing number of diverse data sources. Therefore, the ability to continuously capture, store, and process this data to quickly turn high-volume streams of raw data into actionable insights has become a substantial competitive advantage for organizations.

Apache Flink is an open source project that is well-suited to form the basis of such a stream processing pipeline. It offers unique capabilities that are tailored to the continuous analysis of streaming data. However, building and maintaining a pipeline based on Flink often requires considerable expertise, in addition to physical resources and operational efforts.

This post outlines a reference architecture for a consistent, scalable, and reliable stream processing pipeline that is based on Apache Flink using Amazon EMR, Amazon Kinesis, and Amazon Elasticsearch Service. An AWSLabs GitHub repository provides the artifacts that are required to explore the reference architecture in action. Resources include a producer application that ingests sample data into an Amazon Kinesis stream and a Flink program that analyses the data in real time and sends the result to Amazon ES for visualization.

Analyzing geospatial taxi data in real time

Consider a scenario related to optimizing taxi fleet operations. You obtain information continuously from a fleet of taxis currently operating in New York City. Using this data, you want to optimize the operations by analyzing the gathered data in real time and making data-based decisions.

You would like, for instance, to identify hot spots—areas that are currently in high demand for taxis—so that you can direct unoccupied taxis there. You also want to track current traffic conditions so that you can give approximate trip durations to customers, for example, for rides to the nearby airports. Naturally, your decisions should be based on information that closely reflects the current demand and traffic conditions. The incoming data needs to be analyzed in a continuous and timely fashion. Relevant KPIs and derived insights should be accessible to real-time dashboards.

For the purpose of this post, you emulate a stream of trip events by replaying a dataset of historic taxi trips collected in New York City into Amazon Kinesis Streams. The dataset is available from the New York City Taxi & Limousine Commission website. It contains information on the geolocation and collected fares of individual taxi trips.

In more realistic scenarios, you could leverage AWS IoT to collect the data from telemetry units installed in the taxis and then ingest the data into an Amazon Kinesis stream.

Architecture of a reliable and scalable stream processing pipeline

Because the pipeline serves as the central tool to operate and optimize the taxi fleet, it’s crucial to build an architecture that is tolerant against the failure of single nodes. The pipeline should adapt to changing rates of incoming events. Therefore, you should separate the ingestion of events, their actual processing, and the visualization of the gathered insights into different components. By loosely coupling these components of the infrastructure and using managed services, you can increase the robustness of the pipeline in case of failures. You can also scale the different parts of your infrastructure individually and reduce the efforts that are required to build and operate the entire pipeline.


Encrypt and Decrypt Amazon Kinesis Records Using AWS KMS

by Temitayo Olajide | on | | Comments

Customers with strict compliance or data security requirements often require data to be encrypted at all times, including at rest or in transit within the AWS cloud. This post shows you how to build a real-time streaming application using Kinesis in which your records are encrypted while at rest or in transit.

Amazon Kinesis overview

The Amazon Kinesis platform enables you to build custom applications that analyze or process streaming data for specialized needs. Amazon Kinesis can continuously capture and store terabytes of data per hour from hundreds of thousands of sources such as website clickstreams, financial transactions, social media feeds, IT logs, and transaction tracking events.

Through the use of HTTPS, Amazon Kinesis Streams encrypts data in-flight between clients which protects against someone eavesdropping on records being transferred. However, the records encrypted by HTTPS are decrypted once the data enters the service. This data is stored at rest for 24 hours (configurable up to 168 hours) to ensure that your applications have enough headroom to process, replay, or catch up if they fall behind.


In this post you build encryption and decryption into sample Kinesis producer and consumer applications using the Amazon Kinesis Producer Library (KPL), the Amazon Kinesis Consumer Library (KCL), AWS KMS, and the aws-encryption-sdk. The methods and the techniques used in this post to encrypt and decrypt Kinesis records can be easily replicated into your architecture. Some constraints:

  • AWS charges for the use of KMS API requests for encryption and decryption, for more information see AWS KMS Pricing.
  • You cannot use Amazon Kinesis Analytics to query Amazon Kinesis Streams with records encrypted by clients in this sample application.
  • If your application requires low latency processing, note that there will be a slight hit in latency.

The following diagram shows the architecture of the solution.


Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight

by Ian Robinson and Ben Snively | on | | Comments

Many business and operational processes require you to analyze large volumes of frequently updated data. Log analysis, for example, involves querying and visualizing large volumes of log data to identify behavioral patterns, understand application processing flows, and investigate and diagnose issues.

VPC flow logs capture information about the IP traffic going to and from network interfaces in VPCs in the Amazon VPC service. The logs allow you to investigate network traffic patterns and identify threats and risks across your VPC estate. Flow log data is stored using Amazon CloudWatch Logs. After you’ve created a flow log, you can view and retrieve its data in Amazon CloudWatch Logs.

Flow logs can help you with a number of tasks. For example, you can use them to troubleshoot why specific traffic is not reaching an instance, which in turn can help you diagnose overly restrictive security group rules. You can also use flow logs as a security tool to monitor the traffic that is reaching your instance.

This blog post shows how to build a serverless architecture by using Amazon Kinesis Firehose, AWS Lambda, Amazon S3, Amazon Athena, and Amazon QuickSight to collect, store, query, and visualize flow logs. In building this solution, you will also learn how to implement Athena best practices with regard to compressing and partitioning data so as to reduce query latencies and drive down query costs.

Summary of the solution

The solution described here is divided into three parts:

  • Send VPC Flow Logs to S3 for Analysis with Athena. This section describes how to use Lambda and Firehose to publish flow log data to S3, and how to create a table in Athena so that you can query this data.
  • Visualize Your Logs in QuickSight. Here you’ll learn how to use QuickSight and its Athena connector to build flow log analysis dashboards that you can share with other users in your organization.
  • Partition Your Data in Athena for Improved Query Performance and Reduced Costs. This section shows how you can use a Lambda function to automatically partition Athena data as it arrives in S3. This function will work with any Firehose stream and any other delivery mechanism that writes data to S3 using a year/month/day/hour prefix.

Partitioning your data is one of three strategies for improving Athena query performance and reducing costs. The other two are compressing your data, and converting it into columnar formats such as Apache Parquet. The solution described here automatically compresses your data, but it doesn’t convert it into a columnar format. Even if you don’t convert your data to a columnar format, as is the case here, it’s always worth compressing and partitioning it. For any large-scale solution, you should also consider converting it to Parquet.

Serverless Architecture for Analyzing VPC Flow Logs

Below is a diagram showing how the various services work together.


When you create a flow log for a VPC, the log data is published to a log group in CloudWatch Logs. By using a CloudWatch Logs subscription, you can send a real-time feed of these log events to a Lambda function that uses Firehose to write the log data to S3.


Implement Serverless Log Analytics Using Amazon Kinesis Analytics

by Nehal Mehta | on | | Comments

Applications log a large amount of data that—when analyzed in real time—provides significant insight into your applications. Real-time log analysis can be used to ensure security compliance, troubleshoot operation events, identify application usage patterns, and much more.

Ingesting and analyzing this data in real time can be accomplished by using a variety of open source tools on Amazon EC2. Alternatively, you can use a set of simple, managed AWS services to perform serverless log analytics. The Amazon Kinesis platform includes the following managed services:

  • Amazon Kinesis Streams streams data on AWS, which allows you to collect, store, and process TBs per hour at a low cost.
  • Amazon Kinesis Firehose loads streaming data in to Amazon Kinesis Analytics, Amazon S3, Amazon Redshift, or Amazon Elasticsearch Service.
  • Amazon Kinesis Analytics helps you analyze streaming data by writing SQL queries and in turn overcoming the management and monitoring of streaming logs in near real time. Analytics allows you to reference metadata stored in S3 in SQL queries for real-time analytics.

In this post, I show you how to implement a solution that analyzes streaming Apache access log data from an EC2 instance aggregated over 5 minutes. The solution helps you understand where requests to your applications are coming from. If the source is an unknown application or if a particular source application is trying to clog your application, you can contact the application owner.

Some challenges that this solution entails:

  • You do not want to maintain (patch/upgrade) the log application or servers to do log analytics. You also want your log analytics to scale on demand by default, and so all components are managed services.
  • Apache Logs logs the host IP address or host name. However, that information isn’t useful in the cloud where servers are fungible and hosts change constantly either to scale or heal automatically. So you maintain a flat file list of servers in an S3 bucket that can be updated by Auto Scaling policies and mapped to streaming log data.


The following diagram shows how this solution works.


  • Application nodes run Apache applications and write Apache logs locally to disk. The Amazon Kinesis agent on the EC2 instance ingests the log stream in to the Amazon Kinesis stream.
  • The log input stream from various application nodes is ingested in to the Amazon Kinesis stream.
  • Machine metadata about the machine or application is stored in flat files in an S3 bucket. It is a mapping of host IP addresses with the application name and contact.
  • The Analytics application processes streaming logs over tumbling windows by adding referenced machine metadata from S3.
  • The output stream, which is the result of the aggregated responses from the Analytics application, is written into the Amazon Kinesis stream.
  • The Lambda function consumes the aggregated response from the destination stream, processes it, and publishes it to Amazon CloudWatch. It is event driven: as soon as new records are pushed to the destination stream, they are processed in batches of 200 records.
  • The CloudWatch dashboard is used to view response trends.
  • Alarms on aggregated data are generated when specified thresholds are reached.


Joining and Enriching Streaming Data on Amazon Kinesis

by Assaf Mentzer | on | | Comments

Are you trying to move away from a batch-based ETL pipeline? You might do this, for example, to get real-time insights into your streaming data, such as clickstream, financial transactions, sensor data, customer interactions, and so on.  If so, it’s possible that as soon as you get down to requirements, you realize your streaming data doesn’t have all of the fields you need for real-time processing, and you are really missing that JOIN keyword!

You might also have requirements to enrich the streaming data based on a static reference, a dynamic dataset, or even with another streaming data source.  How can you achieve this without sacrificing the velocity of the streaming data?

In this blog post, I provide three use cases and approaches for joining and enriching streaming data:

Joining streaming data with a relatively static dataset on Amazon S3 using Amazon Kinesis Analytics

In this use case, Amazon Kinesis Analytics can be used to define a reference data input on S3, and use S3 for enriching a streaming data source.

For example, bike share systems around the world can publish data files about available bikes and docks, at each station, in real time.  On bike-share system data feeds that follow the General Bikeshare Feed Specification (GBFS), there is a reference dataset that contains a static list of all stations, their capacities, and locations.

Let’s say you would like to enrich the bike-availability data (which changes throughout the day) with the bike station’s latitude and longitude (which is static) for downstream applications.  The architecture would look like this:



Scale Your Amazon Kinesis Stream Capacity with UpdateShardCount

by Allan MacInnis | on | | Comments

Allan MacInnis is a Kinesis Solution Architect for Amazon Web Services

Starting today, you can easily scale your Amazon Kinesis streams to respond in real time to changes in your streaming data needs. Customers use Amazon Kinesis to capture, store, and analyze terabytes of data per hour from clickstreams, financial transactions, social media feeds, and more.

With the new Amazon Kinesis Streams UpdateShardCount API operation, you can automatically scale your stream shard capacity by using Amazon CloudWatch alarms, Amazon SNS, and AWS Lambda. In this post, I walk you through an example of how you can automatically scale your shards using a few lines of code.

Getting started

In this example, I only demonstrate how to scale out capacity within your Amazon Kinesis stream. However, you can implement a similar strategy for when you need to scale in the number of shards in your stream, as the approach is nearly identical.

Consider a stream containing two shards, with a producer that is using a random partition key. As a reminder, each shard in a stream can ingest up to 1000 records per second, or up to 1 MB per second of data. If your data producer exceeds either of those values, Amazon Kinesis Streams raises an exception, and your producer needs to retry records that did not get written successfully. Retrying failed records is a valid approach if the spike is very short-lived. However, to ingest more than 1000 records per second for a longer duration, you need to scale the number of shards in your stream. Instead of scaling manually, how about building a system that can automatically scale the shards for you?

Pick your CloudWatch scaling thresholds

You need to consider the threshold at which the system should automatically add shards. For this post, double the total shard count of your stream when the number of records per second being written to the stream reaches 80% of the total records per second capacity. So, for a two-shard stream, the limit is 2000 records per second. You want the threshold to be set to 80% of that, or 1600 records per second. When this threshold is breached, you want the system to double the shard count in the stream to four, scaling up to a total ingestion capacity of 4000 records per second.

Create an alarm on the IncomingRecords metric of your stream, with a threshold value set to greater than or equal to 240000 for the Sum value of this metric.  Set the alarm to monitor over a single period of five minutes. In other words, if the total number of records written to your stream over a five-minute period exceeds 240000, double the number of shards in the stream. The 240000 value is calculated using the 80% critera (800 records per second), multiplied by the number of shards in the stream (2), multiplied by the number of seconds in the period (300).

Automatically adjust your CloudWatch scaling thresholds

You also need to adjust the alarm threshold to accommodate for the new shard capacity automatically. For this example, update the alarm threshold to 80% of your new capacity (or 3200 records per second) by setting a CloudWatch alarm with an action to publish to a SNS topic when the alarm is triggered.

You can then create a Lambda function that subscribes to this SNS topic and executes a call to the new UpdateShardCount API operation while adjusting the CloudWatch alarm threshold. To learn how to configure a Cloudwatch alarm, see Creating Amazon Cloudwatch Alarms. For information about how to invoke a Lambda function from SNS, see Invoking Lambda Functions Using Amazon SNS Notifications.

To scale in your shards, a similar strategy is used: create a second CloudWatch alarm that would be triggered when the input dropped below a threshold; the Lambda function would halve the number of shards and halve the CloudWatch alarm thresholds.

Example Lambda function

The following Python code doubles the shard count and adjusts the threshold for the alarm that triggered the scaling action. You can create a second, similar function to handle scale-in actions, or just adjust this one with some conditional logic to handle both scenarios.


Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics

by Chris Marshall | on | | Comments

Chris Marshall is a Solutions Architect for Amazon Web Services

Analyzing web log traffic to gain insights that drive business decisions has historically been performed using batch processing.  While effective, this approach results in delayed responses to emerging trends and user activities.  There are solutions to deal with processing data in real time using streaming and micro-batching technologies, but they can be complex to set up and maintain.  Amazon Kinesis Analytics is a managed service that makes it very easy to identify and respond to changes in behavior in real-time.

One use case where it’s valuable to have immediate insights is analyzing clickstream data.   In the world of digital advertising, an impression is when an ad is displayed in a browser and a clickthrough represents a user clicking on that ad.  A clickthrough rate (CTR) is one way to monitor the ad’s effectiveness.  CTR is calculated in the form of: CTR = Clicks / Impressions * 100.  Digital marketers are interested in monitoring CTR to know when particular ads perform better than normal, giving them a chance to optimize placements within the ad campaign.  They may also be interested in anomalous low-end CTR that could be a result of a bad image or bidding model.

In this post, I show an analytics pipeline which detects anomalies in real time for a web traffic stream, using the RANDOM_CUT_FOREST function available in Amazon Kinesis Analytics.


Amazon Kinesis Analytics includes a powerful set of analytics functions to analyze streams of data.  One such function is RANDOM_CUT_FOREST.  This function detects anomalies by scoring data flowing through a dynamic data stream. This novel approach identifies a normal pattern in streaming data, then compares new data points in reference to it. For more information, see Robust Random Cut Forest Based Anomaly Detection On Streams.

Analytics pipeline components

To demonstrate how the RANDOM_CUT_FOREST function can be used to detect anomalies in real-time click through rates, I will walk you through how to build an analytics pipeline and generate web traffic using a simple Python script.  When your injected anomaly is detected, you get an email or SMS message to alert you to the event.

This post first walks through the components of the analytics pipeline, then discusses how to build and test it in your own AWS account.  The accompanying AWS CloudFormation script builds the Amazon API Gateway API, Amazon Kinesis streams, AWS Lambda function, Amazon SNS components, and IAM roles required for this walkthrough. Then you manually create the Amazon Kinesis Analytics application that uses the RANDOM_CUT_FOREST function in SQL.


Writing SQL on Streaming Data with Amazon Kinesis Analytics – Part 2

by Ryan Nienhuis | on | | Comments

Ryan Nienhuis is a Senior Product Manager for Amazon Kinesis.

This is the second of two AWS Big Data posts on Writing SQL on Streaming Data with Amazon Kinesis Analytics. In the last post, I provided an overview of streaming data and key concepts, such as the basics of streaming SQL, and completed a walkthrough using a simple example. In this post, I cover more advanced stream processing concepts using Amazon Kinesis Analytics and you can complete an end-to-end application.

Amazon Kinesis Analytics allows you to easily write SQL ­­­on streaming data, providing a powerful way to build a stream processing application in minutes. The service allows you to connect to streaming data sources, process the data with sub-second latencies, and continuously emit results to downstream destinations for use in real-time alerts, dashboards, or further analysis.


Writing SQL on Streaming Data with Amazon Kinesis Analytics – Part 1

by Ryan Nienhuis | on | | Comments

Ryan Nienhuis is a Senior Product Manager for Amazon Kinesis

This is the first of two AWS Big Data blog posts on Writing SQL on Streaming Data with Amazon Kinesis Analytics. In this post, I provide an overview of streaming data and key concepts like the basics of streaming SQL, and complete a walkthrough using a simple example. In the next post, I will cover more advanced stream processing concepts using Amazon Kinesis Analytics.

Most organizations use batch data processing to perform their analytics in daily or hourly intervals to inform their business decisions and improve their customer experiences. However, you can derive significantly more value from your data if you are able to process and react in real time. Indeed, the value of insights in your data can decline rapidly over time – the faster you react, the better. For example:

  • Analyzing your company’s key performance indicators over the last 24 hours is a better reflection of your current business than analyzing last month’s metrics.
  • Reacting to an operational event as it is happening is far more valuable than discovering a week later that the event occurred.
  • Identifying that a customer is unable to complete a purchase on your ecommerce site so you can assist them in completing the order is much better than finding out next week that they were unable to complete the transaction.

Real-time insights are extremely valuable, but difficult to extract from streaming data. Processing data in real time can be difficult because it needs to be done quickly and continuously to keep up with the speed at which the data is produced. In addition, the analysis may require data to be processed in the same order in which it was generated for accurate results, which can be hard due to the distributed nature of the data.

Because of these complexities, people start by implementing simple applications that perform streaming ETL, such as collecting, validating, and normalizing log data across different applications. Some then progress to basic processing like rolling min-max computations, while a select few implement sophisticated processing such as anomaly detection or correlating events by user sessions.  With each step, more and more value is extracted from the data but the difficulty level also increases.

With the launch of Amazon Kinesis Analytics, you can now easily write SQL ­­­on streaming data, providing a powerful way to build a stream processing application in minutes. The service allows you to connect to streaming data sources, process the data with sub-second latencies, and continuously emit results to downstream destinations for use in real-time alerts, dashboards, or further analysis.

This post introduces you to Amazon Kinesis Analytics, the fundamentals of writing ANSI-Standard SQL over streaming data, and works through a simple example application that continuously generates metrics over time windows.


How SmartNews Built a Lambda Architecture on AWS to Analyze Customer Behavior and Recommend Content

by SmartNews | on | | Comments

This is a guest post by Takumi Sakamoto, a software engineer at SmartNews. SmartNews in their own words: “SmartNews is a machine learning-based news discovery app that delivers the very best stories on the Web for more than 18 million users worldwide.”

Data processing is one of the key technologies for SmartNews. Every team’s workload involves data processing for various purposes. The news team at SmartNews uses data as input to their machine learning algorithm for delivering the very best stories on the Web. The product team relies on data to run various A/B tests, to learn about how our customers consume news articles, and to make product decisions.

To meet the goals of both teams, we built a sustainable data platform based on the lambda architecture, which is a data-processing framework that handles a massive amount of data and integrates batch and real-time processing within a single framework.

Thanks to AWS services and OSS technologies, our data platform is highly scalable and reliable, and is flexible enough to satisfy various requirements with minimum cost and effort.

Our current system generates tens of GBs of data from multiple data sources, and runs daily aggregation queries or machine learning algorithms on datasets with hundreds of GBs. Some outputs by machine learning algorithms are joined on data streams for gathering user feedback in near real-time (e.g. the last 5 minutes). It lets us adapt our product for users with minimum latency. In this post, I’ll show you how we built a SmartNews data platform on AWS.

The image below depicts the platform. Please scroll to see the full architecture.