AWS Big Data Blog

Increase Apache Kafka’s resiliency with a multi-Region deployment and MirrorMaker 2

Customers create business continuity plans and disaster recovery (DR) strategies to maximize resiliency for their applications, because downtime or data loss can result in losing revenue or halting operations. Ultimately, DR planning is all about enabling the business to continue running despite a Regional outage. This post explains how to make Apache Kafka resilient to issues that span more than a single Availability Zone using a multi-Region Apache Kafka architecture. We use Apache Kafka deployed as Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters in this example, but the same architecture also applies to self-managed Apache Kafka.

Amazon MSK is a fully managed service that makes it easy for you to build and run Apache Kafka to process streaming data. Amazon MSK provides high availability by offering Multi-AZ configurations to distribute brokers across multiple Availability Zones within an AWS Region. A single MSK cluster deployment provides message durability through intra-cluster data replication. Data replication with a replication factor of 3 and “min-ISR” value of 2 along with the producer setting acks=all provides the strongest availability guarantees, because it ensures that other brokers in the cluster acknowledge receiving the data before the leader broker responds to the producer. This design provides robust protection against single broker failure as well as Single-AZ failure. However, if an unlikely issue was impacting your applications or infrastructure across more than one Availability Zone, the architecture outlined in this post can help you prepare, respond, and recover from it.

For companies that can withstand a longer time to recover (Recovery Time Objective, RTO) but are sensitive to data loss on Amazon MSK (Recovery Point Objective, RPO), backing up data to Amazon Simple Storage Service (Amazon S3) and recovering the data from Amazon S3 is sufficient as a DR plan. However, most streaming use cases rely on the availability of the MSK cluster itself for your business continuity plan, and you may want a lower RTO as well. In these cases, setting up MSK clusters in multiple Regions and configuring them to replicate data from one cluster to another provides the required business resilience and continuity.

MirrorMaker

MirrorMaker is a utility bundled as part of Apache Kafka, which helps replicate the data between two Kafka clusters. MirrorMaker is essentially a Kafka high-level consumer and producer pair, efficiently moving data from the source cluster to the destination cluster. Use cases for MirrorMaker include aggregating data to a central cluster for analytics, isolating data based on use case, geo-proximity, migrating data from one Kafka cluster to another, and for highly resilient deployments.

In this post, we use MirrorMaker v2 (MM2), which is available as part of Apache Kafka version 2.4 onwards, because it enables us to sync topic properties and also sync offset mappings across clusters. This feature helps us migrate consumers from one cluster to another because the offsets are synced across clusters.

Solution overview

In this post, we dive into the details of how to configure Amazon MSK with cross-Region replication for the DR process. The following diagram illustrates our architecture.

We create two MSK clusters across the primary and secondary Regions (mapping to your chosen Regions), with the primary being active and secondary being passive. We can also extend this solution to an active-active setup. Our Kafka clients interact with the primary Region’s MSK cluster. The Kafka Connect cluster is deployed in the secondary Region’s MSK cluster and hosts the MirrorMaker connectors responsible for replication.

We go through the following steps to show the end-to-end process of setting up the deployment, failing over the clients if a Regional outage occurs, and failing back after the outage:

  1. Set up an MSK cluster in the primary Region.
  2. Set up an MSK cluster in the secondary Region.
  3. Set up connectivity between the two MSK clusters.
  4. Deploy Kafka Connect as containers using AWS Fargate.
  5. Deploy MirrorMaker connectors on the Kafka Connect cluster.
  6. Confirm data is replicated from one Region to another.
  7. Fail over clients to the secondary Region.
  8. Fail back clients to the primary Region.

Step 1: Set up an MSK cluster in the primary Region

To set up an MSK cluster in your primary Region, complete the following steps:

  1. Create an Amazon Virtual Private Cloud (Amazon VPC) in the Region where you want to have your primary MSK cluster.
  2. Create three (or at least two) subnets in the VPC.
  3. Create an MSK cluster using the AWS Command Line Interface (AWS CLI) or the AWS Management Console.

