AWS Compute Blog

Offset lag metric for Amazon MSK as an event source for Lambda

This post written by Adam Wagner, Principal Serverless Solutions Architect.

Last year, AWS announced support for Amazon Managed Streaming for Apache Kafka (MSK) and self-managed Apache Kafka clusters as event sources for AWS Lambda. Today, AWS adds a new OffsetLag metric to Lambda functions with MSK or self-managed Apache Kafka event sources.

Offset in Apache Kafka is an integer that marks the current position of a consumer. OffsetLag is the difference in offset between the last record written to the Kafka topic and the last record processed by Lambda. Kafka expresses this in the number of records, not a measure of time. This metric provides visibility into whether your Lambda function is keeping up with the records added to the topic it is processing.

This blog walks through using the OffsetLag metric along with other Lambda and MSK metrics to understand your streaming application and optimize your Lambda function.

Overview

In this example application, a producer writes messages to a topic on the MSK cluster that is an event source for a Lambda function. Each message contains a number and the Lambda function finds the factors of that number. It outputs the input number and results to an Amazon DynamoDB table.

Finding all the factors of a number is fast if the number is small but takes longer for larger numbers. This difference means the size of the number written to the MSK topic influences the Lambda function duration.

Example application architecture

Example application architecture

  1. A Kafka client writes messages to a topic in the MSK cluster.
  2. The Lambda event source polls the MSK topic on your behalf for new messages and triggers your Lambda function with batches of messages.
  3. The Lambda function factors the number in each message and then writes the results to DynamoDB.

In this application, several factors can contribute to offset lag. The first is the volume and size of messages. If more messages are coming in, the Lambda may take longer to process them. Other factors are the number of partitions in the topic, and the number of concurrent Lambda functions processing messages. A full explanation of how Lambda concurrency scales with the MSK event source is in the documentation.

If the average duration of your Lambda function increases, this also tends to increase the offset lag. This lag could be latency in a downstream service or due to the complexity of the incoming messages. Lastly, if your Lambda function errors, the MSK event source retries the identical records set until they succeed. This retry functionality also increases offset lag.

Measuring OffsetLag

To understand how the new OffsetLag metric works, you first need a working MSK topic as an event source for a Lambda function. Follow this blog post to set up an MSK instance.

To find the OffsetLag metric, go to the CloudWatch console, select All Metrics from the left-hand menu. Then select Lambda, followed by By Function Name to see a list of metrics by Lambda function. Scroll or use the search bar to find the metrics for this function and select OffsetLag.

OffsetLag metric example

OffsetLag metric example

To make it easier to look at multiple metrics at once, create a CloudWatch dashboard starting with the OffsetLag metric. Select Actions -> Add to Dashboard. Select the Create new button, provide the dashboard a name. Choose Create, keeping the rest of the options at the defaults.

Adding OffsetLag to dashboard

Adding OffsetLag to dashboard

After choosing Add to dashboard, the new dashboard appears. Choose the Add widget button to add the Lambda duration metric from the same function. Add another widget that combines both Lambda errors and invocations for the function. Finally, add a widget for the BytesInPerSec metric for the MSK topic. Find this metric under AWS/Kafka -> Broker ID, Cluster Name, Topic. Finally, click Save dashboard.

After a few minutes, you see a steady stream of invocations, as you would expect when consuming from a busy topic.

Data incoming to dashboard

Data incoming to dashboard

This example is a CloudWatch dashboard showing the Lambda OffsetLag, Duration, Errors, and Invocations, along with the BytesInPerSec for the MSK topic.

In this example, the OffSetLag metric is averaging about eight, indicating that the Lambda function is eight records behind the latest record in the topic. While this is acceptable, there is room for improvement.

The first thing to look for is Lambda function errors, which can drive up offset lag. The metrics show that there are no errors so the next step is to evaluate and optimize the code.

The Lambda handler function loops through the records and calls the process_msg function on each record:

def lambda_handler(event, context):
    for batch in event['records'].keys():
        for record in event['records'][batch]:
            try:
                process_msg(record)
            except:
                print("error processing record:", record)
    return()

The process_msg function handles base64 decoding, calls a factor function to factor the number, and writes the record to a DynamoDB table:

def process_msg(record):
    #messages are base64 encoded, so we decode it here
    msg_value = base64.b64decode(record['value']).decode()
    msg_dict = json.loads(msg_value)
    #using the number as the hash key in the dynamodb table
    msg_id = f"{msg_dict['number']}"
    if msg_dict['number'] <= MAX_NUMBER:
        factors = factor_number(msg_dict['number'])
        print(f"number: {msg_dict['number']} has factors: {factors}")
        item = {'msg_id': msg_id, 'msg':msg_value, 'factors':factors}
        resp = ddb_table.put_item(Item=item)
    else:
        print(f"ERROR: {msg_dict['number']} is >= limit of {MAX_NUMBER}")

The heavy computation takes place in the factor function:

def factor(number):
    factors = [1,number]
    for x in range(2, (int(1 + number / 2))):
        if (number % x) == 0:
            factors.append(x)
    return factors

The code loops through all numbers up to the factored number divided by two. The code is optimized by only looping up to the square root of the number.

def factor(number):
    factors = [1,number]
    for x in range(2, 1 + int(number**0.5)):
        if (number % x) == 0:
            factors.append(x)
            factors.append(number // x)
    return factors

There are further optimizations and libraries for factoring numbers but this provides a noticeable performance improvement in this example.

Data after optimization

Data after optimization

After deploying the code, refresh the metrics after a while to see the improvements:

The average Lambda duration has dropped to single-digit milliseconds and the OffsetLag is now averaging two.

If you see a noticeable change in the OffsetLag metric, there are several things to investigate. The input side of the system, increased messages per second, or a significant increase in the size of the message are a few options.

Conclusion

This post walks through implementing the OffsetLag metric to understand latency between the latest messages in the MSK topic and the records a Lambda function is processing. It also reviews other metrics that help understand the underlying cause of increases to the offset lag. For more information on this topic, refer to the documentation and other MSK Lambda metrics.

For more serverless learning resources, visit Serverless Land.