AWS Big Data Blog

Implementing Efficient and Reliable Producers with the Amazon Kinesis Producer Library

by Kevin Deng | on | | Comments

Kevin Deng is an SDE with the Amazon Kinesis team and is the lead author of the Amazon Kinesis Producer Library

How do you vertically scale an Amazon Kinesis producer application by 100x? While it’s easy to get started with streaming data into Amazon Kinesis, streaming large volumes of data efficiently and reliably presents some challenges. When a host needs to send many records per second (RPS) to Amazon Kinesis, simply calling the basic PutRecord API action in a loop is inadequate.

To reduce overhead and increase throughput, the application must batch records and implement parallel HTTP requests. It also must deal with the transient failures inherent to any network application and perform retries as needed. And in a large-scale application, monitoring is needed to allow operators to diagnose and troubleshoot any issues that arise.

In this post, you’ll develop a sample producer application and evolve it through several stages of complexity to learn about these challenges. I discuss each challenge at a conceptual level and present sample solutions for some of them. The goal is to help you understand what problems the KPL solves and, at a high level, how it solves them.

All code in this post is available in the AWS Big Data Blog Github repository.

Scenario

Consider a typical use case of Amazon Kinesis: realtime clickstream analysis. In this setup, a webserver receives requests from the browser about the links a user has clicked on a website. A typical request might contain a payload that looks like this:

{
  "sessionId": "seXACNTS3FoQuqTVxAM",
  "fmt": "1",
  "num": "1",
  "cv": "7",
  "frm": "2",
  "url": "http%3A//www.mywebsite.com/activityi%3Bsrc%3D1782317%3Btype%3D2015s004%3Bcat%3Dgs6pr0%3Bord%3D3497210042551.1597%3F",
  "ref": "http%3A//www.mywebsite.com/us/explore/myproduct-features-and-specs/%3Fcid%3Dppc-",
  "random": "3833153354"
}

This payload is 342 bytes long, and the examples in this post assume that most of the requests coming in are roughly this size (~350 bytes).

After receiving the request, the webserver sends it to Amazon Kinesis to be consumed by an analysis application. This application provides the business intelligence in near-realtime that can be used for various purposes. Don’t worry about the consumer in this post; focus only on the producer–in this case, the webserver.

Overall Strategy

To avoid interfering with other workloads such as actually serving pages or receiving other requests, the webserver doesn’t try to send the data to Amazon Kinesis within its request handler. Instead, it places the payload of each request into a queue to be processed by a separate component. Call this component ClickEventsToKinesis; its implementation is what I discuss in the rest of this post. Start with the following abstract class that all subsequent implementations inherit from:

public abstract class AbstractClickEventsToKinesis implements Runnable {
    protected final static String STREAM_NAME = "myStream";
    protected final static String REGION = "us-east-1";

    protected final BlockingQueue<ClickEvent> inputQueue;
    protected volatile boolean shutdown = false;
    protected final AtomicLong recordsPut = new AtomicLong(0);

    protected AbstractClickEventsToKinesis(
            BlockingQueue<ClickEvent> inputQueue) {
        this.inputQueue = inputQueue;
    }

