AWS Big Data Blog

Build a Real-time Stream Processing Pipeline with Apache Flink on AWS

NOTE: As of November 2018, you can run Apache Flink programs with Amazon Kinesis Analytics for Java Applications in a fully managed environment. You can find further details in a new blog post on the AWS Big Data Blog and in this Github repository.

————————–

September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.


This post has been translated into Japanese.

In today’s business environments, data is generated in a continuous fashion by a steadily increasing number of diverse data sources. Therefore, the ability to continuously capture, store, and process this data to quickly turn high-volume streams of raw data into actionable insights has become a substantial competitive advantage for organizations.

Apache Flink is an open source project that is well-suited to form the basis of such a stream processing pipeline. It offers unique capabilities that are tailored to the continuous analysis of streaming data. However, building and maintaining a pipeline based on Flink often requires considerable expertise, in addition to physical resources and operational efforts.

This post outlines a reference architecture for a consistent, scalable, and reliable stream processing pipeline that is based on Apache Flink using Amazon EMR, Amazon Kinesis, and Amazon OpenSearch Service. An AWSLabs GitHub repository provides the artifacts that are required to explore the reference architecture in action. Resources include a producer application that ingests sample data into an Amazon Kinesis stream and a Flink program that analyses the data in real time and sends the result to Amazon OpenSearch Service for visualization.

Analyzing geospatial taxi data in real time

Consider a scenario related to optimizing taxi fleet operations. You obtain information continuously from a fleet of taxis currently operating in New York City. Using this data, you want to optimize the operations by analyzing the gathered data in real time and making data-based decisions.

You would like, for instance, to identify hot spots—areas that are currently in high demand for taxis—so that you can direct unoccupied taxis there. You also want to track current traffic conditions so that you can give approximate trip durations to customers, for example, for rides to the nearby airports. Naturally, your decisions should be based on information that closely reflects the current demand and traffic conditions. The incoming data needs to be analyzed in a continuous and timely fashion. Relevant KPIs and derived insights should be accessible to real-time dashboards.

For the purpose of this post, you emulate a stream of trip events by replaying a dataset of historic taxi trips collected in New York City into Amazon Kinesis Streams. The dataset is available from the New York City Taxi & Limousine Commission website. It contains information on the geolocation and collected fares of individual taxi trips.

In more realistic scenarios, you could leverage AWS IoT to collect the data from telemetry units installed in the taxis and then ingest the data into an Amazon Kinesis stream.

Architecture of a reliable and scalable stream processing pipeline

Because the pipeline serves as the central tool to operate and optimize the taxi fleet, it’s crucial to build an architecture that is tolerant against the failure of single nodes. The pipeline should adapt to changing rates of incoming events. Therefore, you should separate the ingestion of events, their actual processing, and the visualization of the gathered insights into different components. By loosely coupling these components of the infrastructure and using managed services, you can increase the robustness of the pipeline in case of failures. You can also scale the different parts of your infrastructure individually and reduce the efforts that are required to build and operate the entire pipeline.

By decoupling the ingestion and storage of events sent by the taxis from the computation of queries deriving the desired insights, you can substantially increase the robustness of the infrastructure.

Events are initially persisted by means of Amazon Kinesis Streams, which holds a replayable, ordered log and redundantly stores events in multiple Availability Zones. Later, the events are read from the stream and processed by Apache Flink. As Flink continuously snapshots its internal state, the failure of an operator or entire node can be recovered by restoring the internal state from the snapshot and replaying events that need to be reprocessed from the stream.

Another advantage of a central log for storing events is the ability to consume data by multiple applications. It is feasible to run different versions of a Flink application side by side for benchmarking and testing purposes. Or, you could use Amazon Kinesis Firehose to persist the data from the stream to Amazon S3 for long-term archival and then thorough historical analytics, using Amazon Athena.

Because Amazon Kinesis Streams, Amazon EMR, and Amazon OpenSearch Service are managed services that can be created and scaled by means of simple API calls, using these services allows you to focus your expertise on providing business value. Let AWS do the undifferentiated heavy lifting that is required to build and, more importantly, operate and scale the entire pipeline. The creation of the pipeline can be fully automated with AWS CloudFormation and individual components can be monitored and automatically scaled by means of Amazon CloudWatch. Failures are detected and automatically mitigated.

For the rest of this post, I focus on aspects that are related to building and running the reference architecture on AWS.

Building and running the reference architecture

To see the taxi trip analysis application in action, use two CloudFormation templates to build and run the reference architecture:

  • The first template builds the runtime artifacts for ingesting taxi trips into the stream and for analyzing trips with Flink
  • The second template creates the resources of the infrastructure that run the application

The resources that are required to build and run the reference architecture, including the source code of the Flink application and the CloudFormation templates, are available from the flink-stream-processing-refarch AWSLabs GitHub repository.

