AWS Machine Learning Blog

Analyze live video at scale in real time using Amazon Kinesis Video Streams and Amazon SageMaker

We are excited to announce the launch of the Amazon Kinesis Video Streams Inference Template (KIT) for Amazon SageMaker. This capability enables customers to attach Kinesis Video streams to Amazon SageMaker endpoints in minutes. This drives real-time inferences without having to use any other libraries or write custom software to integrate the services. The KIT comprises of the Kinesis Video Client Library software packaged as a Docker container and an AWS CloudFormation template that automates the deployment of all required AWS resources. Amazon Kinesis Video Streams makes it easy to securely stream audio, video, and related metadata from connected devices to AWS for analytics, machine learning (ML), playback, and other processing. Amazon SageMaker is the managed platform for developers and data scientists to build, train, and deploy ML models quickly and easily.

Customers ingest audio and video feeds from sources like home security cameras, enterprise IP cameras, traffic cameras, AWS DeepLens, cellphones, and more into Kinesis Video Streams. Developers and data scientists across industry verticals ranging from smart homes to smart cities, from intelligent manufacturing to retail, want to deploy their own machine learning algorithms to analyze these video feeds on the AWS Cloud. These customers want a reliable way to connect Kinesis Video Streams to their Amazon SageMaker endpoints, so that they can build scalable, real-time, ML-driven video analytics pipelines with minimal operating overhead.

In this blog post, we’ll introduce this new capability and explain the functionality of both the Kinesis Video Streams Client Library and the CloudFormation template. We’ll also provide a step-by-step working example of integrating Kinesis Video Streams to Amazon SageMaker using KIT.

Kinesis Video Streams and Machine-Learning driven analytics

Amazon Kinesis Video Streams launched at re:Invent 2017. At launch it was already integrated with Amazon Rekognition Video, enabling an easy way to perform real-time face recognition using a private database of face metadata. This earlier blog post details how to use facial recognition to deliver high-end consumer experience with Amazon Kinesis Video Streams and Amazon Rekognition Video.

As customers ingest a variety of video feeds using Kinesis Video Streams their use cases, training data sets, and types of inferences being performed are also diversifying. For example, a leading home security provider wants to ingest audio and video from their home security cameras using Kinesis Video Streams. After which, they want to attach their own custom ML-models running in Amazon SageMaker to detect and analyze pets and objects to build richer user experiences. An in-store physical retail intelligence provider, wants to stream videos from cameras placed inside stores to train a custom person-counting model using Amazon SageMaker. This will enable them to make real-time inferences to estimate the number of shoppers in the store to inform store operations. 

Kinesis Video Streams integration with Amazon SageMaker using KIT

We’ll now discuss the two components that constitute KIT for Amazon SageMaker.

The Kinesis Video Streams client library enables scalable, a- least-once-processing of the media across a distributed set of workers, manages the reliable invocation of Amazon SageMaker endpoints, and publishing of inference results into a Kinesis data stream for subsequent processing. Specifically, the library determines the Kinesis Video streams that have to be processed, connects to the streams, and refreshes them periodically to include/ exclude streams for processing. The software instantiates a worker that runs consumers which are responsible for processing a Kinesis Video stream at any given time. As part of this, it also maintains leases for every consumer running in (and across) workers to coordinate among themselves the ability to process the various streams. It also ensures reliable, at-least-once-processing of the media fragments by managing checkpoints on a per lease-stream basis.

The software pulls media fragments from the streams using the real-time Kinesis Video Streams GetMedia API operation, parses the media fragments to extract the H264 chunk, samples the frames that need decoding, then decodes the I-frames and converts them into image formats such as JPEG/PNG format, before invoking the Amazon SageMaker endpoint. As the Amazon SageMaker-hosted model returns inferences, KIT captures and publishes those results into a Kinesis data stream. Customers can then consume those results using their favorite service, such as AWS Lambda. Finally, the library publishes a variety of metrics into Amazon CloudWatch so that customers can build dashboards, monitor, and alarm on thresholds as they deploy into production.

The AWS CloudFormation template automates the deployment of all relevant AWS infrastructure in the customer’s own account, to read media from Kinesis Video Streams and invoke the Amazon SageMaker endpoint for ML-based analytics. This saves time to build, operate, and scale the integrated capability.

