AWS Machine Learning Blog

Use Amazon Mechanical Turk with Amazon SageMaker for supervised learning

Supervised learning needs labels, or annotations, that tell the algorithm what the right answers are in the training phases of your project. In fact, many of the examples of using MXNet, TensorFlow, and PyTorch start with annotated data sets you can use to explore the various features of those frameworks. Unfortunately, when you move from the examples to application, it’s much less common to have a fully annotated set of data at your fingertips. This tutorial will show you how you can use Amazon Mechanical Turk (MTurk) from within your Amazon SageMaker notebook to get annotations for your data set and use them for training.

TensorFlow provides an example of using an Estimator to classify irises using a neural network classifier. This example can also be found in the SageMaker sample-notebooks library or in the SageMaker Examples project on GitHub. The tutorial and sample notebook both use the Iris data set, which includes measurements for three related Iris species and the species they are associated with. Those data points are then used to train a model that can predict the species of iris based on four measurements.

From left to right, Iris setosa (by Radomil, CC BY-SA 3.0), Iris versicolor (by Dlanglois, CC BY-SA 3.0), and Iris virginica (by Frank Mayfield, CC BY-SA 2.0).

This is a great example for getting started with TensorFlow Estimators, but it would be much more difficult if all you had were images of each Iris and the associated measurements. Without annotated data, you could find yourself manually annotating the images and wasting a lot of time that you’d rather spend developing your model.

MTurk provides human intelligence through an API and is ideally suited to providing a wide range of annotations that can then be used in your training. With MTurk you get access to a 24x7x365 global workforce that can supply annotations for your data set. Simply define a task that you want Workers to complete for each of your data points, post it to the marketplace, and retrieve your results in a matter of minutes or hours. MTurk allows you to quickly get the annotations you need without the need to hire a team or spend your own time.

From within your SageMaker notebook you can quickly send tasks to MTurk Workers for annotation, review the results, and move on to your training. In this tutorial we’ll build a version of the Iris data set that includes images instead of species information, and then ask MTurk Workers to identify the species based on those images. Finally, we can use the Amazon SageMaker Python SDK to construct and train a neural network classifier using TensorFlow’s tf.estimator. The notebook for this tutorial can be downloaded here. Note that spinning up an Amazon SageMaker notebook, gathering annotations with MTurk, training your model, and deploying it will all incur fees you will be charged for. Don’t forget to delete your resources at completion of training so you won’t continue to be billed.

You’ll perform eight steps to annotate the data and train a model:

  1. Set up your Amazon Mechanical Turk Requester account and link it to your AWS account.
  2. Load the Iris data set and modify it to include images.
  3. Define an MTurk task for annotating the data set.
  4. Submit the task to MTurk Workers for annotation.
  5. Retrieve and reconcile the results provided by Workers.
  6. Construct and train a model using the training data.
  7. Deploy the model to an endpoint and use it to classify new samples.
  8. Clean up your resources

Note that this project makes use of the images above within both the data set and the MTurk task itself. These images are used here under the Creative Commons Attribution-ShareAlike 2.0 and 3.0 licenses and have been provided by Radomil (Iris setosaCC BY-SA 3.0), Dlanglois (Iris versicolor, CC BY-SA 3.0), and Frank Mayfield (Iris virginica, CC BY-SA 2.0).

Step 1. Amazon Mechanical Turk Account setup

If you haven’t already, you’ll need to set up an Amazon Mechanical Turk Requester account that is linked to the AWS account you’re using with Amazon SageMaker. To get started, visit https://requester.mturk.com and create a new account.

After you’ve set up your MTurk account, you need to link it to the AWS account you’re using with Amazon SageMaker. Sign into your AWS account as a root user by using the email address for the root of your account. If you see the IAM user sign-in page, choose Sign-in using root account credentials. Then go to https://requester.mturk.com/developer to link them together.

We now need to update the permissions for your Amazon SageMaker role to include the AmazonMechanicalTurkFullAccess policy. To do this, in the AWS Management Console open the IAM console and view your roles. Select the IAM role associated with your SageMaker instance and choose Attach policy. Search for AmazonMechanicalTurkFullAccess and add the policy to the role.

To post tasks to MTurk for Workers to complete you first need to purchase prepaid HITs that can be used to reward Workers that complete your tasks. A HIT (Human Intelligence Task) is how individual tasks are represented in MTurk. You can do this at https://requester.mturk.com/account.

Note: As defined in the code that follows, the tutorial will cost you between $18.00 and $19.00 in Worker rewards and fees.

We will be using the xmltodict library to handle results returned from Mturk, so you will want to install it in your SageMaker notebook instance.

