AWS Big Data Blog

Visualizing Real-time, Geotagged Data with Amazon Kinesis

Nick Corbett is a Big Data Consultant for AWS Professional Services

Amazon Kinesis is a fully managed service for processing real-time data at massive scale.  Whether you are building a system that collects data from remote sensors, aggregating log files from multiple servers, or creating the latest Internet of Things (IoT) solution, Amazon Kinesis lets you capture and process terabytes of data every hour from hundreds of thousands of sources.

For many of these systems, the location where the data was generated is important to users.  For example, an alarm from a remote sensor is not going to be much use unless a user can see where the event occurred.  Plotting data on a map is the most obvious way to help your users easily visualize geographic data.  This blog post shows you how to use Amazon Kinesis to build a system that handles geotagged streaming data and shows two simple visualizations that can provide insight to users.  The first visualization plots events on a globe and is useful as a high-impact way of displaying a relatively small number of events:

The second visualization can scale to cope with a much larger number of events.  This plots a heat-map of events over a period of time:

The basic architecture of the system that we will build is shown below.  A Producer puts data into Amazon Kinesis.  An Amazon Kinesis processes this information and the relevant geo information is stored in an Amazon ElastiCache Redis Cluster.  A node.js web server, running in Elastic Beanstalk, provides a visualization of this data.

The code needed to build this system is written in Java and JavaScript, but don’t worry if your development environment is not set up for these languages; we will compile all our code using Amazon Elastic Compute Cloud (Amazon EC2) servers.

Important

When you build the system described in this blog on AWS, it will incur nominal charges for Amazon Kinesis and other AWS services.  Where possible, we will use resources that are eligible for the AWS Free Tier.  Otherwise, you’ll incur the standard Amazon usage fees.  The total charges to complete the processes in this blog post are minimal (typically only a few dollars).

Source of Geotagged Data

Since most of us don’t have access to a network of connected devices or sensors, we need to find an alternative source for our geotagged data.  About 500 million Tweets are sent every day and Twitter makes a small sample of these available to developers via their streaming APIs.  If a Tweet has been sent from a mobile device, it may be geotagged, letting you know the Tweeter’s location.  You can sign up for a Twitter developer account and then create an application.  Make sure your application is set for ‘read-only’ access and then click the button Create My Access Token at the bottom of the Keys and Access Tokens tab.  By this point you should have four Twitter application keys: Consumer Key (API Key), Consumer Secret (API Secret), Access Token, and Access Token Secret.  Once you have these keys, you are ready to start building your AWS solution.

Create a Security Group

Before we can start building our system, we need to create a Security Group for our servers.  From the AWS Console, create a Security Group with two rules.  The first should allow SSH (port 22) traffic so that you can connect to your servers.  Next, add a rule that allows traffic on all ports for all protocols when the source is the Id of the Security Group you are creating.  This allows inbound traffic from instances assigned the same Security Group and ensures all servers in our system can communicate.

Create an Amazon Kinesis Stream

The next task is to create an Amazon Kinesis Stream.  From the AWS Console, go to the Amazon Kinesis page for your region and click on the Create Stream button.  Give your stream a name and select the number of shards.  An Amazon Kinesis stream consists of one or more shards, each of which provides a capacity of 1MB/sec data input and 2MB/data output.  The total capacity of the stream is simply the sum of all its shards; to scale up the stream shards are added, to scale down they are taken away.  Each shard can process up to 1000 write transactions per second.  While testing the processes in this blog post, I found that the Twitter streaming API has always run below 1000 Tweets per second: you will need only 1 shard.

Create an Amazon ElastiCache Instance

Next, use the AWS Console to go to the Amazon ElastiCache page and click Launch Cache Cluster.  Use the next screen to configure a single-node Redis cluster.  Keep the cache port as the standard and select a cache.m3.medium as the Node Type.  Make sure you use the security group you created in the previous section.  Once the Amazon ElasiCache server has been created, you will be able to find the name of the endpoint by going to Cache Clusters in the left-hand navigation and clicking the Nodes link for your cluster.  The other parts of the system that read and write data to the cluster will use this endpoint name and port number.