For this post, we use the console. For instructions, see Creating an Amazon MSK Cluster.

  1. Choose the Kafka version as 2.7 or higher.
  2. Pick the broker instance type based on your use case and configuration needs.
  3. Choose the VPC and subnets created to make sure the brokers in your MSK clusters are spread across multiple Availability Zones.
  4. For Encrypt Data in transit, choose TLS encryption between brokers and between client and brokers.
  5. For Authentication, you can choose IAM access control, TLS-based authentication, or username/password authentication.

We use SASL/SCRAM (Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism) authentication to authenticate Apache Kafka clients using usernames and passwords for clusters secured by AWS Secrets Manager. AWS has since launched IAM Access Control which could be used as authentication for this solution, For more information about IAM Access Control, see Securing Apache Kafka is easy and familiar with IAM Access Control for Amazon MSK.

  1. Create the secret in Secrets Manager and associate it to the MSK cluster. For instructions, see Username and password authentication with AWS Secrets Manager.

Make sure the secrets are encrypted with a customer managed key via AWS Key Management Service (AWS KMS).

Step 2: Set up an MSK cluster in the secondary Region

To set up an MSK cluster in our secondary Region, complete the following steps:

  1. Create an MSK cluster in another Region with similar configuration to the first.
  2. Make sure the number of brokers and instance type match what was configured in the primary.

This makes sure the secondary cluster has the same capacity and performance metrics as the primary cluster.

  1. For Encrypt Data in transit, choose TLS encryption between brokers and between client and brokers.
  2. For Authentication, choose the same authentication mechanism as with the cluster in the primary Region.
  3. Create a secret in Secrets Manager and secure with a customer managed KMS key in the Region of the MSK cluster.

Step 3: Set up connectivity between the two MSK clusters

For data to replicate between the two MSK clusters, you need to allow the clusters in different VPCs to communicate with each other, where VPCs are within the same or a different AWS account, or the same or different Region. You have the following options for resources in either VPC to communicate with each other as if they’re within the same network:

For more information about access options, see Accessing an Amazon MSK Cluster.

VPC peering is more suited for environments that have a high degree of trust between the parties that are peering their VPCs. This is because, after a VPC peering connection is established, the resources in either VPC can initiate a connection. You’re responsible for implementing fine-grained network access controls with security groups to make sure that only specific resources intended to be reachable are accessible between the peered VPCs. For our data replication use case, we assume that the two VPCs are trusted and therefore we can use VPC peering connectivity to replicate data between the primary and secondary MSK clusters. For instructions on setting up VPC peering connections between two VPCs across two Regions, see Creating and accepting a VPC peering connection.

When you set up VPC peering, enable DNS resolution support. This allows you to resolve public IPv4 DNS hostnames to private IPv4 addresses when queried from instances in the peer VPC. To enable DNS resolution on VPC peering, you must have the two peering VPCs enabled for DNS hostnames and DNS resolution. This step is important for you to be able to access the MSK cluster using DNS names across the VPCs.

Step 4: Deploy Kafka Connect as containers using AWS Fargate

Kafka Connect is a scalable and reliable framework to stream data between a Kafka cluster and external systems. Connectors in Kafka Connect define where data should be copied to and from. Each connector instance coordinates a set of tasks that copy the data. Connectors and tasks are logical units of work and must be scheduled to run in a process. Kafka Connect calls these processes workers and has two types of workers: standalone and distributed.

Deploying Kafka Connect in a distributed mode provides scalability and automatic fault tolerance for the tasks that are deployed in the worker. In distributed mode, you start many worker processes using the same group ID, and they automatically coordinate to schedule running connectors and tasks across all available workers. If you add a worker, shut down a worker, or a worker fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers.

Kafka Connect in distributed mode lends itself to be deployed as containers (workers) and scales based on the number of tasks and connectors that are being deployed on Kafka Connect.

