AWS Big Data Blog

Best practices from Delhivery on migrating from Apache Kafka to Amazon MSK

August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink. Read the announcement in the AWS News Blog and learn more.

This is a guest post by Delhivery.

In this post, we describe the steps Delhivery took to migrate from self-managed Apache Kafka running on Amazon Elastic Compute Cloud (Amazon EC2) to Amazon Managed Streaming for Apache Kafka (Amazon MSK). “We’ve been in production for over a year now,” said Akash Deep Verma, Senior Technical Architect, at Delhivery. “We have over 350+ applications running on Amazon MSK that are producing and consuming data every second of every day. Overall, it’s a happy experience to be working with Amazon MSK!”

Delhivery is India’s leading fulfilment platform for digital commerce. With its nationwide network extending beyond 18,000 pin codes and 2,500 cities, the company provides a full suite of logistics services like express parcel transportation, LTL and FTL freight, reverse logistics, cross-border, B2B and B2C warehousing, and technology services.

“Our vision is to become the operating system for commerce in India, through a combination of world-class infrastructure, logistics operations of the highest quality, and cutting-edge engineering and technology capabilities,” Verma says. “Our team has successfully fulfilled over 650 million orders to more than 120 million households across India. We operate 24 automated sort centers, 75 fulfilment centers, 70 hubs, over 2,500+ direct delivery centers, over 8,000+ partner centers, and more than 14,000+ vehicles. Over 40,000 team members make it possible to deliver a million packages a day, 24 hours a day, 7 days a week, 365 days a year.”

Self-managing Apache Kafka was difficult

We process close to 1 TB of data per day to serve various analytical functions. The data comes from shipment tracking, order tracking, GPS, biometrics, handheld devices, sorters, weights, clients, and facilities. It moves through various systems and services using several real-time and batch pipelines. The data is processed and enriched to serve both business and technical use cases. Due to the nature of our business, incoming messages and events on Apache Kafka consisted of a steady pace with intermittent spikes, and ranged from 10,000–12,000 messages coming in per second and 50,000–55,000 messages going out per second.

Apache Kafka serves as the critical messaging and events backbone to these dependent systems and services.

We were self-managing Apache Kafka brokers and its associated components, like Apache ZooKeeper, on Amazon Elastic Compute Cloud (Amazon EC2) instances.

With the growth in our business, managing these components and ensuring uptime became a significant resource-intensive operation. We had to allocate two developers on a constant basis to manage our Apache Kafka infrastructure and maintain uptime. This undifferentiated heavy lifting caused productivity loss because the developers couldn’t contribute effectively towards business feature development.

“We wanted a managed Apache Kafka service to reduce the time and resources we used for infrastructure management,” says Verma. “This would enable us to reprioritize our technical team to focus on feature development that added more business value.”

Getting time back by migrating to Amazon MSK

We looked at several options to replace our self-hosted Apache Kafka on EC2 instances, and chose Amazon MSK. With Amazon MSK, we could continue to use native Apache Kafka APIs and run our existing applications on AWS without changing the code. It provisions, configures, and maintains Apache Kafka clusters and Apache ZooKeeper nodes for us. This enables us to shift developers from managing infrastructure to writing more creative applications for our business.

On advice from the AWS team, we took the following steps:

  1. Size the MSK cluster
  2. Migrate individual Apache Kafka topics to Amazon MSK
  3. Monitor on Amazon MSK

Sizing the MSK cluster

