AWS Machine Learning Blog

Real-time data labeling pipeline for ML workflows using Amazon SageMaker Ground Truth

High-quality machine learning (ML) models depend on accurately labeled, high-quality training, validation, and test data. As ML and deep learning models are increasingly integrated into production environments, it’s becoming more important than ever to have customizable, real-time data labeling pipelines that can continuously receive and process unlabeled data.

For example, you may want to create a consumer-facing application that regularly collects and sends new data objects to a data labeling pipeline, which produces labels and builds a dataset for model training or retraining. This pipeline creates a positive feedback loop that leads to more accurate, sophisticated models.

Amazon SageMaker Ground Truth streaming labeling jobs provide infrastructure and resources to create a continuously running labeling job that receives new data objects on demand and sends them to human workers to be labeled. You can chain multiple streaming labeling jobs together to create more intricate and refined data labeling pipelines.

Use this blog post to learn how to set up and customize Ground Truth streaming labeling jobs.

Walkthrough overview

In addition to discussing the benefits of using a streaming labeling job, such as eliminating delays, enforcing idempotency, and customizing input data sources, this post features two Jupyter notebooks that you can use to set up streaming labeling jobs. You can use these notebooks or follow the console instructions in this post to create a streaming labeling job using a supported, language-specific AWS Software Development Kit (SDK) of your choice.

The first notebook shows you how to create Ground Truth streaming labeling jobs. This notebook supports built-in and custom task types, which allow you to quickly create data labeling pipelines for various data types such as image, text, video, video frame, 3D point cloud, and more. This walkthrough demonstrates how to use Amazon Simple Notification Service (Amazon SNS) to send secure, real-time messages to a streaming labeling job to feed new data objects to human workers for labeling. You learn how you can set up notifications to receive the output data from that labeling task in real time, as soon as workers finish labeling a data object.

When you create a streaming labeling job, you can route the output data of that job to another streaming labeling job to create more complex data labeling pipelines and for data label verification and adjustment. This is referred to as chaining labeling jobs. You can use the second notebook with this post to learn how to chain two streaming labeling jobs together.

You can run both notebooks on default mode, requiring little to no input. Default mode creates image object detection (bounding box) labeling jobs and demonstrates how to send data objects to these labeling jobs. If you have your own data objects that you want to use, you can turn off default mode.

To get started, complete the Prerequisites and Launching a notebook instance and setting up the demo notebook sections in this post to gather the resources you need to complete this tutorial and, optionally, set up the Jupyter notebooks ground_truth_create_streaming_labeling_job.ipynb and ground_truth_create_chained_streaming_labeling_job.ipynb in an Amazon SageMaker notebook instance.

The following diagram illustrates the solution architecture.

Advantages of streaming labeling jobs

The first notebook shows you how to create Ground Truth streaming labeling jobs.

Real-time input channel

You can feed objects in real time and continuously to a labeling job. Amazon SNS allows you to configure topics to feed objects in real time to a running labeling job.

Long-running workflows

You can launch labeling jobs that can run for a long time if they’re actively being fed objects. Streaming jobs are designed to be long-running workflows that keep running until you choose to stop them.

Ground Truth will stop the job if it is idle for a long time. A job is defined as idle if Ground Truth doesn’t detect any objects waiting to be labeled in the system over a certain number of days. For example, if Ground Truth doesn’t receive new data objects from the SNS input topic and all the objects fed to the system are already labeled, a timer for idle time starts. If the idle timer hits a certain number, Ground Truth stops the labeling job.

In short, if objects are actively flowing through the system at regular cadence, and you can achieve a long-running workflow. For more information about configuring idle timers, see Stop a Streaming Labeling Job.

Eliminate delays

With streaming labeling jobs, objects can flow through your data labeling pipeline faster. Streaming jobs work in a sliding window manner, where Ground Truth keeps sending objects for labeling as long as slots are available. The slots are defined by the parameter MaxConcurrentTaskCount, which defines the maximum number of objects (slots) that can be filled by objects to be sent for labeling. When MaxConcurrentTaskCount is reached, you can view the number of data objects queued in Amazon Simple Queue Services (Amazon SQS).