Producer

If you were collecting data from remote sensors these could send data directly to Amazon Kinesis.  In our architecture, we need a ‘Producer’ to extract the data from Twitter and forward it to Amazon Kinesis.  Twitter has opened sourced a Java library called Hosebird Client (HBC) that can be used to extract data from their streaming APIs.  The code used to configure the client is shown below:

/**
* Set up your blocking queues: Be sure to size these properly based
	* on expected TPS of your stream
	*/
	BlockingQueue msgQueue = new LinkedBlockingQueue(10000);

	/**
	* Declare the host you want to connect to, the endpoint, and
	* authentication (basic auth or oauth)
	*/
	StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
			
	// Track anything that is geotagged
	endpoint.addQueryParameter("locations", "-180,-90,180,90");

	// These secrets should be read from a config file
	Authentication hosebirdAuth = new OAuth1(consumerKey,
					consumerSecret, token, secret);

	// create a new basic client - by default gzip is enabled
	Client client = new ClientBuilder().hosts(Constants.STREAM_HOST)
			.endpoint(endpoint).authentication(hosebirdAuth)
			.processor(new StringDelimitedProcessor(msgQueue)).build();

	client.connect(); 

As you can see from the code, we use ‘addQueryParameter’ on the endpoint to filter the data so only geotagged Tweets are sent. The examples on HBC’s github page show how to filter for search terms. For example, you could restrict your analysis to Tweets with a certain hashtag. One thing to note: if you use more than one filter they will be ‘OR’ed together. If you add an extra line to the above code to ‘trackTerms’ the results you get back will either contain your search term or be geotagged, which isn’t what we want. The way forward here is to provide HBC with the most restrictive filter (usually the search term) to limit the amount of data sent back from Twitter and build the second filter directly into your code.

The Amazon Kinesis stream we created can handle up to 1000 transactions per second. This presents a problem when sending data from a single node: to send at this rate you will need to process one Tweet every 1ms. Allowing for network latency alone, it is unlikely that you’ll get your calls to Amazon Kinesis working that quickly. The producer therefore uses a thread pool to make multiple concurrent calls. The code below shows how this is set up:

// create producer
	ProducerClient producer = new ProducerBuilder()
			.withName("Twitter")
			.withStreamName(streamName)
			.withRegion(regionName)
			.withThreads(10)
			.build();
			
	producer.connect(); 

The ‘name’ of the producer is used for de-bugging purposes and the stream and region names are set using variables extracted from the configuration file. The above call sets up a pool of 10 threads to process messages that you want to send to Amazon Kinesis. The code below shows how the Tweets from HBC are wired to the producer:

// get message from HBC queue
	String msg = msgQueue.take();
				
	// use 'random' partition key
	String key = String.valueOf(System.currentTimeMillis());
				
	// send to Kinesis
	producer.post(key, msg); 

Each Tweet is represented as a JSON string. This is taken from the HBC queue (msgQueue) and posted to the producer. Internally, the producer puts this in a queue and a worker thread will send the data to Amazon Kinesis. A random partition key is used as we don’t need to group messages on the shards.

Before you can run the producer on Amazon EC2 you need to set up an IAM role. This will allow our Amazon EC2 instance access to Amazon Kinesis and Amazon S3. Go to the AWS console, click on IAM, and create a new role. The role should be an AWS Service Role for Amazon EC2 role with the following custom policy:

{
 	"Version": "2012-10-17",
 "Statement": [
  {
    "Sid": "Stmt1392290776000",
    "Effect": "Allow",
    "Action": [
      "kinesis:*", "s3:Get*", "s3:List*"
    ],
    "Resource": [
      "*"
    ]
  }
 ]
}

