AWS News Blog

Speak to Amazon Kinesis in Python

My colleague Rahul Patil sent me a nice guest post. In the post Rahul shows you how to use the new Kinesis Client Library (KCL) for Python developers.

Jeff;


The Amazon Kinesis team is excited to release the Kinesis Client Library (KCL) for Python developers! Developers can use the KCL to build distributed applications that process streaming data reliably at scale. The KCL takes care of many of the complex tasks associated with distributed computing, such as load-balancing across multiple instances, responding to instance failures, checkpointing processed records, and reacting to changes in stream volume.

You can download the KCL for Python using Github, or PyPi.

Getting Started
Once you are familiar with key concepts of Kinesis and KCL, you are ready to write your first application. Your code has the following duties:

  1. Set up application configuration parameters.
  2. Implement a record processor.

The application configuration parameters are specified by adding a properties file. For example:


# The python executable script 
executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.
streamName = words

# Unique KCL application name
applicationName = PythonKCLSample

# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON

The above example configures KCL to process a Kinesis stream called “words” using the record processor supplied in sample_kclpy_app.py. The unique application name is used to coordinate amongst workers running on multiple instances.

Developers have to implement the following three methods in their record processor:


initialize(self, shard_id)
process_records(self, records, checkpointer)
shutdown(self, checkpointer, reason)

initialize() and shutdown() are self-explanatory; they are called once in the lifecycle of the record processor to initialize and clean up the record processor respectively. If the shutdown reason is TERMINATE (because the shard has ended due to split/merge operations), then you must also take care to checkpoint all of the processed records.

You implement the record processing logic inside the process_records() method. The code should loop through the batch of records and checkpoint at the end of the call. The KCL assumes that all of the records have been processed. In the event the worker fails, the checkpointing information is used by KCL to restart the processing of the shard at the last checkpointed record.


# Process records and checkpoint at the end of the batch
    def process_records(self, records, checkpointer):
        for record in records:
            # record data is base64 encoded
            data = base64.b64decode(record.get('data'))
            ####################################       
            # Insert your processing logic here#
            ####################################       
       
        #checkpoint after you are done processing the batch  
        checkpointer.checkpoint()

The KCL connects to the stream, enumerates shards, and instantiates a record processor for each shard. It pulls data records from the stream and pushes them into the corresponding record processor. The record processor is also responsible for checkpointing processed records.

Since each record processor is associated with a unique shard, multiple record processors can run in parallel. To take advantage of multiple CPUs on the machine, each Python record processor runs in a separate process. If you run the same KCL application on multiple machines, the record processors will be load-balanced across these machines. This way, KCL enables you to seamlessly change machine types or alter the size of the fleet.

Running the Sample
The release also comes with a sample word counting application. Navigate to the amazon_kclpy directory and install the package.


$ python setup.py download_jars
$ python setup.py install

A sample putter is provided to create a Kinesis stream called “words” and put random words into that stream. To start the sample putter, run:


$ sample_kinesis_wordputter.py --stream words .p 1 -w cat -w dog -w bird

You can now run the sample python application that processes records from the stream we just created:


$ amazon_kclpy_helper.py --print_command --java  --properties samples/sample.properties

Before running the samples, you’ll want to make sure that your environment is configured to allow the samples to use your AWS credentials via the default AWS Credentials Provider Chain.

Under the Hood – What You Should Know
KCL for Python uses KCL for Java. We have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over STDIN and STDOUT using a defined protocol. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and expose an interface that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.

Join the Kinesis Team
The Amazon Kinesis team is looking for talented Web Developers and Software Development Engineers to push the boundaries of stream data processing! Here are some of our open positions:

— Rahul Patil

TAGS:
Jeff Barr

Jeff Barr

Jeff Barr is Chief Evangelist for AWS. He started this blog in 2004 and has been writing posts just about non-stop ever since.