To properly size our MSK cluster, we needed to understand our existing workload dynamics. We retrieved the following metrics from our existing Amazon EC2-based Apache Kafka clusters:

  • Ingestion rate from producers – We considered the broker level metric BytesInPerSec, chose the average value for individual broker, and aggregated the value across all brokers present in the cluster to estimate the net ingestion rate (not to be confused with ReplicationBytesInPerSec, which gives ingestion rate from other brokers).
  • Consumption rate from consumers – We looked at the broker level metric BytesOutPerSec, chose the average value for individual broker, and aggregated the value across all brokers present in the cluster to estimate net consumption rate for the cluster (not to be confused with ReplicationBytesOutPerSec which gives consumption rate to other brokers).
  • Data replication strategy – We determined the highest value of replication factor, evaluated between cluster global parameter default.replication.factor, and that specified for individual topics.
  • Data retention strategy and target percentage of disk utilization – We considered the highest value of data retention evaluated between cluster global parameter log.retention.hours and that specified for individual topics. We also specified the percentage of used storage and estimated the headroom that we needed to be comfortable with our use case.

AWS provided us with an Amazon MSK Sizing and Pricing spreadsheet to help us estimate the number of brokers that we needed in our MSK cluster. We subsequently performed POCs in our environment and found the suggested cluster sizing from the spreadsheet accurate. The spreadsheet also helped us to easily estimate the cluster pricing well in advance. For more information, see Numbers of brokers per cluster.

Migrating individual Apache Kafka topics to Amazon MSK

We considered several options to migrate individual topics to Amazon MSK:

  • MirrorMaker 1.0, which ships with Apache Kafka and is a standalone tool that can move data from self-managed Apache Kafka clusters to Amazon MSK with minimal downtime.
  • Using consumers to read data from self-managed Apache Kafka clusters and write to Amazon MSK. This migration approach required some application downtime.
  • Other replication tools that we had experience with.

We used a combination of the first two approaches for our migration. For time-critical topics, we used MirrorMaker 1.0 to migrate data to Amazon MSK. For non-time-critical topics, our internal SLAs allowed us to perform a redirection of application traffic from our self-managed Apache Kafka clusters to Amazon MSK.

The MirrorMaker option involved setting up a MirrorMaker 1.0 daemon on a self-managed EC2 instance to consume messages from the source cluster, and republish them to the target MSK cluster. Each MirrorMaker 1.0 thread gets equipped with a separate consumer instance and shares one common producer. The process is as follows:

  1. The MirrorMaker 1.0 instance spawns a consumer process that interacts with the Apache ZooKeeper ensemble supporting the source Apache Kafka cluster for topic discovery.
  2. The consumer process reads payload from the concerned topic.
  3. MirrorMaker 1.0 spawns a producer process that interacts with the managed Apache ZooKeeper fleet of Amazon MSK via the Apache ZooKeeper endpoint.
  4. The producer process relays the payload retrieved by the consumer process to the respective topic of Amazon MSK via the broker endpoint

The following diagram shows our migration topology.

The MSK cluster creation process requires subnet IDs as input so that the broker and Apache ZooKeeper nodes can map to the customer VPC. This mapping is acheived by creating ENIs within these subnet IDs with a primary private IPv4 address. The broker and Apache ZooKeeper endpoints attached with the MSK cluster actually resolve to these private IPv4 addresses.

We used the following command to mirror all topics from the Amazon EC2-based source cluster to Amazon MSK:

kafka-mirror-maker.sh 
--consumer.config config/mirrormaker-consumer.properties 
--producer.config config/mirrormaker-producer.properties 
--whitelist '*'

The command contains the following details:

With acceptance of KIP-382, Amazon MSK can now support MirrorMaker 2.0 and benefit from MirrorMaker 2.0 advantages. For instructions, configuration files, sample code, and labs using MirrorMaker 2.0 to migrate a self-managed Apache Kafka cluster to an MSK cluster, see the MirrorMaker2 on Amazon EC2 workshop.

You can migrate an existing Apache Kafka cluster to Amazon MSK using Amazon Kinesis Data Analytics, a fully managed service for Apache Flink. This enables you to use fully managed Apache Flink applications to process streaming data stored in Amazon MSK. For more information about using Amazon Kinesis Data Analytics with Amazon MSK, see Tutorial: Using a Kinesis Data Analytics application to Replicate Data from One MSK Cluster to Another in a VPC and the Clickstream Lab.

