AWS Developer Blog

Serverless Service Discovery: Part 3: Registration

by Magnus Bjorkman | on | in Python | Permalink | Comments |  Share

In this, the third part of our serverless service discovery series, we will show how to configure Amazon API Gateway to require AWS Identity and Access Management (IAM) for authentication and how to create a V4 signature to call our register and deregister methods.

We have created all the functions required to manage our API and code, so we can jump directly into creating our new functions.

Registering and Deregistering Services

We start by creating a Lambda function for registering a service:


def lambda_handler(api_parameters, context):
    """Lambda hander for registering a service."""
    logger.info("lambda_handler - service_name: %s"
                " service_version: %s"
                % (api_parameters["service_name"],
                   api_parameters["service_version"]))

    table = boto3.resource('dynamodb',
                           region_name='us-east-1').Table('Services')

    table.put_item(
           Item={
                'name': api_parameters["service_name"],
                'version': api_parameters["service_version"],
                'endpoint_url': api_parameters["endpoint_url"],
                'ttl': int(api_parameters["ttl"]),
                'status': api_parameters["status"],
            }
        )

This function takes the input and stores it in Amazon DynamoDB. If you call the function with the same service name and version (our DynamoDB key), then it will overwrite the existing item.

Followed by the function to deregister:


def lambda_handler(api_parameters, context):
    """Lambda hander for deregistering a service."""
    logger.info("lambda_handler - service_name: %s"
                " service_version: %s"
                % (api_parameters["service_name"],
                   api_parameters["service_version"]))

    table = boto3.resource('dynamodb',
                           region_name='us-east-1').Table('Services')

    table.delete_item(
            Key={
                'name': api_parameters["service_name"],
                'version': api_parameters["service_version"]
            }
        )

The function removes the item from the DynamoDB table based on the service name and version.

We need to add the new functions and API methods to the Swagger file:


{
  "swagger": "2.0",
  "info": {
    "title": "catalog_service",
    "version": "1.0.0"
  },
  "basePath": "/v1",
  "schemes": ["https"],
  "consumes": ["application/json"],
  "produces": ["application/json"],
  "paths": {
    "/catalog/{serviceName}/{serviceVersion}": {
      "parameters": [{
        "name": "serviceName",
        "in": "path",
        "description": "The name of the service to look up.",
        "required": true,
        "type": "string"
      },
      {
        "name": "serviceVersion",
        "in": "path",
        "description": "The version of the service to look up.",
        "required": true,
        "type": "string"
      }],
      "get": {
        "responses": {
          "200": {
            "description": "version information"
          },
          "404": {
            "description": "service not found"
          }
        },
        "x-amazon-apigateway-integration": {
          "type": "aws",
          "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$catalog_serviceARN$/invocations",
          "httpMethod": "POST",
          "requestTemplates": {
            "application/json": "{\"service_name\": \"$input.params('serviceName')\",\"service_version\": \"$input.params('serviceVersion')\"}"
          },
          "responses": {
            "default": {
              "statusCode": "200",
              "schema": {
                "$ref": "#/definitions/CatalogServiceModel"
              }
            },
            ".*NotFound.*": {
              "statusCode": "404",
              "responseTemplates" : {
                 "application/json": "{\"error_message\":\"Service Not Found\"}"
                } 
            } 
          }
        }
      }
    },
    "/catalog/register": {
      "post": {
        "responses": {
          "201": {
            "description": "service registered"
          }
        },
        "parameters": [{
          "name": "body",
          "in": "body",
          "description": "body object",
          "required": true,
          "schema": {
            "$ref":"#/definitions/CatalogRegisterModel"
          }
        }],
        "x-amazon-apigateway-auth" : {
          "type" : "aws_iam" 
        },
        "x-amazon-apigateway-integration": {
          "type": "aws",
          "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$catalog_registerARN$/invocations",
          "httpMethod": "POST",
          "requestTemplates": {
            "application/json": "$input.json('$')"
          },
          "responses": {
            "default": {
              "statusCode": "201"
            } 
          }
        }
      } 
    },
    "/catalog/deregister": {
      "post": {
        "responses": {
          "201": {
            "description": "service deregistered"
          }
        },
        "parameters": [{
          "name": "body",
          "in": "body",
          "description": "body object",
          "required": true,
          "schema": {
            "$ref":"#/definitions/CatalogDeregisterModel"
          }
        }],
        "x-amazon-apigateway-auth" : {
          "type" : "aws_iam" 
        },
        "x-amazon-apigateway-integration": {
          "type": "aws",
          "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$catalog_deregisterARN$/invocations",
          "httpMethod": "POST",
          "requestTemplates": {
            "application/json": "$input.json('$')"
          },
          "responses": {
            "default": {
              "statusCode": "201"
            } 
          }
        }
      } 
    }
  },
  "definitions": {
    "CatalogServiceModel": {
      "type": "object",
      "properties": {
        "endpoint_url": {
          "type": "string"
        },
        "ttl": {
          "type": "integer"
        },
        "status": {
          "type": "string"
        }
      },
      "required": ["endpoint_url", "ttl", "status"]
    },
    "CatalogRegisterModel": {
      "type": "object",
      "properties": {
        "service_name": {
          "type": "string"
        },
        "service_version": {
          "type": "string"
        },
        "endpoint_url": {
          "type": "string"
        },
        "ttl": {
          "type": "integer"
        },
        "status": {
          "type": "string"
        }
      },
      "required": ["service_name","service_version","endpoint_url", "ttl", "status"]
    },
    "CatalogDeregisterModel": {
      "type": "object",
      "properties": {
        "service_name": {
          "type": "string"
        },
        "service_version": {
          "type": "string"
        }
      },
      "required": ["service_name","service_version"]
    }
  }
}

The new methods will be POST-based, so we need to define models (CatalogRegisterModel and CatalogDeregisterModel) for the data passed through the method body. After API Gateway processes the models, the JSON objects will be passed, as is, to the Lambda functions.

We set the x-amazon-apigateway-auth element to the type of aws_iam for the register and deregister methods, so API Gateway will require a V4 signature when we access them.

We can now deploy our new functions:


ACCOUNT_NUMBER = _your account number_

create_deployment_package("/tmp/catalog_register.zip", ["catalog_register.py"])
catalog_register_arn = create_lambda_function(
                       "/tmp/catalog_register.zip",
                       "catalog_register",
                       "arn:aws:iam::"+ACCOUNT_NUMBER+":role/lambda_s3",
                       "catalog_register.lambda_handler",
                       "Registering a service.",
                       ACCOUNT_NUMBER)
replace_instances_in_file("swagger.json",
                          "/tmp/swagger_with_arn.json",
                          "$catalog_registerARN$", catalog_register_arn)
create_deployment_package("/tmp/catalog_deregister.zip",
                          ["catalog_deregister.py"])
catalog_deregister_arn = create_lambda_function(
                       "/tmp/catalog_deregister.zip",
                       "catalog_deregister",
                       "arn:aws:iam::"+ACCOUNT_NUMBER+":role/lambda_s3",
                       "catalog_deregister.lambda_handler",
                       "Deregistering a service.",
                       ACCOUNT_NUMBER)
