AWS Big Data Blog

Amazon Managed Service for Apache Flink now supports Apache Flink version 1.19

Apache Flink is an open source distributed processing engine, offering powerful programming interfaces for both stream and batch processing, with first-class support for stateful processing and event time semantics. Apache Flink supports multiple programming languages, Java, Python, Scala, SQL, and multiple APIs with different level of abstraction, which can be used interchangeably in the same application.

Amazon Managed Service for Apache Flink offers a fully managed, serverless experience in running Apache Flink applications and now supports Apache Flink 1.19.1, the latest stable version of Apache Flink at the time of writing. AWS led the community release of the version 1.19.1, which introduces a number of bug fixes over version 1.19.0, released in March 2024.

In this post, we discuss some of the interesting new features and configuration changes available for Managed Service for Apache Flink introduced with this new release. In every Apache Flink release, there are exciting new experimental features. However, in this post, we are going to focus on the features most accessible to the user with this release.

Connectors

With the release of version 1.19.1, the Apache Flink community also released new connector versions for the 1.19 runtime. Starting from 1.16, Apache Flink introduced a new connector version numbering, following the pattern <connector-version>-<flink-version>. It’s recommended to use connectors for the runtime version you are using. Refer to Using Apache Flink connectors to stay updated on any future changes regarding connector versions and compatibility.

SQL

Apache Flink 1.19 brings new features and improvements, particularly in the SQL API. These enhancements are designed to provide more flexibility, better performance, and ease of use for developers working with Flink’s SQL API. In this section, we delve into some of the most notable SQL enhancements introduced in this release.

State TTL per operator

Configuring state TTL at the operator level was introduced in Apache Flink 1.18 but wasn’t easily accessible to the end-user. To modify an operator TTL, you had to export the plan at development time, modify it manually, and force Apache Flink to use the edited plan instead of generating a new one when the application starts. The new features added to Flink SQL in 1.19 simplify this process by allowing TTL configurations directly through SQL hints, eliminating the need for JSON plan manipulation.

The following code shows examples of how to use SQL hints to set state TTL:

-- State TTL for Joins
SELECT /*+ STATE_TTL('Orders' = '1d', 'Customers' = '20d') */ 
  *
FROM Orders 
LEFT OUTER JOIN Customers 
  ON Orders.o_custkey = Customers.c_custkey;

-- State TTL for Aggregations
SELECT /*+ STATE_TTL('o' = '1d') */ 
  o_orderkey, SUM(o_totalprice) AS revenue 
FROM Orders AS o 
GROUP BY o_orderkey;

Session window table-valued functions

Windows are at the heart of processing infinite streams in Apache Flink, splitting the stream into finite buckets for computations. Before 1.19, Apache Flink provided the following types of window table-value functions (TVFs):

  • Tumble windows – Fixed-size, non-overlapping windows
  • Hop windows – Fixed-size, overlapping windows with a specified hop interval
  • Cumulate windows – Increasingly larger windows that start at the same point but grow over time

With the Apache Flink 1.19 release, it has enhanced its SQL capabilities by supporting session window TVFs in streaming mode, allowing for more sophisticated and flexible windowing operations directly within SQL queries. Applications can create dynamic windows that group elements based on session gaps, now supported in streaming mode. The following code shows an example:

-- Session window with partition keys
SELECT 
  * 
FROM TABLE(
  SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES));

-- Apply aggregation on the session windowed table with partition keys
SELECT 
  window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
  SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;

Mini-batch optimization for regular joins

When using the Table API or SQL, regular joins—standard equi-joins like a table SQL join, where time is not a factor—may induce a considerable overhead for the state backend, especially when using RocksDB.

Normally, Apache Flink processes standard joins one record at a time, looking up the state for a matching record in the other side of the join, updating the state with the input record, and emitting the resulting record. This may add considerable pressure on RocksDB, with multiple reads and writes for each record.

Apache Flink 1.19 introduces the ability to use mini-batch processing with equi-joins (FLIP-415). When enabled, Apache Flink will process regular joins not one record at a time, but in small batches, substantially reducing the pressure on the RocksDB state backend. Mini-batching adds some latency, which is controllable by the user. See, for example, the following SQL code (embedded in Java):

