AWS Big Data Blog
Deep dive into the Amazon Managed Service for Apache Fink application lifecycle – Part 1
Apache Flink is an open source framework for stream and batch processing applications. It excels in handling real-time analytics, event-driven applications, and complex data processing with low latency and high throughput. Flink is designed for stateful computation with exactly-once consistency guarantees for the application state.
Amazon Managed Service for Apache Flink is a fully managed stream processing service that you can use to run Apache Flink jobs at scale without worrying about managing clusters and provisioning resources. You can focus on implementing your application using your integrated development environment (IDE) of choice, and build and package the application using standard build and continuous integration and delivery (CI/CD) tools.
With Managed Service for Apache Flink, you can control the application lifecycle through simple AWS API actions. You can use the API to start and stop the application, and to apply any changes to the code, runtime configuration, and scale. The service takes care of managing the underlying Flink cluster, giving you a serverless experience. You can implement automation such as CI/CD pipelines with tools that can interact with the AWS API or AWS Command Line Interface (AWS CLI).
You can control the application using the AWS Management Console, AWS CLI, AWS SDK, and tools using the AWS API, such as AWS CloudFormation or Terraform. The service is not prescriptive on the automation tool you use to deploy and orchestrate the application.
Paraphrasing Jackie Stewart, the famous racing driver, you don’t need to understand how to operate a Flink cluster to use Managed Service for Apache Flink, but some Mechanical Sympathy will help you implement a robust and reliable automation.
In this two-part series, we explore what happens during an application’s lifecycle. This post covers core concepts and the application workflow during normal operations. In Part 2, we look at potential failures, how to detect them through monitoring, and ways to quickly resolve issues when they occur.
Definitions
Before examining the application lifecycle steps, we need to clarify the usage of certain terms in the context of Managed Service for Apache Flink:
- Application – The main resource you create, control, and run in Managed Service for Apache Flink is an application.
- Application code package – For each Managed Service for Apache Flink application, you implement the application code package (application artifact) of the Flink application code you want to run. This code is compiled and packaged along with dependencies into a JAR or a ZIP file, that you upload to an Amazon Simple Storage Service (Amazon S3) bucket.
- Configuration – Each application has a configuration that contains the information to run it. The configuration points to the application code package in the S3 bucket and defines the parallelism, which will also determine the application resources, in terms of KPUs. It also defines security, networking, and runtime properties, which are passed to your application code at runtime.
- Job – When you start the application, Managed Service for Apache Flink creates a dedicated cluster for you and runs your application code as a Flink job.
The following diagram shows the relationship between these concepts.
There are two additional important concepts: checkpoints and savepoints, the mechanisms Flink uses to guarantee state consistency across failures and operations. In Managed Service for Apache Flink, both checkpoints and savepoints are fully managed.
- Checkpoints – These are controlled by the application configuration and enabled by default with a period of 1 minute. In Managed Service for Apache Flink, checkpoints are used when a job automatically restarts after a runtime failure. They are not durable and are deleted when the application is stopped or updated and when the application automatically scales.
- Savepoints – These are called snapshots in Managed Service for Apache Flink, and are used to persist the application state when the application is deliberately restarted by the user, due to an update or an automatic scaling event. Snapshots can be triggered by the user. Snapshots (if enabled) are also automatically used to save and restore the application state when the application is stopped and restarted, for example to deploy a change or automatically scale. Automatic use of snapshots is enabled in the application configuration (enabled by default when you create an application using the console).
Lifecycle of an application in Managed Service for Apache Flink
Starting with the happy path, a typical lifecycle of a Managed Service for Apache Flink application comprises the following steps:
- Create and configure a new application.
- Start the application.
- Deploy a change (update the runtime configuration, update the application code, change the parallelism to scale up or down).
- Stop the application.
Starting, stopping, and updating the application use snapshots (if enabled) to retain application state consistency during operations. We recommend enabling snapshots on every production and staging application, to support the persistence of the application state across operations.
In Managed Service for Apache Flink, the application lifecycle is controlled through the console, API actions in the kinesisanalyticsv2 API, or equivalent actions in the AWS CLI and SDK. On top of these fundamental operations, you can build your own automation using different tools, directly using low-level actions or using higher level infrastructure-as-code (IaC) tooling such as AWS CloudFormation or Terraform.
In this post, we refer to the low-level API actions used at each step. Any higher-level IaC tooling will use combination of these operations. Understanding these operations is fundamental to designing a robust automation.
The following diagram summarizes the application lifecycle, showing typical operations and application statuses.
The status of your application, READY
, STARTING
, RUNNING
, UPDATING
, and so on, can be observed on the console and using the DescribeApplication API action.
In the following sections, we analyze each lifecycle operation in more detail.
Create and configure the application
The first step is creating a new Managed Service for Apache Flink application, including defining the application configuration. You can do this in a single step using the CreateApplication action, or by creating the basic application configuration and then updating the configuration before starting it using UpdateApplication. The latter approach is what you do when you create an application from the console.
In this phase, the developer packages the application they have implemented in a JAR file (for Java) or ZIP file (for Python) and uploads it to an S3 bucket the user has previously created. The bucket name and the path to the application code package are part of the configuration you define.
When UpdateApplication or CreateApplication is invoked, Managed Service for Apache Flink takes a copy of the application code package (JAR or ZIP file) referred by the configuration. The configuration is rejected if the file pointed by the configuration doesn’t exist.
The following diagram illustrates this workflow.
Simply updating the application code package in the S3 bucket doesn’t trigger an update. You need to run UpdateApplication to make the new file visible to the service and trigger the update, even when you overwrite the code package with the same name.
Start the application
Managed Service for Apache Flink provisions resources when the application is actually running, and you only pay for the resources of running applications. You explicitly control when to start the application by issuing a StartApplication.
Managed Service for Apache Flink indexes on high availability and runs your application in a dedicated Flink cluster. When you start the application, Managed Service for Apache Flink deploys a dedicated cluster and deploys and runs the Flink job based on the configuration you defined.
When you start the application, the status of the application moves from READY
, to STARTING
, and then RUNNING
.
The following diagram illustrates this workflow.
Managed Service for Apache Flink supports both streaming mode, the default for Apache Flink, and batch mode:
- Streaming mode – In streaming mode, after an application is successfully started and goes into
RUNNING
status, it keeps running until you stop it explicitly. From this point on, the behavior on failure is automatically restarting the job from the latest checkpoint, so there is no data loss. We discuss more details about this failure scenario later in this post. - Batch mode – A Flink application running in batch mode behaves differently. After you start it, it goes into
RUNNING
status, and the job continues running until it completes the processing. At that point the job will gracefully stop, and the Managed Service for Apache Flink application goes back toREADY
status.
This post focuses on streaming applications only.
Update the application
In Managed Service for Apache Flink, you handle the following changes by updating the application configuration, using the console or the UpdateApplication API action:
- Application code changes, replacing the package (JAR or ZIP file) with one containing a new version
- Runtime properties changes
- Scaling, which implies changing parallelism and resources (KPU) changes
- Operational parameter changes, such as checkpoint, logging level, and monitoring setup
- Networking configuration changes
When you modify the application configuration, Managed Service for Apache Flink creates a new configuration version, identified by a version ID number, automatically incremented at every change.
Update the code package
We mentioned how the service takes a copy of the code package (JAR or ZIP file) when you update the application configuration. The copy is associated with the new application configuration version that has been created. The service uses its own copy of the code package to start the application. You can safely replace or delete the code package after you have updated the configuration. The new package is not taken into account until you update the application configuration again.
Update a READY (not running) application
If you update an application in READY
status, nothing special happens beyond creating the new configuration version that will be used the next time you start the application. However, in production, you will normally update the configuration of an application in RUNNING
status to apply a change. Managed Service for Apache Flink automatically handles the operations required to update the application with no data loss.
Update a RUNNING application
To understand what happens when you update a running application, you need to remember that Flink is designed for strong consistency and exactly-once state consistency. To maintain these features when a change is applied, Flink must stop the data processing, take a copy of the application state, restart the job with the changes, and restore the state, before processing can restart.
This is a standard Flink behavior, and applies to any changes, whether it’s code changes, runtime configuration changes, or new parallelism to scale up and down. Managed Service for Apache Flink automatically orchestrates this process for you. If snapshots are enabled, the service will take a snapshot before stopping the processing and restart from the snapshot when the change is deployed. This way, the change can be deployed with zero data loss.
If snapshots are disabled, the service restarts the job with the change, but the state will be empty, like the first time you started the application. This might cause data loss. You normally don’t want this to happen, particularly in production applications.
Let’s explore a practical example, illustrated by the following diagram. For instance, when you want to deploy a code change, the following steps typically happen (in this example, we assume that snapshots are enabled, which they should be in a production application):
- Make changes to the application code.
- The build process creates the application package (JAR or ZIP file), either manually or using CI/CD automation.
- Upload the new application package to an S3 bucket.
- Update the application configuration pointing to the new application package.
- As soon as you successfully update the configuration, Managed Service for Apache Flink starts the operation for updating the application. The application status changes to
UPDATING
. The Flink job is stopped, taking a snapshot of the application state. - After the changes have been applied, the application is restarted using the new configuration, which in this case includes the new application code, and the job restores the state from the snapshot. When the process is complete, the application status goes back to
RUNNING
.
The process is similar for changes to the application configuration. For example, you can change the parallelism to scale the application updating the application configuration, causing the application to be redeployed with the new parallelism and the amount resources (CPU, memory, local storage) based on the new number of KPU.
Update the application’s IAM role
The application configuration contains a reference to an AWS Identity and Access Management (IAM) role. In the unlikely case you want to use a different role, you can update the application configuration using UpdateApplication. The process will be the same described earlier.
However, you usually want to modify the IAM role, to add or remove permissions. This operation doesn’t use the Managed Service for Apache Flink application lifecycle and can be done at any time. No application stop and restart is required. IAM changes take effect immediately, potentially inducing a failure if, for example, you inadvertently remove a required permission. In this case, the behavior of the Flink job’s response might vary, depending on the affected component.
Stop the application
You can stop a running Managed Service for Apache Flink application using the StopApplication action or the console. The service gracefully stops the application. The state turns from RUNNING
, into STOPPING
, and finally into READY
.
When snapshots are enabled, the service will take a snapshot of the application state when it is stopped, as shown in the following diagram.
After you stop the application, any resource previously provisioned to run your application is reclaimed. You incur no cost while the application is not running (READY
).
Start the application from a snapshot
Sometimes, you might want to stop a production application and restart it later, restarting the processing from the point it was stopped. Managed Service for Apache Flink supports starting the application from a snapshot. The snapshot saves not only the application state, but also the point in the source—the offsets in a Kafka topic, for example—where the application stopped consuming.
When snapshots are enabled, Managed Service for Apache Flink automatically takes a snapshot when you stop the application. This snapshot can be used when you restart the application.
The StartApplication API command has three restore options:
RESTORE_FROM_LATEST_SNAPSHOT
: Restore from the latest snapshot.RESTORE_FROM_CUSTOM_SNAPSHOT
: Restore from a custom snapshot (you need to specify which one).SKIP_RESTORE_FROM_SNAPSHOT
: Skip restoring from the snapshot. The application will start with no state, as the very first time you ran it.
When you start the application for the very first time, no snapshot is available yet. Regardless of the restore option you choose, the application will start with no snapshot.
The process of starting the application from a snapshot is visualized in the following diagram.
In production, you normally want to restore from the latest snapshot (RESTORE_FROM_LATEST_SNAPSHOT
). This will automatically use the snapshot the service created when you last stopped the application.
Snapshots are based on Flink’s savepoint mechanism and maintain the exactly-once consistency of the internal state. Also, the risk of reprocessing duplicate records from the source is minimized because the snapshot is taken synchronously while the Flink job is stopped.
Start the application from an older snapshot
In Managed Service for Apache Flink, you can schedule taking periodic snapshots of a running production application, for example using the Snapshot Manager. Taking a snapshot from a running application doesn’t stop the processing and only introduces a minimal overhead (comparable to checkpointing). With the second option, RESTORE_FROM_CUSTOM_SNAPSHOT
, you can restart the application back in time, using a snapshot older than the one taken on the last StopApplication.
Because the source positions—for example, the offsets in a Kafka topic—are also restored with the snapshot, the application will revert to the point the application was processing when the snapshot was taken. This will also restore the state at that exact point, providing consistency.
When you start an application from an older snapshot, there are two important considerations:
- Only restore snapshots taken within the source system retention period – If you restore a snapshot older than the source retention, data loss might occur, and the application behavior is unpredictable.
- Restarting from an older snapshot will likely generate duplicate output – This is often not a problem when the end-to-end system is designed to be idempotent. However, this might cause problems if you are using a Flink transactional connector, such as File System sink or Kafka sink with exactly-once guarantees enabled. Because these sinks are designed to guarantee no duplicates (preventing them at any cost), they might prevent your application from restarting from an older snapshot. There are workarounds to this operational problem, but they depend on the specific use case and are beyond the scope of this post.
Understanding what happens when you start your application
We have learned the fundamental operations in the lifecycle of an application. In Managed Service for Apache Flink, these operations are controlled by a few API actions, such as StartApplication, UpdateApplication, and StopApplication. The service controls every operation for you. You don’t have to provision or manage Flink clusters. However, a better understanding of what happens during the lifecycle will give you sufficient Mechanical Sympathy to recognize potential failure modes and implement a more robust automation.
Let’s see in detail what happens when you issue a StartApplication command on an application in READY
(not running). When you issue an UpdateApplication command on a RUNNING
application, the application is first stopped with a snapshot, and then restarted with the new configuration, with a process identical to what we are going to see.
Composition of a Flink cluster
To understand what happens when you start the application, we need to introduce a couple of additional concepts. A Flink cluster is comprised of two types of nodes:
- A single Job Manager, which acts as a coordinator
- One or more Task Managers, which do the actual data processing
In Managed Service for Apache Flink, you can see the cluster nodes in the Flink Dashboard, which you can access from the console.
Flink decomposes the data processing defined by your application code into one or more subtasks, which are distributed across the Task Manager nodes, as illustrated in the following diagram.
Remember, in Managed Service for Apache Flink, you don’t need to worry about provisioning and configuring the cluster. The service provides a dedicated cluster for your application. The total amount of vCPU, memory, and local storage of Task Managers matches the number of KPU you configured.
Starting your Managed Service for Apache Flink application
Now that we’ve discussed how a Flink cluster is composed, let’s explore what happens when you issue a StartApplication command, or when the application restarts after a change has been deployed with an UpdateApplication command.
The following diagram illustrates the process. Everything is carried out automatically for you.
The workflow consists of the following steps:
- A dedicated cluster, with the amount of resources you requested, based on the number of KPU, is provisioned for your application.
- The application code, runtime properties, and other configurations such as the application parallelism are passed to the Job Manager node, the coordinator of the cluster.
- The Java or Python code in the
main()
method of your application is executed. This generates the logical graph of operators of your application (called dataflow). Based on the dataflow you defined and the application parallelism, Flink generates the subtasks, the actual nodes Flink will execute to process your data. - Flink then distributes the job’s subtasks across Task Managers, the actual worker nodes of the cluster.
- When the previous step succeeds, the Flink job status and the Managed Service for Apache Flink application status change to
RUNNING
. However, the job is still not completely running and processing data. All substasks must be initialized. - Each subtask independently restores its state, if starting from a snapshot, and initializes runtime resources. For example, Flink’s Kafka source connector restores the partition assignments and offsets from the savepoint (snapshot), establishes a connection to the Kafka cluster, and subscribes to the Kafka topic. From this step onward, a Flink job will stop and restart from its last checkpoint when encountering any unhandled error. If the problem causing the error is not transient, the job keeps stopping and restarting from the same checkpoint in a loop.
- When all subtasks are successfully initialized and change to
RUNNING
status, the Flink job starts processing data and is now properly running.
Conclusion
In this post, we discussed how the lifecycle of a Managed Service for Apache Flink application is controlled by simple AWS API commands, or the equivalent using the AWS SDK or AWS CLI. If you are using high-level automation tools such as AWS CloudFormation or Terraform, the low-level actions are also abstracted away for you. The service handles the complexity of operating the Flink cluster and orchestrating the Flink job lifecycle.
However, with a better understanding of how Flink works and what the service does for you, you can implement more robust automation and troubleshoot failures.
In the Part 2, we continue examining failure scenarios that can happen during normal operations or when you deploy a change or scale the application, and how to monitor operations to detect and recover when something goes wrong.