replace_instances_in_file("/tmp/swagger_with_arn.json",
                          "/tmp/swagger_with_arn.json",
                          "$catalog_deregisterARN$", catalog_deregister_arn)
catalog_service_arn = get_function_arn("catalog_service")
replace_instances_in_file("/tmp/swagger_with_arn.json",
                          "/tmp/swagger_with_arn.json",
                          "$catalog_serviceARN$", catalog_service_arn)
api_id = update_api("/tmp/swagger_with_arn.json")
deploy_api(api_id, "/tmp/swagger_with_arn.json", "dev")

We can try out the new register service like this:


json_body = {
            "service_name": "registerservice3",
            "service_version": "1.0",
            "endpoint_url": "notarealurlregister3",
            "ttl": "300",
            "status": "healthy"
            }
request_url = "https://yourrestapi.execute-api.us-east-1.amazonaws.com/"\
              "dev/catalog/register"
response = requests.post(
            request_url,
            data=json.dumps(json_body))
if(not response.ok):
    logger.error("Error code: %i" % (response.status_code,))


We should get something like this:


ERROR:root:Error code: 403

Signing a Request with Signature Version 4

To successfully call our new services, we need to implement a client that will sign the request to the API with a Version 4 signature. First we implement the functions that creates the signature:


from botocore.credentials import get_credentials
from botocore.session import get_session
import requests
import json
import logging
import sys
import datetime
import hashlib
import hmac
import urlparse
import urllib
from collections import OrderedDict

def sign(key, msg):
    """Sign string with key."""
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()


def getSignatureKey(key, dateStamp, regionName, serviceName):
    """Create signature key."""
    kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, 'aws4_request')
    return kSigning


def create_canonical_querystring(params):
    """Create canonical query string."""
    ordered_params = OrderedDict(sorted(params.items(), key=lambda t: t[0]))
    canonical_querystring = ""
    for key, value in ordered_params.iteritems():
        if len(canonical_querystring) > 0:
            canonical_querystring += ","
        canonical_querystring += key+"="+value
    return canonical_querystring


def sign_request(method, url, credentials, region, service, body=''):
    """Sign a HTTP request with AWS V4 signature."""
    ###############################
    # 1. Create a Canonical Request
    ###############################
    t = datetime.datetime.utcnow()
    amzdate = t.strftime('%Y%m%dT%H%M%SZ')
    # Date w/o time, used in credential scope
    datestamp = t.strftime('%Y%m%d')

    # Create the different parts of the request, with content sorted
    # in the prescribed order
    parsed_url = urlparse.urlparse(url)
    canonical_uri = parsed_url.path
    canonical_querystring = create_canonical_querystring(
                              urlparse.parse_qs(parsed_url.query))
    canonical_headers = ("host:%s\n"
                         "x-amz-date:%s\n" %
                         (parsed_url.hostname, amzdate))
    signed_headers = 'host;x-amz-date'
    if (not (credentials.token is None)):
        canonical_headers += ("x-amz-security-token:%s\n") % (credentials.token,)
        signed_headers += ';x-amz-security-token'

    payload_hash = hashlib.sha256(body).hexdigest()
    canonical_request = ("%s\n%s\n%s\n%s\n%s\n%s" %
                         (method,
                          urllib.quote(canonical_uri),
                          canonical_querystring,
                          canonical_headers,
                          signed_headers,
                          payload_hash))

    #####################################
    # 2. Create a String to Sign
    #####################################
    algorithm = 'AWS4-HMAC-SHA256'
    credential_scope = ("%s/%s/%s/aws4_request" % 
                        (datestamp,
                         region,
                         service))
    string_to_sign = ("%s\n%s\n%s\n%s" %
                       (algorithm,
                        amzdate,
                        credential_scope,
                        hashlib.sha256(canonical_request).hexdigest()))
    #####################################
    # 3. Create a Signature
    #####################################
    signing_key = getSignatureKey(credentials.secret_key,
                                  datestamp, region, service)
    signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'),
                         hashlib.sha256).hexdigest()

    ######################################################
    # 4. Assemble request to it can be used for submission
    ######################################################
    authorization_header = ("%s Credential=%s/%s, "
                            "SignedHeaders=%s, "
                            "Signature=%s" %
                            (algorithm,
                             credentials.access_key,
                             credential_scope,
                             signed_headers,
                             signature))
    headers = {'x-amz-date': amzdate, 'Authorization': authorization_header}
    if (not (credentials.token is None)):
        headers['x-amz-security-token'] = credentials.token
    request_url = ("%s://%s%s" % 
                   (parsed_url.scheme,parsed_url.netloc,canonical_uri))
    if (len(canonical_querystring) > 0):
        request_url += ("?%s" % (canonical_querystring,))

    return request_url, headers, body

The main function, sign_request, can sign requests for both POST and GET methods. It also works with both short and long term credentials. For more information about creating Signature Version 4 requests, see Signing Requests

We implement the following method to submit a POST request:


def signed_post(url, region, service, data, **kwargs):
    """Signed post with AWS V4 Signature."""
    credentials = get_credentials(get_session())

    request_url, headers, body = sign_request("POST", url, credentials, region,
                                              service, body=data)

    return requests.post(request_url, headers=headers, data=body, **kwargs)

We are using botocore functionality to get the configured keys on the instance we are running. If we are running this on an Amazon EC2 instance or AWS Lambda, botocore will use the configured IAM role.

We can now test the service by calling register:


json_body = {
            "service_name": "registerservice6",
            "service_version": "1.0",
            "endpoint_url": "notarealurlregister6",
            "ttl": "300",
            "status": "healthy"
            }
request_url = "https://yourrestapiid.execute-api.us-east-1.amazonaws.com/"\
              "dev/catalog/register"
response = signed_post(
            request_url,
            "us-east-1",
            "execute-api",
            json.dumps(json_body))
if(not response.ok):
    logger.error("Error code: %i" % (response.status_code,))
else:
    logger.info("Successfully registered the service.")

The test should complete without a failure. To test, look up this item:


request_url="https://your_rest_api_id.execute-api.us-east-1.amazonaws.com/"\
            "dev/v1/catalog/registerservice6/1.0"
response = requests.get(request_url)
json_response = json.loads(response.content)
logging.info("Endpoint URL: %s" % (json_response['endpoint_url'],))
logging.info("TTL: %i" % (json_response['ttl'],))
logging.info("Status: %s" % (json_response['status'],))

You should get the following output:


INFO:root:Endpoint URL: notarealurlregister6
INFO:root:TTL: 300
INFO:root:Status: healthy

Using a Thread Pool with the AWS SDK for C++

by Jonathan Henson | on | in C++, C++ | Permalink | Comments |  Share

The default thread executor implementation we provide for asynchronous operations spins up a thread and then detaches it. On modern operating systems, this is often exactly what we want. However, there are some other use cases for which this simply will not work. For example, suppose we want to fire off asynchronous calls to Amazon Kinesis as quickly as we receive an event. Then suppose that we sometimes receive these events at a rate of 10 per millisecond. Even if we are calling Amazon Kinesis from an Amazon Elastic Compute Cloud (EC2) instance in the same data center as our Amazon Kinesis stream, the latency will eventually cause the number of threads on our system to bloat and possibly exhaust.

