AWS Big Data Blog

Improve the resilience of Amazon Managed Service for Apache Flink application with system-rollback feature

“Everything fails all the time” – Werner Vogels, CTO Amazon

Although customers always take precautionary measures when they build applications, application code and configuration errors can still happen, causing application downtime. To mitigate this, Amazon Managed Service for Apache Flink has built a new layer of resilience by allowing customers to opt for the system-rollback feature that will seamlessly revert the application to a previous running version, thereby improving application stability and high availability.

Apache Flink is an open source distributed processing engine that offers powerful programming interfaces for stream and batch processing. It also offers first-class support for stateful processing and event time semantics. Apache Flink supports multiple programming languages, including Java, Python, Scala, SQL, and multiple APIs with different levels of abstraction. These APIs can be used interchangeably in the same application.

Managed Service for Apache Flink is a fully managed, serverless experience in running Apache Flink applications, and it now supports Apache Flink 1.19.1, the latest released version of Apache Flink at the time of this writing.

This post explores how to use the system-rollback feature in Managed Service for Apache Flink.We discuss how this functionality improves your application’s resilience by providing a highly available Flink application. Through an example, you will also learn how to use the APIs to have more visibility of the application’s operations. This would help in troubleshooting application and configuration issues.

Error scenarios for system-rollback

Managed Service for Apache Flink operates under a shared responsibility model. This means the service owns the infrastructure to run Flink applications that are secure, durable, and highly available. Customers are responsible for making sure application code and configurations are correct. There have been cases where updating the Flink application failed due to code bugs, incorrect configuration, or insufficient permissions. Here are a few examples of common error scenarios:

  1. Code bugs, including any runtime errors encountered. For example, null values are not appropriately handled in the code, resulting in NullPointerException
  2. The Flink application is updated with parallelism higher than the max parallelism configured for the application.
  3. The application is updated to run with incorrect subnets for a virtual private cloud (VPC) application which results in failure at Flink job startup.

As of this writing, the Managed Service for Apache Flink application still shows a RUNNING status when such errors occur, despite the fact that the underlying Flink application cannot process the incoming events and recover from the errors.

Errors can also happen during application auto scaling. For example, when the application scales up but runs into issues restoring from a savepoint due to operator mismatch between the snapshot and the Flink job graph. This can happen if you failed to set the operator ID using the uid method or changed it in a new application.

You may also receive a snapshot compatibility error when upgrading to a new Apache Flink version. Although stateful version upgrades of Apache Flink runtime are generally compatible with very few exceptions, you can refer to the Apache Flink state compatibility table and Managed Service for Apache Flink documentation for more details.

In such scenarios, you can either perform a force-stop operation, which stops the application without taking a snapshot, or you can roll back the application to the previous version using the RollbackApplication API. Both processes need customer intervention to recover from the issue.

Automatic rollback to the previous application version

With the system-rollback feature, Managed Service for Apache Flink will perform an automatic RollbackApplication operation to restore the application to the previous version when an update operation or a scaling operation fails and you encounter the error scenarios discussed previously.

If the rollback is successful, the Flink application is restored to the previous application version with the latest snapshot. The Flink application is put into a RUNNING state and continues processing events. This process results in high availability of the Flink application with improved resilience under minimal downtime. If the system-rollback fails, the Flink application will be in a READY state. If this is the case, you need to fix the error and restart the application.

However, if a Managed Service for Apache Flink application is started with application or configuration issues, the service will not start the application. Instead, it will return in the READY state. This is a default behavior regardless of whether system-rollback is enabled or not.

System-rollback is performed before the application transitions to RUNNING status. Automatic rollback will not be performed if a Managed Service for Apache Flink application has already successfully transitioned to RUNNING status and later faces runtime issues such as checkpoint failures or job failures. However, customers can trigger the RollbackApplication API themselves if they want to roll back on runtime errors.

Here is the state transition flowchart of system-rollback.

Amazon Managed Service for Apache Flink State Transition

System-rollback is an opt-in feature that needs you to enable it using the console or the API. To enable it using the API, invoke the UpdateApplication API with the following configuration. This feature is available to all Apache Flink versions supported by Managed Service for Apache Flink.

Each Managed Service for Apache Flink application has a version ID, which tracks the application code and configuration for that specific version. You can get the current application version ID from the AWS console of the Managed Service for Apache Flink application.

aws kinesisanalyticsv2 update-application \
	--application-name sample-app-system-rollback-test \
	--current-application-version-id 5 \
	--application-configuration-update "{\"ApplicationSystemRollbackConfigurationUpdate\": {\"RollbackEnabledUpdate\": true}}" \
	--region us-west-1

Application operations observability

Observability of the application versions change is of utmost importance because Flink applications can be rolled back seamlessly from newly upgraded versions to previous versions in the event of application and configuration errors. First, visibility of the version history will provide chronological information about the operations performed on the application. Second, it will help with debugging because it shows the underlying error and why the application was rolled back. This is so that the issues can be fixed and retried.