Fargate is a serverless compute engine for containers that works with both Amazon Elastic Container Service (Amazon ECS) and Amazon Elastic Kubernetes Service (Amazon EKS). Fargate makes it easy for you to focus on building your applications. Fargate removes the need to provision and manage servers, lets you specify and pay for resources per application, and improves security through application isolation by design.

For replicating data using MirrorMaker, the pattern of remote-consume and local-produce is recommended, so in the simplest source-destination replication pair, you want to deploy your MirrorMaker connectors on Kafka Connect in your destination MSK cluster. This avoids loss of data because data is replicated across Regions. In this step, we build and run a distributed Kafka Connect in a Fargate cluster.

The Docker container for Kafka Connect is available on GitHub. For more details on the Docker container and its content, refer to the README.md file.

  1. Clone the code from GitHub and build the code.
  2. Push the image into a repository in Amazon Elastic Container Registry (Amazon ECR).
  3. Create a Fargate cluster in your secondary Region, in the same VPC as your MSK cluster.
  4. Deploy the Fargate cluster.
  5. Deploy the Kafka Connect containers.

The task definition JSON to deploy Kafka Connect containers is available on GitHub. The JSON file refers to a Docker container that was pushed into Amazon ECR earlier.

  1. Replace the IMAGE_URL string in the JSON file with the actual image from Amazon ECR.
  2. Replace the IAM_ROLE string with the ARN of your AWS Identity and Access Management (IAM) role.

The IAM role for the Amazon ECS task should have permission to interact with MSK clusters, read secrets from Secret Manager, decrypt the KMS key used to encrypt the secret, read images from Amazon ECR, and write logs to Amazon CloudWatch.

  1. Make sure to update in the following environment variables with the appropriate values in the task definition:
    1. BROKERS – The bootstrap servers connection string of the MSK cluster in the secondary Region.
    2. USERNAME – The username that was created as a secret in Secrets Manager and associated with the MSK cluster in the secondary Region.
    3. PASSWORD – The password that was created as a secret in Secrets Manager and associated with the MSK cluster in the secondary Region.
    4. GROUP – The Kafka Connect group ID to register all containers to the same group.
  2. Create a service based on the task definition and deploy at least two tasks on the Fargate cluster.
  3. Wait until the tasks are provisioned and in running status.
  4. Log in from a bastion host or any Amazon Elastic Compute Cloud (Amazon EC2) instance in the VPC that you can log in to (using SSH or AWS Systems Manager Session Manager).

You use this EC2 instance for your administration of the Kafka Connect cluster. Because you use this host to interact with MSK clusters, you have to download Kafka binary (greater than 2.7 version).

Each running Fargate task gets its own elastic network interface (ENI) and IPV4 address, which you can use to connect to the application running on the task. You can view the ENI attachment information for tasks on the Amazon ECS console or with the DescribeTasks API operation.

  1. Connect to one of the Amazon ECS task IPs and check if the Kafka Connect cluster is up and running (Kafka Connect runs on port 8083):
    curl <ip-address>:8083 | jq .
    
    {
      "version": "2.7.0",
      "commit": "448719dc99a19793",
      "kafka_cluster_id": "J1xVaRK9QW-1eJq3jJvbsQ"
    }

Step 5: Deploy MirrorMaker connectors on the Kafka Connect cluster

MirrorMaker 2 is based on the Kafka Connect framework and runs based on Kafka source connectors. In the Kafka Connect configuration, a source connector reads data from any data repository and writes data into a Kafka cluster, and a sink connector reads data from a Kafka cluster and writes to any data repository.

MM2 creates remote topics, which are replicated topics that refer back to the source cluster topics using an alias. This is handled by a class called the replication policy class; a default class is provided by Apache Kafka.

For example, the following diagram shows TopicA in a source cluster with alias Primary, which gets replicated to a destination cluster with the topic name Primary.TopicA.

In a failover scenario, when you move your Kafka clients from one cluster to another, you have to modify your clients to pick from a different topic as it fails over, or you have to configure them to pick from both these topics to handle the failover scenario. For example, a consumer reading from TopicA in the primary cluster upon failover has to be modified to start reading from Primary.TopicA. Alternatively, the consumers can always be configured to read from both topics.

