AWS Machine Learning Blog

Video analytics in the cloud and at the edge with AWS DeepLens and Kinesis Video Streams

Yesterday we announced the integration of AWS DeepLens with Amazon Kinesis Video Streams, allowing you to easily and securely stream a video feed from AWS DeepLens to Amazon Kinesis Video Streams for analytics, machine learning and other processing.

To help you understand the solution that integrates AWS DeepLens and Kinesis Video Streams, we’ll recap the concept of inference in machine learning (ML). Inference is the process where the trained machine learning model is used to predict the new data sample. In IoT deployment scenarios, where there is a connected device, such as a camera, we can build and train the models in the cloud, and we can deploy the model to the device. Performing inference locally on the device reduces the round-trip latency of sending device data to the cloud, as well as taking actions on the inference results. When we use a camera, for example, we might want to stream the video only when an object is detected. That will allow us to perform more sophisticated analysis of the video by using a service like Amazon Rekognition Video or by using the time-indexed video stored in Kinesis Video Streams for model training.

We love dogs at Amazon, so we thought we’d build a dog monitoring project, to help keep an eye on our furry friends, perhaps to see how often they are eating, or how many times they jump on the sofa when they are home alone! When AWS DeepLens detects a dog, it will send the next 60 seconds of video to Kinesis Video Streams, long enough to get a decent report of what the woofers are up to.

In this blog post, we will set up AWS DeepLens and deploy the object detection model that detects dogs. When a dog is detected, AWS DeepLens will stream the video directly to Amazon Kinesis Video Streams. We will then index the dog detection events in Amazon Elasticsearch Service using AWS IoT and Amazon Kinesis Data Firehose. This allows us to search the videos where dogs are detected and then play them in the Kinesis Video Streams console.

We use the new Amazon Kinesis Video Streams for the AWS DeepLens Video Library – a Python module that encapsulates the Kinesis Video Streams Producer SDK for AWS DeepLens. We can use this module to send video feeds from an AWS DeepLens device to Kinesis Video Streams, and to control when to start and stop video streaming from the device.

Amazon Kinesis Video Streams makes it easy to securely stream video from millions of connected devices to AWS for real-time machine learning, storage, and batch-oriented processing and analytics. The time-indexed video from AWS DeepLens can be stored durably for as long as you want, and you don’t need to manage any infrastructure.

We are going to build the following architecture using these AWS services:

  • AWS DeepLens (capture video)
  • Amazon Kinesis Video Streams (stream, store, and play back video)
  • AWS IoT (capture detected metadata)
  • Amazon Kinesis Data Firehose (stream detected dog from multiple cameras to Amazon Elasticsearch Service and Amazon S3)
  • Amazon Elasticsearch Service (querying)

We’ll use the following steps to implement the architecture:

  1. Set up AWS DeepLens and deploy the object detection model that detects dogs. When a dog is detected it streams video directly to Amazon Kinesis Video Streams and publishes events to AWS IoT.
  2. Set up Amazon Elasticsearch Service, Amazon Kinesis Data Firehose, and an AWS IoT rule. Also create an index for storing incoming events with appropriate mapping and index patterns for visualization. After the index is created, enable the IoT rule.
  3. Visualize detected dog events across the cameras using Kibana Dashboards.
  4. Play the video that is stored and indexed in Kinesis Video Streams.

Step 1: Set up AWS DeepLens and deploy the object detection model.

We can use any device with the Kinesis Video Producer SDK or a device enabled by AWS Greengrass to capture video and stream to Amazon Kinesis Video Streams for processing. Here we are using the new AWS DeepLens-KinesisVideo APIs. We are going to develop our code using AWS Greengrass, which runs on the IOT device (the AWS DeepLens camera in our case). The AWS Greengrass core can run AWS Lambda functions that are deployed to it. For more information, see the AWS Greengrass Getting Started Guide.

AWS DeepLens automates most of these steps, so we’ll focus in this blog post on developing an AWS Lambda function that streams h264 data (video data) from AWS DeepLens to Amazon Kinesis Video Streams when a particular object (“dog”) is found in the local inference.