!pip install xmltodict

We’ll also import xmltodict and a few other libraries that we’ll need. Specify the Amazon S3 bucket we’ll use for storing our annotated data, which will be used for training.

import xmltodict
import boto3
import json
import pandas as pd
import sagemaker
import io

s3 = boto3.resource('s3')
training_bucket_name = '<bucket-name>'
training_bucket = s3.Bucket(training_bucket_name)

MTurk has two environments you can use, Production and Sandbox. When you use the Production environment your tasks are visible to Workers at https://worker.mturk.com. There is also a Sandbox environment that can be used for testing. Workers won’t see your tasks in the Sandbox, but you can visit https://workersandbox.mturk.com to do them yourself and test your task interfaces. There is no cost to use the Sandbox environment, and it’s recommended that you test there first to make sure your task returns the data you need before moving to the Production environment. If you want to test in the Sandbox, note that you will need to create an additional account in the Sandbox and link it to your AWS account.

The following instructions create a client in one of the two environments, depending on the value of create_hits_in_production. If you want to test in the Sandbox, change this value to False.

create_hits_in_production = True
environments = {
        "production": {
            "endpoint": "https://mturk-requester.us-east-1.amazonaws.com",
            "preview": "https://www.mturk.com/mturk/preview"
        },
        "sandbox": {
            "endpoint": "https://mturk-requester-sandbox.us-east-1.amazonaws.com",
            "preview": "https://workersandbox.mturk.com/mturk/preview"
        },
}
mturk_environment = environments["production"] if create_hits_in_production else environments["sandbox"]

mturk = boto3.client(
    service_name='mturk',
    region_name='us-east-1',
    endpoint_url=mturk_environment['endpoint'],
)

To confirm your account is set up correctly, make a call to get your account balance. If you’ve connected to the Sandbox your balance is always $10,000. This task will require between $18.00 and $19.00 to complete so you should add $19.00 to your Production account. The funds must be in your account before you submit tasks for Workers to complete.

print(mturk.get_account_balance()['AvailableBalance'])

Step 2. Import the training data set

We’ll get started by importing the iris data set from the UCI Machine Learning Repository into a pandas DataFrame. We appreciate the assistance provided in making this dataset available: Dua, D. and Karra Taniskidou, E. (2017). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

For this tutorial we’ll replace the species information already included in this data set with images of each flower. Note that in this example we’ll be using the same three images for all of the data points. In a real world scenario each of the images would be different, but the resulting label would be the same.

# Load the iris data set
training_df = pd.read_csv('https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data', header=None)

# Name the columns
training_df.columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']

# Add an image_url column and remove the species column
def species_to_url(species):
    if (species == 'Iris-setosa'): return 'https://upload.wikimedia.org/wikipedia/commons/5/56/Kosaciec_szczecinkowaty_Iris_setosa.jpg'
    elif (species == 'Iris-versicolor'): return 'https://upload.wikimedia.org/wikipedia/commons/4/41/Iris_versicolor_3.jpg'
    else: return 'https://upload.wikimedia.org/wikipedia/commons/3/38/Iris_virginica_-_NRCS.jpg'
image_urls = [ species_to_url(row.species) for index, row in training_df.iterrows() ]
training_df['image_url'] = image_urls
del training_df['species']

training_df

Step 3. Define an MTurk task

MTurk accepts an XML document containing the HTML that will be displayed to Workers. Workers will see this HTML for each item that is published. The HTML file for this task can be downloaded here.

html_layout = open('./IrisAnnotation.html', 'r').read()
QUESTION_XML = """<HTMLQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2011-11-11/HTMLQuestion.xsd">
        <HTMLContent><![CDATA[{}]]></HTMLContent>
        <FrameHeight>1000</FrameHeight>
        </HTMLQuestion>"""
question_xml = QUESTION_XML.format(html_layout)

Within the HTML loaded in this step is a variable ${content} that will be replaced with an image for each data point when the tasks are created.

In MTurk each task is represented by a Human Intelligence Task (HIT), which is an individual item you want to be annotated by one or more Workers. We want two Workers to complete each item so we’ll set the MaxAssignments at two. Assignments represent each individual Worker response. The definition that follows also requests that the HIT remain live on the worker.mturk.com website for no more than four hours and that Workers provide a response for each item in less than five minutes. Each response has a reward of $0.05, so the total Worker reward for each HIT will cost $0.10 plus $0.02 in MTurk fees. An appropriate title, description, and keywords are also provided to let Workers know what is involved in this task.