Here is an example of what this code might look like:


#include <aws/kinesis/model/PutRecordsRequest.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/core/utils/Outcome.h>
#include <aws/core/utils/memory/AWSMemory.h>
#include <aws/core/Aws.h>

using namespace Aws::Client;
using namespace Aws::Utils;
using namespace Aws::Kinesis;
using namespace Aws::Kinesis::Model;

class KinesisProducer
{
public:
    KinesisProducer(const Aws::String& streamName, const Aws::String& partition) : m_partition(partition), m_streamName(streamName)
    {
        ClientConfiguration clientConfiguration;
        m_client = Aws::New<KinesisClient>("kinesis-sample", clientConfiguration);
    }

    ~KinesisProducer()
    {
        Aws::Delete(m_client);
    }

    void StreamData(const Aws::Vector<ByteBuffer>& data)
    {
        PutRecordsRequest putRecordsRequest;
        putRecordsRequest.SetStreamName(m_streamName);

        for(auto& datum : data)
        {
            PutRecordsRequestEntry putRecordsRequestEntry;
            putRecordsRequestEntry.WithData(datum)
                    .WithPartitionKey(m_partition);

            putRecordsRequest.AddRecords(putRecordsRequestEntry);
        }

        m_client->PutRecordsAsync(putRecordsRequest,
               std::bind(&KinesisProducer::OnPutRecordsAsyncOutcomeReceived, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
    }

private:
    void OnPutRecordsAsyncOutcomeReceived(const KinesisClient*, const Model::PutRecordsRequest&,
                                          const Model::PutRecordsOutcome& outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext>&)
    {
        if(outcome.IsSuccess())
        {
            std::cout << "Records Put Successfully " << std::endl;
        }
        else
        {
            std::cout << "Put Records Failed with error " << outcome.GetError().GetMessage() << std::endl;
        }
    }

    KinesisClient* m_client;
    Aws::String m_partition;
    Aws::String m_streamName;
};


int main()
{
    Aws::SDKOptions options;
    Aws::InitAPI(options);
	{
		KinesisProducer producer("kinesis-sample", "announcements");

		while(true)
		{
			Aws::String event1("Event #1");
			Aws::String event2("Event #2");

			producer.StreamData( {
										 ByteBuffer((unsigned char*)event1.c_str(), event1.length()),
										 ByteBuffer((unsigned char*)event2.c_str(), event2.length())
								 });
		}
	}
    Aws::ShutdownAPI(options);
    return 0;
}


This example is intended to show how exhausting the available threads from the operating system will ultimately result in a program crash. Most systems with this problem would be bursty and would not create such a sustained load. Still, we need a better way to handle our threads for such a scenario.

This week, we released a thread pool executor implementation. Simply include the aws/core/utils/threading/Executor.h file. The class name is PooledThreadExecutor. You can set two options: the number of threads for the pool to use and the overflow policy.

Currently, there are two overflow policy modes:

QUEUE_TASKS_EVENLY_ACROSS_THREADS will allow you to push as many tasks as you want to the executor. It will make sure tasks are queued and pulled by each thread as quickly as possible. For most cases, QUEUE_TASKS_EVENLY_ACROSS_THREADS is the preferred option.

REJECT_IMMEDIATELY will reject the task submission if the queued task length ever exceeds the size of the thread pool.

Let’s revise our example to use a thread pool:


#include <aws/kinesis/model/PutRecordsRequest.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/core/utils/Outcome.h>
#include <aws/core/utils/memory/AWSMemory.h>
#include <aws/core/utils/threading/Executor.h>

using namespace Aws::Client;
using namespace Aws::Kinesis;
using namespace Aws::Kinesis::Model;

class KinesisProducer
{
public:
    KinesisProducer(const Aws::String& streamName, const Aws::String& partition) : m_partition(partition), m_streamName(streamName)
    {
        ClientConfiguration clientConfiguration;
        clientConfiguration.executor = Aws::MakeShared<PooledThreadExecutor>("kinesis-sample", 10);
        m_client = Aws::New<KinesisClient>("kinesis-sample", clientConfiguration);
    }

    ....

The only change we need to make to add the thread pool to our configuration is to assign an instance of the new executor implementation to our ClientConfiguration object.

As always, we welcome your feedback –and even pull requests– about how we can improve this feature.

Serverless Service Discovery: Part 2: Lookup

by Magnus Bjorkman | on | in Python | Permalink | Comments |  Share

In this, the second part of our serverless service discovery series, we will use Amazon DynamoDB to store information about the services in our service discovery service and update our AWS Lambda function to read information from the DynamoDB table.

Updating Amazon API Gateway and AWS Lambda

As part of adding new functionality to what we are implementing, we need to update the Service Discovery API and the code in API Gateway and AWS Lambda. First, we update the code in AWS Lambda:


def get_function_arn(function_name):
    """Return ARN given the Function Name.

    :param function_name: The name of the Lambda function.
    :return: The ARN for the Lambda function.
    """
    client = boto3.client('lambda')

    response = client.get_function(
        FunctionName=function_name
    )

    return response['Configuration']['FunctionArn']


def update_lambda_function(package_name, function_name):
    """Update a Lambda function from zip-file.

    :param package_name: The name of the package. Full or relative path.
    :param function_name: The name of the Lambda function.
    :return: The ARN for the Lambda function.
    """
    with open(package_name, "rb") as package_file:
        package_data = package_file.read()

    # connect to Lambda API
    client = boto3.client('lambda')

    # update the function code
    client.update_function_code(
        FunctionName=function_name,
        ZipFile=package_data,
        Publish=True
    )

    # get function configuration to get top level ARN
    return get_function_arn(function_name)

The get_function_arn returns the top level ARN for the Lambda function. When we update the code, we get the ARN for the version that was uploaded, but we need to use the top level ARN with Swagger.

The update_lambda_function updates the code of the Lambda function only. There are other functions in the Lambda API to update other configurations for Lambda functions.

Next we update the API with Swagger:


def update_api(swagger_file_name):
    """Update an API defined in Swagger.

    :param swagger_file_name: The name of the swagger file.
                              Full or relative path.
    :return: The id of the REST API.
    """
    # get the API Gateway ID of the existing API
    rest_api_name = get_rest_api_name(swagger_file_name)
    client = boto3.client('apigateway')
    paginator = client.get_paginator('get_rest_apis')
    rest_api_id = ""
    for response in paginator.paginate():
        for item in response["items"]:
            if (rest_api_name == item["name"]):
                rest_api_id = item["id"]

    with open(swagger_file_name, "r") as swagger_file:
        swagger_data = swagger_file.read()

    response = client.put_rest_api(restApiId=rest_api_id,
                                   body=swagger_data)

    return response['id']

We first query API Gateway for the REST API identifier based on the name in the Swagger file. We need to submit this identifier for the update to work.

We can use the same deployment method we used in the first blog post to deploy the update to the API Gateway stage. If the stage already exists, it will just be updated.

Creating a DynamoDB Table

We need to be able to persistently store the information about a service, and we need to be able to create, update, and query that information. DynamoDB is a fully managed serverless service that works well with AWS Lambda. We will start by creating a table for our discovery service:


dynamodb = boto3.resource('dynamodb', region_name = 'us-east-1')  

# create the table
table = dynamodb.create_table(
    TableName='Services',
    KeySchema=[ { 'AttributeName': 'name',
                  'KeyType': 'HASH' }, 
                { 'AttributeName': 'version',
                  'KeyType': 'RANGE' } ],
    AttributeDefinitions=[ { 'AttributeName': 'name',
                             'AttributeType': 'S' },
                           { 'AttributeName': 'version',
                             'AttributeType': 'S' }, ],
    ProvisionedThroughput={ 'ReadCapacityUnits': 10,
                            'WriteCapacityUnits': 10 } )

# wait for the table to be ready
# this will block until the table is ACTIVE
table = boto3.resource('dynamodb').Table('Services')
table.wait_until_exists()

# insert some test data
with table.batch_writer() as batch:
    batch.put_item(Item={
                'name': 'testservice1', 
                'version': '1.0', 
                'endpoint_url': 'notarealurl1',
                'ttl': 300,
                'status': 'healthy' })
    batch.put_item(Item={
                'name': 'testservice2', 
                'version': '1.0', 
                'endpoint_url': 'notarealurl2',
                'ttl': 600,
                'status': 'healthy' })

The only attributes that we need to define are the keys. We will store additional attributes, but we can add them as we store items in the table.
We are using the name and version as the key because that matches our access pattern.
We are provisioning 10 read and write capacity units for the table. The number can be adjusted, depending on the amount of traffic the service receives and the effectiveness of the client caching.
After the table is created and active, we then prepare for the testing of our lookup service by inserting a couple of test records.

Looking Up Service Information from a DynamoDB Table

We are now ready to update our Lambda function so we can start using our DynamoDB table:


def lambda_handler(api_parameters, context):
    """Lambda hander for service lookup."""
    logger.info("lambda_handler - service_name: %s"
                " service_version: %s"
                % (api_parameters["service_name"],api_parameters["service_version"]))

    table = boto3.resource('dynamodb',region_name='us-east-1').Table('Services')

    dynamodb_response = table.get_item(
                    Key={
                        'name': str(api_parameters["service_name"]),
                        'version': str(api_parameters["service_version"])
                    }
                )

    if ('Item' in dynamodb_response):
        logger.info("found service with: %s" %
                     (dynamodb_response['Item']['endpoint_url'],))
        return {
            "endpoint_url": dynamodb_response['Item']['endpoint_url'],
            "ttl": dynamodb_response['Item']['ttl'],
            "status": dynamodb_response['Item']['status']
            }
    else:
        raise Exception('NotFound')

The function gets the item from the table, and then returns a JSON object with the information to the client.

Notice that we throw an exception if we don’t find a record in DynamoDB. We can use this exception in API Gateway to map to a 404 HTTP code. We update the two response sections in the Swagger file to make that happen:


{
  "swagger": "2.0",
  "info": {
    "title": "catalog_service",
    "version": "1.0.0"
  },
  "basePath": "/v1",
  "schemes": ["https"],
  "consumes": ["application/json"],
  "produces": ["application/json"],
  "paths": {
    "/catalog/{serviceName}/{serviceVersion}": {
      "parameters": [{
        "name": "serviceName",
        "in": "path",
        "description": "The name of the service to look up.",
        "required": true,
        "type": "string"
      },
      {
        "name": "serviceVersion",
        "in": "path",
        "description": "The version of the service to look up.",
        "required": true,
        "type": "string"
      }],
      "get": {
        "responses": {
          "200": {
            "description": "version information"
          },
          "404": {
            "description": "service not found"
          }
        },
        "x-amazon-apigateway-integration": {
          "type": "aws",
          "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$catalog_serviceARN$/invocations",
          "httpMethod": "POST",
          "requestTemplates": {
            "application/json": "{\"service_name\": \"$input.params('serviceName')\",\"service_version\": \"$input.params('serviceVersion')\"}"
          },
          "responses": {
            "default": {
              "statusCode": "200",
              "schema": {
                "$ref": "#/definitions/CatalogServiceModel"
              }
            },
            ".*NotFound.*": {
              "statusCode": "404",
              "responseTemplates" : {
                 "application/json": "{\"error_message\":\"Service Not Found\"}"
                } 
            } 
          }
        }
      }
    }
  },
  "definitions": {
    "CatalogServiceModel": {
      "type": "object",
      "properties": {
        "endpoint_url": {
          "type": "string"
        },
        "ttl": {
          "type": "integer"
        },
        "status": {
          "type": "string"
        }
      },
      "required": ["endpoint_url", "ttl", "status"]
    }
  }
}

We use a regular expression (.*NotFound.*) under x-amazon-apigateway-integration -> responses to catch our exception and map it to a static JSON message.

We can now put everything together and update the code and API:


create_deployment_package("/tmp/catalog_service.zip", ["catalog_service.py"])
function_arn = update_lambda_function(
                       "/tmp/catalog_service.zip",
                       "catalog_service")
replace_instances_in_file("swagger.json",
                          "/tmp/swagger_with_arn.json",
                          "$catalog_serviceARN$", function_arn)
api_id = update_api("/tmp/swagger_with_arn.json")
deploy_api(api_id, "/tmp/swagger_with_arn.json", "dev")

We can use the following to test our deployment:


request_url="https://yourrestapiid.execute-api.us-east-1.amazonaws.com/"\
            "dev/catalog/testservice2/1.0"
response = requests.get(request_url)
json_response = json.loads(response.content)
logging.info("Endpoint URL: %s" % (json_response['endpoint_url'],))
logging.info("TTL: %i" % (json_response['ttl'],))
logging.info("Status: %s" % (json_response['status'],))

That should give us the following results:


INFO:root:Endpoint URL: notarealurl2
INFO:root:TTL: 600
INFO:root:Status: healthy

Serverless Service Discovery – Part 1: Get Started

by Magnus Bjorkman | on | in Python | Permalink | Comments |  Share

AWS provides a lot of features and services for writing serverless architectures with Python. In this four-part series, we will show how you can use Python to manage and implement Amazon API Gateway, AWS Lambda, and Amazon DynamoDB. We will use a common use case, service discovery, to showcase a simple way to do this with Python and boto3. Service discovery is a foundational service for microservices. There are many implementations running on servers or in containers, including Consul by HashiCorp and ZooKeeper from Apache.

This four-part series will cover the following topics:

  • Part 1: Get Started: Using Python and Swagger to Deploy to Amazon API Gateway and AWS Lambda
  • Part 2: Lookup: Looking Up Service Information in Amazon DynamoDB from AWS Lambda
  • Part 3: Registration: Using Signature Version 4 Authentication to API Gateway and AWS Lambda
  • Part 4: Registrar: Using a Registrar Agent in AWS Lambda to Manage Service Registration

By the end of the series, we will have built the system shown in this diagram:

Then we will be able to use a client that can look up a Hello World service in the discovery service, and call the Hello World service. We will also implement a registrar agent with the Hello World service that will keep the information about the Hello World service up-to-date in the discovery service.

Today’s post will cover these areas of our overall design:

We will create the basics of setting up a service running on API Gateway and Lambda. So we can do something easy to get us started, for this first step, we will return hard-coded values in the service.

We will create a few functions to make it easy to manage our serverless architecture. These are all of the imports used by management functions in this series:

import json
import os
import logging
import zipfile
import boto3

We set the log level to INFO:


logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)

Creating an AWS Lambda Function

We start with a couple of utility methods that will help us package a list of files or directories into a zip file that can be used with AWS Lambda:

def zipdir(path, ziph):
    """Add directory to zip file.

    :param path: The top level path to traverse to discover files to add.
    :param ziph: A handle to a zip file to add files to.
    """
    for root, dirs, files in os.walk(path):
        for file in files:
            ziph.write(os.path.join(root, file))


def create_deployment_package(package_name, file_names):
    """Create a deployment package for Lambda.

    :param package_name: The name of the package. Full or relative path.
    :param file_names: Files or folders to add to the package.
    """
    ziph = zipfile.ZipFile(package_name, "w", zipfile.ZIP_DEFLATED)
    for file_name in file_names:
        if (os.path.isdir(file_name)):
            zipdir(file_name, ziph)
        else:
            ziph.write(file_name)
    ziph.close()

The next function will use the package we just created to create the Lambda function:


def create_lambda_function(package_name, function_name, role,
                           handler, description, account_number):
    """Create a Lambda function from zip-file.

    :param package_name: The name of the package. Full or relative path.
    :param function_name: The name of the Lambda function to create.
    :param role: The Role ARN to use when executing Lambda function
    :param handler: The handler to execute when the Lambda function is called.
    :param description: The description of the Lambda function.
    :param: account_number: The Account number of the API Gateway using this
                            function.
    :return: The ARN for the Lambda function.
    """
    with open(package_name, "rb") as package_file:
        package_data = package_file.read()

    # connect to Lambda API
    client = boto3.client('lambda')

    # create the function
    response = client.create_function(
        FunctionName=function_name,
        Runtime="python2.7",
        Role=role,
        Handler=handler,
        Code={'ZipFile': package_data},
        Description=description,
        Timeout=60,
        MemorySize=128,
        Publish=True
    )

    # store away the name and arn for later use
    function_arn = response['FunctionArn']
    function_name = response['FunctionName']

    # add permissions for the function to be called by API Gateway
    response = client.add_permission(
        FunctionName=response['FunctionArn'],
        StatementId=response['FunctionName']+"-invoke",
        Action="lambda:InvokeFunction",
        Principal="apigateway.amazonaws.com",
        SourceArn='arn:aws:execute-api:us-east-1:'+account_number+':*'
    )

    return function_arn

We read the package into memory and provide it directly to the create_function method that creates our Lambda function. You might want to put a large package in Amazon S3 and then submit a reference to the package.

We need to give permissions to API Gateway to call our Lambda function. We do that using the AWS Lambda resource policies, adding the ARN of the API Gateway service for our account to the Lambda permissions.

Creating an API with Swagger

We again start with a couple of utility methods.


def replace_instances_in_file(filename_source, filename_target, old, new):
    """Replace string occurence in file.

    :param filename_source: The name of the file to read in.
    :param filename_target: The name of the file to write to.
    :param old: The string to find in the file.
    :param new: The string to replace any found occurrences with.
    """
    with open(filename_source, 'r') as f:
        newlines = []
        for line in f.readlines():
            newlines.append(line.replace(old, new))
    with open(filename_target, 'w') as f:
        for line in newlines:
            f.write(line)


def get_rest_api_name(swagger_file):
    """Get Rest API Name from Swagger file.

    :param swagger_file: The name of the swagger file. Full or relative path.
    :return: The name of the API defined in the Swagger file.
    """
    with open(swagger_file) as json_data:
        api_def = json.load(json_data)
        json_data.close()
        rest_api_name = api_def["info"]["title"]
        return rest_api_name

The replace_instances_in_file function allows us to take Lambda function ARNs and put them into specific places in the Swagger file. We will put in a marker string in the Swagger file. This function finds the marker and replaces it with the Lambda ARN.

The get_rest_api_name function allows us to get the name of the REST API specified in the Swagger file so we can use it with calls to the API Gateway API.

In the following function, we are using the newly released API function to import an API defined in Swagger:


def create_api(swagger_file_name):
    """Create an API defined in Swagger.

    :param swagger_file_name: The name of the swagger file.
                              Full or relative path.
    :return: The id of the REST API.
    """
    with open(swagger_file_name, "r") as swagger_file:
        swagger_data = swagger_file.read()

    client = boto3.client('apigateway')
    response = client.import_rest_api(body=swagger_data)

    return response['id']

Like the creation of the Lambda function, we read the Swagger file into memory and submit it directly to the function.

The last management function deploys the API to an API Gateway stage so we have a public host name that we can use to access the API:


def deploy_api(api_id, swagger_file, stage):
    """Deploy API to the given stage.

    :param api_id: The id of the API.
    :param swagger_file: The name of the swagger file. Full or relative path.
    :param stage: The name of the stage to deploy to.
    :return: Tuple of Rest API ID, stage and Enpoint URL.
    """
    client = boto3.client('apigateway')

    with open(swagger_file) as json_data:
        api_def = json.load(json_data)
        json_data.close()
        logger.info("deploying: "+api_id+" to "+stage)
        client.create_deployment(restApiId=api_id,
                                 stageName=stage)

        # print the end points
        logger.info("--------------------- END POINTS (START) ---------------")
        for path, path_object in api_def["paths"].iteritems():
            logger.info("End Point: https://%s"
                        ".execute-api.us-east-1.amazonaws.com/"
                        "%s%s" % (api_id, stage, path))
        logger.info("--------------------- END POINTS (END) -----------------")

        enpoint_url = ("https://%s"
                       ".execute-api.us-east-1.amazonaws.com/"
                       "%s" % (api_id, stage))
        return api_id, stage, enpoint_url

Deploying a Skeleton Service

We are now ready to test the functions with a simple skeleton of our service lookup function. The function is minimal and includes hard-coded values:


def lambda_handler(api_parameters, context):
    """Lambda hander for service lookup."""
    logger.info("lambda_handler - service_name: %s"
                " service_version: %s"
                % (api_parameters["service_name"],
                   api_parameters["service_version"]))

    response = {
            "endpoint_url": "notarealurl",
            "ttl": "300",
            "status": "healthy"
         }

    return response

Given a service name and a service version, the function will return three values:

  • The endpoint URL from which the service can be accessed.
  • The time to live (TTL) for this information so that a client knows for how long to cache this information and can avoid unnecessary calls to the service.
  • The status of the service, either healthy or unhealthy.

We define the API in a Swagger file for the preceeding Lambda function:


{
  "swagger": "2.0",
  "info": {
    "title": "catalog_service",
    "version": "1.0.0"
  },
  "schemes": ["https"],
  "consumes": ["application/json"],
  "produces": ["application/json"],
  "paths": {
    "/catalog/{serviceName}/{serviceVersion}": {
      "parameters": [{
        "name": "serviceName",
        "in": "path",
        "description": "The name of the service to look up.",
        "required": true,
        "type": "string"
      },
      {
        "name": "serviceVersion",
        "in": "path",
        "description": "The version of the service to look up.",
        "required": true,
        "type": "string"
      }],
      "get": {
        "responses": {
          "200": {
            "description": "version information"
          }
        },
        "x-amazon-apigateway-integration": {
          "type": "aws",
          "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$catalog_serviceARN$/invocations",
          "httpMethod": "POST",
          "requestTemplates": {
            "application/json": "{\"service_name\": \"$input.params('serviceName')\",\"service_version\": \"$input.params('serviceVersion')\"}"
          },
          "responses": {
            "default": {
              "statusCode": "200",
              "schema": {
                "$ref": "#/definitions/CatalogServiceModel"
              }
            } 
          }
        }
      }
    }
  },
  "definitions": {
    "CatalogServiceModel": {
      "type": "object",
      "properties": {
        "endpoint_url": {
          "type": "string"
        },
        "ttl": {
          "type": "integer"
        },
        "status": {
          "type": "string"
        }
      },
      "required": ["endpoint_url", "ttl", "status"]
    }
  }
}

We define our service method as a GET method that will take the service name and service version as part of the path. We have also defined a response model (CatalogServiceModel) that specifies our return properties as the endpoint URL, the TTL, and the status.

The x-amazon-apigateway-integration element specifies how Amazon API Gateway will be integrated with AWS Lambda. The marker $catalog_serviceARN$ will be replaced with the AWS Lambda function ARN when this service is deployed.

We can now use all of the above to deploy our service to Lambda and API Gateway:


ACCOUNT_NUMBER = _your AWS account number_

create_deployment_package("/tmp/catalog_service.zip", ["catalog_service.py"])
function_arn = create_lambda_function(
                       "/tmp/catalog_service.zip",
                       "catalog_service",
                       "arn:aws:iam::"+ACCOUNT_NUMBER+":role/lambda_s3",
                       "catalog_service.lambda_handler",
                       "Looking up service information.",
                       ACCOUNT_NUMBER)
replace_instances_in_file("swagger.json",
                          "/tmp/swagger_with_arn.json",
                          "$catalog_serviceARN$", function_arn)
api_id = create_api("/tmp/swagger_with_arn.json")
deploy_api(api_id, "/tmp/swagger_with_arn.json", "dev")

We can use this to test our new deployment:


import requests
import json
import logging

request_url = "https://yourrestapiid.execute-api.us-east-1.amazonaws.com/"\
              "dev/catalog/testservice1/1.0"
response = requests.get(request_url)
json_response = json.loads(response.content)
logging.info("Endpoint URL: %s" % (json_response['endpoint_url'],))
logging.info("TTL: %i" % (json_response['ttl'],))
logging.info("Status: %s" % (json_response['status'],))

That should give us the following results:


Endpoint URL: notarealurl
TTL: 300
Status: healthy

AWS Lambda Support for Node.js 4.3.2 Runtime

We are pleased to announce that in conjunction with the availability of Node.js 4.3.2 in AWS Lambda, Version 2.3.2 of the AWS SDK for JavaScript (in both Node.js and in the browser) supports the passing in of "nodejs4.3" as a value for the Runtime parameter for the createFunction and updateFunctionConfiguration operations. Now that the Node.js 4.3.2 runtime is available in Lambda, you will be able to take advantage of ES6 features in your Lambda functions, such as native Promise support, destructuring assignment, block-scope variables, and more. For more information about using Node.js 4.3.2 in Lambda, see the AWS Compute Blog.

Since Version 2.3.0, the AWS SDK for JavaScript supports a Promise interface on all requests. Lambda will soon update its default version of the SDK to include this feature, but you can leverage the SDK’s Promise support in Lambda now by bundling the latest version of the SDK in your Lambda functions. For more information about the SDK’s Promise support, see our blog post.

We are excited for Lambda support for Node.js 4.3 and the possibilities for our customers. We would love to hear your feedback. You can leave comments here on our blog, open up an issue on Github, or tweet @AWSforJS!

AWS SDK for .NET Version 2 Status

by Norm Johanson | on | in .NET | Permalink | Comments |  Share

Version 3 of the AWS SDK for .NET has been generally available since 7/28/2015. Although the legacy version (v2) of the SDK will continue to work, we strongly recommend that all customers migrate to the latest version 3 to take advantage of various improvements including modularized architecture, portable class library support, and .NET Core support. There are only a few backward-incompatible changes between version 2 and version 3 (see the migration guide for details). Additionally, the last few legacy releases of the version 2 SDK (versions 2.3.50 and later) have all the classes and methods that are changed in version 3 marked obsolete, so compile-time warnings can help you make forward-compatible updates before upgrading to the version 3 SDK.

To help customers plan their migration, our current maintenance timeline for the version 2 SDK is provided below.

Security issues and critical bugs

Critical bugs with no reasonable workaround as well as any security-related issues will be addressed with the highest priority. We will continue to support fixing such issues indefinitely.

Non-critical bugs

We will continue to address non-critical bugs in the version 2 SDK until the end of 2016. These bugs will be fixed in relative priority order. Factors considered in prioritization will include

  • Number of affected customers
  • Severity of the problem (broken feature vs. typo fix in documentation)
  • Whether the issue is already fixed in version 3
  • Risk of the fix causing unintended side effects

Service API updates

We will continue to add API updates to existing service clients based on customer request (GitHub Issue) until 8/1/2016.

New service clients

New service clients will not be added to the version 2 SDK. They will only be added to version 3.

As always, please find us in the Issues section of the SDK repository on GitHub, if you would like to report bugs, request service API updates, or ask general questions.

Support for Promises in the SDK

Today’s release of the AWS SDK for JavaScript (v2.3.0) introduces support for promises when calling service operations. Promises provide an alternative to the use of a callback function to manage asynchronous flow. They allow treating asynchronous calls as a variable, simplifying error handling and providing greater control over handling results from asynchronous calls.

For more information about promises, check out out this entry on MDN!

This post describes how to use promises within the context of the SDK to handle completing asynchronous tasks.

Setting a Promise Library

By default, the AWS SDK for JavaScript will check for a globally defined Promise function. If found, it adds the promise() method on AWS.Request objects. Some environments, such as Internet Explorer or earlier versions of Node.js, don’t support promises natively. You can use the AWS.config.setPromisesDependency() method to supply a Promise constructor. After this method is called, promise() will exist on all AWS.Request objects. The following example shows how you can set a custom promise implementation for the AWS SDK to use.

Node.js Example

// Use bluebird implementation of Promise
AWS.config.setPromisesDependency(require('bluebird'));
// Use Q implementation of Promise
AWS.config.setPromisesDependency(require('Q').Promise);
// Use 'rsvp' implementation of Promise
AWS.config.setPromisesDependency(require('rsvp').Promise);
// Revert back to using native or globally available Promise
AWS.config.setPromisesDependency(null);

Browser Example

In the browser, a library that implements promises and provides a global Promise namespace, (bluebird) should be loaded before the AWS SDK is loaded. The AWS SDK will then find the namespace and use it internally. If the AWS SDK is loaded before the library, or the library uses a namespace other than Promise, then the namespace can be provided by calling AWS.config.setPromisesDependency():

// AWS SDK was loaded after bluebird, set promise dependency
AWS.config.setPromisesDependency(Promise);

Making Requests by Using Promises

Instead of using callbacks, the AWS.Request.promise() method provides a way to call a service operation and return a promise to manage asynchronous flow instead of callbacks. In node.js and the browser, an AWS.Request is returned when a service operation is called without a callback function. You could call send() on the request to make the service call. The promise() method immediately starts the service call and returns a promise.

The following examples provide a comparison between the use of a simplified callback method and promises to make a simple request.

Simplified Callback Method

In the following example, a callback method that accepts error and data objects is supplied. The logic to handle errors or successful requests is contained in the callback method.

var s3 = new AWS.S3({apiVersion: '2006-03-01', region: 'us-west-2'});
var params = {
  Bucket: 'bucket',
  Key: 'example1.txt',
  Body: 'Uploaded text using the simplified callback method!'
};
s3.putObject(params, function(err, data) {
  if (err) {
    console.log(err);
  } else {
    console.log('Success');
  }
});

Promise-Based Method

In the following example, a promise is returned that is fulfilled with a data object, or rejected with an error object. Using promises, a single callback isn’t responsible for detecting errors. Instead, the correct callback will be called based on the success or failure of a request.

var s3 = new AWS.S3({apiVersion: '2006-03-01', region: 'us-west-2'});
var params = {
  Bucket: 'bucket',
  Key: 'example2.txt',
  Body: 'Uploaded text using the promise-based method!'
};
var putObjectPromise = s3.putObject(params).promise();
putObjectPromise.then(function(data) {
  console.log('Success');
}).catch(function(err) {
  console.log(err);
});

The following example demonstrates how to write a simple script that uploads a directory of files to an Amazon S3 bucket, and then sends an email after the upload is complete.

Promises Example

// Check if environment supports native promises
if (typeof Promise === 'undefined') {
  AWS.config.setPromisesDependency(require('bluebird'));
}

var s3 = new AWS.S3({apiVersion: '2006-03-01', region: 'us-west-2'});
var ses = new AWS.SES({apiVersion: '2010-12-01', region: 'us-west-2'});

// Take a list of objects containing file data and send an email
var sendEmail = function sendEmail(files) {
  var keys = files.map(function(file) {
    return file.key;
  });
  var body = keys.join('n') + 'nnobjects were successfully uploaded.';
  var params = {
    Source: 'from@email.com',
    Destination: {
      ToAddresses: ['to@email.com']
    },
    Message: {
      Subject: {
        Data: 'Batch PutObject job completed'
      },
      Body: {
        Text: {
          Data: body
        }
      }
    }
  };
  return ses.sendEmail(params).promise();
};

// Upload a list of files to an S3 bucket
var putBatch = function putBatch(bucket, files) {
  // Make all the putObject calls immediately
  // Will return rejected promise if any requests fail
  return Promise.all(files.map(function(file) {
    var params = {
      Bucket: bucket,
      Key: file.key,
      Body: file.stream
    };
    return s3.putObject(params).promise();
  }));
};

// Create streams for files
var fileNames = fs.readdirSync('/path/to/dir/');
var files = fileNames.map(function(fileName) {
  return {
    key: fileName,
    stream: fs.createReadStream('/path/to/dir/' + fileName)
  };
});

//Upload directory of files to S3 bucket and send an email on success
putBatch('myBucket', files)
  .then(sendEmail.bind(null, files))
  .catch(console.error.bind(console));

You’ll notice we aren’t checking for errors inside our callback functions to determine the action to take. Instead, we can attach a single catch callback to our promise chain that will handle any errors thrown by our functions. Our code also reads as a series of steps to take (putBatch then sendEmail) instead of having to nest callbacks inside callbacks.

When a promise is fulfilled, the registered callback will have access to the data object as the first argument. We could have also provided a second callback that would be executed if the promise was rejected, but used a single catch statement instead. Rejected promises will pass an error object as the first argument to a callback.

Give It a Try!

We would love to hear what you think of this new feature. Give the AWS SDK for JavaScript v2.3.0 a try and leave your feedback in the comments or on GitHub!

Testing Lambda functions using the AWS Toolkit for Eclipse

In this blog post, I will introduce how to test AWS Lambda functions in Eclipse by using the AWS Toolkit for Eclipse. The AWS Toolkit for Eclipse Lambda Plugin provides a feature in which a JUnit test class is created automatically upon creation of a new AWS Lambda Java project. I will give you step-by-step instructions for creating an AWS Lambda Java project, creating an AWS Lambda function, unit testing the AWS Lambda function, uploading the AWS Lambda function to AWS Lambda, and testing the AWS Lambda function remotely. Currently, AWS Lambda supports five AWS service event sources for Java: S3 Event, SNS Event, Kinesis Event, Cognito Event, and DynamoDB Event. You can also define a custom event. In this post, I will give examples using the S3 Event and a custom event. The other event types can be used in the same way.

Prerequisites

  1. Install Eclipse version 3.6 (Helios) or later on your computer.
  2. Follow the instructions on the AWS Toolkit for Eclipse page to Install the AWS Toolkit for Eclipse.
  3. After it is installed, the new AWS Toolkit for Eclipse icon will appare on the toolbar.

Steps for testing a Lambda function

  1. Create an AWS Lambda Java project.
    • Choose the AWS Toolkit for Eclipse or New icon, and then choose AWS Lambda Java Project.
    • When you create a name for the project and package, you should see the corresponding changes in the Preview text area. For Input Type, you can choose from the five AWS service event sources plus the custom event. You can also complete the Class Name, Input Type, and Output Type fileds. The AWS Toolkit for Eclipse will auto-generate the Lambda function class in the src/ folder and the unit test class in the tst/ folder. In this example, we set Project Name as S3EventDemo, Package Name to  com.lambda.demo.s3, and left the other settings at their defaults.
    • The AWS Toolkit for Eclipse will create the following folder structure for the S3 Event.

      The LambdaFunctionHandler class is an implementation of the RequestHandler interface that defines the Lambda function you need to implement. The LambdaFunctionHandlerTest class is where the unit tests reside. The TestContext class is an implementation of the Context interface, which acts as a parameter for the Lambda function. The TestUtils class is a supporting class to parse JSON file. The s3-event.put.json file is the sample S3 event source configuration you can use for testing.
  2. Create an AWS Lambda function.
    You need to implement the Lambda function handleRequest in the LambdaFunctionHandler class. It takes S3Event and Context as parameters, and returns an Object. You can always define a custom output class instead of the default Object class. The following is the sample implementation of the Lambda function, which returns a string of the bucket name from the S3 Event.

    @Override
    public Object handleRequest(S3Event input, Context context) {
        context.getLogger().log("Input: " + input);
        return input.getRecords().get(0).getS3().getBucket().getName();
    }
    
  3. Unit-test the AWS Lambda function.
    In the unit test, the S3Event parameter is loaded from the s3-event.put.json file in the tst/ folder and the Context is implemented and instantiated by the customers for testing. The default unit test in the LambdaFunctionHandlerTest class is simply printing the output. You may want to change this to a validation as shown in the following code. From the s3-event.put.json file, the bucket name returned from the Lambda function is expected to be “sourcebucket.

    @Test
    public void testLambdaFunctionHandler() {
        LambdaFunctionHandler handler = new LambdaFunctionHandler();
        Context ctx = createContext();
        Object output = handler.handleRequest(input, ctx);
        if (output != null) {
            System.out.println(output.toString());
        }
        Assert.assertEquals("sourcebucket", output);
    }

    This is the simplest way to write the test case. When you run the unit test, output like that shown in the following screenshot will appear in the console.

  4. Upload and run the AWS Lambda function.You can also test the Lambda function after you upload it to AWS Lambda. To do this, right-click anywhere in the workspace of the project, choose Amazon Web Services, and choose Run function on AWS Lambda…, as shown in the following screenshot.

    You will be asked to select the JSON file as the S3Event input. Choose the default one provided by the AWS Toolkit for Eclipse, as shown in the following screenshot.

    Choose Invoke. You will see output similar to the following screenshot in the console. The function output is the bucket name returned by the Lambda function.

  5. Test the custom event Lambda function.
    The workflow for testing a custom event is very similar to testing the S3 Event. Let’s define a Lambda function that calculates the maximum value from a list of integer values.

    • First, define the custom event input class.
      public class CustomEventInput {
          private List<Integer> values;
          public List<Integer> getValues() {
              return values;
          }
          public void setValues(List<Integer> values) {
              this.values = values;
          }
      }
      
    • Second, define the custom event output class.
      public class CustomEventOutput {
          private Integer value;
          public CustomEventOutput(int value) {
              setValue(value);
          }
          public Integer getValue() {
              return value;
          }
          public void setValue(Integer value) {
              this.value = value;
          }
      }
      
    • Third, implement the Lambda function.
      @Override
      public CustomEventOutput handleRequest(CustomEventInput input, Context context) {
          context.getLogger().log("Input: " + input);
      
          int maxValue = Integer.MIN_VALUE;
          for (Integer value : input.getValues()) {
              if (value > maxValue) {
                  maxValue = value;
              }
          }
          return new CustomEventOutput(maxValue);
      }
      
    • Fourth, prepare a sample JSON file as the CustomEventInput object for testing. AWS Lambda will use JSON format to represent THE object you defined. Here is an example using POJOs for handler input/output.
      {
          "values" : [34, 52, 335, 32]
      }
      
    • Lastly, upload this Lambda function to AWS Lambda, and test remotely. You should see console output similar to the following. The output is the JSON format of the CustomEventOutput object returned by the Lambda function.

This is how a typical Lambda function is written and tested using the AWS Toolkit for Eclipse. For more advanced use cases, you can use the S3 Event and DynamoDB Event examples provisioned by AWS Lambda.

Announcing the AWS Encryption SDK

by Andrew Shore | on | in Java | Permalink | Comments |  Share

We’ve published several posts on client-side encryption using Java tools over the past couple of years, including ones on the S3 Encryption Client and the DynamoDB Encryption Client. Both of these clients assume a specific AWS service as the storage layer for data encrypted by the client. Today, the AWS Cryptography team released the AWS Encryption SDK for Java, a library that you can use to encrypt your data without assuming a particular storage layer. The SDK makes envelope encryption easier for developers while minimizing errors that could lower the security of your applications. The SDK doesn’t require you to use any specific AWS services, but we’ve provided ready-to-use samples for AWS customers who do use AWS CloudHSM or AWS Key Management Service (KMS).

Check out the AWS Encryption SDK on AWS Labs. You should also read Greg Rubin’s post on the AWS Security Blog on how to use the SDK. Let us know what you think!

Introducing Retry Throttling

by Jonathan Breedlove | on | in Java | Permalink | Comments |  Share

Client side retries are used to avoid surfacing unnecessary exceptions back to the caller in the case of transient network or service issues.  In these situations a subsequent retry will likely succeed.  Although this process incurs a time penalty, it is often better than the noise from oversensitive client side exceptions.  Retries are less useful in cases of longer running issues where subsequent retries will almost always fail. An extended retry loop for each request ties up a client application thread, that could otherwise be moving on to another task, only to return an exception.  In cases of service degradation, the explosion of retried requests from clients can often exacerbate problems for the service, which hurts recovery times, prolonging the client side impact. To address this issue we are pleased to announce the introduction of a client retry throttling feature.  

Retry throttling is designed to throttle back retry attempts when a large percentage of requests are failing and retries are unsuccessful. With retry throttling enabled, the client will drain an internal retry capacity pool and slowly roll off from retry attempts until there is no remaining capacity. At this point subsequent retries will not be attempted until the client gets successful responses, at which time the retry capacity pool will slowly begin to refill and retries will once again be permitted.  Because retry throttling only kicks in only when a large number of requests fail and retries are not successful, transient retries are still permitted and unaffected by this feature. Retries resulting from provisioned capacity exceptions are not throttled.

Behavior compared

To test the effectiveness of this new feature we set up a controlled environment in which we could subject the AWS SDK for Java to various failure scenarios.  For this test, we drove a consistent request load through the client and placed a fault injection proxy between the client and service.  The fault proxy was set up to return 5xx responses for a certain percentage of requests.  Each test run lasted 30 minutes. The test, which initially began with no errors, slowly ramped up to a 100% error rate, and then back down to 0% by the end of the run.

No throttling

With the default retry behavior and no throttling you can clearly see the client ramping up retries proportional to the number of 5xx responses it sees.  At the middle of the test run we hit the 100% error rate and retries are pegged at their maximum level.  Even though none of these retries result in successful responses the client continues retrying at the same pace, tying up application threads and client connections and hammering the service with wasteful requests.

Throttling enabled

With retry throttling enabled you can see the client initially ramp up its retry attempts as 5xx errors are introduced but begin to tail off as errors increase.  After the 100% error rate is reached the client abandons retry attempts because there are no successful responses. As the error rate drops below 100% and the client begins to get successful responses, retries are slowly re-enabled.

In situations where retries have been throttled this feature will result in fail-fast behavior from the client. Because retries are circumvented an exception will be immediately returned to the caller immediately if the initial request is unsuccessful.  Although this will result in more up-front exceptions, it will avoid tying up connections and client application threads for longer periods of time. This is particularly important in latency sensitive applications.

Enabling retry throttling

Retry throttling can be enabled by explicitly setting it on the ClientConfiguration, as shown in this example:

ClientConfiguration clientConfig = new ClientConfiguration();
clientConfig.setUseThrottleRetries(true);
AmazonSQS sqs = new AmazonSQSClient(clientConfig);

Alternatively, it can be enabled by including this system property when you start up the JVM. Retry throttling will apply to all client instances running in that VM:

    -Dcom.amazonaws.sdk.enableThrottledRetry

As you can see, it’s easy to opt in to this feature. Retry throttling can improve the ability of the SDK to adapt to suboptimal situations.  Have you used this feature? Feel free to leave questions or comments below!