Develop an AWS Lambda function to start the camera, capture the video, and send to Kinesis Data Streams

AWS DeepLens runs the Ubuntu operating system (OS). AWS DeepLens is preloaded with AWS Greengrass, which lets you run local compute, messaging, data caching, sync, and ML inference capabilities in a secure way. We’ll use the AWS Lambda blueprint greengrass-hello-world and implement the function to stream video to Kinesis Video Streams.

Open the AWS Lambda console and select the deeplens-object-detection function:

We’ll select the Python2.7 runtime and update the deeplens-object-detection function code.

For the Lambda function, follow these steps in the AWS Lambda console:

  1. Import appropriate modules.
  2. Create a Kinesis video stream.
  3. Apply logic that detects the “dog” object.
  4. Start the camera and stream the data to Kinesis Video Streams.
  5. Publish and deploy the function on AWS DeepLens.

Import appropriate modules

The AWS DeepLens device comes preinstalled with the library DeepLens_Kinesis_Video. Use this library with the Kinesis Video API to handle the video data that is captured. We also added the following libraries that are required to run the function:

from threading import Thread, Event  
import os  
import json  
import DeepLens_Kinesis_Video as dkv  
from botocore.session import Session  
import numpy as np  
import awscam  
import cv2  
import time  
import greengrasssdk 

Create a Kinesis Video Stream, apply logic that detects a “dog” object, publish MQTT with timestamp, and start the camera to stream the data to Kinesis Video Streams

As mentioned earlier, we modified the existing deeplens-object-detection function and published a new version. We’ll create a new Kinesis video stream: “deeplens-dogstream”. (You won’t need to add the “deeplens” prefix because it will be added by DeepLens_Kinesis_Video lib.) When the dog object is identified, we’ll stream 60 seconds of video to Kinesis Video Streams. You can change your configuration of the code and identify other objects as you like, but for this blog post, we used a dog object. We will publish these events and timestamp via MQTT using AWS IoT when we have a video log that shows this information. 

class LocalDisplay(Thread):  
    """ Class for facilitating the local display of inference results 
        (as images). The class is designed to run on its own thread. In 
        particular the class dumps the inference results into a FIFO 
        located in the tmp directory (which lambda has access to). The 
        results can be rendered using mplayer by typing: 
        mplayer -demuxer lavf -lavfdopts format=mjpeg:probesize=32 /tmp/results.mjpeg 
    """  
    def __init__(self):  
        """Constructor"""  
        # Initialize the base class, so that the object can run on its own  
        # thread.  
        super(LocalDisplay, self).__init__()  
        # Initialize the default image to be a white canvas. Clients  
        # will update the image when ready.  
        self.frame = cv2.imencode('.jpg', 255*np.ones([640, 480, 3]))[1]  
        self.stop_request = Event()  
  
    def run(self):  
        """ Overridden method that continually dumps images to the desired 
            FIFO file. 
        """  
        # Path to the FIFO file. The lambda only has permissions to the tmp  
        # directory. Pointing to a FIFO file in another directory  
        # will cause the lambda to crash.  
        result_path = '/tmp/results.mjpeg'  
        # Create the FIFO file if it doesn't exist.  
        if not os.path.exists(result_path):  
            os.mkfifo(result_path)  
        # This call will block until a consumer is available  
        with open(result_path, 'w') as fifo_file:  
            while not self.stop_request.isSet():  
                try:  
                    # Write the data to the FIFO file. This call will block  
                    # meaning the code will come to a halt here until a consumer  
                    # is available.  
                    fifo_file.write(self.frame.tobytes())  
                except IOError:  
                    continue  
            fifo_file.flush()  
            fifo_file.close()  
  
    def set_frame_data(self, frame):  
        """ Method updates the image data. This currently encodes the 
            numpy array to jpg but can be modified to support other encodings. 
        """  
        ret, jpeg = cv2.imencode('.jpg', frame)  
        if not ret:  
            raise Exception('Failed to get frame from the stream')  
        self.frame = jpeg  
  
    def join(self):  
        self.stop_request.set()  
  