task_attributes = {
    'MaxAssignments': 2,                 
    'LifetimeInSeconds': 60*60*4,         # How long the task will be available on the MTurk website (4 hours)
    'AssignmentDurationInSeconds': 60*5,  # How long will Workers have to complete each item (5 minutes)
    'Reward': '0.05',                     # The reward you will offer Workers for each response
    'Title': 'Classify images of flowers',
    'Description': 'Provide the species of iris in each image',
    'Keywords': 'classification, image'
}

We assign each item to two Workers because we want to ensure a that we have at least two Workers who agree on an annotation before we use it for training. If two Workers don’t agree on a data point, we’ll add additional Assignments until we get at least two Workers that agree. This is defined later in the Get Results step.

Step 4. Publish the tasks to MTurk

Now that we’ve defined our task and dataset, we’re ready to send it to MTurk Workers to be annotated. For each row in the dataset we’ll substitute the image_url into our question and create a HIT on MTurk. When the HIT is created we’ll capture the HITId that is assigned to this item so that we can retrieve the results later.

hit_type_id = ''
results = []

for index, row in training_df.iterrows():
    response = mturk.create_hit(
        **task_attributes,
        Question=question_xml.replace('${image_url}',row['image_url'])
    )
    hit_type_id = response['HIT']['HITTypeId']
    results.append({
        'hit_id': response['HIT']['HITId']
    })

print("You can view the HITs here:")
print(mturk_environment['preview'] + "?groupId={}".format(hit_type_id))

Step 5. Retrieve the annotation results

In many cases, results of annotation tasks will be available in a matter of minutes, although larger tasks can take a few hours or more. We can check the progress of the project any time by retrieving the status of each HIT and getting the results for each Assignment.

For each item we’ll start by getting the status of the HIT. The most common states are:

  • Assignable: When the task is available to be accepted by a Worker
  • Unassignable: When the task is being worked on by Workers and can no longer be accepted by additional Workers
  • Reviewable: When all of the responses have been received

Next we’ll retrieve all of the Worker Reponses for this item by retrieving the Assignments. We’ll extract the Worker’s response from each assignment and store it in an array of answers. We’ll also approve all of the Assignments immediately so that Workers get credited with their reward immediately.

After we’ve retrieved all of the answers for an item, we can check to see if at least two Workers agree. If they agree, we use that as our annotation. If not, we’ll add an additional Assignment to get additional Worker input.

species_count = 0
for item in results:
    
    # Get the status of the HIT
    hit = mturk.get_hit(HITId=item['hit_id'])
    item['status'] = hit['HIT']['HITStatus']

    # Get a list of the Assignments that have been submitted by Workers
    assignmentsList = mturk.list_assignments_for_hit(
        HITId=item['hit_id'],
        AssignmentStatuses=['Submitted', 'Approved'],
        MaxResults=10
    )

    # Get the assignments that have been submitted and capture a count in the results
    assignments = assignmentsList['Assignments']
    item['assignments_submitted_count'] = len(assignments)

    answers = []
    for assignment in assignments:
    
        # Retrieve the attributes for each Assignment
        worker_id = assignment['WorkerId']
        assignment_id = assignment['AssignmentId']
        
        # Retrieve the value submitted by the Worker from the XML
        answer_dict = xmltodict.parse(assignment['Answer'])
        answer = answer_dict['QuestionFormAnswers']['Answer']['FreeText']
        answers.append(int(answer))
        
        # Approve the Assignment (if it hasn't already been approved)
        if assignment['AssignmentStatus'] == 'Submitted':
            mturk.approve_assignment(
                AssignmentId=assignment_id,
                OverrideRejection=False
            )
    
    # Add the answers that have been retrieved for this item to the results
    item['answers'] = answers
    
    # If we've received at least 2 answers for the same category, use that answer
    if len(answers) > 1:
        for species in [0,1,2]:
            if answers.count(species) >= 2:
                item['species'] = species
                species_count += 1
                
    # If none of the Workers agree after all answers have been provided, 
    # add an additional Assignment to break the tie
    if len(answers) == hit['HIT']['MaxAssignments'] and 'species' not in item:
        extend_result = mturk.create_additional_assignments_for_hit(HITId=item['hit_id'], NumberOfAdditionalAssignments=1)
        print("Extended HIT {} to {} assignments".format(item['hit_id'], hit['HIT']['MaxAssignments'] + 1))

print("Irises annotated: {}".format(species_count))

Now that we’ve retrieved all of our annotations, we can merge the results back into our DataFrame. We’ll also split the DataFrame into train and test datasets that we’ll use to train the model.

# merge the results back into the dataframe
results_df = pd.merge(training_df, pd.DataFrame(results), left_index=True, right_index=True)[['sepal_length','sepal_width','petal_length','petal_width','species']]