Building the runtime artifacts and creating the infrastructure

Execute the first CloudFormation template to create an AWS CodePipeline pipeline, which builds the artifacts by means of AWS CodeBuild in a serverless fashion. You can also install Maven and building the Flink Amazon Kinesis connector and the other runtime artifacts manually. After all stages of the pipeline complete successfully, you can retrieve the artifacts from the S3 bucket that is specified in the output section of the CloudFormation template.

When the first template is created and the runtime artifacts are built, execute the second CloudFormation template, which creates the resources of the reference architecture described earlier.

Wait until both templates have been created successfully before proceeding to the next step. This takes up to 15 minutes, so feel free to get a fresh cup of coffee while CloudFormation does all the work for you.

Starting the Flink runtime and submitting a Flink program

To start the Flink runtime and submit the Flink program that is doing the analysis, connect to the EMR master node. The parameters of this and later commands can be obtained from the output sections of the two CloudFormation templates, which have been used to provision the infrastructure and build the runtime artifacts.

$ ssh -C -D 8157 hadoop@«EMR master node IP»

The EMR cluster that is provisioned by the CloudFormation template comes with two c4.large core nodes with two vCPUs each. Generally, you match the number of node cores to the number of slots per task manager. For this post, it is reasonable to start a long-running Flink cluster with two task managers and two slots per task manager:

$ flink-yarn-session -n 2 -s 2 -jm 768 -tm 1024 -d

After the Flink runtime is up and running, the taxi stream processor program can be submitted to the Flink runtime to start the real-time analysis of the trip events in the Amazon Kinesis stream.

$ aws s3 cp s3://«Artifact bucket»/artifacts/flink-taxi-stream-processor-1.3.jar .

$ flink run -p 4 flink-taxi-stream-processor-1.3.jar --region «AWS region» --stream «Kinesis stream name» --es-endpoint https://«Elasticsearch endpoint» --checkpoint s3://«Checkpoint bucket»

Now that the Flink application is running, it is reading the incoming events from the stream, aggregating them in time windows according to the time of the events, and sending the results to Amazon OpenSearch Service. The Flink application takes care of batching records so as not to overload the Elasticsearch cluster with small requests and of signing the batched requests to enable a secure configuration of the Elasticsearch cluster.

If you have activated a proxy in your browser, you can explore the Flink web interface through the dynamic port forwarding that has been established by the SSH session to the master node.

Ingesting trip events into the Amazon Kinesis stream

To ingest the events, use the taxi stream producer application, which replays a historic dataset of taxi trips recorded in New York City from S3 into an Amazon Kinesis stream with eight shards. In addition to the taxi trips, the producer application also ingests watermark events into the stream so that the Flink application can determine the time up to which the producer has replayed the historic dataset.

$ ssh -C ec2-user@«producer instance IP»

$ aws s3 cp s3://«Artifact bucket»/artifacts/kinesis-taxi-stream-producer-1.3.jar .

$ java -jar kinesis-taxi-stream-producer-1.3.jar -speedup 6480 -stream «Kinesis stream name» -region «AWS region»

This application is by no means specific to the reference architecture discussed in this post. You can easily reuse it for other purposes as well, for example, building a similar stream processing architecture based on Amazon Kinesis Analytics instead of Apache Flink.

Exploring the Kibana dashboard

Now that the entire pipeline is running, you can finally explore the Kibana dashboard that displays insights that are derived in real time by the Flink application:

https://«Elasticsearch end-point»/_plugin/kibana/app/kibana#/dashboard/Taxi-Trips-Dashboard

For the purpose of this post, the Elasticsearch cluster is configured to accept connections from the IP address range specified as a parameter of the CloudFormation template that creates the infrastructure. For production-ready applications, this may not always be desirable or possible. For more information about how to securely connect to your Elasticsearch cluster, see the Set Access Control for Amazon OpenSearch Service post on the AWS Database blog.

In the Kibana dashboard, the map on the left visualizes the start points of taxi trips. The redder a rectangle is, the more taxi trips started in that location. The line chart on the right visualizes the average duration of taxi trips to John F. Kennedy International Airport and LaGuardia Airport, respectively.

Given this information, taxi fleet operations can be optimized by proactively sending unoccupied taxis to locations that are currently in high demand, and by estimating trip durations to the local airports more precisely.

You can now scale the underlying infrastructure. For example, scale the shard capacity of the stream, change the instance count or the instance types of the Elasticsearch cluster, and verify that the entire pipeline remains functional and responsive even during the rescale operation.

Running Apache Flink on AWS

As you have just seen, the Flink runtime can be deployed by means of YARN, so EMR is well suited to run Flink on AWS. However, there are some AWS-related considerations that need to be addressed to build and run the Flink application:

  • Building the Flink Amazon Kinesis connector
  • Adapting the Amazon Kinesis consumer configuration
  • Enabling event time processing by submitting watermarks to Amazon Kinesis
  • Connecting Flink to Amazon OpenSearch Service