The CloudFormation template first creates an Amazon Elastic Container Services (ECS) cluster using AWS Fargate compute engine that runs the library software hosted in a Docker container.

It also spins up an Amazon DynamoDB table for maintaining checkpoints and related state across workers that run on Fargate Tasks and Amazon Kinesis Data Streams to capture the inference outputs generated from Amazon SageMaker.  The template also creates the requisite AWS Identity and Access Management (IAM) policies and Amazon CloudWatch resources to monitor the entire infrastructure. KIT for Amazon SageMaker is compatible with any Amazon SageMaker endpoint that accepts image data. Customer can modify the template as needed to fit their specific use case.

How to set up KIT

Prerequisites

Step-by-step instructions for KIT deployment

  • You’ll deploy a website by means of a CloudFormation
  • CloudFormation is a powerful tool that facilitates the creation of an infrastructure-as-code template for repeatable infrastructure resource deployments.
    1. Log into your AWS account if you haven’t already. If you have already logged in go to step 2 by means of the following URL: https://xxxxxxxxxxxx.signin.aws.amazon.com/console replacing the Xs with your account number.
    2. On the AWS Services search bar choose CloudFormation.
    3. Select the CloudFormation Template for your target region from this location
    4. Name the Stack and fill out the parameters then choose Next.
      • AppName – A unique application name that is used for creating all resources
      • DockerImageRepository – Docker Image for Kinesis Video Streams and SageMaker Driver
      • EndPointAcceptContentType – image/jPEG or image/png image formats are currently supported to invoke the SageMaker endpoint
      • LambdaFunctionBucket – Amazon S3 bucket location for your custom Lambda function
      • LambdaFunctionKey – Amazon S3 Object Key  for your custom Lambda function code zip file
      • SageMaker Endpoint – Amazon SageMaker endpoint that hosts your custom Machine Learning model
      • StreamNames – CSV list of strings specifying stream names
      • TagFilters – JSON string of Tag filters
    5. Leave the parameters on the Options page as default and choose Next.
    6. Review the configuration information on the Review Acknowledge the creation of IAM Roles check box, and choose Create.

Extending the Solution

Depending on your use case, this solution can be extended by updating the Lambda function and integrating with other AWS services.