def greengrass_infinite_infer_run():  
    """ Entry point of the lambda function"""  
    try:  
        # This object detection model is implemented as single shot detector (ssd), since  
        # the number of labels is small we create a dictionary that will help us convert  
        # the machine labels to human readable labels.  
        model_type = 'ssd'  
        output_map = {1: 'aeroplane', 2: 'bicycle', 3: 'bird', 4: 'boat', 5: 'bottle', 6: 'bus',  
                      7 : 'car', 8 : 'cat', 9 : 'chair', 10 : 'cow', 11 : 'dinning table',  
                      12 : 'dog', 13 : 'horse', 14 : 'motorbike', 15 : 'person',  
                      16 : 'pottedplant', 17 : 'sheep', 18 : 'sofa', 19 : 'train',  
                      20 : 'tvmonitor'}  
        # Create an IoT client for sending to messages to the cloud.  
        client = greengrasssdk.client('iot-data')  
        iot_topic = '$aws/things/{}/infer'.format(os.environ['AWS_IOT_THING_NAME'])  
        # Create a local display instance that will dump the image bytes to a FIFO  
        # file that the image can be rendered locally.  
        local_display = LocalDisplay()  
        local_display.start()  
        # The sample projects come with optimized artifacts, hence only the artifact  
        # path is required.  
        model_path = '/opt/awscam/artifacts/mxnet_deploy_ssd_resnet50_300_FP16_FUSED.xml'  
        # Load the model onto the GPU.  
        client.publish(topic=iot_topic, payload='Loading object detection model')  
        model = awscam.Model(model_path, {'GPU': 1})  
        client.publish(topic=iot_topic, payload='Object detection model loaded')  
        # Set the threshold for detection  
        detection_threshold = 0.25  
        # The height and width of the training set images  
        input_height = 300  
        input_width = 300  
        # Use the boto session API to grab credentials  
        session = Session()  
        creds = session.get_credentials()  
        # Stream name and retention  
        stream_name = 'dogstream'  
        retention = 2 #hours  
        region = "us-east-1"  
        # Create producer and stream.  
        producer = dkv.createProducer(creds.access_key, creds.secret_key, creds.token, region)  
        client.publish(topic=iot_topic, payload="Producer created")  
        kvs_stream = producer.createStream(stream_name, retention)  
        client.publish(topic=iot_topic, payload="Stream {} created".format(stream_name))  
        # Create variable to track whether or not we are streaming to KVS  
        streaming = False  
        # Amount of time to stream - in seconds  
        wait_time = 60 #seconds  
        # Do inference until the lambda is killed.  
        while True:  
            # Get a frame from the video stream  
            ret, frame = awscam.getLastFrame()  
            if not ret:  
                raise Exception('Failed to get frame from the stream')  
            # Resize frame to the same size as the training set.  
            frame_resize = cv2.resize(frame, (input_height, input_width))  
            # Run the images through the inference engine and parse the results using  
            # our parser API, note it is possible to get the output of doInference  
            # and do the parsing manually, but since it is a ssd model,  
            # a simple API is provided.  
            parsed_inference_results = model.parseResult(model_type,  
                                                         model.doInference(frame_resize))  
            # Compute the scale in order to draw bounding boxes on the full resolution  
            # image.  
            yscale = float(frame.shape[0]/input_height)  
            xscale = float(frame.shape[1]/input_width)  
            # Dictionary to be filled with labels and probabilities for MQTT  
            cloud_output = {}  
            # Track whether or not at least one dog is preset  
            dog_present = False  
            # Get the detected objects and probabilities  
            for obj in parsed_inference_results[model_type]:  
                if obj['prob'] > detection_threshold:  
                    # Machine label 12 corresponds to a dog, if the lable is present,  
                    # then at least one dog is in the frame  
                    if obj['label'] == 12:  
                        dog_present = True  
                        cloud_output['timestamp'] = time.strftime("%Y-%m-%d %H:%M:%S")  
                    # Add bounding boxes to full resolution frame  
                    xmin = int(xscale * obj['xmin']) \  
                           + int((obj['xmin'] - input_width/2) + input_width/2)  
                    ymin = int(yscale * obj['ymin'])  
                    xmax = int(xscale * obj['xmax']) \  
                           + int((obj['xmax'] - input_width/2) + input_width/2)  
                    ymax = int(yscale * obj['ymax'])  
                    # See https://docs.opencv.org/3.4.1/d6/d6e/group__imgproc__draw.html  
                    # for more information about the cv2.rectangle method.  
                    # Method signature: image, point1, point2, color, and tickness.  
                    cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (255, 165, 20), 4)  
                    # Amount to offset the label/probability text above the bounding box.  
                    text_offset = 15  
                    # See https://docs.opencv.org/3.4.1/d6/d6e/group__imgproc__draw.html  
                    # for more information about the cv2.putText method.  
                    # Method signature: image, text, origin, font face, font scale, color,  
                    # and thickness  
                    cv2.putText(frame, "{}:    {:.2f}%".format(output_map[obj['label']],  
                                                               obj['prob'] * 100),  
                                (xmin, ymin-text_offset),  
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 165, 20), 4)  
                    # Store label and probability to send to cloud  
                    cloud_output[output_map[obj['label']]] = obj['prob']  
            # Place data into the KVS stream if at least one dog is detected by the model  
            if dog_present and not streaming:  
                client.publish(topic=iot_topic, payload=json.dumps(cloud_output))  
                client.publish(topic=iot_topic, payload='Start streaming video at {}'.format(time.strftime("%Y-%m-%d %H:%M:%S")))  
                kvs_stream.start()  
                #Streaming 60 seconds of Video  
                time.sleep(wait_time)  
                streaming = True  
            elif streaming:  
                kvs_stream.stop()  
                client.publish(topic=iot_topic, payload='Stop streaming video at {}'.format(time.strftime("%Y-%m-%d %H:%M:%S")))  
                streaming = False  
            # Set the next frame in the local display stream.  
            local_display.set_frame_data(frame)  
            # Send results to the cloud  
            client.publish(topic=iot_topic, payload=json.dumps(cloud_output))  
    except Exception as ex:  
        client.publish(topic=iot_topic, payload='Error in object detection lambda: {}'.format(ex))  
  