TableConfig tableConfig = tableEnv.getConfig();
tableConfig.set("table.exec.mini-batch.enabled", "true");
tableConfig.set("table.exec.mini-batch.allow-latency", "5s");
tableConfig.set("table.exec.mini-batch.size", "5000");

tableEnv.executeSql("CREATE TEMPORARY VIEW ab AS " +
  "SELECT a.id as a_id, a.a_content, b.id as b_id, b.b_content " +
  "FROM a LEFT JOIN b ON a.id = b.id";

With this configuration, Apache Flink will buffer up to 5,000 records or up to 5 seconds, whichever comes first, before processing the join for the entire mini-batch.

In Apache Flink 1.19, mini-batching only works for regular joins, not windowed or temporal joins. Mini-batching is disabled by default, and you have to explicitly enable it and set the batch size and latency for Flink to use it. Also, mini-batch settings are global, applied to all regular join of your application. At the time of writing, it’s not possible to set mini-batching per join statement.

AsyncScalarFunction

Before version 1.19, an important limitation of SQL and the Table API, compared to the Java DataStream API, was the lack of asynchronous I/O support. Any request to an external system, for example a database or a REST API, or even any AWS API call, using the AWS SDK, is synchronous and blocking. An Apache Flink’s subtask waits for the response before completing the processing of a record and proceeding to the next one. Practically, the roundtrip latency of each request was added to the processing latency for each processed record. Apache Flink’s Async I/O API removes this limitation, but it’s only available for the DataStream API and Java. Until version 1.19, there was no simple efficient workaround in SQL, the Table API, or Python.

Apache Flink 1.19 introduces the new AsyncScalarFunction, a user-defined function (UDF) that can be implemented using non-blocking calls to the external system, to support use cases similar to asynchronous I/O in SQL and the Table API.

This new type of UDF is only available in streaming mode. At the moment, it only supports ordered output. DataStream Async I/O also supports unordered output, which may further reduce latency when strict ordering isn’t required.

Python 3.11 support

Python 3.11 is now supported, and Python 3.7 support has been completely removed (FLINK-33029). Managed Service for Apache Flink currently uses the Python 3.11 runtime to run PyFlink applications. Python 3.11 is a bugfix only version of the runtime. Python 3.11 introduced several performance improvements and bug fixes, but no API breaking changes.

Performance improvements: Dynamic checkpoint interval

In the latest release of Apache Flink 1.19, significant enhancements have been made to improve checkpoint behavior. With this new release, it gives the application the capability to adjust checkpointing intervals dynamically based on whether the source is processing backlog data (FLIP-309).

In Apache Flink 1.19, you can now specify different checkpointing intervals based on whether a source operator is processing backlog data. This flexibility optimizes job performance by reducing checkpoint frequency during backlog phases, enhancing overall throughput. Extending checkpoint intervals allows Apache Flink to prioritize processing throughput over frequent state snapshots, thereby improving efficiency and performance.

To enable it, you need to define the execution.checkpointing.interval parameter for regular intervals and execution.checkpointing.interval-during-backlog to specify a longer interval when sources report processing backlog.

For example, if you want to run checkpoints every 60 seconds during normal processing, but extend to 10 minutes during the processing of backlogs, you can set the following:

  • execution.checkpointing.interval = 60s
  • execution.checkpointing.interval-during-backlog = 10m

In Amazon Managed Service for Apache Flink, the default checkpointing interval is configured by the application configuration (60 seconds by default). You don’t need to set the configuration parameter. To set a longer checkpointing interval during backlog processing, you can raise a support case to modify execution.checkpointing.interval-during-backlog. See Modifiable Flink configuration properties for further details about modifying Apache Flink configurations.

At the time of writing, dynamic checkpointing intervals are only supported by Apache Kafka source and FileSystem source connectors. If you use any other source connector, intervals during backlog are ignored, and Apache Flink runs a checkpoint at the default interval during backlog processing.

In Apache Flink, checkpoints are always injected in the flow from the sources. This feature only involves source connectors. The sink connectors you use in your application don’t affect this feature. For a deep dive into the Apache Flink checkpoint mechanism, see Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints.

More troubleshooting information: Job initialization and checkpoint traces

With FLIP-384, Apache Flink 1.19 introduces trace reporters, which show checkpointing and job initialization traces. As of 1.19, this trace information can be sent to the logs using Slf4j. In Managed Service for Apache Flink, this is now enabled by default. You can find checkpoint and job initialization details in Amazon CloudWatch Logs, with the other logs from the application.

Checkpoint traces contain valuable information about each checkpoint. You can find similar information on the Apache Flink Dashboard, but only for the latest checkpoints and only while the application is running. Conversely, in the logs, you can find the full history of checkpoints. The following is an example of a checkpoint trace:

SimpleSpan{
  scope=org.apache.flink.runtime.checkpoint.CheckpointStatsTracker, 
  name=Checkpoint, 
  startTsMillis=1718779769305, 
  endTsMillis=1718779769542, 
  attributes={
    jobId=1b418a2404cbcf47ef89071f83f2dff9, 
    checkpointId=9774, 
    checkpointStatus=COMPLETED, 
    fullSize=9585, 
    checkpointedSize=9585
  }
}

Job initialization traces are generated when the job starts and recovers the state from a checkpoint or savepoint. You can find valuable statistics you can’t normally find elsewhere, including the Apache Flink Dashboard. The following is an example of a job initialization trace:

SimpleSpan{
  scope=org.apache.flink.runtime.checkpoint.CheckpointStatsTracker,
  name=JobInitialization,
  startTsMillis=1718781201463,
  endTsMillis=1718781409657,
  attributes={
    maxReadOutputDataDurationMs=89,
    initializationStatus=COMPLETED,
    fullSize=26167879378,
    sumMailboxStartDurationMs=621,
    sumGateRestoreDurationMs=29,
    sumDownloadStateDurationMs=199482,
    sumRestoredStateSizeBytes.LOCAL_MEMORY=46764,
    checkpointId=270,
    sumRestoredStateSizeBytes.REMOTE=26167832614,
    maxDownloadStateDurationMs=199482,
    sumReadOutputDataDurationMs=90,
    maxRestoredStateSizeBytes.REMOTE=26167832614,
    maxInitializeStateDurationMs=201122,
    sumInitializeStateDurationMs=201241,
    jobId=8edb291c9f1c91c088db51b48de42308,
    maxGateRestoreDurationMs=22,
    maxMailboxStartDurationMs=391,
    maxRestoredStateSizeBytes.LOCAL_MEMORY=46764
  }
}

Checkpoint and job initialization traces are logged at INFO level. You can find them in CloudWatch Logs only if you configure a logging level of INFO or DEBUG in your Managed Service for Apache Flink application.

Managed Service for Apache Flink behavior change

As a fully managed service, Managed Service for Apache Flink controls some runtime configuration parameters to guarantee the stability of your application. For details about the Apache Flink settings that can be modified, see Apache Flink settings.

With the 1.19 runtime, if you programmatically modify a configuration parameter that is directly controlled by Managed Service for Apache Flink, you receive an explicit ProgramInvocationException when the application starts, explaining what parameter is causing the problem and preventing the application from starting. With runtime 1.18 or earlier, changes to parameters controlled by the managed service were silently ignored.

To learn more about how Managed Service for Apache Flink handles configuration changes in runtime 1.19 or later, refer to FlinkRuntimeException: “Not allowed configuration change(s) were detected”.

Conclusion

In this post, we explored some of the new relevant features and configuration changes introduced with Apache Flink 1.19, now supported by Managed Service for Apache Flink. This latest version brings numerous enhancements aimed at improving performance, flexibility, and usability for developers working with Apache Flink.

With the support of Apache Flink 1.19, Managed Service for Apache Flink now supports the latest released Apache Flink version. We have seen some of the interesting new features available for Flink SQL and PyFlink.

You can find more details about recent releases from the Apache Flink blog and release notes:

If you’re new to Apache Flink, we recommend our guide to choosing the right API and language and following the getting started guide to start using Managed Service for Apache Flink.

If you’re already running an application in Managed Service for Apache Flink, you can safely upgrade it in-place to the new 1.19 runtime.


About the Authors

Francisco Morillo is a Streaming Solutions Architect at AWS, specializing in real-time analytics architectures. With over five years in the streaming data space, Francisco has worked as a data analyst for startups and as a big data engineer for consultancies, building streaming data pipelines. He has deep expertise in Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Managed Service for Apache Flink. Francisco collaborates closely with AWS customers to build scalable streaming data solutions and advanced streaming data lakes, ensuring seamless data processing and real-time insights.

Lorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-centered, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.