AWS Big Data Blog

Back up and restore Kafka topic data using Amazon MSK Connect

You can use Apache Kafka to run your streaming workloads. Kafka provides resiliency to failures and protects your data out of the box by replicating data across the brokers of the cluster. This makes sure that the data in the cluster is durable. You can achieve your durability SLAs by changing the replication factor of the topic. However, streaming data stored in Kafka topics tends to be transient and typically has a retention time of days or weeks. You may want to back up the data stored in your Kafka topic long after its retention time expires for several reasons. For example, you might have compliance requirements that require you to store the data for several years. Or you may have curated synthetic data that needs to be repeatedly hydrated into Kafka topics before starting your workload’s integration tests. Or an upstream system that you don’t have control over produces bad data and you need to restore your topic to a previously well state.

Storing data indefinitely in Kafka topics is an option, but sometimes the use case calls for a separate copy. Tools such as MirrorMaker let you back up your data into another Kafka cluster. However, this requires another active Kafka cluster to be running as a backup, which increases compute costs and storage costs. A cost-effective and durable way of backing up the data of your Kafka cluster is to use an object storage service like Amazon Simple Storage Service (Amazon S3).

In this post, we walk through a solution that lets you back up your data for cold storage using Amazon MSK Connect. We restore the backed-up data to another Kafka topic and reset the consumer offsets based on your use case.

Overview of solution

Kafka Connect is a component of Apache Kafka that simplifies streaming data between Kafka topics and external systems like object stores, databases, and file systems. It uses sink connectors to stream data from Kafka topics to external systems, and source connectors to stream data from external systems to Kafka topics. You can use off-the-shelf connectors written by third parties or write your own connectors to meet your specific requirements.

MSK Connect is a feature of Amazon Managed Streaming for Apache Kafka (Amazon MSK) that lets you run fully managed Kafka Connect workloads. It works with MSK clusters and with compatible self-managed Kafka clusters. In this post, we use the Lenses AWS S3 Connector to back up the data stored in a topic in an Amazon MSK cluster to Amazon S3 and restore this data back to another topic. The following diagram shows our solution architecture.

To implement this solution, we complete the following steps:

  1. Back up the data using an MSK Connect sink connector to an S3 bucket.
  2. Restore the data using an MSK Connect source connector to a new Kafka topic.
  3. Reset consumer offsets based on different scenarios.

Prerequisites

Make sure to complete the following steps as prerequisites:

  1. Set up the required resources for Amazon MSK, Amazon S3, and AWS Identity and Access Management (IAM).
  2. Create two Kafka topics in the MSK cluster: source_topic and target_topic.
  3. Create an MSK Connect plugin using the Lenses AWS S3 Connector.
  4. Install the Kafka CLI by following Step 1 of Apache Kafka Quickstart.
  5. Install the kcat utility to send test messages to the Kafka topic.

Back up your topics

Depending on the use case, you may want to back up all the topics in your Kafka cluster or back up some specific topics. In this post, we cover how to back up a single topic, but you can extend the solution to back up multiple topics.

The format in which the data is stored in Amazon S3 is important. You may want to inspect the data that is stored in Amazon S3 to debug issues like the introduction of bad data. You can examine data stored as JSON or plain text by using text editors and looking in the time frames that are of interest to you. You can also examine large amounts of data stored in Amazon S3 as JSON or Parquet using AWS services like Amazon Athena. The Lenses AWS S3 Connector supports storing objects as JSON, Avro, Parquet, plaintext, or binary.

In this post, we send JSON data to the Kafka topic and store it in Amazon S3. Depending on the data type that meets your requirements, update the connect.s3.kcql statement and *.converter configuration. You can refer to the Lenses sink connector documentation for details of the formats supported and the related configurations. If the existing connectors don’t work for your use case, you can also write your own connector or extend existing connectors. You can partition the data stored in Amazon S3 based on fields of primitive types in the message header or payload. We use the date fields stored in the header to partition the data on Amazon S3.

