AWS Open Source Blog

Tracing End-to-End Performance of Messaging Applications Built on Apache Kafka Using AWS X-Ray

Developers are using microservices architecture to build large and complex distributed applications. Distributed applications provide flexibility to isolate issues to a specific microservice, and optimize that microservice independent of other microservices. While such distributed applications scale well, it can be challenging to identify performance issues in disparate parts of the application. AWS X-Ray helps developers analyze and debug production applications built using microservices, to identify performance issues and quantify customer impact.

Apache Kafka is an open source distributed platform predominantly used in building real-time streaming and messaging systems. It enables developers to build robust large-scale message processing and streaming applications.

The primary challenge in understanding the end-to-end performance of any messaging system is that requests to publish the records are disconnected from those that consume these records, so you must manually aggregate request latencies. X-Ray helps you automate this step, to show latency for requests that publish records with the corresponding latency for requests that consume these records.

This post walks you through how to enable end-to-end performance analysis for messaging applications built on Kafka. We’ll focus on two of Kafka’s four APIs: Producer API, which publishes streams of records to the Kafka Topic, and Consumer API, which processes streams of records published to the Topic. The steps to get started with X-Ray in Kafka (as well as some limitations) are described below.

How X-Ray Works

An X-Ray trace identifier (ID) tracks the path of a request through your application. The trace collects all the segments generated by a single request. The unique identifier for a specific segment is the combination of the segment’s identifier and the trace’s identifier. At a high level, these identifiers should be stored in each record published to your Kafka Topic. While consuming these records, set the segment identifier and trace identifier for the request to the Consumer. Using this workaround, you can connect the individual request to the Producer and the Consumer into a single trace.

Use our guide to get started with X-Ray.

Get AWS X-Ray Working with Apache Kafka

I have a sample chat application to guide you as well.

At a high level, you have to do three things:

  1. Update the data object that is streamed to have a variable to store the trace ID and segment ID.
  2. Create a class that implements the Kafka ProducerInterceptor interface.Override the onSend(ProducerRecord<K,V> record) method to store the trace ID and segment ID to the variable in the data object.
  3. Set the trace ID and segment ID to the current segment in the Kafka Consumer.

The following steps walk you through these tasks in detail.

Update the Kafka Producer Implementation Code

1. Add a String variable to the data object (message) that will be published to the Kafka Topic to store the trace ID and segment ID.

//add this variable to have traceId and segmentId in the message object
    private String traceInformation;

//getter
public String getTraceInformation(){
        return this.traceInformation;
    }
    
//setter
    private void setTraceInformation(String traceInfo){
        this.traceInformation = traceInfo;
    }

2. Create a class implementing the Kafka ProducerInterceptor interface.

3. Override the onSend(ProducerRecord&lt;K,V&gt; record)method to store the segment ID and trace ID to the variable in the data object.

@Override
    public ProducerRecord onSend(ProducerRecord record) {
        //intercept the message
        AWSXRayRecorder xrayRecorder = AWSXRayRecorderBuilder.defaultRecorder();
        Subsegment kafkaProducerInterceptorSS = xrayRecorder.beginSubsegment("KafkaProducerInterceptorSS");
        
        //get the parentId and traceId
        String parentId = xrayRecorder.getCurrentSegment().getId();
        TraceID traceId = xrayRecorder.getCurrentSegment().getTraceId();
        
        //format the trace information prior to saving
        String traceInformation = traceId.toString()+"::"+parentId;
        
        //create a new message object with the trace information
        Message interceptedMessage = (Message)record.value();
        
        //now add the message with trace information
        String changedMessage = "intercepted-and-changed-"+interceptedMessage.getMessageText();
        Message messageWithTraceInformation = new Message(interceptedMessage.getToUserName(),changedMessage,traceInformation);
        
        //send the changed intercepted message
        ProducerRecord newRecord = new ProducerRecord<>(KafkaTopic, record.key(), messageWithTraceInformation);
        kafkaProducerInterceptorSS.end();
        
        return newRecord;
    }

4. This data can be published to the Kafka Topic from the Producer to be retrieved from the Consumer.

Update the Kafka Consumer Implementation Code

Because the same request can’t both publish the data to the Kafka Topic and retrieve the data from it, the Kafka Consumer will be a separate segment in the X-Ray trace.

1. Create a new X-Ray segment on your Kafka Consumer:


AWSXRayRecorder xrayRecorder = AWSXRay.getGlobalRecorder();
Segment kafkaConsumerSS = xrayRecorder.beginSegment("KafkaConsumerSegment");

2. Follow these steps only if the Kafka Consumer is running on a separate thread on the consumer side:

a. Set the trace entity variable on the Kafka Consumer:

KafkaConsumerChat chat = new KafkaConsumerChat(xrayRecorder.getTraceEntity());

b. Set the trace entity to the X-Ray global recorder in the Kafka Consumer:

this.recorder.setTraceEntity(this.traceEn);

3. Get the trace ID and segment ID that were set in the Message object and set them to the current segment created on the Kafka Consumer. Notice that the trace ID and segment ID were set in a single String variable with a delimiter. You can instead have two String variables, with one representing the trace ID while the other represents the segment ID.

//now get the trace information to set it to the current trace segment
String traceIdValue;
String parentValue;
                    
//get the traceId and parentId that were in the delimited string
try (Scanner traceidandparentscanner = new Scanner(receivedMessage.getTraceInformation()).useDelimiter("::")) {
	traceIdValue = traceidandparentscanner.next();
	parentValue = traceidandparentscanner.next();
	
	traceidandparentscanner.close();
}

//get the traceId in the X-Ray format
TraceID traceIdFromString = TraceID.fromString(traceIdValue);

//set the traceId and parentId to the current Kafka Consumer segment
xrayRecorder.getCurrentSegment().setTraceId(traceIdFromString);
xrayRecorder.getCurrentSegment().setParentId(parentValue);

After going through these steps, you should see a service map like the one below:

X-Ray Kafka service map

Add-AWS X-Ray Service Map with Apache Kafka-pic

You should then see a trace timeline like the example below, where the requests to Producer and Consumer show up in one trace.

X-Ray Kafka trace timeline

Add-X-Ray trace with Kafka Producer and Kafka Consumer-pic

Although connecting the trace segments between the Producer and Consumer provides information on end-to-end performance impact for messaging and streaming applications using Kafka, there are some limitations to this solution. When records are published one by one to the Producer, but all records are retrieved in a single request, only the first request to publish the record to the Producer will show up as having a corresponding request from the Consumer. Due to this limitation, you’ll see fewer requests from the Consumer than from the Producer.

We hope this post helped you get performance insights from X-Ray for your messaging application built on Kafka. As always, feel free to leave comments or feedback below. Also, try out the sample app.