AWS Partner Network (APN) Blog
How Emumba Navigated the Challenges of Handling Streaming IoT Data at Scale
By Dr. Affan Syed, Director Cloud and Network Solutions – Emumba
Emumba |
As we digitize the physical world, we have to grow accustomed to dealing with millions of devices reporting short but regular data to a central cloud. This cloud needs to handle the velocity and amount of data while applying semantic meaning for it to be consumed for business purposes later on.
Emumba is an AWS Select Tier Services Partner and AWS Marketplace Seller that was involved in building the proof of concept (PoC) for just such a solution.
While we need to keep the client and exact use case confidential, this post intends to share the experience of implementing a big data streaming solution over Amazon Web Services (AWS) infrastructure.
Streaming, Ingesting, and Querying Real-Time Data at Scale
Emumba was asked to review a scenario where a few dozen million Internet of Things (IoT) devices will be frequently sending a binary payload which then gets translated into about 1.2 KB of JSON. This was sent to a central location, periodically, and then pushed to a Kafka cluster.
The challenge was to enable stream processing, ingestion, and querying of the data at the scale needed, while also proposing an architecture that remains cost-effective—a critical business need for this client.
The first thing we did was to convert the data processing needs at the target scale. The numbers we got, shown to a first-order approximation, were those that made us take stock of our architecture.
Sec | Day | |
Events | 250,000 | 22 billion |
Data size | 0.3 GB | 26 TB |
Not only was this a huge data size (26 TB is approximately 14,000 HD movies) required to be handled at ingestion and for stream processing, but we needed the ability to query this data in near-real time (seconds) for the purpose of operational dashboards.
We finally settled on an architecture that had the following components:
- Apache Kafka for ingestion of the data that was partitioned around a composite key resulting in individual time streams.
- Kafka Streams (KStreams) for stateful stream processing; needed to achieve the business objective
- Real-time analytic data warehouse that can support the ingestion speed and query requirements.
Solution Architecture and High-Level Details
Emumba built this system, deployed it, and tested it as a PoC with simulated data for only 100,000 devices. The PoC setup included a separate Kafka and KStreams cluster using m5.xlarge instances with Amazon Elastic Block Store (Amazon EBS) volumes. We were using the suppress functionality in KStreams API to provide a single packet per reporting interval, and had enabled rocksdb as the key-value store to keep the streaming state.
Since all components are natively scalable, we assumed that horizontally scaling the PoC setup would be a simple interpolation. We had instrumented the PoC cluster with monitoring using both Netdata and custom JMX scripts to track the consumption and production rate of the cluster, and thus had a good idea of the resource consumption for our PoC with 100,000 devices.
Figure 1 – High-level view of the proposed streaming architecture.
When the client asked us to demonstrate the architecture working at the target scale, we believed this would be a straightforward task. We estimated, using simple interpolation, that an eight-node cluster (for both Kafka and KStreams) would be sufficient, so we set up a much larger cluster to handle the case for the ~30x larger scale. Since this was to be a production setup, we also enabled replication on Kafka partitions.
Lo and behold, the entire architecture collapsed as soon as we tried to feed in data at the rate of ~250k events/sec.
What Goes Unnoticed in a PoC Doesn’t Stay Unnoticed at Scale
What comes next is a whirlwind tale of how Emumba got to the final solution. I will elaborate on each step to identify the process and mistakes we made scaling a stream processing architecture.
What went wrong? First, we observed the consumption rate (required at 250,000 events/sec) for our stream applications would periodically collapse to zero. We looked at our Kafka and KStream nodes and found the following CPU behavior.
Figure 2 – Kafka performs well, but KStreams had periodic CPU stalls without any discernable reason.
Clearly, the Kafka cluster doesn’t show any bottleneck, but the stream application was periodically idle, without any apparent reason.
We immediately tried to scale the streaming cluster both vertically (eight m5.2xlarge instances) and horizontally (13 m5.xlarge instances). However, both attempts yielded similar results. While scaling horizontally, we also increased the number of Kafka partitions.
A first root cause by looking at the memory usage indicated we needed to balance the JVM heap size on KStream nodes to handle in-process memory needs, but also allow memory for use by rocksdb (written in C++) to store a window’s worth of data. We calculated the memory needed on a KStream node beyond the heap with a rough calculation for the size of data that is stored in memory, per partition, for our 15-minute window size.
This resolved one problem, but we next found disk I/O related stalls and that the stream application’s consumption rate periodically went to zero when left to run for a longer time period.
To handle the disk I/O limitation, we decided to go with the storage-optimized i3 en.xlarge instance. We also realized, with monitoring and some research and development, that using the KStream suppress functionality was requiring significantly more memory and disk I/O than were needed for the basic stateful computation. We therefore went ahead and implemented a custom suppress functionality, since at our scale optimization would only help resource crunch and address the second problem.
Figure 3 – A steadily decreasing consumption rate after around 30 minutes.
These two changes improved the performance considerably, but we still had a blocker. While now the consumption rate would start at a very high rate (350,000 events/sec) it would consistently, after around 30 minutes, decrease to a much lower and unacceptable rate.
Disk/Network Bottlenecks and Rate-Limiting Instances
The team at Emumba were frankly confounded by this behavior and decided to take a deeper look into the instance metrics.
Figure 4 – Something strange happens to disk and network rates after 30 minutes.
This is when we delved into the details around network and I/O limits for AWS instances. We saw that most instance types, even with AWS Elastic Network Adapter, support a high burst rate for traffic but have a much lower baseline bandwidth. Thus, for our selected i3en.xlarge instance, while the burst rate is 25 Gbps the baseline (consistent) bandwidth is 4.2 Gbps.
To meet additional demand, you can use a network I/O credit mechanism to burst beyond the baseline bandwidth. Instances can use burst bandwidth for a limited time, typically from five to 60 minutes, depending on the instance size. To learn more, see the AWS documentation.
A similar behavior occurs for disk I/O with Amazon EBS-optimized instances, where maximum throughput is also bursty for certain instance types. These instance types can support maximum performance for 30 minutes at least once every 24 hours.
If you have a workload that requires sustained maximum performance for longer than 30 minutes, select an instance type according to baseline performance as shown. To learn more, see the AWS documentation.
Our storage-optimized i3en.xlarge instance was one of those instances that didn’t support sustained disk IOPs. Looking back, the nature of streaming operations defines sustained and consistent high network and disk I/O; thus, it was imperative to choose instance types that provide sustained throughput.
Defining the Sustained Network and Disk Throughput Needed
We then took a step back and decided to properly estimate the cluster-wide network and disk throughput needed for our streaming scenario. Here, understanding the architecture and replication strategy for Kafka and KStreams becomes essential.
In a production deployment, a Kafka cluster with a minimal replication factor of two (replicating data to different nodes) will result in doubling the network requirements (0.3 Gbps) and since Kafka writes to disk, a similar increase in disk I/O.
With KStream applications, resiliency is achieved by pushing state-store data to a changelog topic on the Kafka cluster at each node. Our applications had a per-device state (1.2 KB), and that translated to another 0.3 Gbps requirement to write to Kafka, implying both network and disk I/O.
Finally, the data warehouse ingestion needs to reads from the output topic, thus implying another factor of network and input/output operations per second (IOPS).
The above roughly equates to around 3 Gbps or 24 Gbps of sustained network and disk throughput, across the cluster, required to manage this scale.
We decided, in view of the above requirements and after a few tweaks, to go with i3.8xlarge instances for our Kafka and KStream cluster. This AWS instance has the following performance details:
i3.8xlarge instance specs | CPU cores | Memory | Network (sustained) | Disk (sustained) | Disk type |
32 | 244 GB | 10 Gbps | 875 Mbps | 4 x 1.9 TB NVME |
Our monitoring showed that both Kafka and KStream had complimentary resource needs. Thus, while the streaming application was CPU-intensive, Kafka (due to multiple in/out from Kafka) was disk-intensive.
Thus, we choose to combine the Kafka broker and KStream nodes on this three-node (initially) cluster.
Knowing your Pet Virtual Machines
We were quite confident that with this setup, our architecture would now support the 250,000 events/sec processing capability on a sustained basis.
However, when we set up the cluster and observed its performance it would not perform as per our calculation. We went back to monitoring the instance and observed that one of the nodes on the cluster had poor disk performance, which was surprising since our streaming data was evenly balanced and similar operations and disk usage is expected on all nodes.
After a day of debugging and trying many different tweaks, we finally decided to delete that instance and use a brand new instance in our cluster. Adding this fresh instance solved the problem—clearly that one node had a disk performance issue that was causing the flaky performance.
This leads us to such a counter-intuitive observation: we move to the cloud so that our virtual machines are no longer pets but cattle. However, when such a high-performing cluster is needed to support the target streaming use case, your cluster nodes become pets again.
While we set up this infrastructure ourselves due to the cost consideration of managed services, such incidents bring home the benefit of using managed services like Amazon Kinesis or Amazon Managed Streaming for Apache Kafka (Amazon MSK) to enable production-level big data and streaming workloads.
With the final change, we were able to deploy and demonstrate > 250,000 events/sec processing capability over a sustained period of time and were ready to roll this into production for our client.
AWS Architecture
Here is the final, relatively simple, architecture on AWS we used for this PoC effort. We were simulating the traffic at scale, and the ingestion from a real system was not part of the effort, but can conceivable be done through an internet gateway.
Figure 5 – Simple architecture for the streaming PoC.
The above is a basic architecture for a PoC. When migrating to production, you’ll have the following steps to consider to improve security posture:
- Authenticated and authorized access to the Kafka cluster; one practical implementation would be a virtual private network (VPN) tunnel from on-premises collector devices that aggregate a sizeable chuck of devices.
- Access to Druid or another data visualization tool via an API Gateway that integrates with AWS Identity and Access Management (IAM).
- Limited access from a secure IP subnet via a Bastion host for management operations.
- Web application access via an API Gateway and web application firewall will ensure application-level security.
Lessons Learned
- Do extensive back-of-envelope calculations: When dealing with data engineering problems, scaling from a functional PoC is not a simple interpolation of infrastructure resources needed. You need to first deeply understand the architecture of underlying distributed systems (Kafka and KStreams, in this case), especially their scaling strategies, and the interplay between them at big data volume and velocity.
. - Understanding disk and network base-throughput: The base-load for network and disk I/O comes from the computation calculation and from understanding the underlying distributed system behavior under production load. Typically, on a cloud infrastructure this calculation implies using high-end instances that support sustained rates. These, in turn, are expensive instances, and the choice of correct instances (memory vs compute vs. disk intensive) has a massive impact on the final cost of the infrastructure.
. - Complementary workloads: With the above instance constraint for high-end resource (i3.8x has 32 CPUs with 244 GB RAM, while a c5d.9x has 36 CPUs with 72 GB RAM), it’s important to optimize the placement of workloads that have complementary resource usage. Thus, placing Kafka and KStreams on the same node was essentially a resource optimization. When using Kubernetes or Amazon Elastic Kubernetes Service (Amazon EKS), using node affinity for such a purpose would be the appropriate solution.
. - Choosing appropriate instances: In our rapid-fire PoC stage, we chose instances we knew well from experience (disk I/O problem, go with i3 instance family). It would appear that choosing m5d.8xlarge instance would satisfy our needs, and would be more cost-effective as well. Thus, knowledge of your application needs and calibrating them to the fleet of instances available is an important step to take before going to final decision on instance sizing.
. - Managing pets is no fun; use managed services when possible: A significant amount of pain in our setup came from managing the underlying infra services (Kafka and KStreams) and their limitations. Using Amazon Kinesis and Kinesis Data Analytics would have reduced our headaches, but when cost constraints and being cloud-agnostic is a must, opting for custom solutions comes with operational costs that need to be properly understood. Thus, a proper total cost of ownership (TCO) of every solution that includes operational (and cognitive) cost is important.
Conclusion
In this post, I have gone over Emumba’s real-world experience handling streaming data from millions of devices—a prototypical use case for IoT and telemetry data ingestion.
I specifically explored how the scale of ingestion radically alters the design decisions, and also that there is no perfect solution for this use case. The lessons learned I’ve shared should help your business to make informed decisions given your unique set of conditions.
One key takeaway from this post should be that while managing streaming infrastructure at scale is an exciting area, managing it and taking it to production has multitudes of challenges that will distract you from your business objectives. As such, having a partner who knows the details can save you time and cost.
Please reach-out at hello@emumba.com if you would like to explore similar problems together. You can also learn more about Emumba in AWS Marketplace.
The content and opinions in this blog are those of the third-party author and AWS is not responsible for the content or accuracy of this post.
Emumba – AWS Partner Spotlight
Emumba is an AWS Partner that puts equal emphasis on product design, user experience, and solid engineering, bringing startup agility to enterprises.
Contact Emumba | Partner Overview | AWS Marketplace | Case Studies