How Baqend built a real-time web analytics platform using Amazon Kinesis Data Analytics for Apache Flink
September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.
This is a customer post written by the engineers from German startup Baqend and the AWS EMEA Prototyping Labs team.
Baqend is one of the fastest-growing software as a service (SaaS) startups in Germany, serving over 5,000 business customers with more than 100 million monthly users and $2 billion EUR revenue per year. Baqend’s main product is a one-click solution to accelerate ecommerce websites called Speed Kit. By rerouting a portion of the web traffic through Speed Kit’s caching infrastructure, it achieves a typical performance boost between 1.5–3 times faster.
To measure the impact of Speed Kit and confirm its uplift to Baqend’s customers, we maintain several dashboards that display the technical and business performance improvements achieved by Speed Kit. This requires complex aggregations of tracking data collected during A/B tests on our customers’ websites.
The Challenge: Real-time analytics and reporting at scale
One of the key issues with our legacy solution for monitoring and reporting needed to process. The raw tracking data from all users was batched through various systems, which resulted in processing delays up to 24 hours for some analytics jobs. This impacted our operations monitoring and sales activities negatively, because our customers sometimes couldn’t analyze the impact of deployment changes until the next day. Furthermore, our legacy reporting service lacked any support for custom visualization development.
This post shows you how we transformed our batch-based analytics process into a continuous complex event-processing pipeline, which is managed by Amazon Kinesis Data Analytics for Apache Flink. The new solution exhibits less than a minute of end-to-end latency from data ingestion to visual output in the dashboard.
The key topics presented in this post are:
- Fully managed and highly scalable service architecture – Our architecture is based on fully managed AWS services that made it easy to scale from 1 to over 100 million users
- Continuous analytics – We transformed our high-volume batch analytics processes with daily updates into an efficient incremental aggregation process with sub-minute latency based on Kinesis Data Analytics for Apache Flink
- Real-time reporting – We connected our aggregation pipeline to a fully customizable dashboard for data exploration based on Amazon Elasticsearch Service (Amazon ES) and Kibana
Solution overview and key components
Following a remote planning phase in which we defined our requirements and laid out the basic design, we built the solution on an on-site prototyping engagement with AWS over the course of 4 weeks in early 2020 in Hamburg. Seven team members from Baqend and AWS EMEA Prototyping Labs implemented the following architecture.
The workflow includes the following steps:
- The performance tracking data is streamed by Speed Kit Amazon Elastic Compute Cloud (Amazon EC2) instances.
- This data goes into an Amazon Kinesis Data Streams
- This data stream is consumed by a Kinesis Data Analytics for Apache Flink application.
- The data is ingested into Amazon ES.
- This streaming application relies on AWS Secrets Manager to store and access the credentials for Elasticsearch with basic HTTP authentication.
- An Nginx proxy server application hosted on EC2 instances in multiple public subnets and Availability Zones redirects the user requests Kibana with Amazon Cognito authentication (for more information, see How do I use an NGINX proxy to access Kibana from outside a VPC that’s using Amazon Cognito authentication?).
- The Apache Flink application also uses Amazon DynamoDB as a backend for long-living external states required for certain operations (covered later in this post).
- The streaming application also delivers the raw and intermediate data outputs to an Amazon Simple Storage Service (Amazon S3) bucket to enable historical data analysis and operational troubleshooting with Amazon Athena.
Although the prototyping engagement also covered other aspects, we focus on the Kinesis Data Analytics application in the following sections of this post.
Continuous aggregation with Kinesis Data Analytics
We need to collect all kinds of technical data points on every page load of a website visitor. Details on the individual page impressions (PI) help us analyze web performance for the websites of our customers. Speed Kit provides a performance tracking functionality that collects data within the browser of every website visitor and sends it to our analytics backend.
Aggregating page impressions
Intuitively, there should be only one data beacon for any given PI because the data could be aggregated in the browser before it’s sent to our backend. Speed Kit sends several data beacons during the page load to minimize the possibility of any data loss, as shown in the following figure.
Aggregating session data
Because some of the most interesting metrics are computed on the level of user sessions, aggregating all data beacons for the individual PIs isn’t enough to analyze web performance. For instance, the user engagement metrics are often quantified by the number of pages visited in one sitting (session length) or the share of users that left on the very first page (bounce rate).
Aggregating relevant information may even involve identifying and removing duplicates, as illustrated in the following figure.
Suppose the user first checks out the landing page and immediately leaves (Session 1), and then comes back later to browse through some products and buy some blue shoes (Session 2), and finally returns after a few hours to reload the order confirmation page and browse some more products (Session 3). Because Session 3 starts with a reload of the order confirmation page, tracking data on the order that was completed in Session 2 is transmitted a second time, resulting in a potentially duplicated count of the completed orders. Therefore, our analytical backend needs to identify the duplicated tracking information as such and ignore it for further analysis. To enable this, we persistently store a salted hash of every order ID and simply have the aggregation pipeline drop the tracking data on any order that has already been written to the external key value store (see the diagram in the following section).
Anatomy of the streaming application
The following diagram shows our event processing pipeline from raw data collection to the storage of aggregation results.
The workflow is as follows:
- The first step is tracking the data within the browsers of the end users.
- The data is sent to Kinesis Data Streams for consumption through a custom stateful Apache Flink process function within a Kinesis Data Analytics application.
- Raw data beacons are initially normalized and invalid data beacons are delivered to Amazon S3 via side outputs to facilitate later analysis of all data that has been sorted out.
- As mentioned earlier, we use a DynamoDB table to run a deduplication rule over all incoming order data (confirmation pages) by the DynamoDB Transactions API. We also use another DynamoDB table to identify bot traffic by storing the user agent strings that have been associated with suspicious behavior consistently (because they belong to web crawlers). Finally, the stream of cleaned tracking beacons is processed in stateful window aggregation steps for storage.
- We aggregate all beacons referring to the same PI and write them off to our data lake on Amazon S3 to enable offline analysis with Athena.
- Furthermore, we compile the tracking beacon stream into 1-minute summaries containing both PI and session data for storage via Amazon ES to enable efficient reporting with Kibana.
State storage and application management
Most of the application state for the streaming application is held in the built-in RocksDB state backend with incremental checkpointing. This default built-in state storage mechanism depends on a 50 GB storage limit provided for each Kinesis Processing Unit (KPU) allocated to a Kinesis Data Analytics application. On the other hand, we used DynamoDB tables to store the state permanently for unique conversions and user agent strings in order to decouple historical state for these two data types from Apache Flink application management and to keep the checkpointing duration and size under control. Using DynamoDB for these two use cases helps to control the overhead for creating and restoring checkpoints and thereby controls the application startup time.
Workload distribution and scalability
As of February 2021, our processing pipeline handles over 2.8 billion tracking beacons per month, which corresponds to more than 500 million individual PIs from over 140 million user sessions and more than 100 million unique users. Achieving this scale requires even distribution of both processing and storage load across all stream partitions. Therefore, we use randomly generated session IDs as a partitioning key for the input Kinesis data stream and throughout most of the remaining sections of our pipeline.
In the presence of certain anomalies such as heavy bot traffic, a load skew may occur regardless, which may impair overall throughput or even crash the entire application in extreme cases. We monitor the number of incoming and outgoing records (to derive the current buffer size) for the individual Apache Flink operators in every stream partition to identify issues with the load distribution quickly and generate alert notifications via multiple channels (such as Slack and email) if the measurements for different stream partitions diverge significantly. For convenience, we further visualize custom Amazon CloudWatch metrics in a Grafana dashboard.
Event processing, delivery semantics, and fault tolerance
The application restarts and downtime (such as during and after application deployment) can be handled seamlessly by using Apache Flink’s event time processing semantics as generated output is independent of the wall-clock time of the processing nodes. All processing is based on monotonically increasing ingestion timestamps to eliminate the possibility of late arrivers. While our data cleaning procedure identifies the invalid records, it never drops any data items from the stream, but instead it only attaches information on the detected issue to the data item in question. This approach enables us to analyze the frequency and distribution of every problem in our aggregation pipeline by using the same Kibana dashboard.
Even though the data ingestion to Amazon ES provides at-least-once delivery guarantees by default, we managed to achieve exactly-once delivery guarantees from the source Kinesis data stream to the Elasticsearch index by generating document identifiers in a deterministic fashion. Therefore, the data stream can be replayed safely because the existing data records are overwritten on re-insertion into the Elasticsearch index.
Data retention and multi-tenancy in Amazon ES
We store pre-aggregated data at the minute level in Amazon ES to make sure our Kibana dashboard remains responsive even when analyzing a scope of weeks or months. As illustrated in the following figure, the Apache Flink application summarizes the raw tracking data along different dimensions (browser, device, test group, and aggregation time in minutes) before writing it to Elasticsearch.
The Elasticsearch documents are composed of bucketed histogram data for performance timers such as the First Contentful Paint (FCP) instead of the actual timer values. Running queries over these aggregates instead of the raw data minimizes query run costs significantly: traffic-heavy customers may have tens of millions of raw tracking beacons in a single week, whereas the number of 1-minute buckets is several orders of magnitudes lower (for small and large customers alike). We observe over 5 times more PIs and 30 times more raw beacons than aggregates stored in Elasticsearch across all of our customers.
We store the data for different customers in separate indexes generated for a fixed temporal rolling period by the Apache Flink Elasticsearch Sink Connector. We also implemented customer-specific retention policies in Amazon ES by deleting the old indexes as required. Our deployment is multi-tenant so that our customers can receive fine-grained access only to their own data stored in the indexes created for them.
Kibana for continuous reporting
We used Kibana to build our dashboards because it provides powerful and easy-to-create built-in visualizations and virtually boundless flexibility through custom Vega chart visualizations. Kibana also works well in combination with Elasticsearch indexes, thereby facilitating the role-based access management that enables us to provide individual customers access to the data in our multi-tenant dashboard.
Easy data exploration
The following illustration shows one of the standard visualizations in Kibana that we use to understand the distribution of device types and browsers used by website visitors.
Real-time histogram visualization
Illustrating the distribution of performance metrics requires using a custom visualization. The following custom Vega histogram chart illustrates by the concrete performance metric LCP how Speed Kit improves the webpage load time.
In comparison with the vanilla website where page loads are almost never faster than 2 seconds (pink area), Speed Kit-accelerated end users experience comparatively faster and even sub-second level load times (blue area).
Because our main business revolves around accelerating our customers’ websites, visualizing the actual uplift is critical for all developers (to debug performance and identify issues quickly) as well as our customers (highlighting the value of our product). With the continuous aggregation and reporting solution outlined in this post, we were able to satisfy all these requirements in a scalable and fully managed fashion.
Conclusion and future directions
In this post, we shared our journey from a high-volume batch analytics solution to a continuous aggregation pipeline using Kinesis Data Analytics for Apache Flink. Key aspects are:
- End-to-end processing time is reduced from 24 hours to sub-minute latency.
- We implemented a fully functional prototype within 4 weeks. The AWS Prototyping team enabled us to build our system on a multitude of managed AWS services.
- The system was used with production load after 8 weeks.
- The new system based on the Kinesis Data Analytics for Apache Flink application exhibits extreme scalability as it handles workloads with ease that were infeasible for the old system. As of February 2021, our system processes more than 500 million page loads from over 100 million unique users every month.
- Elasticsearch and Kibana with customized Vega visualizations provides flexible and continuously updating dashboards for all our customers.
For more details on the challenges and solutions discussed in this article, we recommend the following resources:
- Speed Kit:
- Real user monitoring:
- Streaming Analytics Workshop
- Creating customized Vega visualizations in Amazon Elasticsearch Service
We would be glad to get feedback on our work, so please drop us a line in case of any remaining questions!
About the Authors
Wolfram “Wolle” Wingerath heads the data engineering team that is responsible for developing and operating Baqend’s infrastructure for analytics and reporting.
Florian Bücklers is Baqend’s Chief Technology Officer and therefore responsible for coordinating between the different teams for front-end and backend development, devOps, onboarding, and data engineering.
Benjamin Wollmer develops data-intensive systems at Baqend, but he is also doing his PhD at the University of Hamburg and therefore likes to read and write about related topics.
Stephan Succo is one of the core developers of Baqend’s continuous analytics pipeline.
Jörn Domnik is a Senior Software Engineer at Baqend with a focus on backend development and reliability engineering.
As a DevOps engineer, Virginia Amberg monitors cluster health and keeps all systems running smoothly at Baqend.
As a Principal Prototyping Engagement Manager in AWS, Markus Bestehorn is responsible for building business-critical prototypes with AWS customers and is a specialist for IoT and machine learning.
As a Data Prototyping Architect in AWS, Anil Sener builds prototypes on big data analytics, data streaming, and machine learning, which accelerates the production journey on the AWS Cloud for top EMEA customers.
As B2B Strategic Account Manager for Startups at AWS, Daniel Zäeh works with customers to make their ideas come true and helps them grow, by connecting tech and business.