For example, if MaxConcurrentTaskCount is 10, and 25 objects are sent via the input SNS topic, Ground Truth sends a maximum of 10 objects to the workers at a time and a maximum of 15 remaining objects are in the Amazon Simple Queue Service (Amazon SQS) queue. If a worker works on and submits 2 objects out of the 10 that were sent, only 8 slots are currently filled, and 2 more are sent to workers from the remaining 15 objects. This way, workers have a constant flow of objects coming in from your inputs, up to a maximum of 10 objects. There aren’t any delays resulting from batching objects. As workers work on objects, new objects are pumped in constantly and you can achieve data labeling with greater speed.

Rate limiting

You can limit and control how and when you feed data to workers. When you feed objects to your input SNS topic, they’re collected in an SQS queue in your account, named GroundTruth-<labeling-job-name>. If more objects are sent to the labeling job than the MaxConcurrentTaskCount, they remain in the SQS queue. Otherwise, they are sent to workers to be labeled. Any object in the SQS queue is available for a maximum of 14 days.

For example, if MaxConcurrentTaskCount is 1000, and 2,500 objects are sent to a streaming labeling job via an input SNS topic, Ground Truth sends a maximum of 1,000 objects to the workers at a time, and initially, 1,500 remain in the SQS queue. The speed of the workers determines how quickly the 1,500 objects in the queue are sent for labeling. If these objects remain in the queue for longer than expected, this serves as an indicator that you have sent more objects than can be worked on by workers in a given timeframe. If the data objects take longer than expected to label, you can adjust input to feed objects to Amazon SNS at a slower pace. You can also change the value of MaxConcurrentTaskCount to suit the pace of the worker.

To monitor the speed and quantity of data objects being fed into the SQS queue associated with a streaming labeling job, you can set up alarms for the queue with Amazon CloudWatch. For more information, see Available CloudWatch metrics for Amazon SQS. For example, you can set up an alarm on the ApproximateAgeOfOldestMessage metric to see how close your oldest data object is to the 14-day limit. When this alarm is trigged, you can take appropriate actions, like resending the object to the input SNS topic or notifying workers that tasks will expire if not completed within a given timeframe.

Output notifications

A new SNS channel is added as an output channel for your labeling job. When a worker completes a labeling job task from a Ground Truth streaming labeling job, Ground Truth uses your output topic to publish output data to one or more endpoints that you specify. To receive notifications when a worker finishes a labeling task, you must subscribe an endpoint to your SNS output topic. For example, you can subscribe an email, an AWS Lambda function, or an SQS queue to the SNS output topic used for labeling job, and any object labeled through Ground Truth appears in real time after labeling.

In addition to the SNS output topic, you can also use the frequent Amazon Simple Storage Service (Amazon S3) output file updates in the Amazon S3 output path. All labels are added to an output manifest file in Amazon S3. You can reference this file if, for example, the real-time output notifications were missed. If the S3 bucket is versioned, you can view and access different versions of the output manifest file.

Idempotency

You can use a unique identifier to distinguish the objects you feed to a labeling job and track them in the output. You can bring your own unique identifier, or take advantage of an auto-generated identifier Ground Truth creates if you don’t supply one.

When you send a data object to your streaming labeling job using an Amazon SNS message, you can specify your deduplication key and deduplication ID. The unique identifier helps make sure that each object sent for labeling is unique. If you send two objects with the same unique identifier, the latter object is considered a duplicate. This prevents in accidental injection of objects that weren’t intended and also provides an ID to track output data when labels are generated. For more information, see Duplicate Message Handling.

Drop objects into Amazon S3

You can set up your S3 buckets to automatically publish data labeling requests to your SNS input topic any time a data object is added to the bucket. With this setup, you can drop objects into the S3 bucket and they are automatically sent to your streaming labeling job.

For more information about setting up your S3 bucket and notifications, see the Sending data objects to your labeling job using an S3 bucket section below.

Solution overview

To complete this use case, use the notebook ground_truth_create_streaming_labeling_job.ipynb in the Amazon SageMaker Examples GitHub repo.