At a steady state, our MSK cluster in production uses the following configuration:

  • Broker nodes – 6 x m5.4xlarge
  • Replication factor – 3
  • Number of producers – 110+
  • Number of consumers – 300+
  • Topics – 500+
  • Kstreams running on broker – 55+

Monitoring on Amazon MSK

Amazon MSK provides Amazon CloudWatch metrics under three levels of granularity:

  • DEFAULT
  • PER_BROKER
  • PER_TOPIC_PER_BROKER

For our use case, we used the highest granularity PER_TOPIC_PER_BROKER for monitoring with optimum visibility. To automate the detection of operational issues, we developed custom CloudWatch alarms with the following metrics.

Metric Name

Alarm Trigger

Significance

UnderReplicatedPartitions When not 0 If partition replicas fall too far behind their leaders, the follower partition is removed from the ISR pool. A corresponding increase in the IsrShrinksPerSec metric is also expected.
OfflinePartitionsCount When not 0 Defines the number of partitions without an active leader. Any partition without an active leader is completely inaccessible, and both consumers and producers of that partition are blocked until a leader becomes available.
IsrShrinksPerSec When increases without an increase in IsrExpandsPerSec A replica could be removed from the ISR pool for a couple of reasons:

  • The replica is too far behind the leader’s offset.
  • The replica hasn’t contacted the leader for some time (configurable with the replica.socket.timeout.ms).
  • More brokers are being added to the cluster (IsrExpandsPerSec should increase as well).
  • Partitions are removed (IsrExpandsPerSec should increase as well).
UncleanLeaderElectionsPerSec When not 0 An unclean leader election is a special case in which no available replicas are in sync. Because each topic must have a leader, an election is held among the out-of-sync replicas and a leader is chosen—meaning any messages that weren’t synced prior to the loss of the former leader are lost forever.
ActiveControllerCount When not 1 The controller in an Apache Kafka cluster is responsible for maintaining the list of partition leaders and coordinating leadership transitions (in the event a partition leader becomes unavailable).
KafkaDataLogsDiskUsed Set an alarm when the disk utilization exceeds 85%, and add storage either via API or using the console such that disk utilization drops to 70% or below Percentage of space utilized in the EBS volume per broker.

Amazon MSK also supports metrics exposed via port 11001 (for the JMX Exporter) and port 11002 (for the Node Exporter) to be captured using monitoring tools such as Prometheus and a variety of third-party tools compatible with Prometheus-formatted metrics like Datadog, New Relic, and SumoLogic. For more information, see Open Monitoring with Prometheus. For instructions on configuring open monitoring, see the Open Monitoring lab.

Conclusion

Self-managed Apache Kafka brokers meant that we had to make sure Apache ZooKeeper always maintained quorum, the network connectivity between brokers was monitored, and different auxiliary Apache Kafka processes like LogCleaner were monitored. All these parts failed at some point, so we were constantly fixing issues with our infrastructure and monitoring. The amount of time that we spent taking care of Apache Kafka could have been better utilized delivering actual business value.

Amazon MSK enabled us to be more productive by reducing the time we spent maintaining our infrastructure, finding and fixing issues, and maintaining our brokers. It takes care of Apache Kafka maintenance in the background, gives us the level of monitoring that we needed, and frees our team to take on more activities to improve our applications and provide value to our customers. 


About the Authors

Akash Deep Verma is a Senior Technical Architect at Delhivery. Akash joined Delhivery in 2016 and worked on many projects related to Big Data and Analytics. Most recently, he led architecture initiatives related to Democratization of Data. In his free time, he enjoys playing table tennis and watching mind-boggling movies.

Dipta S. Bhattacharya is an Enterprise Solutions Architect at AWS. Dipta joined AWS in 2018. He works with large startup customers to design and develop architectures on AWS and support their journey on the cloud.

Nikhil Khokhar is a Solutions Architect at AWS. He joined AWS in 2016 and specializes in building and supporting data streaming solutions that help customers analyze and get value out of their data. In his free time, he makes use of his 3D printing skills to solve everyday problems.