Follow these steps to back up your topic:

  1. Create a new Amazon MSK sink connector by running the following command:
    aws kafkaconnect create-connector \
    --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}" \
    --connector-configuration \
    "connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector, \
    key.converter.schemas.enable=false, \
    connect.s3.kcql=INSERT INTO <<S3 Bucket Name>>:my_workload SELECT * FROM source_topic PARTITIONBY _header.year\,_header.month\,_header.day\,_header.hour STOREAS \`JSON\` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5, \
    aws.region=us-east-1, \
    tasks.max=2, \
    topics=source_topic, \
    schema.enable=false, \
    errors.log.enable=true, \
    value.converter=org.apache.kafka.connect.storage.StringConverter, \
    key.converter=org.apache.kafka.connect.storage.StringConverter " \
    --connector-name "backup-msk-to-s3-v1" \
    --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "<<MSK broker list>>","vpc": {"securityGroups": [ <<Security Group>> ],"subnets": [ <<Subnet List>> ]}}}' \
    --kafka-cluster-client-authentication "authenticationType=NONE" \
    --kafka-cluster-encryption-in-transit "encryptionType=PLAINTEXT" \
    --kafka-connect-version "2.7.1" \
    --plugins "customPlugin={customPluginArn=<< ARN of the MSK Connect Plugin >>,revision=1}" \
    --service-execution-role-arn " <<ARN of the IAM Role>> "
  2. Send data to the topic using kcat:
    ./kcat -b <<broker list>> -t source_topic -H "year=$(date +"%Y")" -H "month=$(date +"%m")" -H "day=$(date +"%d")" -H "hour=$(date +"%H")" -P
    {"message":"interesset eros vel elit salutatus"}
    {"message":"impetus deterruisset per aliquam luctus"}
    {"message":"ridens vocibus feugait vitae cras"}
    {"message":"interesset eros vel elit salutatus"}
    {"message":"impetus deterruisset per aliquam luctus"}
    {"message":"ridens vocibus feugait vitae cras"}
  3. Check the S3 bucket to make sure the data is being written.

MSK Connect publishes metrics to Amazon CloudWatch that you can use to monitor your backup process. Important metrics are SinkRecordReadRate and SinkRecordSendRate, which measure the average number of records read from Kafka and written to Amazon S3, respectively.

Also, make sure that the backup connector is keeping up with the rate at which the Kafka topic is receiving messages by monitoring the offset lag of the connector. If you’re using Amazon MSK, you can do this by turning on partition-level metrics on Amazon MSK and monitoring the OffsetLag metric of all the partitions for the backup connector’s consumer group. You should keep this as close to 0 as possible by adjusting the maximum number of MSK Connect worker instances. The command that we used in the previous step sets MSK Connect to automatically scale up to two workers. Adjust the --capacity setting to increase or decrease the maximum worker count of MSK Connect workers based on the OffsetLag metric.

Restore data to your topics

You can restore your backed-up data to a new topic with the same name in the same Kafka cluster, a different topic in the same Kafka cluster, or a different topic in a different Kafka cluster altogether. In this post, we walk through the scenario of restoring data that was backed up in Amazon S3 to a different topic, target_topic, in the same Kafka cluster. You can extend this to other scenarios by changing the topic and broker details in the connector configuration.

Follow these steps to restore the data:

  1. Create an Amazon MSK source connector by running the following command:
    aws kafkaconnect create-connector \
    --capacity "autoScaling={maxWorkerCount=2,mcuCount=1,minWorkerCount=1,scaleInPolicy={cpuUtilizationPercentage=10},scaleOutPolicy={cpuUtilizationPercentage=80}}"   \
    --connector-configuration \
        "connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector, \
         key.converter.schemas.enable=false, \
         connect.s3.kcql=INSERT INTO target_topic SELECT * FROM <<S3 Bucket Name>>:my_workload PARTITIONBY _header.year\,_header.month\,_header.day\,_header.hour STOREAS \`JSON\` WITHPARTITIONER=KeysAndValues WITH_FLUSH_COUNT = 5 , \
         aws.region=us-east-1, \
         tasks.max=2, \
         topics=target_topic, \
         schema.enable=false, \
         errors.log.enable=true, \
         value.converter=org.apache.kafka.connect.storage.StringConverter, \
         key.converter=org.apache.kafka.connect.storage.StringConverter " \
    --connector-name "restore-s3-to-msk-v1" \
    --kafka-cluster '{"apacheKafkaCluster": {"bootstrapServers": "<<MSK broker list>>","vpc": {"securityGroups": [<<Security Group>>],"subnets": [ <<Subnet List>> ]}}}' \
    --kafka-cluster-client-authentication "authenticationType=NONE" \
    --kafka-cluster-encryption-in-transit "encryptionType=PLAINTEXT" \
    --kafka-connect-version "2.7.1" \
    --plugins "customPlugin={customPluginArn=<< ARN of the MSK Connect Plugin >>,revision=1}" \
    --service-execution-role-arn " <<ARN of the IAM Role>> "

The connector reads the data from the S3 bucket and replays it back to target_topic.

  1. Verify if the data is being written to the Kafka topic by running the following command:
    ./kafka-console-consumer.sh --bootstrap-server <<MSK broker list>> --topic target_topic --from-beginning

MSK Connect connectors run indefinitely, waiting for new data to be written to the source. However, while restoring, you have to stop the connector after all the data is copied to the topic. MSK Connect publishes the SourceRecordPollRate and SourceRecordWriteRate metrics to CloudWatch, which measure the average number of records polled from Amazon S3 and number of records written to the Kafka cluster, respectively. You can monitor these metrics to track the status of the restore process. When these metrics reach 0, the data from Amazon S3 is restored to the target_topic. You can get notified of the completion by setting up a CloudWatch alarm on these metrics. You can extend the automation to invoke an AWS Lambda function that deletes the connector when the restore is complete.

As with the backup process, you can speed up the restore process by scaling out the number of MSK Connect workers. Change the --capacity parameter to adjust the maximum and minimum workers to a number that meets the restore SLAs of your workload.

Reset consumer offsets

Depending on the requirements of restoring the data to a new Kafka topic, you may also need to reset the offsets of the consumer group before consuming or producing to them. Identifying the actual offset that you want to reset to depends on your specific business use case and involves manual work to identify this. You can use tools like Amazon S3 Select, Athena, or other custom tools to inspect the objects. The following screenshot demonstrates reading the records ending at offset 14 of partition 2 of topic source_topic using S3 Select.

After you identify the new start offsets for your consumer groups, you have to reset them on your Kafka cluster. You can do this using the CLI tools that come bundled with Kafka.

Existing consumer groups

If you want to use the same consumer group name after restoring the topic, you can do this by running the following command for each partition of the restored topic:

 ./kafka-consumer-groups.sh --bootstrap-server <<broker list>> --group <<consumer group>> --topic target_topic:<<partition>> --to-offset <<desired offset>> --reset-offsets --execute

Verify this by running the --describe option of the command:

./kafka-consumer-groups.sh --bootstrap-server <<broker list>> --group <<consumer group>>  --describe
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        ...
source_topic  0          211006          188417765       188206759  ...
source_topic  1          212847          192997707       192784860  ...
source_topic  2          211147          196410627       196199480  ...
target_topic  0          211006          188417765       188206759  ...
target_topic  1          212847          192997707       192784860  ...
target_topic  2          211147          196410627       196199480  ...

New consumer group

If you want your workload to create a new consumer group and seek to custom offsets, you can do this by invoking the seek method in your Kafka consumer for each partition. Alternatively, you can create the new consumer group by running the following code:

./kafka-console-consumer.sh --bootstrap-server <<broker list>> --topic target_topic --group <<consumer group>> --from-beginning --max-messages 1

Reset the offset to the desired offsets for each partition by running the following command:

./kafka-consumer-groups.sh --bootstrap-server <<broker list>> --group <<New consumer group>> --topic target_topic:<<partition>> --to-offset <<desired offset>> --reset-offsets --execute

Clean up

To avoid incurring ongoing charges, complete the following cleanup steps:

  1. Delete the MSK Connect connectors and plugin.
  2. Delete the MSK cluster.
  3. Delete the S3 buckets.
  4. Delete any CloudWatch resources you created.

Conclusion

In this post, we showed you how to back up and restore Kafka topic data using MSK Connect. You can extend this solution to multiple topics and other data formats based on your workload. Be sure to test various scenarios that your workloads may face and document the runbook for each of those scenarios.

For more information, see the following resources:


About the Author

Rakshith Rao is a Senior Solutions Architect at AWS. He works with AWS’s strategic customers to build and operate their key workloads on AWS.