After completing the prerequisites, you can use this walkthrough to do the following:

  1. Launch a notebook instance and set up the demo notebook
  2. Launch a streaming job
  3. Monitor the job
  4. Send objects to an ongoing job
  5. Stop the labeling job

Streaming labeling jobs are launched using the Ground Truth API operation CreateLabelingJob in a supported language-specific AWS SDK.

Prerequisites

If you’re a new user of Ground Truth streaming labeling jobs, it’s recommended you review Ground Truth Streaming Labeling Jobs before completing this walkthrough.

To complete this walkthrough, you need the following:

  • An AWS account.
  • An S3 bucket in the same AWS Region you use to launch your streaming labeling job. If you’re using a demo notebook, this bucket must also be in the same Region as your Amazon SageMaker notebook instance. You can either specify this bucket in the notebook variable BUCKET, or use the default bucket in the Region that you create your notebook instance in. For more information, see How do I create an S3 Bucket?
  • An AWS Identity and Access Management (IAM) execution role with required permissions. The notebook automatically uses the role you used to create your notebook instance (see the next item in this list). Add the following permissions to this IAM role:
    • Attach managed policies AmazonSageMakerGroundTruthExecution. The following GIF demonstrates how to attach this policy to the role on the IAM console.

    • When you create your role, you specify Amazon S3 permissions. You can either allow that role to access all your resources in Amazon S3, or you can specify particular buckets. Make sure that your IAM role has access to the S3 bucket that you plan to use. This bucket must be in the same Region as your notebook instance.
  • A work team. A work team is a group of people that you select to label your data. A work team is a group of workers from a workforce, which is made up of workers engaged through Amazon Mechanical Turk, vendor-managed workers, or your own private workers that you invite to work on your tasks. Whichever workforce type you choose, Ground Truth takes care of sending tasks to workers. To preview the worker UI, use a private workforce and add yourself to the work team you use in the notebook.
    • To use a private or vendor workforce, record the Amazon Resource Name (ARN) of the work team you use—you need it in the accompanying Jupyter notebooks. The following GIF demonstrates how to quickly create a private work team on the Amazon SageMaker console.

    • If you don’t specify a private or vendor workforce, the notebook automatically uses the Mechanical Turk workforce. When you create the labeling job, you can specify the total amount you pay an individual worker for labeling a data object. To learn more, see Amazon SageMaker Ground Truth pricing.
  • If you’re not using default mode in the notebooks, you must supply a HTML worker task template. This template is used to render the human task UI that your workers use to complete tasks. You can copy your template directly to the notebooks, which provides logic to write the template to Amazon S3, or you can add the template to your S3 bucket and record the template Amazon S3 URI. For more information about sample templates, see Built-in Task Types. For more information about custom labeling workflows, see Step 2: Creating your custom labeling task template.
  • A list of label categories. The notebooks use this list to create a label category configuration file and upload it to Amazon S3. When you use default mode in the notebooks, this list is provided.
  • If you’re not using the notebooks, you need two Lambda functions to pre-process your input data (PreHumanTaskLambdaArn) and output data (AnnotationConsolidationLambdaArn). If you use one of the built-in task types, Ground Truth provides these functions.

Launching a notebook instance and setting up the demo notebook

To use the notebooks, you can launch an Amazon SageMaker notebook instance. For more information, see Create a Notebook Instance. When your notebook instance is active, complete the following steps to use the notebooks:

  1. On the Amazon SageMaker console in Notebook instances, locate your notebook instance.
  2. Choose Open Jupyter or Open Jupyter Lab.
  3. In Jupyter, choose the SageMaker Examples In Jupyter Lab, choose the Amazon SageMaker icon to see a list of example notebooks.
  4. In the Ground Truth Labeling Jobs section, select one of the following notebooks to use alongside this post. In Jupyter, choose Use next to a notebook to start using it. In Jupyter Lab, select the notebook, then choose Create Copy.
    1. ground_truth_create_streaming_labeling_job.ipynb
    2. ground_truth_create_chained_streaming_labeling_job.ipynb

Launching a streaming job

Streaming labeling jobs are created using the same API operation, CreateLabelingJob, as non-streaming labeling jobs. To create a streaming labeling job, you specify an input topic as your input data source, and an output topic as your output data source. New data objects are continuously sent to your labeling job through the input topic, and output data is sent to the output topic as soon as workers complete labeling tasks. You can configure your output topic to send a notification or trigger an event any time output data is received.