    @Override
    public void run() {
        while (!shutdown) {
            try {
                runOnce();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public long recordsPut() {
        return recordsPut.get();
    }

    public void stop() {
        shutdown = true;
    }

    protected abstract void runOnce() throws Exception;
}

In this code, ClickEvent is a trivial wrapper around the payload, with the sessionId field extracted. Use the sessionId as your partition key when putting into Amazon Kinesis.

public class ClickEvent {
    private String sessionId;
    private String payload;

    public ClickEvent(String sessionId, String payload) {
        this.sessionId = sessionId;
        this.payload = payload;
    }

    public String getSessionId() {
        return sessionId;
    }

    public String getPayload() {
        return payload;
    }
}

For simplicity, assume that the server runs on an Amazon EC2 instance that has an appropriate IAM role such that you can simply use DefaultAWSCredentialsProviderChain for credentials.

Basic Producer

Imagine that you have just launched your website. Traffic is fairly low at the moment, and each one of your servers is getting about 50 ClickEvents objects per second. Start with the simplest possible implementation: take the ClickEvents objects from the queue one at a time and send each to Amazon Kinesis synchronously. At this point, make no attempt at handling errors or performing retries should any of the puts fail.

public class BasicClickEventsToKinesis extends AbstractClickEventsToKinesis {
    private final AmazonKinesis kinesis;

    public BasicClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) {
        super(inputQueue);
        kinesis = new AmazonKinesisClient().withRegion(
                Regions.fromName(REGION));
    }

    @Override
    protected void runOnce() throws Exception {
        ClickEvent event = inputQueue.take();
        String partitionKey = event.getSessionId();
        ByteBuffer data = ByteBuffer.wrap(
                event.getPayload().getBytes("UTF-8"));
        kinesis.putRecord(STREAM_NAME, data, partitionKey);
        recordsPut.getAndIncrement();
    }
}

That didn’t take too long, not bad at all. This implementation attains a throughput of about 90 records per second (RPS) in us-east-1 on a c3.2xlarge Ubuntu 15 instance, sufficient for your present needs.

Using the KPL – Quick Preview

Take a look at a similar implementation that uses the KPL instead:

public class KPLClickEventsToKinesis extends AbstractClickEventsToKinesis {
    private final KinesisProducer kinesis;

    public KPLClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) {
        super(inputQueue);
        kinesis = new KinesisProducer(new KinesisProducerConfiguration()
                .setRegion(REGION)
                .setRecordMaxBufferedTime(5000));
    }

    @Override
    protected void runOnce() throws Exception {
        ClickEvent event = inputQueue.take();
        String partitionKey = event.getSessionId();
        ByteBuffer data = ByteBuffer.wrap(
                event.getPayload().getBytes("UTF-8"));
        while (kinesis.getOutstandingRecordsCount() > 5e4) {
            Thread.sleep(1);
        }
        kinesis.addUserRecord(STREAM_NAME, partitionKey, data);
        recordsPut.getAndIncrement();
    }
}

While the two implementations are very similar on the surface, a significant amount of additional logic is performed by the KPL-based implementation underneath the covers. The latter achieves a throughput of about 76,000 RPS when run on the same host, which is about 800 times greater. As you go through the rest of the post, it should become clear why that is the case. I’ll show you a more advanced example of using the KPL later on and go through the various API methods, but for now I’ll discuss the problems with the basic implementation you created earlier.

Throughput

Imagine now that your website has gone viral, and as a result each one of your servers is receiving about 5000 ClickEvents objects per second instead of 50. Because BasicClickEventsToKinesis can only do 90 RPS, you’ll need to make some changes to get the throughput you need.

Multithreading

One approach would be to create multiple instances of BasicClickEventsToKinesis, pass each one a reference to the same queue, and then submit all of them to an ExecutorService object. If all goes well, this should multiply your throughput by some large fraction of the number of instances you create. To get to 5000 RPS at 90 RPS each, you would require 56 instances in the ideal case, but in reality you’ll need more because the scaling is not going to be perfectly linear.

Code implementing the above approach is available in the repository as the class MultithreadedClickEventsToKinesis.

The problem with this approach is that it incurs a large amount of CPU overhead from context switching between threads. In addition to this, sending records one by one also incurs large overheads from signature version 4 and HTTP, further consuming CPU cycles as well as network bandwidth. I’ll discuss the bandwidth aspect next.

Bandwidth Overhead

If you take a look at what’s actually going over the wire, you’ll see that each request looks more or less like the following:

Even though your original data and partition key are only about 370 bytes in size, the HTTP request ends up consuming about 1200 bytes. Some of this is due to expansion from base64-encoding, but the bulk of it comes from the HTTP headers, which constitute just over half of the entire request.

Batching with PutRecords

To improve efficiency, the service team introduced the PutRecords API method in December 2014. This allows many records to be sent with a single HTTP request, thus amortizing the cost of the headers and fixed-cost components of signature version 4. Another benefit of using PutRecords is that you don’t have to incur a round-trip of latency between every single record, even if you don’t use multiple connections in parallel.

The following code example implements a new class called BatchedClickEventsToKinesis to take advantage of these benefits of PutRecords.

public class BatchedClickEventsToKinesis extends AbstractClickEventsToKinesis {
    protected AmazonKinesis kinesis;
    protected List<PutRecordsRequestEntry> entries;