greengrass_infinite_infer_run()  

Then make sure you use the Existing role: AWSDeepLensLambdaRole. Review the AWS DeepLens Setup guide if this is the first time that you are using it.

Save, publish and deploy the function on AWS DeepLens

In the AWS DeepLens console, on the Object-detection project page, choose Action and select Publish New Version.

Deploy the function on your AWS DeepLens

To deploy the function on your AWS DeepLens device, we’ll use the AWS Deep Lens console. We’ll use the existing Object-Detection model and edit the project.

Choose Edit Project.

Edit the function. We’ll use the updated function version we published earlier.

Make sure that you set the Memory size of the function to 3 GB and save the project.

Remember that every time you register a device, AWS DeepLens creates a unique MQTT topic.

We chose that our function will publish inference output to an MQTT topic. This lets you see the model output on the AWS IoT console, or enable other Lambda functions to receive the output and take actions on it. Learn more.

And the last step is to save the project.

After the project is saved, we can deploy it on the AWS DeepLens device.

Upon successful deployment of the project, we can view the messages and the video on the Kinesis Video console.

In this example I used an image of my all-time favorite dog, Bono. I use this image to trigger AWS DeepLens to send 60 seconds of video to Kinesis Video Streams.

Now we go to the AWS IoT console and subscribe to the topic ID in the Inference output action.

We should see the MQTT message triggered by the dog detection. We will go to the Kinesis Video Streams console. We select the us-east-1 Region that we set in our Lambda function and select the stream deeplens-dogstream.

We choose deeplens-dogstream, and in the Video preview section we can see the live video ingested from AWS DeepLens.

