AWS Open Source Blog
Building a Data Processing Pipeline with Amazon Kinesis Data Streams and Kubeless
If you’re already running Kubernetes, FaaS (Functions as a Service) platforms on Kubernetes can help you leverage your existing investment in EC2 by enabling serverless computing. The real significance of such platforms, however, lies in the number of data sources that can trigger the deployed function. The first part of this two-part series introduced one such FaaS platform, Kubeless. This second part explains how to use Amazon Kinesis Data Streams as a trigger for functions deployed in Kubeless.
– Arun
In our previous post, Running FaaS on Kubernetes Cluster on AWS using Kubeless, we went through the basics of the FaaS solution Kubeless and showed you how to get started with Kubeless functions in a kops-based cluster on AWS. In this second post, we’ll show you how to build a data processing pipeline by deploying Kubeless functions that are executed when events appear in an Amazon Kinesis Data Stream. The example we will show pushes #kubecon Tweets to a Kinesis data stream. A Kubeless function is then triggered automatically, which sends a mobile notification via SNS.
Introduction
Streaming data is data generated continuously by thousands of data sources, that simultaneously generate many small-sized data records. There are many sources of streaming data, such as social network feeds, sensor networks, connected IoT devices, or financial transactions. This data needs to be processed sequentially and incrementally in real time, on a record-by-record basis, and can be used for a variety of purposes such as analytics. Processing streaming data typically has typically needed a streaming data solution and complex event processing systems. However, with a managed data streaming service like Kinesis and a FaaS platform like Kubeless, it can be easier and quicker to build a solution that lets you get value from streaming data, without worrying about the infrastructure.
Amazon Kinesis Data Streams is a fully-managed streaming data service that makes it easy to collect, process, and analyze real-time, streaming data; it offers key capabilities to cost-effectively process streaming data at any scale. The unit of data stored by Kinesis Data Streams is a data record: a stream represents a group of data records. The scaling unit of a Kinesis stream is a shard. The data records in a stream are distributed into shards. Shards are also responsible for the partitioning of the stream — all records entering the stream are partitioned into a shard by a PartitionKey, which can be specified by the producer. All records with the same PartitionKey are guaranteed to be partitioned into the same shard so records are stored and read in the same order they were created.
As explained in our earlier post, kubeless is a Kubernetes-native FaaS framework that lets you deploy functions without having to worry about the underlying infrastructure used for executing them. It is designed to be deployed on top of a Kubernetes cluster and take advantage of all the great Kubernetes primitives. Kubeless is built around the core concepts of functions, triggers, and runtimes. Triggers in Kubeless represent the event sources and associated functions to be executed on occurence of an event from a given event source. Kubeless supports a wide variety of event sources, in this post we’ll focus on support for Kinesis data streams. Kubeless will let you run the functions in response to records being published to a Kinesis stream.
Triggering Kubeless Functions with Kinesis Data Streams
Before diving into the use case of processing tweets, let’s see how Kubeless triggers functions from a Kinesis data stream. The complete walk-through is available on GitHub . To keep it short, a custom controller is deployed inside your Kubernetes cluster. This controller watches a API endpoint that defines mappings betweens Kinesis streams and functions. When a new mapping is created the controller creates a Kinesis consumer and calls the functions over HTTP with the event.
Starting from the working Kubeless deployment shown in the previous post, you need to create a custom resource definition which defines a new object type called KinesisTriggers and you need to launch a Kinesis controller that watches those new objects. You can do it in one command line as shown:
kubectl create -f https://github.com/kubeless/kubeless/releases/download/v1.0.0-alpha.4/kinesis-v1.0.0-alpha.4.yaml
The so-called Kinesis trigger controller will watch for KinesisTriggers objects. These objects will declare the mapping between a data stream in Kinesis and a function deployed by Kubeless. Once such a trigger is created, the controller will consume events in the stream and forward them to the function. A typical trigger will look like this:
$ kubectl get kinesistriggers.kubeless.io test -o yaml
apiVersion: kubeless.io/v1beta1
kind: KinesisTrigger
metadata:
labels:
created-by: kubeless
name: test
namespace: default
spec:
aws-region: us-west-2
function-name: post-python
secret: ec2
shard: shardId-000000000000
stream: my-kinesis-stream?
The trigger manifest above will consume events on the Kinesis data stream called my-kinesis-stream available in us-west-2 availability zone. The AWS credentials necessary to connect to that data stream will be stored in a Kubernetes secret called ec2, and the events will be forwarded to the Kubeless function post-python. A shard is a unit of streaming capability (up to 1MB/sec in and up to 2MB/sec out), records are ordered within a shard by partition key on a first-in basis, and shards are grouped by a logical construct called a data stream. You can easily add or remove shards to right-size the streaming capability within your data stream. Each shard has an identifier called a shardID. Shard shardId-000000000000 identifies the shard to be used with in the Kinesis stream my-kinesis-stream. Creating such a trigger is straightforward using the Kubeless CLI as shown in the example below.
Use Case: Processing Tweets with Kinesis, Kubeless, and SNS
Now I’ll walk through a real life scenario to illustrate the end-to-end picture and help you appreciate the power of Kinesis and Kubeless.
We will a social network feed to get real-time insights, in this example all the tweets being produced during the Kubecon conference. We will run through a scenario where we would like to get real-time notification of the mention of a topic of interest: kubeless. 🙂
Creating a Kinesis Stream
I’ll start by creating a stream to persist streaming data in Kinesis, using the AWS web console to a create a stream named KubelessDemo.
The snapshot above comes directly from the AWS management console. As you can see, there is very little configuration needed to get started with a stream.
Set Up Data Producer
Now that we have a Kinesis stream to take the data in, we will use a simple Python program that uses boto3 and tweepy to fetch in real time tweets with the hashtag #kubecon, and ingest the data into the KubelessDemo Kinesis stream. Run the following Python script locally; it will filter for tweets that contain the hashtag #kubelessonaws, and push the data into Kinesis.
#!/usr/bin/env python
import os
import json
import boto3
import tweepy
consumer_key = os.getenv("consumer_key")
consumer_secret = os.getenv("consumer_secret")
access_token = os.getenv("access_token")
access_token_secret = os.getenv("access_token_secret")
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
kinesis_client = boto3.client('kinesis')
class KinesisStreamProducer(tweepy.StreamListener):
def __init__(self, kinesis_client):
self.kinesis_client = kinesis_client
def on_data(self, data):
tweet = json.loads(data)
self.kinesis_client.put_record(StreamName='kubeless', Data=tweet["text"], PartitionKey="key")
print("Publishing record to the stream: ", tweet)
return True
def on_error(self, status):
print("Error: " + str(status))
def main():
mylistener = KinesisStreamProducer(kinesis_client)
myStream = tweepy.Stream(auth = auth, listener = mylistener)
myStream.filter(track=['#kubelessonaws'])
if __name__ == "__main__":
main()
Note that the keys configured at the beginning of the script are for a Twitter application that allows you to invoke the Twitter API. See Twitter applications.
Record processing with Kubeless
Now that I have a producer of data to fills up our Kinesis stream, I want to call the Kubeless function that sends an SNS mobile notification when the keyword “kubeless” appears in a tweet. To do I use the Python script below and we deploy it with the Kubeless CLI.
import json
import boto3
access_key="fsdfsf"
secret_key="dfsdfsdfs"
def tweets_processor(event, context):
tweet = event['data']
print tweet
# we found a tweet that is of interest push the record to SNS for mobile notification
client = boto3.client('sns',
region_name='eu-west-1',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
response = client.publish(TargetArn="arn:aws:sns:eu-west-1:587264368683:kubeless-tweets", Message=json.dumps({'default': json.dumps(tweet)}),MessageStructure='json')
return
Some points to note about the function show above:
- It requires the Python boto3 module, hence we will declare this dependency in a requirements.txt file.
- For ease of demonstration, I’m adding the credentials to the python script directly. Clearly, in a real world scenario you’ll want to use better security practices.
- Finally, the TargetArn should be modified to correspond to one of your SNS topics configured with a subscription to send a SMS message.
Let’s now deploy the above Kubeless function to process the records from the Kinesis event stream using the CLI.
kubeless function deploy tweets --runtime python2.7 \
--handler sns.tweets_processor \
--from-file sns.py \
--dependencies requirements.txt
Now that the function is deployed, we need to create a trigger. A trigger is a mapping between an event source and a function. In our example, this trigger is a Kinesis event trigger that we can create via the Kubeless CLI.
First, Kubeless needs to be able poll the Kinesis data stream to fetch the records. To do this, it needs to have access to AWS credentials. Kubeless uses Kubenetes secrets to store AWS keys (on the roadmap is support for IAM roles). I’ll now create a Kubernetes secret to store AWS access and secret keys from a user that has access to our Kinesis data stream:
kubectl create secret generic ec2 \
--from-literal=aws_access_key_id=$AWS_ACCESS_KEY_ID \
--from-literal=aws_secret_access_key=$AWS_SECRET_ACCESS_KEY
Now that the function is deployed into Kubeless and a secret is available with AWS credential, I will set up the Kinesis trigger that associates the deployed function with the KubelessDemo Kinesis data stream that we created earlier. The Kubeless CLI has a subcommand kubeless trigger kinesis to do just that. Below is an example command:
kubeless trigger kinesis create kinesis-trigger --function-name tweets\
--aws-region us-west-2 \
--shard-id shardId-000000000000 \
--stream KubelessDemo \
--secret ec2
That’s it! At this point Kubeless will start polling for records in the Kinesis stream. When there are records to be processed, Kubeless will invoke the associated Kubeless functions. Kubeless will automatically scale up or scale down the resources it consumes (Kubernetes pods) as necessary to process the records in the stream.
The Python script filtering the Twitter stream, looking for #kubelessonaws, will push the tweet to Kinesis, as shown in the logs below:
$ python ./streaming.py
Publishing record to the stream: {'created_at': 'Mon May 28 17:02:09 +0000 2018', 'id': 1001146620644417537, 'id_str': '1001146620644417537', 'text': 'this tweet will be pushed to Kinesis, will trigger a kubeless function and then be sent to my phone via SNS #kubelessonaws', ...
The controller will consumer the event within the Kinesis stream and forward it to the function. Then, the logs of the function will show the tweet message:
$ kubectl logs -f tweets-85cfb996bc-vk7wf
Bottle v0.12.13 server starting up (using CherryPyServer())...
Listening on http://0.0.0.0:8080/
Hit Ctrl-C to quit.
172.17.0.1 - - [28/May/2018:17:01:25 +0000] "GET /healthz HTTP/1.1" 200 2 "" "kube-probe/." 0/297
172.17.0.1 - - [28/May/2018:17:01:55 +0000] "GET /healthz HTTP/1.1" 200 2 "" "kube-probe/." 0/91
this tweet will be pushed to Kinesis, will trigger a kubeless function and then be sent to my phone via SNS #kubelessonaws
172.17.0.5 - - [28/May/2018:17:02:10 +0000] "POST / HTTP/1.1" 200 0 "" "Go-http-client/1.1" 0/524932
If you have set up your SNS subscription properly and set it to forward notifications to your phone, you will get a text message like the snapshot below:
Conclusion
This simple scenario gives an overview of how easy it is to set up a streaming data processing pipeline with Kinesis Data Streams and Kubeless, without worrying about managing the infrastructure, scaling aspects etc. As a Kubeless function author, you can just focus on the application logic and not worry about how to connect to the Kinesis data stream, how to fetch the records, or how to scale up and scale down: Kubeless does it all automatically. While this is a simple scenario, I hope it gave you more insight into how easy and efficient it is to set up a streaming data processing pipeline with Kinesis and Kubeless.
Murali Reddy is a senior Kubeless architect. He focuses on everything Kubeless and Kubernetes as it relates to application packaging and design. He is an avid open source contributor. He is also the author of the Kubernetes CNI kube-router networking plugin. Prior to working on Kubernetes he was a core contributor to CloudStack while at SF startup cloud.com and became an ASF Cloudstack committer.
The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.