    private int dataSize;

    public BatchedClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) {
        super(inputQueue);
        kinesis = new AmazonKinesisClient().withRegion(
                Regions.fromName(REGION));
        entries = new ArrayList<>();
        dataSize = 0;
    }

    @Override
    protected void runOnce() throws Exception {
        ClickEvent event = inputQueue.take();
        String partitionKey = event.getSessionId();
        ByteBuffer data = ByteBuffer.wrap(
                event.getPayload().getBytes("UTF-8"));
        recordsPut.getAndIncrement();

        addEntry(new PutRecordsRequestEntry()
                .withPartitionKey(partitionKey)
                .withData(data));
    }

    @Override
    public long recordsPut() {
        return super.recordsPut() - entries.size();
    }

    @Override
    public void stop() {
        super.stop();
        flush();
    }

    protected void flush() {
        kinesis.putRecords(new PutRecordsRequest()
                .withStreamName(STREAM_NAME)
                .withRecords(entries));
        entries.clear();
    }

    protected void addEntry(PutRecordsRequestEntry entry) {
        int newDataSize = dataSize + entry.getData().remaining() +
                entry.getPartitionKey().length();
        if (newDataSize <= 5 * 1024 * 1024 && entries.size() < 500) {
            dataSize = newDataSize;
            entries.add(entry);
        } else {
            flush();
            dataSize = 0;
            addEntry(entry);
        }
    }
}

This implementation gives you 5500 RPS, a dramatic improvement in the throughput and efficiency of the producer.

PutRecords is not Atomic

There is however a subtle issue with PutRecords in that it is not an atomic operation; unlike the other API methods that either fail or succeed, PutRecords can partially fail. When a batch of multiple records is sent with PutRecords, it’s possible for every record to individually fail or succeed, independent of the other ones in the same batch. Unless an error prevents the entire HTTP request containing the batch from being delivered, PutRecords always returns a 200 result, even if some records within the batch have failed. This means that instead of simply relying on exceptions to indicate errors, you must write code that examines the PutRecordsResult objects to detect individual record failures and take appropriate action.

Retries

For various reasons, calls to the Amazon Kinesis API can occasionally fail. To avoid losing data because of this, you’ll want to retry failed puts.

Add retries to the BatchedClickEventsToKinesis from earlier by re-implementing the flush() method in a subclass:

public class RetryingBatchedClickEventsToKinesis
        extends BatchedClickEventsToKinesis {
    private final int MAX_BACKOFF = 30000;
    private final int MAX_ATTEMPTS = 5;
    private final Set<String> RETRYABLE_ERR_CODES = ImmutableSet.of(
        "ProvisionedThroughputExceededException",
        "InternalFailure",
        "ServiceUnavailable");

    private int backoff;
    private int attempt;
    private MapSet<PutRecordsRequestEntry, Integer> recordAttempts;

    public RetryingBatchedClickEventsToKinesis(
            BlockingQueue<ClickEvent> inputQueue) {
        super(inputQueue);
        reset();
        recordAttempts = new HashMap<>();
    }

    @Override
    protected void flush() {
        PutRecordsRequest req = new PutRecordsRequest()
                .withStreamName(STREAM_NAME)
                .withRecords(entries);

        PutRecordsResult res = null;
        try {
            res = kinesis.putRecords(req);
            reset();
        } catch (AmazonClientException e) {
            if ((e instanceof AmazonServiceException
                    && ((AmazonServiceException) e).getStatusCode() / 100 == 4)
                    || attempt == MAX_ATTEMPTS) {
                reset();
                throw e;
            }

            try {
                Thread.sleep(backoff);
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
            Math.min(MAX_BACKOFF, backoff *= 2);

            attempt++;
            flush();
        }

        final List<PutRecordsResultEntry> results = res.getRecords();
        List<PutRecordsRequestEntry> retries = IntStream
            .range(0, results.size())
            .mapToObj(i -> {
                PutRecordsRequestEntry e = entries.get(i);
                String errorCode = results.get(i).getErrorCode();
                int n = recordAttempts.getOrDefault(e, 1) + 1;
                // Determine whether the record should be retried
                if (errorCode != null &&
                        RETRYABLE_ERR_CODES.contains(errorCode) &&
                        n < MAX_ATTEMPTS) {
                    recordAttempts.put(e, n);
                    return Optional.of(e);
                } else {
                    recordAttempts.remove(e);
                    return Optional.<PutRecordsRequestEntry>empty();
                }
            })
            .filter(Optional::isPresent)
            .map(Optional::get)
            .collect(Collectors.toList());

        entries.clear();
        retries.forEach(e -> addEntry(e));
    }

    private void reset() {
        attempt = 1;
        backoff = 100;
    }
}

There’s a lot going on here, so I’ll walk you through all the issues.

Different Types of Errors

The first type of error this code attempts to handle is a complete request failure, indicated by HTTP status code 4XX or 5XX. This occurs when the entire PutRecordsRequest request fails to get processed, for example because the service had an internal error, or if the credentials are invalid. 4XX errors are pointless to retry because they are not recoverable. For 5XX errors, on the other hand, perform an exponential backoff followed by a retry. Only do so if the put failed and you haven’t reached the maximum number of attempts for this record.

Partial Failures

As I discussed earlier, PutRecords calls can partially fail. The second half of the flush() method deals with this. For each entry in PutRecordsResult, match it up with the original PutRecordsRequestEntry request that you tried to put. Then, check whether you need to retry the record based on the error code and number of attempts already made.

Backoff Strategies

Despite all of this, you’re not quite done yet. Notice that you have backoffs if the entire request fails, but not for individual records. What if certain shards had problems but not others? If you performed backoff whenever any record failed, you would unnecessarily delay records and reduce throughput to those shards that are not affected.

On the other hand, if you retried failed records without any backoff (as in the example), then you might end up spamming a shard even though it’s already having problems. Regardless of how you mitigate this problem, you would still want a way to know if it, or some other problem, is occurring in production. You need to add monitoring to the application, so that operators can quickly respond to and fix any issues that occur.

Monitoring

The two main methods of monitoring are metrics and realtime log analysis (RTLA). In AWS, these are offered by Amazon CloudWatch and Amazon CloudWatch Logs respectively. CloudWatch metrics allows users to submit metric data, which are then aggregated over several time windows, and made available for retrieval.

Here’s how you modify BasicClickEventsToKinesis to emit a metric:

public class MetricsEmittingBasicClickEventsToKinesis
        extends AbstractClickEventsToKinesis {
    private final AmazonKinesis kinesis;
    private final AmazonCloudWatch cw;

    protected MetricsEmittingBasicClickEventsToKinesis(
            BlockingQueue<ClickEvent> inputQueue) {
        super(inputQueue);
        kinesis = new AmazonKinesisClient().withRegion(
                Regions.fromName(REGION));
        cw = new AmazonCloudWatchClient().withRegion(Regions.fromName(REGION));
    }

    @Override
    protected void runOnce() throws Exception {
        ClickEvent event = inputQueue.take();
        String partitionKey = event.getSessionId();
        ByteBuffer data = ByteBuffer.wrap(
                event.getPayload().getBytes("UTF-8"));
        recordsPut.getAndIncrement();

        PutRecordResult res = kinesis.putRecord(
                STREAM_NAME, data, partitionKey);

        MetricDatum d = new MetricDatum()
            .withDimensions(
                new Dimension().withName("StreamName").withValue(STREAM_NAME),
                new Dimension().withName("ShardId").withValue(res.getShardId()),
                new Dimension().withName("Host").withValue(
                        InetAddress.getLocalHost().toString()))
            .withValue(1.0)
            .withMetricName("RecordsPut");
        cw.putMetricData(new PutMetricDataRequest()
            .withMetricData(d)
            .withNamespace("MySampleProducer"));
    }
}

As you can see, the code to emit the metric ended up being longer than the code that does the actual work (i.e., putting the record). In practice, to get a good picture of what the system is doing, you’ll need far more than just one metric. This means you’ll need to create utilities to simplify the call site.

Another problem is that every PutMetricData call makes a HTTP request, and the overhead from this is quite substantial because the payload size is small. Perhaps unsurprisingly, the solution to this problem is basically the same for CloudWatch metrics as it is for Amazon Kinesis records.

To address these issues, you’ll end up basically writing a different application, all because you wanted to keep track of how the original application was behaving.

Aggregation

I’m going to shift the focus now from the throughput of sending records to the throughput of the Amazon Kinesis shards actually receiving them.

Provisioned Capacity

Amazon Kinesis requires the provisioning of capacity through the concept of shards. Each shard can ingest 1 MiB or 1000 records per second, and is throttled if either limit is reached. In a cost-optimal architecture, we all want to have as few shards as possible, but still be able to ingest all data promptly at peak traffic.

In the example here, your records are a mere 350 bytes each. Putting 1000 of these a second results in a throughput of 0.33 MiB/s, only a third of what the shard would’ve allowed you to do in terms of bandwidth. If you can increase your data-to-record ratio by 3x or more, you can get away with having only 1/3 as many shards.

Increasing Capacity Utilization with Aggregation

So how can you increase your shard utilization? The obvious answer is to concatenate multiple records together. If you combined three of your ClickEvents objects, you’d have a 1050 byte record. This is called aggregation.

Aggregation is not always as simple as simply concatenating strings together. There are various additional design considerations to keep in mind:

  • You want to ensure that the consumer application can still access all the records with the same partition key without having to perform distributed sorting or resorting to an application level map. This means you have to be careful about how you group records together.
  • You want a buffering scheme that imposes a limit on how long records can wait in buffers in order to prevent excessive delays. This means you need to add timestamps and timers to your components.
  • You want a binary format that’s unambiguous between the aggregated and non-aggregated case, such that every record can be deserialized correctly at the consumer. This requires code not just in the producer, but the consumer as well.

Because of these intricacies, implementing record aggregators can be challenging and time-consuming.

Amazon Kinesis Producer Library

Because the challenges of batching, retries, and monitoring are shared by the majority of Amazon Kinesis application developers, we decided to build the Amazon Kinesis Producer Library (KPL). The KPL implements solutions to these problems and frees developers from having to worry about how best to stream their data into Amazon Kinesis, allowing them to focus on the actual business logic of their applications instead.

Architecture

The diagram below shows an overview of the internal architecture of the KPL:

KPL Architecture

Native Process

The KPL is split into a native process and a wrapper. The native process does all the actual work of processing and sending records, while the wrapper manages the native process and communicates with it.

Each KPL native process can process records from multiple streams at once, as long as they’re in the same region and use the same credentials for API calls. Each stream is handled by a separate pipeline, which contains components performing the functions I’ve discussed throughout this post.

IPC

Communication between the wrapper and the native process takes place over named pipes (either Unix or Windows), and the messages are implemented with Google protocol buffers. It’s possible to build your own KPL wrapper using the protobuf message definitions available in the source code. The native process is agnostic to how the wrapper is implemented as long as the messages sent to it are valid.

Multithreading

The native process is multithreaded, and can process multiple records simultaneously. This allows it to scale to higher throughputs on systems with more CPU cores. The threads running the pipelines do not participate in I/O to avoid unnecessarily blocking internal processing. Dedicated threads service the IPC manager and HTTP client. The latter uses asynchronous I/O to avoid needing a thread pool.

In practice, all the details are abstracted away by the wrapper, so that for the most part, the KPL should behave just like a regular library in the wrapper’s language.

Using the KPL

The KPL provides the following features out of the box:

  • Batching of puts using PutRecords (the Collector in the architecture diagram)
  • Tracking of record age and enforcement of maximum buffering times (all components)
  • Per-shard record aggregation (the Aggregator)
  • Retries in case of errors, with ability to distinguish between retryable and non-retryable errors (the Retrier)
  • Per-shard rate limiting to prevent excessive and pointless spamming (the Limiter)
  • Useful metrics and a highly efficient CloudWatch client (not shown in diagram)

Configuration

Many configuration options are available to customize the behavior of many of the components. To get started, however, the only things you need are AWS credentials and the AWS region. The KPL attempts to use the DefaultAWSCredentialsProviderChain if no credentials providers are given in the config. In addition, the KPL automatically attempts to fetch the region from Amazon EC2 metadata, so when you are running in Amazon EC2, no configuration may be needed at all.

Currently, configuration is global and applies to all streams. This includes the credentials used for putting records and CloudWatch metrics. If you wish to have different configurations for different streams, it’s feasible to create multiple instances of the KPL, as long as the number is reasonably small (e.g., 10 to 15). Because the core is native code, the fixed overhead per instance is small.

API

Now take a look at the class AdvancedKPLClickEventsToKinesis, to take a tour of the features of the KPL Java API:

public class AdvancedKPLClickEventsToKinesis
        extends AbstractClickEventsToKinesis {
    private static final Random RANDOM = new Random();
    private static final Log log = LogFactory.getLog(
            AdvancedKPLClickEventsToKinesis.class);

    private final KinesisProducer kinesis;

    protected AdvancedKPLClickEventsToKinesis(
            BlockingQueue<ClickEvent> inputQueue) {
        super(inputQueue);
        kinesis = new KinesisProducer(new KinesisProducerConfiguration()
                .setRegion(REGION));
    }

    @Override
    protected void runOnce() throws Exception {
        ClickEvent event = inputQueue.take();
        String partitionKey = event.getSessionId();
        String payload =  event.getPayload();
        ByteBuffer data = ByteBuffer.wrap(payload.getBytes("UTF-8"));
        while (kinesis.getOutstandingRecordsCount() > 1e4) {
            Thread.sleep(1);
        }
        recordsPut.getAndIncrement();

        ListenableFuture<UserRecordResult> f =
                kinesis.addUserRecord(STREAM_NAME, partitionKey, data);
        Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
            @Override
            public void onSuccess(UserRecordResult result) {
                long totalTime = result.getAttempts().stream()
                        .mapToLong(a -> a.getDelay() + a.getDuration())
                        .sum();
                // Only log with a small probability, otherwise it'll be very
                // spammy
                if (RANDOM.nextDouble() < 1e-5) {
                    log.info(String.format(
                            "Succesfully put record, partitionKey=%s, "
                                    + "payload=%s, sequenceNumber=%s, "
                                    + "shardId=%s, took %d attempts, "
                                    + "totalling %s ms",
                            partitionKey, payload, result.getSequenceNumber(),
                            result.getShardId(), result.getAttempts().size(),
                            totalTime));
                }
            }

            @Override
            public void onFailure(Throwable t) {
                if (t instanceof UserRecordFailedException) {
                    UserRecordFailedException e = 
                           (UserRecordFailedException) t;
                    UserRecordResult result = e.getResult();

                    String errorList =
                        StringUtils.join(result.getAttempts().stream()
                            .map(a -> String.format(
                                "Delay after prev attempt: %d ms, "
                                        + "Duration: %d ms, Code: %s, "
                                        + "Message: %s",
                                a.getDelay(), a.getDuration(), 
                                a.getErrorCode(),
                                a.getErrorMessage()))
                            .collect(Collectors.toList()), "n");

                    log.error(String.format(
                            "Record failed to put, partitionKey=%s, "
                                    + "payload=%s, attempts:n%s",
                            partitionKey, payload, errorList));
                }
            };
        });
    }

    @Override
    public long recordsPut() {
        try {
            return kinesis.getMetrics("UserRecordsPut").stream()
                .filter(m -> m.getDimensions().size() == 2)
                .findFirst()
                .map(Metric::getSum)
                .orElse(0.0)
                .longValue();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void stop() {
        super.stop();
        kinesis.flushSync();
        kinesis.destroy();
    }
}

Backpressure

First, examine the runOnce() method. You’ll notice that you’re calling getOutstandingRecordsCount() in a loop. This is a measure of the backpressure in the system, which should be checked before putting more records, to avoid exhausting system resources.

Async and Futures

Moving on, the KPL has an asynchronous interface that does not block. When addUserRecord is called, the record is placed into a queue serviced by another thread and the method returns immediately with a ListenableFuture object. To this, a FutureCallback object can be added that is invoked when the record completes. Adding the callback is optional, which means the API can be used in a fire-and-forget fashion.

The KPL always returns a UserRecordResult object in the future, whether the record succeeded or not, but because of the way ListenableFuture is designed, retrieving the UserRecordResult is slightly different between the success and failure cases. In the latter, it is wrapped in a UserRecordFailedException object.

Information about Put Attempts

Every UserRecordResult object contains a list of Attempt objects describing each attempt at sending a particular record to Amazon Kinesis. Timing information and error messages from failures are available. This is useful for application logging and debugging, as demonstrated in the onFailure() method. The same information is also uploaded to CloudWatch as metrics named BufferingTime and RequestTime, and there are individual error count metrics (one per unique error code) as well. Attempt objects provide this information at a per-record granularity, while the CloudWatch metrics are aggregated over a shard or stream.

Metrics

Now take a look at the recordsPut() method. You can retrieve metrics from the current KinesisProducer instance directly with the getMetrics() methods. These are the same metrics as those that are uploaded to CloudWatch, the difference being that getMetrics() retrieves the data for the instance on which it is called, whereas the data in CloudWatch is aggregated over all instances sharing the same metrics namespace. The overload getMetrics(String, int) allows you to get data aggregated over a specified number of seconds. This is useful for tracking sliding-window statistics. If called without the int argument, getMetrics() returns the cumulative statistics from the time the KinesisProducer instance started to the present moment. For a list of the available metrics, see the Monitoring Amazon Kinesis with Amazon CloudWatch topic in the Amazon Kinesis Developer’s Guide.

Termination

Finally, look at the stop() method. Because the KPL uses a child process, it is necessary to explicitly terminate the child if you wish to do so without exiting the JVM. This is what the destroy() method does. Note that destroy() does not attempt to wait for buffered or in-flight records. To ensure all records are sent before killing the child, call flushSync(). This method blocks until all records are complete. A non-blocking flush() method is also available.

Conclusion

In this post, I’ve discussed several challenges that are commonly encountered when implementing producer applications for Amazon Kinesis: the need to aggregate records and use batch puts while controlling buffering delays, the need to retry records in case of failures, and the need to monitor the application using metrics.

At the same time, I showed you how to develop a sample application and evolve it through several stages of increasing complexity in an attempt to address those needs. Despite having a non-trivial amount of code, the sample falls far short of the requirements for a efficient and reliable producer library that can be reused across many different applications.

The KPL was developed in recognition of the fact that these same challenges are faced by many customers, and it presents a general solution that customers can leverage to save time and effort when developing Amazon Kinesis-based solutions.

You can get started using the KPL with just a few lines of code, but the API is also flexible enough to cater to more advanced use cases, including retrieving detailed information about record puts and realtime metrics from the currently running instance.

Hopefully, this post has given you a deeper understanding of Amazon Kinesis producer applications and provided some insight into how to develop or evolve your own solution by leveraging the KPL.

If you have questions or suggestions, please leave a comment below.

————————

Related:

Snakes in the Stream! Feeding and Eating Amazon Kinesis Streams with Python

Python Kinesis