In this example, we’ll retrieve the Kinesis Video fragment and store it in an Amazon S3 bucket along with detection data.

  1. Create an Amazon S3 bucket.
  2. Add the following additional permissions to the AWS Lambda Execution role – replacing with correct bucket name and Kinesis Video Stream ARNs. These additional permissions enable AWS Lambda to retrieve the fragment from the Kinesis Video Stream and write to an S3 bucket.
    {
        "Effect": "Allow",
        "Action": [
            "s3:PutObject",
        ],
        "Resource": [
            "arn:aws:s3:::<<YOUR BUCKET>>/*",
        ]
    },
    {
        "Effect": "Allow",
        "Action": [
            "kinesisvideo:GetMediaForFragmentList",
            "kinesisvideo:GetDataEndpoint",
        ],
        "Resource": [
            "<< YOUR KINESIS VIDEO STREAM ARNs>>",
        ]
    }
    
  3. Replace <<YOUR BUCKET>> in the following code and replace the Lambda function code.
    from __future__ import print_function
    import base64
    import json
    import boto3
    import os
    import datetime
    import time
    from botocore.exceptions import ClientError
    
    bucket='<<YOUR BUCKET>>'
    
    #Lambda function is written based on output from an Amazon SageMaker example: 
    #https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_amazon_algorithms/object_detection_pascalvoc_coco/object_detection_image_json_format.ipynb
    object_categories = ['person', 'bicycle', 'car',  'motorbike', 'aeroplane', 'bus', 'train', 'truck', 'boat', 
                         'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog',
                         'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag',
                         'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat',
                         'baseball glove', 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup',
                         'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot',
                         'hot dog', 'pizza', 'donut', 'cake', 'chair', 'sofa', 'pottedplant', 'bed', 'diningtable',
                         'toilet', 'tvmonitor', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven',
                         'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier',
                         'toothbrush']
    
    def lambda_handler(event, context):
      for record in event['Records']:
        payload = base64.b64decode(record['kinesis']['data'])
        #Get Json format of Kinesis Data Stream Output
        result = json.loads(payload)
        #Get FragmentMetaData
        fragment = result['fragmentMetaData']
        
        # Extract Fragment ID and Timestamp
        frag_id = fragment[17:-1].split(",")[0].split("=")[1]
        srv_ts = datetime.datetime.fromtimestamp(float(fragment[17:-1].split(",")[1].split("=")[1])/1000)
        srv_ts1 = srv_ts.strftime("%A, %d %B %Y %H:%M:%S")
        
        #Get FrameMetaData
        frame = result['frameMetaData']
        #Get StreamName
        streamName = result['streamName']
       
        #Get SageMaker response in Json format
        sageMakerOutput = json.loads(base64.b64decode(result['sageMakerOutput']))
        #Print 5 detected object with highest probability
        for i in range(5):
          print("detected object: " + object_categories[int(sageMakerOutput['prediction'][i][0])] + ", with probability: " + str(sageMakerOutput['prediction'][i][1]))
        
        detections={}
        detections['StreamName']=streamName
        detections['fragmentMetaData']=fragment
        detections['frameMetaData']=frame
        detections['sageMakerOutput']=sageMakerOutput
    
        #Get KVS fragment and write .webm file and detection details to S3
        s3 = boto3.client('s3')
        kv = boto3.client('kinesisvideo')
        get_ep = kv.get_data_endpoint(StreamName=streamName, APIName='GET_MEDIA_FOR_FRAGMENT_LIST')
        kvam_ep = get_ep['DataEndpoint']
        kvam = boto3.client('kinesis-video-archived-media', endpoint_url=kvam_ep)
        getmedia = kvam.get_media_for_fragment_list(
                                StreamName=streamName,
                                Fragments=[frag_id])
        base_key=streamName+"_"+time.strftime("%Y%m%d-%H%M%S")
        webm_key=base_key+'.webm'
        text_key=base_key+'.txt'
        s3.put_object(Bucket=bucket, Key=webm_key, Body=getmedia['Payload'].read())
        s3.put_object(Bucket=bucket, Key=text_key, Body=json.dumps(detections))
        print("Detection details and fragment stored in the S3 bucket "+bucket+" with object names : "+webm_key+" & "+text_key)
      return 'Successfully processed {} records.'.format(len(event['Records']))
    

S3 Bucket with video fragments and detection details

The following screenshot shows that KIT for Amazon SageMaker is emitting detected video fragments and corresponding inferences into the Amazon S3 bucket.

AWS Lambda function logs showing processed output

This solution can be extended for various use cases. For example, by combining the Computer Vision OpenCV library and the Amazon SageMaker prediction details, bounding boxes can added to the detected objects in the video frames and fed in to a real time alerting portal.

Monitoring the KIT-managed infrastructure

The library software vends a variety of CloudWatch metrics by default that customers can use to monitor the progress being made to process individual streams. These include metrics that determine the resource consumption of the workers in their cluster, the rates at which the Amazon SageMaker endpoint is being invoked, and how the inference results are published into their Kinesis Data Stream. The CloudFormation template, creates a ready-to-use CloudWatch dashboard that customers can further extend for their purposes. By default the dashboard captures the key metrics for the underlying services that power KIT and custom metrics specific to the latency, reliability, and scaling characteristics of the software.

CloudWatch dashboard – KIT metrics

Conclusion

Through KIT for Amazon SageMaker, we have simplified the real-time, ML-driven processing of media streams in a reliable and scalable manner. Customers can attach all of their Kinesis Video streams to their Amazon SageMaker endpoints to power their ML-driven use cases with minimal operational overhead. You can read more about this capability in our documentation. We look forward to iterating on the underlying Kinesis Video Client Library software, based on customer feedback so that all developers can further customize for their use cases.


About the Authors

Aditya Krishnan is the head of Amazon Kinesis Video Streams. In this role he has the good fortune of working with customers, hardware and software partners, and a phenomenal engineering team to deliver on the vision of making it ridiculously easy to stream video from internet-enabled camera devices at massive scale.

 

 

 

Jagadeesh Pusapadi is a Solutions Architect with AWS working with customers on their strategic initiatives. He helps customers build innovative solutions on AWS Cloud by providing architectural guidance to achieve desired business outcomes.