Building the Flink Amazon Kinesis connector

Flink provides a connector for Amazon Kinesis streams. In contrast to other Flink artifacts, the Amazon Kinesis connector is not available from Maven central, so you need to build it yourself. I recommend building Flink with Maven 3.2.x instead of the more recent Maven 3.3.x release, as Maven 3.3.x may produce outputs with improperly shaded dependencies.

$ wget -qO- https://github.com/apache/flink/archive/release-1.2.0.zip | bsdtar -xf-

$ cd flink-release-1.2.0

$ mvn clean package -Pinclude-kinesis -DskipTests -Dhadoop-two.version=2.7.3 -Daws.sdk.version=1.11.113 -Daws.kinesis-kcl.version=1.7.5 -Daws.kinesis-kpl.version=0.12.3

After you have obtained the Flink Amazon Kinesis connector, you can import the respective .jar file to your local Maven repository:

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.10-1.2.0.jar -DpomFile=flink-connector-kinesis_2.10-1.2.0.pom.xml

Adapting the Amazon Kinesis consumer configuration

Flink recently introduced support for obtaining AWS credentials from the role that is associated with an EMR cluster. Enable this functionality in the Flink application source code by setting the AWS_CREDENTIALS_PROVIDER property to AUTO and by omitting any AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY parameters from the Properties object.

Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

Credentials are automatically retrieved from the instance’s metadata and there is no need to store long-term credentials in the source code of the Flink application or on the EMR cluster.

As the producer application ingests thousands of events per second into the stream, it helps to increase the number of records fetched by Flink in a single GetRecords call. Change this value to the maximum value that is supported by Amazon Kinesis.

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10000");

Enabling event time processing by submitting watermarks to Amazon Kinesis

Flink supports several notions of time, most notably event time. Event time is desirable for streaming applications as it results in very stable semantics of queries. The time of events is determined by the producer or close to the producer. The reordering of events due to network effects has substantially less impact on query results.

To realize event time, Flink relies on watermarks that are sent by the producer in regular intervals to signal the current time at the source to the Flink runtime. When integrating with Amazon Kinesis Streams, there are two different ways of supplying watermarks to Flink:

  • Manually adding watermarks to the stream
  • Relying on ApproximalArrivalTime, which is automatically added to events on their ingestion to a stream

By just setting the time model to event time on an Amazon Kinesis stream, Flink automatically uses the ApproximalArrivalTime value supplied by Amazon Kinesis.

StreamExecutionEnvironment env = StreamExecutionEnviron-ment.getExecutionEnvironment(); 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Alternatively, you can choose to use the time that is determined by the producer by specifying a custom Timestamp Assigner operator that extracts the watermark information from the corresponding events of the stream.

DataStream<Event> kinesis = env
	.addSource(new FlinkKinesisConsumer<>(...))
	.assignTimestampsAndWatermarks(new PunctuatedAssigner())

If you rely on PunctuatedAssigner, it is important to ingest watermarks to all individual shards, as Flink processes each shard of a stream individually. This can be realized by enumerating the shards of a stream. Ingest watermarks to specific shards by explicitly setting the hash key to the hash range of the shard to which the watermark should be sent.

The producer that is ingesting the taxi trips into Amazon Kinesis uses the latter approach. You can explore the details of the implementation in the flink-stream-processing-refarch AWSLabs GitHub repository.

Connecting Flink to Amazon OpenSearch Service

Flink provides several connectors for Elasticsearch. However, all these connectors merely support the TCP transport protocol of Elasticsearch, whereas Amazon OpenSearch Service relies on the HTTP protocol. As of Elasticsearch 5, the TCP transport protocol is deprecated. While an Elasticsearch connector for Flink that supports the HTTP protocol is still in the works, you can use the Jest library to build a custom sink able to connect to Amazon OpenSearch Service. The sink should be capable of signing requests with IAM credentials.

For the full implementation details of the Elasticsearch sink, see the flink-taxi-stream-processor AWSLabs GitHub repository, which contains the source code of the Flink application.

Summary

This post discussed how to build a consistent, scalable, and reliable stream processing architecture based on Apache Flink. It illustrates how to leverage managed services to reduce the expertise and operational effort that is usually required to build and maintain a low latency and high throughput stream processing pipeline, so that you can focus your expertise on providing business value.

Start using Apache Flink on Amazon EMR today. The AWSLabs GitHub repository contains the resources that are required to run through the given example and includes further information that helps you to get started quickly.

If you have questions or suggestions, please comment below.


About the Author

Dr. Steffen Hausmann is a Solutions Architect with Amazon Web Services. He has a strong background in the area of complex event and stream processing and supports customers on their cloud journey. In his spare time, he likes hiking in the nearby mountains.


Related

Use Apache Flink on Amazon EMR