# ensure the species column is defined as an int
results_df['species'] = results_df['species'].astype(int)

# split the frame into training and test
test_df = results_df.sample(n=30)
test_df.columns  = [30,4,'setosa','versicolor','virginica']
train_df = results_df.drop(test_df.index)
train_df.columns = [120,4,'setosa','versicolor','virginica']

train_df

Now we can store them to S3 as CSV files so that we can begin training.

csv_buffer = io.StringIO()
train_df.to_csv(csv_buffer, index=False)
s3.Object(training_bucket_name, 'trainingdata/iris_training.csv').put(Body=csv_buffer.getvalue())

csv_buffer = io.StringIO()
test_df.to_csv(csv_buffer, index=False)
s3.Object(training_bucket_name, 'trainingdata/iris_test.csv').put(Body=csv_buffer.getvalue())

Step 6. Train the model

We can now train the estimator using this data. Here we’ll use iris_dnn_classify.py, the training script that is used in the Amazon SageMaker example. This can be downloaded here. For more information about how this model is set up, reference the notebook associated with this sample.

We’ll start by initializing a few variables.

from sagemaker import get_execution_role

#Bucket location to save your custom code in tar.gz format.
custom_code_upload_location = 's3://{}/customcode/tensorflow_iris'.format(training_bucket_name)

#Bucket location where results of model training are saved.
model_artifacts_location = 's3://{}/artifacts'.format(training_bucket_name)

#IAM execution role that gives SageMaker access to resources in your AWS account.
role = get_execution_role()

We can use the Amazon SageMaker Python SDK to run our local training script inside the pre-built SageMaker TensorFlow container by passing the path to the iris_dnn_classifier.py file to the sagemaker.TensorFlow init method. This contains the functions for defining your estimator.

To train the model, we’ll pass the location of our annotated data to the fit() method.

from sagemaker.tensorflow import TensorFlow

iris_estimator = TensorFlow(entry_point='iris_dnn_classifier.py',
                            role=role,
                            output_path=model_artifacts_location,
                            code_location=custom_code_upload_location,
                            train_instance_count=1,
                            train_instance_type='ml.c4.xlarge',
                            training_steps=1000,
                            evaluation_steps=100)

train_data_location = 's3://{}/trainingdata'.format(training_bucket_name)
iris_estimator.fit(train_data_location)

Step 7. Deploy the Trained Model

The deploy() method creates an endpoint that serves prediction requests in real time so that we can invoke it using predict().

iris_predictor = iris_estimator.deploy(initial_instance_count=1,
                                       instance_type='ml.m4.xlarge')

iris_predictor.predict([6.4, 3.2, 4.5, 1.5])

Step 8. Clean up your resources

When you’re done with this tutorial, you can avoid incurring unnecessary charges by using the AWS Management Console to delete the resources that you created for this exercise.

  1. Open the Amazon SageMaker console at https://console.aws.amazon.com/sagemaker/ and delete the endpoint, endpoint configuration, and the model. Then stop your notebook instance and delete it.
  2. Open the Amazon S3 console at https://console.aws.amazon.com/s3/ and delete the bucket that you created for storing model artifacts and the training dataset.
  3. Open the IAM console at https://console.aws.amazon.com/iam/ and delete the IAM role. If you created permission policies, you can delete them, too.
  4. Open the Amazon CloudWatch console at https://console.aws.amazon.com/cloudwatch/ and delete all of the log groups that have names starting with /aws/sagemaker/.

Conclusion

As you can see, MTurk makes it possible to build the annotated data you need for your project. We started with an unannotated data set, created an MTurk task to annotate it, retrieved our Worker provided results, and then started training our model, all within a SageMaker notebook. As a next step, we could use MTurk Workers to validate the results provided by your model. This is particularly powerful when your model returns low confidence predictions for a portion of your data. By using MTurk to review those items, you can get new annotations that you’ll be able to use to re-train your model. By iteratively using MTurk to provide annotations, you can steadily refine your model as new data is available. For more examples of how to use MTurk in a variety of contexts, check out the tutorials provided on the MTurk Blog.

Amazon Mechanical Turk is a powerful tool for any data scientist that needs to collect, annotate, or validate data for training their models. With easy access to MTurk from within Amazon SageMaker, you get access to an on-demand workforce that can help you quickly and economically get the data you need for your project.


About the Author

Dave Schultz leads Business Development for Amazon Mechanical Turk. He helps customers find ways to apply human intelligence to complex problems in machine learning and data management. In his spare time, he enjoys woodworking and the odd programming project.