Start a new m3.medium Amazon EC2 instance using the Amazon Linux AMI. Assign the IAM role and the Security Group that you created. Modify the following script by adding the name of your Amazon Kinesis Stream, the region you are using (eg us-west-1) and the 4 keys that you got when you set up your Twitter Application. Paste this into the User Data for the instance (found under the ‘Advanced’ section of the ‘Instance Configuration’ tab). Finally review and start your instance, remembering to supply a key pair so that you can get access later.

#!/bin/bash
# update the instance
yum update -y

# install jdk
yum install java-1.8.0-openjdk -y
yum install java-1.8.0-openjdk-devel -y
yum install git -y

update-alternatives --set java /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.25-0.b18.4.amzn1.x86_64/jre/bin/java

cd /home/ec2-user

# install Apache Maven
wget http://www.dsgnwrld.com/am/maven/maven-3/3.2.3/binaries/apache-maven-3.2.3-bin.tar.gz
tar -xzvf apache-maven-3.2.3-bin.tar.gz

# get the code
git clone https://github.com/awslabs/aws-big-data-blog.git
cp ./aws-big-data-blog/aws-blog-kinesis-data-visualization/TwitterProducer/* /home/ec2-user -r

# create the config file
echo "aws.streamName = Name of Your Amazon Kinesis Stream" > AwsUserData.properties
echo "aws.regionName = Name of your region" >> AwsUserData.properties
echo "twitter.consumerKey = Twitter Consumer Key" >> AwsUserData.properties
echo "twitter.consumerSecret = Twitter Consumer Secret Key" >> AwsUserData.properties
echo "twitter.token = Twitter Access Token" >> AwsUserData.properties
echo "twitter.secret = Twitter Access Token Secret" >> AwsUserData.properties
echo "twitter.hashtags = " >> AwsUserData.properties

# do the build
/home/ec2-user/apache-maven-3.2.3/bin/mvn package

This script will run after the instance has booted and installed Open JDK and Apache Maven. Next, it downloads the source code for the Producer from git and creates the configuration file. Finally, Apache Maven is used to build the source code. To start the producer, follow the instructions to SSH to the instance and type:

java -jar target/TwitterProducer-0.0.1-SNAPSHOT.jar AwsUserData.properties 

You may have to wait a couple minutes after the server has booted for the Apache Maven build to complete and the jar file to appear on the server. At this point, you should see messages indicating that Tweets are being sent to Amazon Kinesis. Amazon Kinesis Application The Amazon Kinesis Application will retrieve the JSON Tweet from Amazon Kinesis, extract the geo information and publish this to the Amazon ElastiCache Redis Cluster.

The starting point for the application was the sample Amazon Kinesis Application  that is hosted on the AWS github pages (this should be the starting point for all your Amazon Kinesis Applications). The  Amazon Kinesis Application uses Jedis to interface with the Amazon ElastiCache Redis server. The details of the Redis server are passed to the Record Processor when it is constructed and a reference to Jedis is created when the object is initialised. To process records coming from Twitter, the following code has been added to the Record Processor:

Coordinate c = null;
            	   
       try {
       	// For this app, we interpret the payload as UTF-8 chars.
              data = decoder.decode(record.getData()).toString();
                   
              // use the ObjectMapper to read the json string and create a tree
              JsonNode node = mapper.readTree(data);
                   
              JsonNode geo = node.findValue("geo");
              JsonNode coords = geo.findValue("coordinates");
                   
              Iterator elements = coords.elements();
                   
              double lat = elements.next().asDouble();
              double lng = elements.next().asDouble();
                   
              c = new Coordinate(lat, lng);
                   
       } catch(Exception e) {
            	// if we get here, its bad data, ignore and move on to next record
       }
            	   
       if(c != null) {
       	String jsonCoords = mapper.writeValueAsString(c);
             jedis.publish("loc", jsonCoords);
       }

As you can see from the code, the coordinates of the Tweet are extracted from the JSON message we receive from Amazon Kinesis. This information is wrapped as a JSON object and published to a Redis key. If you want to run an Amazon Kinesis Application in production, consider hosting the code in Elastic Beanstalk following the instructions in our previous blog post, “Hosting Amazon Kinesis Applications on AWS Elastic Beanstalk.” Alternatively, you can build the application as a JAR and run it directly on a server. Either way, you will need to set up an IAM role for your target server. Go to the IAM console and create an AWS Service Role with the following policy:

{
 "Version": "2012-10-17",
 "Statement": [
  {
    "Sid": "Stmt1392290776000",
    "Effect": "Allow", 
    "Action": [ 
      "kinesis:*", "cloudwatch:*", "dynamodb:*", "elasticache:*", "s3:Get*", "s3:List*"
    ], 
    "Resource": [ 
      "*"
    ] 
  } 
 ] 
}

If you are running your Amazon Kinesis Application outside of Elastic Beanstalk, start an Amazon EC2 instance (m3.meduim and Amazon Linux AMI) and assign the IAM role that you just created. Once again, use the same security group that you create above; this will allow your instance to communicate with Redis. Modify the following script below by entering the name of the Amazon Kinesis stream and region together with the name and port of the Amazon ElastiCache Redis cluster that you created. You can then paste this script into the ‘User Data’ section of the server configuration.

#!/bin/bash

# update instance
yum update -y

# install jdk
yum install java-1.8.0-openjdk -y
yum install java-1.8.0-openjdk-devel -y
yum install git -y

update-alternatives --set java /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.25-0.b18.4.amzn1.x86_64/jre/bin/java

cd /home/ec2-user

# install Apache Maven
wget http://www.dsgnwrld.com/am/maven/maven-3/3.2.3/binaries/apache-maven-3.2.3-bin.tar.gz
tar -xzvf apache-maven-3.2.3-bin.tar.gz

# get the code
git clone https://github.com/awslabs/aws-big-data-blog.git
cp ./aws-big-data-blog/aws-blog-kinesis-data-visualization/KinesisApplication/* /home/ec2-user -r

# create the config file
echo "appName = DataVizAnalyzer" > KinesisClient.properties
echo "kinesisEndpoint = Name of your region" >> KinesisClient.properties
echo "redisEndpoint = Name of the Redis node" >> KinesisClient.properties
echo "redisPort = The port number used by Redis" >> KinesisClient.properties
echo "kinesisStreamName = The name of your Amazon Kinesis Stream" >> KinesisClient.properties

# build
/home/ec2-user/apache-maven-3.2.3/bin/mvn package

This script works in the same way as before. Open JDK and Apache Maven are installed and the code for the Amazon Kinesis Application is downloaded from git and built using Apache Maven. SSH to the instance. Once the jar has built, type the following command:

java -jar target/KinesisClient-0.0.1-SNAPSHOT.jar KinesisClient.properties

You should soon see some messages to indicate that data is being extracted from Amazon Kinesis and coordinate data is being sent to Redis.

node.js Server

You can download the code used for the node.js server.  The node.js server uses two important libraries: node_redis to connect to Redis and socket.io to established a socket connection between browser and server.  The socket is used to push new Tweet information to any connected users.

When the node.js server starts, it registers to listen for Redis publishing changes:

	// subscribe to listen to events from redis redisClient.on("ready", function () { redisClient.subscribe("loc"); 
}); 

If Redis publishes a change, it is a very simple process to push that change to any connected users:

// When we get a message from redis, we send the message down the socket to the client
redisClient.on('message', function(channel, message) {	
	var coord = JSON.parse(message);
	io.emit('tweet', coord);
});

The node.js code that you downloaded also contains the HTML and javascript code to visualize the geotagged data.

The code contains two methods for visualization, both of which use Three.js to do 3D rendering. The starting point for the first visualization is a spinning globe, which is based on an excellent blog post by Bjørn Sandvik. Thanks also goes to Tom Paterson who created the open source textures used. The main code for the globe is in public/js/earth.js. You can see calls to createEarth, createClouds, createStars, createSun and createLensFlare to set up the environment.

The final part of the set up is to create 50 lights, all of which are switched off. When sockets detects that details of a Tweet have been sent to the browser, the pool of lights is searched for one that is not in use. The light is moved to the location of the Tweet and, over the next few seconds, its brightness is increased then decreased. This has the effect of creating a flash of light for each Tweet. The time taken to flash and the color of the light are specified when the flash is started. Below is the code that triggers this:

        var socket = io();
	
	socket.on('tweet', function(coord) {
		startLight(coord.lat, coord.lng, 2000, 0x6DAEE1);
	});

The spinning globe and flashing lights combine to make a striking display, but this visualization is limited by the amount of data it can display. Creating the graphic and flashing lights is very resource intensive and you may find that if you try to display more than 25 events per second your computer will start to slow.

The second visualization will scale to show many more data points. The main code for this is in public/js/earth2.js. The structure of the code is very similar to the first file. An environment is set up with createEarth and createBackgroundLight. This is a much simpler view; the Earth is rendered flat and the camera stays still to save on processing cycles. Instead of creating a pool of lights, this visualization creates a PointCloud with 50,000 points. Each point is created at an infinite position so the users won’t see it.

When sockets reports a Tweet, the code searches for a point that is not in use and moves it to the Tweet’s location. Each point uses Additive Blending which means as points are added to the same location the color intensity will build up. By keeping each point for a relatively long period of time (10 seconds) you can build some great heat maps to show the density of Tweets in geographic areas.

To get the node.js application running in Elastic Beanstalk, you first need to create a zip archive of your code. Zip the contents of the directory (such as files globe.html, heatmap.html, and server.js) not the parent folder. Next, create an IAM role for the application with the following policy:

{
 "Version": "2012-10-17",
 "Statement": [
  {
    "Sid": "Stmt1392290776000",
    "Effect": "Allow", 
    "Action": [ 
       "elasticache:*"
    ], 
    "Resource": [ 
      "*"
    ] 
  } 
 ] 
}

Now you can go to the AWS Console for Elastic Beanstalk and create a new application.

  1. Select Node.js as the pre-defined environment and specify zip file of the code as the application source.
  2. On the Configuration Details screen, specify the IAM role that you just created.
  3. Once the Elastic Beanstalk application has been created, select Configuration in the left-hand navigation on the environment, and then select the Software Configuration panel (click the cog in the upper-right corner).
  4. Under Environment Properties, enter the name of the Redis ElastiCache server in parameter PARAM1.
  5. Finally, select the Instances panel and add the name of the Security Group you created to the list.

If you go back to the main application dashboard, you will see the address of your visualization application. You can access this after the AWS Elastic Beanstalk Application has finished building. If you get a warning page telling you that WebGL is not supported, make sure you are using a compatible browser.

Summary

This blog post has shown you how Amazon Kinesis can be used to capture and ingest real-time, geotagged data and how to visualize that information using a web-based client. The visualizations shown here are fairly basic. It is easy to imagine how they could be further advanced to use color to specify another dimension of the data. When the Amazon Kinesis Application processes the Tweet, it currently only extracts the coordinates from the JSON. It could easily be modified to extract other data that can be used to enhance the visualization.

The solution presented here is also scalable. Shards could easily be added to the Amazon Kinesis Stream and the Amazon Kinesis Application can be hosted across a fleet of servers to increase the throughput.

If you have questions or suggestions, please leave a comment below. And if you make your own visualization and want to share, please let us know in a comment!

Do more with Amazon Kinesis!

Snakes in the Stream – Feeding and Eating Amazon Kinesis Streams with Python

Hosting Amazon Kinesis Applications on AWS Elastic Beanstalk