When you create a streaming labeling job, the input manifest file is optional.

You can use the Amazon SNS API operation CreateTopic to create your input and output topics, or you can use the Amazon SNS console. The response to a successful request to CreateTopic includes the topic ARN. You use the topic ARNs of your input and output topics in CreateLabelingJob in the parameters.

If the name of the topic contains GroundTruth (not case-sensitive) or SageMaker (not case-sensitive), the policy AmazonSageMakerGroundTruthExecution grants sufficient permissions to publish messages to your labeling job. If not, make sure to grant your IAM role permission to perform the actions sns:Publish and sns:Subscribe for your SNS topics.

Creating an SNS topic using the Amazon Python (Boto3) SDK

The notebook ground_truth_create_streaming_labeling_job.ipynb creates SNS topics using the AWS Python (Boto3) SDK. In the following code, replace LABELING_JOB_NAME with the name of the labeling job:

sns = boto3.client('sns')

# Create Input Topic
input_response = sns.create_topic(Name= LABELING_JOB_NAME + '-Input')
INPUT_SNS_TOPIC_ARN = input_response['TopicArn']

# Create Output Topic
output_response = sns.create_topic(Name= LABELING_JOB_NAME + '-Output')
OUTPUT_SNS_TOPIC_ARN = output_response['TopicArn']

Creating an SNS topic on the Amazon SNS console

To create an SNS topic on the Amazon SNS console, complete the following steps:

  1. On the Amazon SNS console, choose Topics.
  2. Choose Create topic.

  1. For Name, enter a name.
  2. For Display name, enter an optional display name.
  3. If required, add additional configurations for your topic, such as Encryption, Access policy, Delivery retry policy, Delivery status logging, and

After the topics are created, feed the input topic ARN to LabelingJobSnsDataSource.SnsTopicArn and the output topic ARN to OutputConfig.SnsTopicArn.

Creating a streaming labeling job using CreateLabelingJob

You must create Ground Truth streaming labeling jobs with the Amazon SageMaker API operation CreateLabelingJob.

The ground_truth_create_streaming_labeling_job.ipynb notebook walks you through creating the resources required and configuring the request.

If you’re not using this notebook, use an AWS SDK supported by CreateLabelingJob. For more information about using an API request to create a streaming labeling job, see Example: Use SageMaker API To Create Streaming Labeling Job. If you’re a new user of Ground Truth, it’s recommended that you use one of the image or text based built-in task types to familiarize yourself with Ground Truth streaming labeling jobs.

After you fill in the parameters of your request, submit the request to create a labeling job. Refer to the Use the CreateLabelingJob API to create a streaming labeling job section in the ground_truth_create_streaming_labeling_job.ipynb notebook. You can also use the AWS Command Line Interface (AWS CLI) or AWS SDK. For more information, see Example: Use SageMaker API To Create Streaming Labeling Job.

Monitoring the job

You can call DescribeLabelingJob after the job is created. Refer to the Use the DescribeLabelingJob API to describe a streaming labeling job section in the ground_truth_create_streaming_labeling_job.ipynb notebook.

Make sure the LabelingJobStatus is InProgress before feeding objects via the SNS channel. The following code is an example of how you can use DescribeLabelingJob (using the AWS Python (Boto3) SDK) to retrieve the labeling job status:

sagemaker = boto3.client('sagemaker')
sagemaker.describe_labeling_job(LabelingJobName=LABELING_JOB_NAME)['LabelingJobStatus']

If you specified the optional field S3DataSource.ManifestS3Uri in the CreateLabelingJob request, the objects in the Amazon S3 file are automatically sent to workers as soon as the labeling job starts. The LabelCounters element of the response to your DescribeLabelingJob request shows these objects as Unlabeled initially, and then HumanLabeled after they have been annotated and workers have submitted their work.

Amazon SQS offers a secure, durable, and available hosted queue. Streaming labeling jobs create an SQS queue in your account. You can check for the queue by the name GroundTruth-LABELING_JOB_NAME. The following code is an example of how you can use GetQueueUrl (using the AWS Python (Boto3) SDK) to retrieve the labeling job status:

sqs = boto3.client('sqs')
response = sqs.get_queue_url(QueueName='GroundTruth-' + LABELING_JOB_NAME.lower())

Sending objects to an ongoing job

After your labeling job has started, data objects can be fed to it through the console or the Amazon SNS API. For more information, see Send Data Objects Using Amazon SNS. The format of the SNS message that you use to send a data object to your labeling job is the same as the augmented manifest format.

For example, to send a new image object to an image classification labeling job, your message may look similar to the following:

{"source-ref": "s3://awsexamplebucket/example-image.jpg"}

If you create a text-based labeling job, your request may look similar to the following:

{"source": "Lorem ipsum dolor sit amet"}

Publishing a request on the Amazon SNS console

To publish a request to your labeling job on the Amazon SNS console, complete the following steps:

  1. On the Amazon SNS console, choose Topics.
  2. Choose your input topic.
  3. Choose Publish message.

Publishing a request using the Publish API operation

You can use the Amazon SNS API operation Publish to send a request to label a data object to your streaming labeling job via a supported AWS SDK.

The notebook demonstrates how to publish a message using this operation.

The following code is an example of how you can use the AWS Python (Boto3) SDK to send a request to Publish. Replace INPUT_TOPIC_ARN with the ARN of your input topic, and replace REQUEST with a request similar to the preceding examples.

sns = boto3.client('sns')
published_message = sns.publish(TopicArn=INPUT_TOPIC_ARN,Message=REQUEST)

After you publish a request, a call to DescribeLabelingJob shows Unlabeled incremented by 1:

"LabelCounters" : {
    'TotalLabeled': 0, 
    'HumanLabeled': 0, 
    'MachineLabeled': 0,  
    'FailedNonRetryableError': 0,  
    'Unlabeled': 1
}

Previewing the worker task

If you used a private workforce and made yourself a worker on the work team used to create the labeling job, you can navigate to your worker portal to preview the worker task. You can find the worker portal link in the Labeling workforces page on the Ground Truth console (on the Amazon SageMaker console) in the Region you used to launch the labeling job. This link is also included in the welcome email sent to you when you were added to the work team.

When a worker submits a data object after labeling it, it is sent to your output topic. Additionally, the results are periodically added to the S3 output bucket you specified when you created your labeling job in S3OutputPath.

Stopping the labeling job

You can use streaming labeling jobs in long-running workflows, and they run until you stop them. This allows you to continuously feed objects to the labeling job.

However, if the system detects no objects are available in the system to be labeled and is idle continuously for more than a certain number of days, GroundTruth attempts to stop the job. For more information, see Stopping Streaming Jobs.

You can stop your labeling job on the Ground Truth console or using the Ground Truth API operation StopLabelingJob. To use the console, complete the following steps:

  1. On the Amazon SageMaker console, choose Ground Truth. Be sure to use the Region you used to launch your labeling job.
  2. Select the labeling job you want to stop.
  3. From the Actions drop-down menu, choose Stop job.

The final cells in the notebook demonstrate how you can stop a labeling job using the AWS Python (Boto3) SDK:

sagemaker = boto3.client('sagemaker')
sagemaker.stop_labeling_job(LabelingJobName=LABELING_JOB_NAME)

When a labeling job has been successfully stopped, its status shows as Stopped.

Other Features of Streaming Labeling Jobs

The following sections cover additional features of streaming labeling jobs: sending objects to your labeling job by dropping them in an S3 bucket, and chaining multiple labeling jobs together.

Sending data objects to your labeling job using an S3 bucket

You can set up your S3 buckets to automatically publish data labeling requests to your SNS input topic any time a data object is added to the bucket. With this setup, you can drop objects into the S3 bucket and they are automatically sent to your streaming labeling job.

To configure an S3 bucket to automatically send data objects to your SNS input topic, you need to add an access policy to the input topic to allow Amazon S3 to add an event to it. The following code illustrates the type of policy to attach with your topic ARN (replace SNS-topic-ARN):

