AWS Architecture Blog
Temporal data lake architecture for benchmark and indices analytics
Financial trading houses and stock exchanges generate enormous volumes of data in near real-time, making it difficult to perform bi-temporal calculations that yield accurate results. Achieving this requires a processing architecture that can handle large volumes of data during peak bursts, meet strict latency requirements, and scale according to incoming volumes.
In this post, we’ll describe a scenario for an industry leader in the financial services sector and explain how AWS services are used for bi-temporal processing with state management and scale based on variable workloads during the day, all while meeting strict service-level agreement (SLA) requirements.
Problem statement
To design and implement a fully temporal transactional data lake with the repeatable read isolation level for queries is a challenge, particularly with burst events that need the overall architecture to scale accordingly. The data store in the overall architecture needs to record the value history of data at different times, which is especially important for financial data. Financial data can include corporate actions, annual or quarterly reports, or fixed-income securities, like bonds that have variable rates. It’s crucial to be able to correct data inaccuracies during the reporting period.
The example customer seeks a data processing platform architecture to dynamically scale based on the workloads with a capacity of processing 150 million records under 5 minutes. Their platform should be capable of meeting the end-to-end SLA of 15 minutes, from ingestion to reporting, with lowest total cost of ownership. Additionally, managing bi-temporal data requires a database that has critical features, such as ACID (atomicity, consistency, isolation, durability) compliance, time-travel capability, full-schema evolution, partition layout and evolution, rollback to prior versions, and SQL-like query experience.
Solution overview
The solution architecture key building blocks are Amazon Kinesis Data Streams for streaming data, Amazon Kinesis Data Analytics with Apache Flink as processing engine, Flink’s RocksDB for state management, and Apache Iceberg on Amazon Simple Storage Service (Amazon S3) as the storage engine (Figure 1).
Data processing
Here’s how it works:
- A publisher application receives the data from the source systems and publishes data into Kinesis Data Streams using a well-defined JSON format structure.
- Kinesis Data Streams holds the data for a duration that is configurable so data is not lost and can auto scale based on the data volume ingested.
- Kinesis Data Analytics runs an Apache Flink application, with state management (RocksDB), to handle bi-temporal calculations. The Apache Flink application consumes data from Kinesis Data Streams and performs the following computations:
- Transforms the JSON stream into a row-type record, compatible with a SQL table-like structure, resolving nesting and parent–child relationships present within the stream
- Checks whether the record has already an existing state in in-memory RocksDB or disk attached to Kinesis Data Analytics computational node to avoid read latency from the database, which is critical for meeting the performance requirements
- Performs bi-temporal calculations and creates the resultant records in an in-memory data structure before invoking the Apache Iceberg sink operator
- The Apache Flink application sink operator appends the temporal states, expressed as records into existing Apache Iceberg data store. This will comply with key principles of time series data, which is immutable, and the ability to time-travel along with ACID compliance, schema evolution, and partition evolution
- Kinesis Data Analytics is resilient and provides a no-data-loss capability, with features like periodic checkpoints and savepoints. They are used to store the state management in a secure Amazon S3 location that can be accessed outside of Kinesis Data Analytics. This savepoints mechanism can be used to programmatically to scale the cluster size based on the workloads using time-driven scheduling and AWS Lambda functions.
- If the time-to-live feature of RocksDB is implemented, old records are stored in Apache Iceberg on Amazon S3. When performing temporal calculations, if the state is not found in memory, data is read from Apache Iceberg into RocksDB and the processing is completed. However, this step is optional and can be circumvented if the Kinesis Data Analytics cluster is initialized with right number of Kinesis processing units to hold the historical information, as per requirements.
- Because the data is stored in an Apache Iceberg table format in Amazon S3, data is queried using Trino, which supports Apache Iceberg table format.
- The end user queries data using any SQL tool that supports the Trino query engine.
Apache Iceberg maintenance jobs, such as data compaction, expire snapshot, delete orphan files, can be launched using Amazon Athena to optimize performance out of Apache Iceberg data store. Details of each processing step performed in Apache Flink application are captured using Amazon CloudWatch, which logs all the events.
Scalability
Amazon EventBridge scheduler invokes a Lambda function to scale the Kinesis Data Analytics. Kinesis Data Analytics has a short outage during rescaling that is proportional to the amount of data stored in RocksDB, which is why a state management strategy is necessary for the proper operation of the system.
Figure 2 shows the scaling process, which depicts:
- Before peak load: The Kinesis Data Analytics cluster is processing off-peak records with minimum configuration before the peak load. A scheduled event is launched from EventBridge that invokes a Lambda function, which shuts down the cluster using the savepoint mechanism and scales up the Kinesis Data Analytics cluster to required Kinesis processing units.
- During peak load: When the peak data burst happens, the Kinesis Data Analytics cluster is ready to handle the volume of data from Kinesis Data Stream, and processes it within the SLA of 5 minutes.
- After peak load: A scheduled event from EventBridge invokes a Lambda function to scale down the Kinesis Data Analytics cluster to the minimum configuration that holds the required state for the entire volume of records.
Performance insights
With the discussed architecture, we want to demonstrate that the we are able to meet the SLAs, in terms of performance and processing times. We have taken a subset of benchmarks and indices data and processed the same with the end-to-end architecture. During the process, we observed some very interesting findings, which we would like to share.
Processing time for Apache Iceberg Upsert vs Append operations: During our tests, we expected Upsert operation to be faster than append. But on the contrary, we noticed that Append operations were faster compared to Upsert even though more computations are performed in the Apache Flink application. In our test with 3,500,000 records, Append operation took 1556 seconds while Upsert took 1675 seconds to process the data (Figure 3).
Compute consumption for Apache Iceberg Upsert vs. Append operations: Comparing the compute consumption for 10,000,000 records, we noticed that Append operation was able to process the data in the same amount of time as Upsert operation but with less compute resources. In our tests, we have noted that Append operation only consumed 64 Kinesis processing units, whereas Upsert consumed 78 Kinesis processing units (Figure 4).
Scalability vs performance: To achieve the desired data processing performance, we need a specific configuration of Kinesis processing units, Kinesis Data Streams, and Iceberg parallelism. In our test with the data that we chose, we started with four Kinesis processing units and four Kinesis data streams for data processing. We observed an 80% performance improvement in data processing with 16 Kinesis data processing units. An additional 6% performance improvement was demonstrated when we scaled to 32 Kinesis processing units. When we increased the Kinesis data streams to 16, we observed an additional 2% performance improvement (Figure 5).
Data volume processing times for Upsert vs. Append: For this test, we started with 350,000 records of data. When we increased data volume to 3.5M records, we observed that Append performing better than Upsert, demonstrating a five-fold increase in processing time (Figure 6).
Conclusion
The architecture we explored today scales based on the data-volume requirements of the customer and is capable of meeting the end-to-end SLA of 15 minutes, with a potential lowered total cost of ownership. Additionally, the solution is capable of handling high-volume, bi-temporal computations with ACID compliance, time travel, full-schema evolution, partition layout evolution, rollback to prior versions and SQL-like query experience.
Further reading
- Enhanced monitoring and automatic scaling for Apache Flink
- Resilience in Amazon Kinesis Data Analytics for Apache Flink
- Kinesis Data Analytics for Apache Flink: How It Works
- Creating a Kinesis Data Analytics for Apache Flink Application
- State TTL in Flink 1.8.0: How to Automatically Cleanup Application State in Apache Flink
- Application Scaling in Kinesis Data Analytics for Apache Flink