AWS Compute Blog

Using custom consumer group ID support for AWS Lambda event sources for MSK and self-managed Kafka

This post is written by Adam Wagner, Principal Serverless Specialist SA.

AWS Lambda already supports Amazon Managed Streaming for Apache Kafka (MSK) and self-managed Apache Kafka clusters as event sources. Today, AWS adds support for specifying a custom consumer group ID for the Lambda event source mappings (ESMs) for MSK and self-managed Kafka event sources.

With this feature, you can create a Lambda ESM that uses a consumer group that has already been created. This enables you to use Lambda as a Kafka consumer for topics that are replicated with MirrorMaker v2 or with consumer groups you create to start consuming at a particular offset or timestamp.

Overview

This blog post shows how to use this feature to enable Lambda to consume a Kafka topic starting at a specific timestamp. This can be useful if you must reprocess some data but don’t want to reprocess all of the data in the topic.

In this example application, a client application writes to a topic on the MSK cluster. It creates a consumer group that points to a specific timestamp within that topic as the starting point for consuming messages. A Lambda ESM is created using that existing consumer group that triggers a Lambda function. This processes and writes the messages to an Amazon DynamoDB table.

Reference architecture

  1. A Kafka client writes messages to a topic in the MSK cluster.
  2. A Kafka consumer group is created with a starting point of a specific timestamp
  3. The Lambda ESM polls the MSK topic using the existing consumer group and triggers the Lambda function with batches of messages.
  4. The Lambda function writes the messages to DynamoDB

Step-by-step instructions

To get started, create an MSK cluster and a client Amazon EC2 instance from which to create topics and publish messages. If you don’t already have an MSK cluster, follow this blog on setting up an MSK cluster and using it as an event source for Lambda.

  1. On the client instance, set an environment variable to the MSK cluster bootstrap servers to make it easier to reference them in future commands:
    export MSKBOOTSTRAP='b-1.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094,b-2.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094,b-3.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094'
  2. Create the topic. This example has a three-node MSK cluster so the replication factor is also set to three. The partition count is set to three in this example. In your applications, set this according to throughput and parallelization needs.
    ./bin/kafka-topics.sh --create --bootstrap-server $MSKBOOTSTRAP --replication-factor 3 --partitions 3 --topic demoTopic01
  3. Write messages to the topic using this Python script:
    #!/usr/bin/env python3
    import json
    import time
    from random import randint
    from uuid import uuid4
    from kafka import KafkaProducer
    
    BROKERS = ['b-1.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094', 
            'b-2.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094',
            'b-3.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094']
    TOPIC = 'demoTopic01'
    
    producer = KafkaProducer(bootstrap_servers=BROKERS, security_protocol='SSL',
            value_serializer=lambda x: json.dumps(x).encode('utf-8'))
    
    def create_record(sequence_num):
        number = randint(1000000,10000000)
        record = {"id": sequence_num, "record_timestamp": int(time.time()), "random_number": number, "producer_id": str(uuid4()) }
        print(record)
        return record
    
    def publish_rec(seq):
        data = create_record(seq)
        producer.send(TOPIC, value=data).add_callback(on_send_success).add_errback(on_send_error)
        producer.flush()
    
    def on_send_success(record_metadata):
        print(record_metadata.topic, record_metadata.partition, record_metadata.offset)
    
    def on_send_error(excp):
        print('error writing to kafka', exc_info=excp)
    
    for num in range(1,10000000):
        publish_rec(num)
        time.sleep(0.5) 
    
  4. Copy the script into a file on the client instance named producer.py. The script uses the kafka-python library, so first create a virtual environment and install the library.
    python3 -m venv venv
    source venv/bin/activate
    pip3 install kafka-python
    
  5. Start the script. Leave it running for a few minutes to accumulate some messages in the topic.
    Output
  6. Previously, a Lambda function would choose between consuming messages starting at the beginning of the topic or starting with the latest messages. In this example, it starts consuming messages from a few hours ago at 14:30 UTC. To do this, first create a new consumer group on the client instance:
    ./bin/kafka-consumer-groups.sh --command-config client.properties --bootstrap-server $MSKBOOTSTRAP --topic demoTopic01 --group specificTimeCG --to-datetime 2022-08-10T16:00:00.000 --reset-offsets --execute
  7. In this case, specificTimeCG is the consumer group ID used when creating the Lambda ESM. Listing the consumer groups on the cluster shows the new group:
    ./bin/kafka-consumer-groups.sh --list --command-config client.properties --bootstrap-server $MSKBOOTSTRAP

    Output

  8. With the consumer group created, create the Lambda function along with the Event Source Mapping that uses this new consumer group. In this case, the Lambda function and DynamoDB table are already created. Create the ESM with the following AWS CLI Command:
    aws lambda create-event-source-mapping --region us-east-1 --event-source-arn arn:aws:kafka:us-east-1:0123456789:cluster/demo-us-east-1/78a8d1c1-fa31-4f59-9de3-aacdd77b79bb-23 --function-name msk-consumer-demo-ProcessMSKfunction-IrUhEoDY6X9N --batch-size 3 --amazon-managed-kafka-event-source-config '{"ConsumerGroupId":"specificTimeCG"}' --topics demoTopic01

    The event source in the Lambda console or CLI shows the starting position set to TRIM_HORIZON. However, if you specify a custom consumer group ID that already has existing offsets, those offsets take precedent.

  9. With the event source created, navigate to the DynamoDB console. Locate the DynamoDB table to see the records written by the Lambda function.
    DynamoDB table

Converting the record timestamp of the earliest record in DynamoDB, 1660147212, to a human-readable date shows that the first record was created on 2022-08-10T16:00:12.

In this example, the consumer group is created before the Lambda ESM so that you can specify the timestamp to start from.

If you create an ESM and specify a custom consumer group ID that does not exist, it is created. This is a convenient way to create a new consumer group for an ESM with an ID of your choosing.

Deleting an ESM does not delete the consumer group, regardless of whether it is created before, or during, the ESM creation.

Using the AWS Serverless Application Model (AWS SAM)

To create the event source mapping with a custom consumer group using an AWS Serverless Application Model (AWS SAM) template, use the following snippet:

Events:
  MyMskEvent:
    Type: MSK
    Properties:
      Stream: !Sub arn:aws:kafka:${AWS::Region}:012345678901:cluster/ demo-us-east-1/78a8d1c1-fa31-4f59-9de3-aacdd77b79bb-23
      Topics:
        - "demoTopic01"
      ConsumerGroupId: specificTimeCG

Other types of Kafka clusters

This example uses the custom consumer group ID feature when consuming a Kafka topic from an MSK cluster. In addition to MSK clusters, this feature also supports self-managed Kafka clusters. These could be clusters running on EC2 instances or managed Kafka clusters from a partner such as Confluent.

Conclusion

This post shows how to use the new custom consumer group ID feature of the Lambda event source mapping for Amazon MSK and self-managed Kafka. This feature can be used to consume messages with Lambda starting at a specific timestamp or offset within a Kafka topic. It can also be used to consume messages from a consumer group that is replicated from another Kafka cluster using MirrorMaker v2.

For more serverless learning resources, visit Serverless Land.