If you want to have the same topic name across your clusters after replication, because you want to minimize changes to your clients, you can use a custom replication policy that overrides MM2’s default behavior of creating remote topics. You can find sample code on GitHub.

For an active-active setup, you have to use Kafka’s default replication policy for creating remote topics with a prefix. Having the same topic names across clusters using a custom replication policy causes an infinite loop of replication.

In this post, we use a custom replication policy with active-passive setup, in which your Kafka clients fail over in a Regional outage scenario and fail back when the outage is over.

To run a successful MirrorMaker 2 deployment, you need several connectors:

  • MirrorSourceConnector – Responsible for replicating data from topics as well as metadata about topics and partitions. This connector reads from a cluster and writes it to the cluster on which Kafka Connect is deployed.
  • HeartBeatConnector – Emits a heartbeat that gets replicated to demonstrate connectivity across clusters. We can use the internal topic heartbeats to verify that the connector is running and the cluster where the connector is running is available.
  • CheckpointConnector – Responsible for emitting checkpoints in the secondary cluster containing offsets for each consumer group in the primary cluster. To do that, it creates an internal topic called <primary-alias>.checkpoints.internal in the secondary cluster. In addition, this connector also creates the topic mm2-offset-syncs.<primary-alias>.internal in the secondary cluster, where it stores consumer offsets that are translated into the ones that make sense in another cluster. This is required as the clients fail over from the primary cluster to secondary to be able to read the messages from the secondary cluster at the correct offset. Prior to Apache Kafka 2.7, MM2 didn’t have a mechanism to sync the offsets for individual consumer groups with the __consumer_offsets internal topic in the secondary cluster. Syncing with __consumer_offsets can allow consumers to simply fail over and continue to process messages from the last offset retrieved from __consumer_offsets in the secondary cluster. Consequently, this had to be done outside of MM2 with an asynchronous process utilizing custom code. The following sample project contains code to do this translation. However, in Apache 2.7, a new feature was released that takes care of synchronizing the translated offsets directly to the _consumer_offsets topic in the cluster, so that when you switch over, you can start from last known offset. To enable this feature, you need to include the property group.offsets.enabled = true in the connector configuration.

Sample connector configurations for each of these connectors are available on GitHub. The configurations contain SASL/SCRAM-related information to connect to the cluster. Make sure the number of tasks match the number of partitions in your Kafka topics. This enables parallel processing to read multiple partitions in parallel. The configuration also uses CustomMM2ReplicationPolicy to make sure the topics are replicated with the same name across clusters. You can remove this line as long as you update the Kafka client to read from topic names with a prefix when using the MSK cluster in the secondary Region.

To deploy these connectors on the Kafka Connect cluster, log back in to the bastion host machine that acts as your administrative console. Make sure the bastion host has an IAM role that has access to the KMS key encrypting your Secrets Manager secrets corresponding to your MSK cluster. Find the IP address of one of the containers running your Kafka Connect cluster.

For instructions on reviewing your connector configuration and deploying it on Kafka Connect, see Configure and start MirrorMaker 2 connectors.

Check the topics that are created in your primary and secondary cluster. The primary MSK cluster should have a new mm2-offset-syncs.sec.internal topic and the secondary MSK cluster should have the heartbeats and pri.checkpoints.internal topics.

Step 6: Confirm the data is replicated from one Region to another

With the connectors up and running on the Kafka Connect cluster, you should now create a topic in your primary cluster and see it replicate to the secondary cluster (with a prefix, if you used the default replication policy).

After the topic replication is configured, you can start producing data into the new topic. You can use the following sample producer code for testing. You can also use a Kafka console producer or your own producer. Make sure the producer can support SASL/SCRAM based connectivity.

If you use the sample producer code, make sure to create a producer.properties file and provide the bootstrap server information of your primary cluster to the BOOTSTRAP_SERVERS_CONFIG property. Then start your producer with the following code:

java -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar -t <topic-name> -pfp <properties_file_path> -nt 8 -rf 300 -sse -ssu <user name> -gsr -grn <glue schema registry name >  -gar > /tmp/producer.log 2>&1 &

The GitHub project has more details on the command line parameters and how to change the rate at which messages are produced.

Confirm the messages produced in the topic in the primary cluster are all flowing to the topic in the destination cluster by checking the message count. For this post, we use kafkacat, which supports SASL/SCRAM to count the messages:

docker run -it --network=host edenhill/kafkacat:1.6.0 -b <bootstrap-servers> -X security.protocol=SASL_SSL -X sasl.mechanism=SCRAM-SHA-512 -X sasl.username=<username>  -X sasl.password=<pwd> -t <topicname> -C -e -q| wc -l

In production environments, if the message counts are large, use your traditional monitoring tool to check the message count, because tools like kafkacat take a long time to consume messages and report on a message count.

Now that you have confirmed the producer is actively producing messages to the topic and that the data is replicated, we can spin up a consumer to consume the messages from the topic. We spin up the consumer in the primary Region, because in an active-passive setup, all activities happen in the primary Region until an outage occurs and you fail over.

You can use the following sample consumer code for testing. You can also use a Kafka console consumer or your own consumer. Make sure that the consumer code can support connectivity with SASL/SCRAM.

For the sample code, make sure to create a consumer.properties file that contains the Amazon MSK bootstrap broker information of the primary cluster for the BOOTSTRAP_SERVERS_CONFIG property. Run the following code to start the consumer:

java -jar KafkaClickstreamConsumer-1.0-SNAPSHOT.jar -t <topic> -pfp <properties file path> -nt 3 -rf 10800 -sse -ssu <username> -src <primary cluster alias> -gsr -grn <glue schema registry name> /tmp/consumer_dest.log 2>&1 &

As explained before, the consumer offsets of this consumer group get replicated to the secondary cluster and get synced up to the _consumer_offsets table. It takes a few minutes for the consumer group offset to sync from the secondary to the primary depending on the value of sync.group.offsets.interval.seconds in the checkpoint connector configuration. See the following code:

./bin/kafka-consumer-groups.sh --bootstrap-server <bootstrap-url>  --command-config /opt/ssl-user-config.properties --describe --group mm2TestConsumer1

Make sure ssl-user-config.properties contains the connectivity information:

sasl.mechanism=SCRAM-SHA-512
# Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
security.protocol=SASL_SSL
sasl.jaas.config=<jaas -config>

The consumer group offsets are now synced up from the primary to the secondary cluster. This helps us fail over clients to the secondary cluster because the consumer started in the primary cluster can start consuming from the secondary cluster and read from where it left off after failover.

Step 7: Fail over clients to the secondary Region

In a DR scenario, if you need to fail clients from your cluster in the primary Region to a secondary Region, follow the steps in this section.

You want to start with shutting off your consumer in the primary Region. Start the consumer in the secondary Region by updating the bootstrap server’s information pointing to the secondary MSK cluster. Because the topic name is the same across both Regions, you don’t need to change the consumer client code. This consumer starts consuming the messages from the topic even if new messages are still being produced by producers in the primary Region.

Now you can stop the producer in the primary Region (if not already stopped due to Regional failure) and start it on the secondary Region by updating the bootstrap server’s information. The consumer in the secondary Region keeps consuming messages on the topic, including the ones now being produced in the secondary Region. After the consumer and producer are failed over, you can delete the MM2 connectors on the Kafka Connect cluster using the HTTP endpoints of the connectors. See the following code:

curl -X DELETE http://<ip-address>:8083/connectors/mm2-msc

This stops all replication activities from the primary cluster to the secondary cluster. Now the MSK cluster in the primary Region is available for upgrade or any other activities.

If a Regional outage is impacting Amazon MSK or Apache Kafka on Amazon EC2, it’s highly probable that the clients, producers, and consumers running on Amazon EC2, Amazon ECS, Amazon EKS, and AWS Lambda are also impacted. In this case, you have to stop the MM2 connectors in the DR cluster because the source cluster isn’t available to replicate from. To recover clients, you can start the consumers on the secondary cluster. The consumers read messages from the topic from where it left off. Start the producers on the secondary cluster and push new messages to the topic in the secondary cluster.