For this, you have two additional APIs to invoke from the AWS Command Line Interface (AWS CLI):

  1. ListApplicationOperations – This API will list all the operations, such as UpdateApplication, ApplicationMaintenance, and RollbackApplication, performed on the application in a reverse chronological order.
  2. DescribeApplicationOperation – This API will provide details of a specific operation listed by the ListApplicationOperations API including the failure details.

Although these two new APIs can help you understand the error, you should also refer to the AWS CloudWatch logs for your Flink application for troubleshooting help. In the logs, you can find additional details, including the stack trace. Once you identify the issue, fix it and update the Flink application.

For troubleshooting information, refer to documentation .

System-rollback process flow

The following image shows a Managed Service for Apache Flink application in RUNNING state with Version ID: 3. The application is consuming data successfully from the Amazon Kinesis Data Stream source, processing it, and writing it into another Kinesis Data Stream sink.

Also, from the Apache Flink Dashboard, you can see the Status of the Flink application is RUNNING.

To demonstrate the system-rollback, we updated the application code to intentionally introduce an error. From the application main method, an exception is thrown, as shown in the following code.

throw new Exception("Exception thrown to demonstrate system-rollback");

While updating the application with the latest jar, the Version ID is incremented to 4, and the application Status shows it is UPDATING, as shown in the following screenshot.

After some time, the application rolls back to the previous version, Version ID: 3, as shown in the following screenshot.

The application now has successfully gone back to version 3 and continues to process events, as shown by Status RUNNING in the following screenshot.

To troubleshoot what went wrong in version 4, list all the application versions for the Managed Service for Apache Flink application: sample-app-system-rollback-test.

aws kinesisanalyticsv2 list-application-operations \
    --application-name sample-app-system-rollback-test \
    --region us-west-1

This shows the list of operations done on Flink application: sample-app-system-rollback-test

{
  "ApplicationOperationInfoList": [
    {
      "Operation": "SystemRollbackApplication",
      "OperationId": "Z4mg9iXiXXXX",
      "StartTime": "2024-06-20T16:52:13+01:00",
      "EndTime": "2024-06-20T16:54:49+01:00",
      "OperationStatus": "SUCCESSFUL"
    },
    {
      "Operation": "UpdateApplication",
      "OperationId": "zIxXBZfQXXXX",
      "StartTime": "2024-06-20T16:50:04+01:00",
      "EndTime": "2024-06-20T16:52:13+01:00",
      "OperationStatus": "FAILED"
    },
    {
      "Operation": "StartApplication",
      "OperationId": "BPyrMrrlXXXX",
      "StartTime": "2024-06-20T15:26:03+01:00",
      "EndTime": "2024-06-20T15:28:05+01:00",
      "OperationStatus": "SUCCESSFUL"
    }
  ]
}

Review the details of the UpdateApplication operation and note the OperationId. If you use the AWS CLI and APIs to update the application, then the OperationId can be obtained from the UpdateApplication API response. To investigate what went wrong, you can use OperationId to invoke describe-application-operation.

Use the following command to invoke describe-application-operation.

aws kinesisanalyticsv2 describe-application-operation \
    --application-name sample-app-system-rollback-test \
    --operation-id zIxXBZfQXXXX \
    --region us-west-1

This will show the details of the operation, including the error.

{
    "ApplicationOperationInfoDetails": {
        "Operation": "UpdateApplication",
        "StartTime": "2024-06-20T16:50:04+01:00",
        "EndTime": "2024-06-20T16:52:13+01:00",
        "OperationStatus": "FAILED",
        "ApplicationVersionChangeDetails": {
            "ApplicationVersionUpdatedFrom": 3,
            "ApplicationVersionUpdatedTo": 4
        },
        "OperationFailureDetails": {
            "RollbackOperationId": "Z4mg9iXiXXXX",
            "ErrorInfo": {
                "ErrorString": "org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:248)\n\tat java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)\n\tat java.ba"
            }
        }
    }
}

Review the CloudWatch logs for the actual error information. The following code shows the same error with the complete stack trace, which demonstrates the underlying problem.

Amazon Managed Service for Apache Flink failed to transition the application to the desired state. The application is being rolled-back to the previous state. Please investigate the following error. org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:248)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
...
...
...
Caused by: java.lang.Exception: Exception thrown to demonstrate system-rollback
at com.amazonaws.services.msf.StreamingJob.main(StreamingJob.java:101)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 12 more

Finally, you need to fix the issue and redeploy the Flink application.

Conclusion

This post has explained how to enable the system-rollback feature and how it helps to minimize application downtime in bad deployment scenarios. Moreover, we have explained how this feature will work, as well as how to troubleshoot underlying problems. We hope you found this post helpful and that it provided insight into how to improve the resilience and availability of your Flink application. We encourage you to enable the feature to improve resilience of your Managed Service for Apache Flink application.

To learn more about system-rollback, refer to the AWS documentation.


About the author

Subham Rakshit is a Senior Streaming Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build streaming architectures so they can get value from analyzing their streaming data. His two little daughters keep him occupied most of the time outside work, and he loves solving jigsaw puzzles with them. Connect with him on LinkedIn.