AWS Big Data Blog

Integrating Splunk with Amazon Kinesis Streams

Prahlad Rao is a Solutions Architect wih AWS

It is important to not only be able to stream and ingest terabytes of data at scale, but to quickly get insights and visualize data using available tools and technologies. The Amazon Kinesis platform of managed services enables continuous capture and stores terabytes of data per hour from hundreds or thousands of sources for real-time data processing over large distributed streams. Splunk enables data insights, transformation, and visualization. Both Splunk and Amazon Kinesis can be used for direct ingestion from your data producers.

This powerful combination lets you quickly capture, analyze, transform, and visualize streams of data without needing to write complex code using Amazon Kinesis client libraries. In this blog post, I show you how to integrate Amazon Kinesis with Splunk by taking Twitter feeds as the input data source and using Splunk to visualize the data.

Why is this architecture important?

Amazon Kinesis allows you to build a common ingestion mechanism for multiple downstream consumers (Splunk being one of them) to process without having to go back to the source multiple times. For example, you can integrate with Splunk for analytics and visualization at the same time you enable streaming data to be emitted to other data sources such as Amazon S3, Amazon Redshift, or even an AWS Lambda function for additional processing and transformation. Another common practice is to use S3 as a landing point for data after ingestion into Amazon Kinesis, which ensures that data can be stored persistently long-term. This post assumes that you have a fair understanding of Amazon Kinesis and Splunk usage and configuration.

Amazon Kinesis Streams

Amazon Kinesis Streams is a fully managed service for real-time processing of data streams at massive scale. You can configure hundreds of thousands of data producers to continuously put data into an Amazon Kinesis stream. The data from the stream is consumed by different Amazon Kinesis applications. Streams allows as many consumers of the data stream as your solution requires without a performance penalty.

Amazon Kinesis Streams

Splunk

Splunk is a platform for real-time, operational intelligence. It is an easy, fast, and secure way to analyze and visualize massive streams of data that could be generated by either IT systems or technology infrastructure. In this post, the data is being generated by Twitter feeds.

Flow of data

Here’s the data flow for this post.

The Twitter feeds related to a particular topic are captured by the Tweepy API. For this post, you capture how users are tweeting on ‘beer’, ’wine’, and ‘whiskey’. The output is then fed into an Amazon Kinesis stream using a simple Python Boto3 script. The consumer, installed on an Amazon EC2 instance (in this case, Splunk), is reading off a stream, which then extracts useful information and builds a dashboard for analysis and visualization. The stream can also feed into multiple consumers, including Lambda.

Get access to the Twitter API

You need to get access to the Twitter streaming API so you can access the Twitter feeds from Python using Tweepy. For more information about how to set up and access Tweepy, see Streaming With Tweepy.

  1. Sign in with your Twitter account at https://apps.twitter.com.
  2. Create a new application (just a placeholder to generate access keys).
  3. Generate the consumer key, consumer secret, access token, and access token secret.
  4. Use OAuth and keys in the Python script.

Install Python boto3

Python was chosen as the programming language for this post, given that it’s fairly simple to set up Tweepy to access Twitter and also use boto, a Python library that provides SDK access to AWS services. AWS provides an easy-to-read guide for getting started with boto.

Create an Amazon Kinesis stream

After you have completed the steps to access the Twitter API and set up Python Boto3, create an Amazon Kinesis stream to ingest Twitter data feeds using the AWS Management Console, CLI, or the Boto3 API. For this example, your stream is called ‘Kinesis_Twitter’ and has one shard.

The unit of data stored by Amazon Kinesis is a data record, and a stream represents an ordered sequence of data records distributed into shards (or groups). When you create a stream, you specify the number of shards for each stream. A producer (in this case, Twitter feeds) puts data records into shards and a consumer (in this case, Splunk) gets data records from shards. You can dynamically resize your stream or add and remove shards after a stream is created.

aws create-stream  --stream-name Kinesis_Twitter –shard-count 1

Verify the stream creation by using the CLI as follows:

aws kinesis describe-stream --stream-name Kinesis_Twitter
{
    "StreamDescription": {
        "StreamStatus": "ACTIVE",
        "StreamName": "Kinesis_Twitter",
        "StreamARN": "arn:aws:kinesis:us-west-2:904672585901:stream/Kinesis_Twitter",
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455",
                    "StartingHashKey": "0"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49554309958127460123837282410803325391382383410689343490"
                }
            }
        ]
    }
}

Set up Python for Twitter authentication

After you have registered your client application with Twitter, you should have your consumer token, access token, and secret. Tweepy supports OAuth authentication, which is handled by the tweepy.AuthHandler class.

Import Tweepy. From tweepy.streaming, import StreamListener. From tweepy, import Stream. Import json. Import boto3

Set the following, replacing your values for the red text:

  • client = boto3.client (‘kinesis’)
  • consumer_key = << consumer key >>
  • consumer_secret = << consumer secret key >>
  • access_token = << access token >>
  • access_secret = << access secret >>
  • auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
  • auth.set_access_token(access_token, access_secret)

Feed Twitter data into Amazon Kinesis

Simple code enable you to pull feeds from Twitter and feed that data into Amazon Kinesis (limiting the number of records to 100 for this post). Load the Twitter data that contains keywords/track for ‘beer’, ‘wine’, and ‘whiskey’ into the stream so you can analyze data about what users are tweeting, including location and other interesting information.

You can write to Amazon Kinesis a single record at a time using the PutRecord API operation, or multiple records at one time using PutRecords. When you have more streaming data from producers, we recommend that you combine multiple records into batches and write bigger groupings of objects into the stream using PutRecords. PutRecords writes multiple data records from a producer into a stream in a single call. Each shard can support up to 1000 records written per second, up to a maximum total of 1 MB data written per second. A stream can have as many shards as you need. Specify the name of the stream and an array of request records, with each record in the array requiring a partition key and data blog. In this post, you are feeding data in the stream called Kinesis_Twitter by batching multiple records up to 10 in a container and calling PutRecords in a loop until the self-imposed limit of 100 records in the stream.

For a thorough implementation of streaming large volumes of data, we recommend that you consider a producer library. The Amazon Kinesis Producer Library (KPL) provides necessary management of retries, failed records, and batch records when implementing producer applications to efficiently feed data into Amazon Kinesis at scale.  For more information, see the Implementing Efficient and Reliable Producers with the Amazon Kinesis Producer Library blog post.

   container = []
   class StdOutListener(StreamListener):
            def __init__(self):
                 super(StdOutListener, self).__init__()
                 self.counter = 0
                 self.limit = 100
            def on_data(self, data):
                data = json.loads(data)
                global container
                record = {'Data': json.dumps(data), 'PartitionKey': 'partition1'}
                container.append(record)
                if len(container) >= 10:
                      client.put_records(Records=container, StreamName='Kinesis_Twitter')
                      container=[]
                      self.counter += 1
                      if self.counter < self.limit:       
                            return True
                   else: 
                           stream.disconnect()
           def on_error(self, status):
                print status
       def main():
             out = StdOutListener()
              stream = Stream(auth, out)
              track = [‘beer’, ‘wine’, ‘whiskey’]

              try:
                 stream.filter(track = track)
              except
                    stream.disconnect()

             if __name__ = = ‘__main__’:
                           main()     

Sample partial output of the Python script above:

[{'PartitionKey': 'partition1', 'Data': '{"contributors": null, "truncated": false, "text": "in case anyone's wondering how walking to work is going, someone poured beer on my leg this morning. (what? http://t.co/oDwcG5Kz5k)", "in_reply_to_status_id": null, "id": 641965701952094208, "favorite_count": 0, "source": "Twitter for iPhone", "retweeted": false, "coordinates": null, "timestamp_ms": "1441891525148", "entities": {"user_mentions": [], "symbols": [], "trends": [], "hashtags": [], "urls": [{"url": "http://t.co/oDwcG5Kz5k", 
……. More blob data >>

Verify and read data in Amazon Kinesis

To read data continuously from a stream, the Amazon Kinesis API provides the getShardIterator and getRecords methods, representing a pull model that draws data directly from specified shards in the stream. You retrieve records from the stream on a per-shard basis; for each shard and each batch of records, you need to obtain a shard iterator, which specifies the position in the shard from which to start reading data records sequentially. Obtain the initial shard iterator using getShardIterator. Next, instantiate a GetRecordsRequest object and specify the iterator for the request using the setShardIterator method. Obtain shard iterators for additional batches of records using the getNextShardIterator method. To get the data records, call the getRecords method and continue to loop through the next shard iterator as follows.

The following code specifies TRIM_HORIZON as the iterator type when you obtain the initial shard iterator, which means records should be returned beginning with the first record added to the shard. For more information about using shard iterators, see Using Shard Iterators.

import boto3
import time
client = boto3.client('kinesis')
shard_id = 'shardId-000000000000'
shard_iterator = client.get_shard_iterator(StreamName='Kinesis_Twitter', ShardId=shard_id,ShardIteratorType='TRIM_HORIZON')['ShardIterator']
i=100
while i > 0:
     out = client.get_records(ShardIterator=shard_iterator, Limit=5)
     shard_iterator = out['NextShardIterator']
     i=i-1
     print out;
     time.sleep(0.5)

So far, you’ve set up Tweepy to access Twitter feeds, configured Amazon Kinesis to ingest Twitter feeds, and verified data in the stream.  Now, set up Splunk on an EC2 instance, connect Splunk to an Amazon Kinesis stream, and finally visualize Twitter data in a Splunk dashboard.

Install and set up Splunk

Splunk Enterprise is available as an Amazon Machine Image on the AWS Marketplace.

Splunk is available on AWS Marketplace

The latest version 6.2.1 is available on Linux as a 64-bit AMI. For more information about setup, see the Splunk documentation.

From the AWS Marketplace, choose Splunk Enterprise HVM AMI. On the overview page, choose Continue.

On the Launch on EC2 page, enter the following:

  • Select the appropriate EC2 instance type.
  • Select the AWS region in which to set up Splunk.
  • Choose Launch with 1-click. For production deployments, we recommend following Splunk capacity planning guidelines and best practices.
  • Select appropriate VPC and security group rules for your environment, including a key pair
  • Select the security group ports to be opened: TCP (554), UDP 8089 (management), 8000 (Splunkweb), 9997 (fwder), 22 (ssh), 443 (SSL/https).

After the instance launches and Splunk is running, log in to the Splunk console.

For a production deployment, you need to set up Splunk indexers and other related configuration; for this post, you can use the default values.

Set up Amazon Kinesis Modular Input

To ingest stream data into Splunk for indexing, install the free Amazon Kinesis Modular Input app.  The Amazon Kinesis Modular Input app is a type of consuming application that enables stream data to be indexed into Splunk. This is very much like a connector application between Amazon Kinesis and Splunk.

On the app home page, choose Settings and Data inputs.

Choose Settings and Data Inputs

On the Data inputs page, you should now see Amazon Kinesis listed as a local input type.  Under Actions, choose Add new.

Specify the following Amazon Kinesis and AWS configuration parameters.

Stanza Name:  Any name associated with Amazon Kinesis data (such as Kinesis_Tweet).

Kinesis App Name: Any name associated with Amazon Kinesis data (such as Kinesis_Tweet).

Kinesis Stream Name: An Amazon Kinesis stream as configured in your Amazon Kinesis environment; you should match the exact name and repeat this configuration for each stream (such as Kinesis_Tweet).

Kinesis Endpoint: Use us-west-2.

Initial Stream Position: Defaults to TRIM_HORIZON, which causes the ShardIterator to point to last untrimmed record in the shard (the oldest data in the shard). You can also point to and read the most recent record in shard with LATEST. For now, use TRIM_HORIZON.  For more information, see GetShardIterator in the Amazon Kinesis Streams API Reference.

AWS Access Key ID: Your AWS access key (IAM user account).

AWS Secret: Your AWS secret key (IAM user account).

Backoff Time (Millis): Defaults to 3000.

Number of Retries: Defaults to 10.

Checkpoint Interval (Millis): Defaults to 60000.

Message Processing: This field is used for additional custom handling and formatting of messages consumed from Amazon Kinesis before they are indexed by Splunk. If this field is empty (as for this post), the default handler is used. For more information about custom message handling and examples, see the Customized Message Handling section in the Splunk documentation. You can find example message handlers in the GitHub SplunkModularInputsJavaFramework repository.

Set Source Type: Manual (allows you to specify a type for Amazon Kinesis streams).

Source Type: Kinesis.

Choose Next, verify the configuration, and enable it by choosing Data Inputs and Kinesis Records.

Enable the configuration

Verify Amazon Kinesis stream data in Splunk

You should be all wired up from streaming Twitter data into Amazon Kinesis using Python tweepy and connecting the Amazon Kinesis stream to your Splunk indexer.  Now, verify Twitter data in your Splunk indexer by using the Splunk search console:  Choose Data Summary, Sources, and kinesis://kinesis_twitter. You should now see Twitter records show up on the search console.

Twitter records in the search console

As the records are in JSON format, you can use Splunk rex commands to extract fields from the records, which can then be used for analysis and dashboards:

Source=”kinesis: //kinesis_twitter” | rex "s+record="(?[^n]+})"ssequence"  | spath input=jsontest

All the fields extracted from each Amazon Kinesis record are displayed to the left.  Choose the user.location field to display users tweeting by location, or user.time_zone to display users tweeting by time zone.

Users tweeting by time zone

Build a simple dashboard and visualization of Twitter data on Splunk

Now, build a simple dashboard displaying the top locations and languages of tweeting users. The following search extracts user locations:

source="kinesis://Kinesis_Tweet" | rex "s+record="(?[^n]+})"ssequence"  | spath input=jsontest | rex "s+record="(?[^n]+})"ssequence"  | spath input=jsontest | rename user.lang as language | stats count by language | sort -count limit=10

After the data is extracted, choose Save As and save the search as Report so you can use the search for later use. Choose the Visualization tab next to Statistics and choose Pie as the chart type.

Pie chart visualization

Select Save As again, save as Dashboard Panel, enter a value for Dashboard Title, and choose Save.

Repeat the steps above for additional dashboard charts.

The following search extracts user language:

source="kinesis://Kinesis_Tweet" | rex "s+record="(?[^n]+})"ssequence"  | spath input=jsontest | rename entities.hashtags{}.text as hashtag | stats count by hashtag | sort -count limit=10