Now that you have successfully failed over from the MSK cluster in the primary Region to the secondary Region, we can see how to fail back after your MSK cluster in the primary Region is ready to be operational.

Step 8: Fail back clients to the primary Region

Check the topics in your MSK cluster in the primary Region. Depending on the activity on the secondary (now primary) cluster during the DR period, you might want to start with fresh data from your secondary cluster. Follow these steps to get your primary cluster synced up with all data required:

  1. Delete all topics (if any) except the _consumer_offsets topic in the primary cluster.
  2. Create a Kafka Connect cluster deploying Fargate containers (as we walked through earlier), with brokers pointing to the MSK cluster in the primary Region.

MirrorSourceConnector can write only to the cluster where Kafka Connect is deployed. Because we want to replicate from the secondary to the primary, we need a Kafka Connect cluster associated to the primary Region.

  1. Deploy the MirrorMaker connectors (similar to what we did earlier) using the configuration samples This time, make sure the source and target bootstrap broker information is flipped on all configuration files.

These connectors are responsible for replicating data from the secondary back to the primary. Make sure to list the topic names containing your data in topics of the MirrorSourceConnector file. You don’t want to replicate topics created by Kafka Connect and MirrorMaker in the secondary Region, because that creates confusion.

This process starts replication activities from your MSK cluster in the secondary Region to the MSK cluster in the primary Region. This happens in parallel because the producers and consumers are actively writing and reading in the MSK cluster in the secondary Region.

  1. Wait until all the data topics and their messages are replicated from the MSK cluster in the secondary Region to the MSK cluster in the primary Region.

The time it takes depends on the number of messages in the topic.

  1. Check the number of messages in the topics on the MSK cluster in the primary Region.

When the number of messages is close to the number of messages in the MSK cluster in the secondary Region, it’s time to fail back your Kafka clients.

  1. Stop the consumers from the secondary Region one by one and move them to point to the primary cluster.

When the consumer is up and running, it should be able to continue to read the messages produced by producers pointing to the MSK cluster in the secondary Region. When all the consumers are healthy in the secondary, it’s time to fail back the producers as well.

  1. Stop the producers in the secondary Region’s MSK cluster and start them by pointing to the primary Region’s MSK cluster.

To enable the MirrorMaker replication back from the primary to secondary, you have to stop the MirrorMaker connectors replicating from the secondary to primary. Because we’re using CustomReplicationPolicy, which tries to use the same topic names, it’s important to have replication of data flowing only one direction, otherwise it creates a recursive loop. You have to repeat similar cleanup steps to get the replication flowing back from the primary to secondary.

Using the default replication policy in MirrorMaker 2

When you use MirrorMaker 2’s default replication policy, it creates topics with a prefix, as explained earlier. This enables you to run dual-way replication because MirrorMaker 2 ignores the topics with a prefix when replicating. This is convenient because you don’t have to delete the MirrorMaker connect configuration moving from one side to another, which makes failover and failback easier.

Make sure to update your clients to read from topics with and without prefixes, because it can read from either of the cluster as part of failover and failback. In addition, if you have a use case to enable active-active setup, it’s imperative that you choose MirrorMaker 2’s default replication policy.

Conclusion

In this post, I reviewed how to set up a highly resilient deployment across Regions for an MSK cluster using MirrorMaker 2 deployed on a distributed Kafka Connect cluster in Fargate. You can use this solution to build a data redundancy capability to meet regulatory compliance, business continuity, and DR requirements. With MirrorMaker 2, you can also set up an active-active MSK cluster, enabling clients to consume from an MSK cluster that has geographical proximity.


About the Author

Anusha Dharmalingam is a Solutions Architect at Amazon Web Services, with a passion for Application Development and Big Data solutions. Anusha works with enterprise customers to help them architect, build, and scale applications to achieve their business goals.