{
 "Version": "2012-10-17",
 "Id": "example-ID",
 "Statement": [
  {
   "Sid": "example-statement-ID",
   "Effect": "Allow",
   "Principal": {
    "AWS":"*"  
   },
   "Action": [
    "SNS:Publish"
   ],
   "Resource": "SNS-topic-ARN",
   "Condition": {
      "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:<bucket-name>" },
      "StringEquals": { "aws:SourceAccount": "<bucket-owner-account-id>" }
   }
  }
 ]
}

To set up your S3 bucket to send data objects to your streaming labeling job on the Amazon S3 console, complete the following steps:

  1. On the Amazon S3 console, choose the bucket that you want to use to send data objects to your labeling job.
  2. On the Properties tab, under Advanced settings, choose Events.
  3. Choose Add notification.
  4. Give your notification a name.
  5. Select All object create events.
  6. Optionally, enter a prefix if you want to drop data objects into a prefix within the S3 bucket.
  7. If you only want to send specific types of data objects to your SNS input topic, specify a suffix. For example, to ensure only image files are sent to your SNS input topic, you can enter .jpg,.png,.jpeg.
  8. From the Send to drop-down menu, choose SNS Topic.
  9. Choose the SNS input topic you used or will use to create your labeling job.
  10. Choose Save.

The following GIF demonstrates how to set up this configuration on the Amazon S3 console.

Chaining

To create sophisticated, persistent, real-time data labeling pipelines that allow you to add multiple types of annotations to data objects, audit and verify labels, and more, you can chain multiple streaming labeling jobs.

Chaining allows you to send the output of one streaming labeling job to another streaming labeling job. For example, the output data of Job 1 can be sent to Job 2 as input, the output data of Job 2 can flow to Job n-1, and the data of Job n-1 can flow to Job n in real time.

As an example use case, you could use Job 1 to add a semantic segmentation mask to a sequence of video frames. You then use Job 2 to add bounding boxes to identify and localize data objects in each frame. Finally, you use Job 3 to verify and adjust labels as needed.

To set this up, you use the output SNS topic of Job 1 as the input SNS topic of Job 2. Similarly, you use the output SNS topic of Job 2 as the input SNS topic of Job 3, and so on. The following diagram illustrates this architecture.

After you set up your jobs this way, a data object flowing through Job 1 makes its way to Job 2 automatically after passing through Job 1. The following are some possibilities for chaining with two jobs:

  • Specify different label attribute names for jobs with a similar task type. For example, Job 1 (label data) chains to Job 2 (adjust, review, and verify annotations from Job 1).
  • Use different label attribute names for jobs with different task types. For example, Job 1 (labeling for image classification) chains to Job 2 (labeling for object detection).
  • Use the same label attribute names for both jobs. For example, Job 1 (labeling) chains to Job 2 (partial labeled data of Job 1 flows to Job 2).

You can use the notebook ground_truth_create_chained_streaming_labeling_job.ipynb to learn how to chain two streaming labeling jobs. This example demonstrates the first use case in the preceding list (different label attribute names for jobs with similar task types). When used on default mode, this notebook chains a bounding box (object detection) job (Job 1) to a bounding box audit job. Any bounding boxes annotated in Job 1 can be adjusted in Job 2 in real time. You can generalize this use case to set up quality-check workflows, in which a work team reviews the annotations of another work team.

You can also use the notebook to set up any kind of chained streaming jobs to achieve multiple job-chaining configurations.

Conclusion

This post covers the benefits of using Ground Truth streaming feature and how to create and chain streaming labeling jobs. This post merely scratches the surface of what Ground Truth streaming can do.

To get started, use one of the notebooks included in this post to launch and experiment with streaming labeling jobs, or see Create a Streaming Labeling Job.

Let us know what you think in the comments.

 


About the Authors

Priyanka Gopalakrishna is a software engineer at Amazon AI. Her focus is on solving labeling problems using machine learning and building scalable AI solutions using distributed systems.

 

 

 

 

Talia Chopra is a Technical Writer in AWS specializing in machine learning and artificial intelligence. She works with multiple teams in AWS to create technical documentation and tutorials for customers using Amazon SageMaker, MxNet, and AutoGluon.