Step 2: Set up Amazon Elasticsearch Service, Amazon Kinesis Data Firehose, an IOT rule, and create an index

We’ll launch the AWS CloudFormation template – Videolog-FS-ES.yaml. This launches Amazon Elasticsearch Service and Amazon Kinesis Data Firehose. All events published to Amazon Kinesis Data Firehose will be indexed on the Amazon Elasticsearch Service domain. The template will ask for the following input parameters:

  • HTTPIP: this is the IP address from which the Amazon Elasticsearch Service domain can be accessed. Part of the CloudFormation template is restricted to a specific IP address, but you can further secure it by adding a resource based, identity based, or IP based access policy. In addition, you can also authenticate Kibana using Amazon Cognito.
  • SQL: So that SQL can be used by the AWS IoT rule, we will write something like SELECT * FROM '$aws/things/[YOUR-DEEPLENS-THING-ID]/infer, where ID is updated by your ID. This means that we are selecting all attributes coming to the topic.

After the template is launched, it provides the domain endpoint in the output section. We will create the index videoLog by using the following command:

curl -XPUT 'https://[DOMAIN-ENDPOINT-OBTAINED-FROM-OUTPUT]/videoLog' -d '
{
    "settings" : {
        "number_of_shards" : 10,
        "number_of_replicas" : 1
    },
    "mappings": {
            "deeplens": {
                "properties": {
                    "boat": {
                        "type": "float"
                    },
                    "bottle": {
                        "type": "float"
                    },
                    "cat": {
                        "type": "float"
                    },
                    "dinning table": {
                        "type": "float"
                    },
                    "dog": {
                        "type": "float"
                    },
                    "person": {
                        "type": "float"
                    },
                    "sofa": {
                        "type": "float"
                    },
                    "timestamp": {
                        "type": "date",
                        "fields": {
                            "keyword": {
                                "type": "keyword"
                                }
                        },
                        "format": "yyyy-MM-dd HH:mm:ss"
                    },
                    "tvmonitor": {
                        "type": "float"
                    }
                }
            }
        }
}
'

After the index is created, create an index pattern from Kibana for visualizations. Log in to Kibana (the Kibana endpoint is available as one of output values), and create an index pattern. Then, from the management tab, add an index pattern on top of the videolog index by using the timestamp as time filter.

Now that everything is configured, we can enable the IoT rule by using the following command.

aws iot enable-topic-rule --rule-name [iotrulename-output-from-template]

Events will start to flow into Amazon Elasticsearch Service.

Step 3: Visualize detected dog events across cameras using Kibana Dashboards

As data flows, we can start building visualizations using Kibana. One of the simple visualization here shows the presence of a dog object in Roy’s office over the last 3 days:

Step 4: Play the video that is stored and indexed in Kinesis Video Streams

As we visualize data using Kibana or query directly to domains, we may want to view captured video for a particular timestamp. The Amazon Kinesis Video Streams console allows us to view stored videos for a specific period as follows:

Conclusion

This blog post shows you how AWS DeepLens and Amazon Kinesis Video Streams can be used as a DIY monitor camera, how to run a machine learning inference model locally, and how to build a searchable video index so that you can use it for logging and monitoring.

If you have any questions, please use the comments after this post.

For more general information, take a look at the AWS DeepLens website or browse AWS DeepLens posts on the AWS Machine Learning blog.


About the Authors

Roy Ben-Alta is a Solutions Architect and Principal Business Development Manager at AWS. Roy leads strategic and technical business development initiatives for machine learning (ML) and data analytics, focusing on real-time video and data streaming analytics and ML. He helps customers build ML solutions, and works with multiple AWS organizations including product, marketing, sales, and support, to ensure customer success in their AI journeys.

 

 

Nehal Mehta is a Sr Data Architect for AWS Professional Services. As part of professional services team he collaborates with sales, pre-sales, support, and product teams to enable partners and customers to benefit from big data analytics workloads, especially stream analytics workloads. He likes to spend time with friends and family, and has interests in technical singularity, politics, and, surprisingly, finance.