The following search extracts user hashtags:

source=”kinesis://Kinesis_Tweet” | rex “s+record=”(?<jsontest>[^n]+})”ssequence”  | spath input=jsontest | rename entities.hashtags{}.text as hashtag | stats count by hashtag | sort -count limit=10

Access your dashboard by clicking on the Kinesis_TA app, and choose Default Views and Dashboards.  Select the dashboard (Twitter data dashboard) that you just created.

Here’s your simple dashboard displaying Twitter user location, language, and hashtag statistics.

Dashboard

When you finish, make sure to delete the streams and terminate the Splunk EC2 instances if you no longer need them.

Conclusion

The combination of Amazon Kinesis and Splunk enables powerful capabilities: you can ingest massive data at scale, consume data for analytics, create visualizations or custom data processing using Splunk, and potentially tie in Lambda functions for multiple consumer needs, all while ingesting data into Amazon Kinesis one time. This is a win-win combination for customers who are already using Splunk and AWS services, or customers looking to implement scalable data ingestion and data insight mechanisms for their big data needs.

In a future post, I’ll continue from here to extract data from an Amazon Kinesis stream and store that data in a DynamoDB table or an S3 bucket using a Lambda function.

Until then, happy tweeting, streaming, and Splunk-ing all at once!
If you have questions or suggestions, please leave a comment below.

————————————

Related:

Using Amazon EMR and Hunk for Rapid Response Log Analysis and Review