Category: Python


Build and Deploy a Serverless REST API in Minutes Using Chalice

by Leah Rivers | on | in Python | | Comments

Chalice is a serverless microframework that makes it simple for you to use AWS Lambda and Amazon API Gateway to build serverless apps. We’ve improved Chalice based on community feedback from GitHub, and we’re eager for you to take our latest version for a spin. Hopefully, you’ll find Chalice a fast and effective way to build serverless apps.

To help you get started with Chalice, here’s a quick five-step review:

   Step 1: Install Chalice
   Step 2: Configure credentials
   Step 3: Create a project
   Step 4: Deploy your API
   Step 5: You’re done launching a simple API. Consider adding something to your app!

Let’s dig in.

Step 1: Install Chalice.
To install Chalice, you have to use Python2.7 or 3.6, the versions Lambda supports. We recommend using a virtual environment, as follows.
(If you haven’t installed chalice before, you can do that with pip install chalice).

 $ pip install virtualenv
 $ virtualenv ~/.virtualenvs/chalice-demo
 $ source ~/.virtualenvs/chalice-demo/bin/activate

Step 2: Add credentials if you haven’t previously configured boto3 or the AWS CLI.
(If you’re already running boto3 or the AWS CLI, you’re all good. Move on to Step 3.)

If this is your first time configuring credentials for AWS, use the following.

 $ mkdir ~/.aws
 $ cat >> ~/.aws/config
 [default]
 aws_access_key_id=YOUR_ACCESS_KEY_HERE
 aws_secret_access_key=YOUR_SECRET_ACCESS_KEY
 region=YOUR_REGION (such as us-west-2, us-west-1, etc)

For more information on all the supported methods for configuring credentials, see the boto3 docs.

Step 3: Create a project using the chalice command line.
Use the new-project command to create a sample app that defines a single view.

 $ chalice new-project helloworld
 $ cd helloworld

Take a moment to check out what you’ve created. In app.py, you’ve created a sample app that defines a single view, /, that when called will return the JSON body {“hello”: “world”}.

Step 4: Deploy your App.
Alright, double-check that you’re still in your project directory – you’re ready to deploy!
From the command line, run chalice deploy.

 $ chalice deploy
 ...
 Initiating first time deployment...
 https://qxea58oupc.execute-api.us-west-2.amazonaws.com/dev/

You now have an API up and running using API Gateway and Lambda.

 $ curl https://qxea58oupc.execute-api.us-west-2.amazonaws.com/dev/
 {"hello": "world"}

Step 5: Add something to your app!
From this point, there’s a bunch of stuff you can do, including adding URL parameters, adding routing, or customizing the HTTP response. Find tutorials and examples here.

Have fun!

Using Python and Amazon SQS FIFO Queues to Preserve Message Sequencing

by Tara Van Unen | on | in Python | | Comments

Thanks to Alexandre Pinhel, Solutions Architect from our team for writing this post!

Amazon SQS
is a managed message queuing service that makes it simple to decouple application components. We recently announced an entirely new queue type, SQS FIFO (first-in, first out) queues with exactly-once processing and deduplication. SQS FIFO queues are now available in the US East (Ohio) and US West (Oregon) regions, with more regions to follow. This new type of queue lets you use Amazon SQS for systems that depend on receiving messages in exact order, and exactly once, such as financial services and e-commerce applications. For example, FIFO queues help ensure mobile banking transactions are processed in the correct sequence, and that inventory updates for online retail sites are processed in the right order. In this post, we show how to use FIFO queues to preserve message sequencing with Python.

FIFO queues complement our existing SQS standard queues, which offer higher throughput, best-effort ordering, and at-least-once delivery. The following diagram compares the features of standard queues vs. FIFO queues. The same API functions apply to both types of queues.

The below use case provides an example of how you can now use SQS FIFO queues to exchange sequence-sensitive information. For more information about developing applications using Amazon SQS, see the Amazon SQS Developer Guide.

SQS FIFO Queues Example

In the capital markets industry, some of the most common patterns for exchanging messages with partners and customers are based on messaging technologies with two types of scenarios:

  1. Communication channels between two messaging managers (one sender channel and one receiver channel). Each messaging manager hosts the local queue and has an alias to the remote queue hosted on the other side (an MQ manager). The messages sent from an MQ manager are not stored locally. The receiving MQ manager stores the messages for the client applications of the named queues.
  2. A single messaging manager that hosts all the queues and that has the associated responsibility for message exchange and backup.

You can use Amazon SQS to decouple the components of an application so that these components can run independently, as expected in a messaging use case. The following diagram shows a sample architecture using an SQS queue with processing servers.


To preserve the order of messages, we use FIFO queues. These queues help ensure that trades are received in the correct order, and a book event is received before an update event or a cancel event.

Important: The name of a FIFO queue must end with the .fifo suffix.

The following diagram shows a financial use case, where Amazon SQS FIFO queues are used with different processing servers based on the type of messages being managed.

 

 

In FIFO queues, Amazon SQS also provides content-based deduplication. Content-based deduplication allows SQS to distinguish the contents of one message from the contents of another message using the message body. This helps eliminate duplicates in referential systems such as those that manage pricing.

In the following example, we simulate the two parts of a capital market exchange. In the first part, we simulate the application sending the trade status and sending messages to the queue named Trade Status. (In Amazon SQS, the queue will be named TradeStatus.fifo.) The application regularly sends trade status received during the trade lifecycle in the queue (for example, trade received, trade checked, trade confirmed, and so on). In the second part, we simulate a client application that gets the trade status to update an internal website or to send status update notifications to other tools. The script stops after the message is read.

To accomplish this, you can use the following two Python code examples. This example is using boto3, the AWS SDK for Python.

This first script sends an XML message to a queue named TradeStatus.fifo, and the second script receives the message from the same queue. Messages can contain up to 256 KB of text in any format. Any component can later retrieve the messages programmatically using the Amazon SQS API. You can manage messages larger than 256 KB by using the SQS Extended Client Library for Java, which uses Amazon S3 to store larger payloads.

For queue creation, please see the Amazon SQS Developer guide.

Name: TradeStatus.fifo

URL: https://sqs.us-west-2.amazonaws.com/12345678/TradeStatus.fifo

The scripts below are in Python2.

import boto3

# Get the service resource
sqs = boto3.resource('sqs')

# Get the queue
queue = sqs.get_queue_by_name(QueueName='TradeStatus.fifo')

try:
    userInput = raw_input("Please enter file name: ")
except NameError:
    pass

with open(userInput, 'r') as myfile:
    data=myfile.read()

response = queue.send_message(
    MessageBody=data,
    MessageGroupId='messageGroup1'
)

# The response is NOT a resource, but gives you a message ID and MD5
print(response.get('MessageId'))
print(response.get('MD5OfMessageBody'))

The following Python code receives the message from the TradeStatus.fifo queue and deletes the message when it’s received. Afterward, the message is no longer available.

import boto3

# Get the service resource
sqs = boto3.resource('sqs')

# Get the queue
queue = sqs.get_queue_by_name(QueueName='TradeStatus.fifo')

# Process messages by printing out body
for message in queue.receive_messages():
    # Print out the body of the message
    print('Hello, {0}'.format(message.body))

    # Let the queue know that the message is processed
    message.delete()

Note: In Python, you need only the name of the queue.

More Resources

In this post, we showed how you can use Amazon SQS FIFO queues to exchange data between distributed systems that depend on receiving messages in exact order, and exactly once. You can get started with SQS FIFO queues using just three simple commands. For more information, see the following resources:

Chalice Version 0.6.0 is Now Available

by James Saryerwinnie | on | in Python | | Comments

The latest preview version of Chalice, our microframework for Python serverless application development, now includes a couple of commonly requested features:

  • Customizing the HTTP response. A new Response class, chalice.Response, enables you to customize the HTTP response by specifying the status code, body, and a mapping of HTTP headers to return. The tutorial in the chalice documentation shows how to use this new functionality to return a non-JSON response to the user.
  • Vendoring binary packages. You can create a top-level vendor/ directory in your application source directory. This vendor directory is automatically included as part of the AWS Lambda deployment package when you deploy your application. You can use this feature for any private Python packages that can’t be specified in your requirements.txt file, as well as any binary content that includes Python packages with C extensions. For more information, see the packaging docs.

Let’s look at the first feature in more detail.

Customizing the HTTP Response

The following example shows a view function that returns a plain text response to the user.

from chalice import Chalice, Response

app = Chalice(app_name='helloworld')

@app.route('/')
def hello_world():
    return Response(
        status_code=200,
        body='hello world',
        headers={'Content-Type': 'text/plain'})

The existing default behavior of returning a JSON response is still preserved. To return a JSON response, you can just return the equivalent Python value directly from your view function.

from chalice import Chalice, Response

app = Chalice(app_name='helloworld')

@app.route('/')
def hello_world():
    return {'hello': 'world'}

You can also use this chalice.Response classto return HTTP redirects to users. In this view function, we accept a URL in the response body and generate a redirect to that URL:

from chalice import Chalice, Response

app = Chalice(app_name='redirect')

@app.route('/redirect', content_types=['text/plain'])
def hello_world():
    url = app.current_request.raw_body.strip()
    return Response(
        status_code=301,
        body='',
        headers={'Location': url})

See the 0.6.0 upgrade notes for more information.

Try out the latest version of Chalice today and let us know what you think. You can chat with us on our gitter channel and file feature requests on our github repo. We look forward to your feedback and suggestions.

Chalice 0.4 & 0.5 Deliver Local Testing and Multifile Application Capabilities for Python Serverless Application Development

by Leah Rivers | on | in Python | | Comments

We’re continuing to add features to Chalice, a preview release of our microframework for Python serverless application development using AWS Lambda and Amazon API Gateway. Chalice is designed to make it simple and fast for Python developers to create REST APIs built in a serverless framework.

In our latest releases, we’ve added initial versions for a couple of the most commonly requested features:

  1. Save time by testing APIs locally before deploying to Amazon API Gateway. In this first version of local testing support for Chalice, we’ve delivered a local HTTP server you can use to test and debug a local version of your python app. This enables you to avoid the work of deploying to API Gateway before you validate APIs.
  2. Build more complex applications with more complex initial support for multifile python apps. Chalice 0.4 enables Python developers to maintain their preferred best practices and coding styles for applications that would not normally be contained within one single file, and to include files of other types as part of the deployment package. This improves on our earlier Chalice releases where deployment packages were limited to the app.py file.

We’ve also improved existing capabilities that make it easier to build and manage your serverless apps.

  1. More configurable logging with improved readability. We’ve added the ability to configure logging for the app object, where previously logging was configured on the root logger. This update enables you to configure logging levels and log format, and eliminates some duplicate log entries seen in previous versions of Chalice.
  2. Improved ability to retrieve your app’s Amazon API Gateway URL. We’ve included a chalice url command which enables you to programmatically retrieve the URL of your API; in previous versions this was a manual process.

Our releases continue to be focused on feedback and requests from the developer community. Want to learn more? Here are a few suggestions.

Try building a serverless application with Chalice. Chalice is available on PyPI (pip install chalice) and GitHub (https://github.com/awslabs/chalice – check out the README for tutorials). It’s published as a preview project and is not yet recommended for production APIs. You can also see our original Chalice blog post where we introduced a preview release of Chalice.

Stay tuned for new capabilities to be released. You can check out the working list of features for our upcoming release here: https://github.com/awslabs/chalice/blob/master/CHANGELOG.rst#next-release-tbd

Let us know what you think. We look forward to your feedback and suggestions. Feel free to leave comments here or come talk to us on GitHub.
Planning to attend AWS re:Invent? Come check out our re:Invent session focused on Chalice where we will present new features and go through demos, such as how to deploy a REST API in less than 30 seconds. You can add this session to your re:Invent schedule here, or sign up for the re:Invent live stream.

Preview the Python Serverless Microframework for AWS

by Peter Moon | on | in Python | | Comments

Serverless computing is one of the most talked-about subjects among AWS customers. The AWS serverless offerings, AWS Lambda and Amazon API Gateway, make it possible for developers to create and run API applications with built-in, virtually unlimited scalability without managing any servers. Today the AWS Developer Tools team is excited to announce the preview of the Python Serverless Microframework for AWS.

This three-minute video shows how quickly you can start building serverless APIs using the framework and its command-line tool, chalice.

In just 45 seconds, I created a new Hello World project, inspected its code file (app.py), deployed it to a public API endpoint, and using curl, made a successful HTTP GET request to the endpoint. Because our goal is to minimize the time it takes to get started, we hope you’ll enjoy the simple and fast experience offered by the new microframework.

In the next minute of the video, I added a new API feature to the app.py file, redeployed the API, and then verified that it works as expected.

If you’ve noticed the programming model feels familiar, that’s because it’s based on the one used by Flask, a popular Python microframework praised by the Python community for its simplicity and ease of use. We believe adopting a similarly succinct and intuitive style will help Python developers build serverless APIs as quickly as possible.

In the last part of the video, you’ll see how the framework makes it easy to consume AWS Lambda’s built-in logging feature available through Amazon CloudWatch Logs. Using the chalice logs and chalice deploy commands together, you can iterate quickly over test-diagnose-fix-deploy cycles in a live environment. The chalice deploy command can optionally take a deployment stage name, and you can deploy different versions of your code to different stages. Using this feature, you can leave your production stage intact while modifying your development stage. Then you can deploy to the production stage when the changes are ready to go out.

The Python Serverless Microframework for AWS is available on PyPI (pip install chalice) and GitHub (https://github.com/awslabs/chalice). It is published as a preview project and is not yet recommended for production APIs.

We look forward your feedback and suggestions. Feel free to leave comments here or come talk to us on GitHub!

How to Analyze AWS Config Snapshots with ElasticSearch and Kibana

by Vladimir Budilov | on | in Python | | Comments

Introduction
In this blog post, I will walk you through a turn-key solution that includes one of our most recently released services, AWS Config. This solution shows how to automate the ingestion of your AWS Config snapshots into the ElasticSearch/Logstash/Kibana (ELK) stack for searching and mapping your AWS environments. Using this functionality, you can do free-form searches, such as “How many EC2 instances are tagged PROD?” or “How many EC2 instances are currently connected to this master security group?”

Prerequisites
In this post, I assume that you have an ELK stack up and running. (Although the “L” isn’t really required, the ELK acronym has stuck, so I’ll continue to use it.)

Here are some ways to get the ELK stack up and running:

  1. You can use our Amazon ElasticSearch Service, which provides the two main components you’ll be using: ElasticSearch and Kibana.
  2. Take a look at this excellent post by Logz.io. It provides step-by-step instructions for installing the ELK stack on an EC2 instance.
  3. You can install Docker locally or create an Amazon EC2 Container Service (Amazon ECS) cluster and then install the ELK Docker image. Follow the instructions here.

You can download the python app referenced in this post from https://github.com/awslabs/aws-config-to-elasticsearch

Why AWS Config?
AWS Config provides a detailed view of your configurations of your AWS resources and their relationships to other resources. For example, you can find out which resources are set up in your default VPC or which Availability Zone has the most EC2 instances. AWS Config also captures the history of configuration changes made to these resources and allows you to look them up through an API. The service allows you to create one-time snapshots or turn on configuration recording, which provides change snapshots and notifications.

Why ELK?
ElasticSearch and Kibana are some of the most popular free, open-source solutions out there to analyze and visualize data. ElasticSearch, which is built on the Lucene search engine, allows for schema-less data ingestion and querying. It provides out-of-the-box data analysis queries and filters, such as data aggregates and term counts. Kibana is the visualization and searching UI that opens up the ElasticSearch data to the regular user.

The Solution
I’ve created a Python app that automates the process of getting AWS Config data from your AWS account to ELK. In short, it asks AWS Config to take a snapshot in each region in which you have the service enabled; waits until the snapshot is uploaded to the configured Amazon S3 bucket; copies the snapshot from the S3 bucket; parses the snapshot (which is just a huge JSON blob); and ingests the JSON array elements into ELK.

Running the Script
You have a couple of options when you run the app. You can specify the region that you want to export and load by including -r and the region name as shown:

./esingest.py –d localhost:9200 –r us-east-1

Or you can simply include the destination (which is required). The app will loop over all of the regions. The following output is an example of what you would see if you don’t specify the region:

./esingest.py –d localhost:9200

figure-1-app-output

Working with Kibana
Now that you have ingested the data into ElasticSearch, you need to use Kibana to index the data. The first time you open Kibana, the Settings page will be displayed. Use this page to configure the searchable index. For simplicity’s sake, under Index name or pattern, type *, and for Time-field name, choose snapshotTimeIso. You can use any date field from the drop-down list, such as resourceCreationTime:

figure-2-kibana-configuration

This will index all of your ElasticSearch indices and use the snapshotTimeIso as the time-series field. You will have duplicates if you run esingest without deleting the current ELK indices, but you will be able to include the snapshot time in your search queries to get time-based results.

Now that we have indexed the data in Kibana, let’s do some searching. Choose the Discover tab and change the time filter by clicking the text in the upper-right corner:

figure-3-kibana-discover

For now, choose Last 5 years, and then minimize the Time Filter section.

For our first search, type resourceType: "aws::ec2::instance" in the text field. You will see all of your EC2 instances in the search results. The time graph shows when they were added to ElasticSearch. Because I ran esingest just once, there’s only one Config snapshot loaded, and only one timestamp will show up.

figure-4-kibana-search-instances

There are many other search queries you can use. Kibana supports the Lucene query syntax, so see this tutorial for examples and ideas.
As you can see, the time filter shows when the data was ingested into ElasticSearch. You might have duplicates here, so you can specify the instance ID and the exact snapshot time (input: resourceType: “*Instance*” AND “sg-a6f641c0*”)

figure-5-search-instances-and-securitygroup

Kibana Visualize Functionality
In addition to search functionality, Kibana provides a way to visualize search results and create search slices. Let’s look at some real-world use cases that I’ve encountered while talking to customers. Click the Visualize tab, choose Pie Chart, and start exploring!

What’s my EC2 distribution between Availability Zones?
Input: resourceType: “aws::ec2::Instance”

figure-6-kibana-visuralize-instances

Let’s create a sub-aggregation and add the tags that are assigned to those EC2 instances:

Input: resourceType: “aws::ec2::Instance”

figure-7-kibana-visualize-instances

Which AMIs were used to create your EC2 instances, and when were they created?
Input: *

figure-8-kibana-visualize-instances-and-regions

How many instances use a security group that you have set up?
Input: “sg-a6f641c0*”

figure-9-kibana-visualize-instances-and-sg

Conclusion
AWS Config is a useful tool for understanding what’s running in your AWS account. The combination of ELK and AWS Config offers AWS admins a lot of advantages that are worth exploring.

Serverless Service Discovery: Part 4: Registrar

by Magnus Bjorkman | on | in Python | | Comments

In this, the last part of our serverless service discovery series, we will show how to register and look up a new service. We will add these components:

AWS Lambda Registrar Agent

In Docker, it is common to have container agents that add functionality to your Docker deployment. We will borrow from this concept and build a Lambda registrar agent that will manage the registration and monitoring of a service.


def component_status(lambda_functions, rest_api_id):
    """Checking component status of REST API."""
    any_existing = False
    any_gone = False
    client = boto3.client('lambda')
    for lambda_function in lambda_functions:
        try:
            logger.info("checking Lambda: %s" % (lambda_function,))
            client.get_function_configuration(
                            FunctionName=lambda_function)
            any_existing = True
        except botocore.exceptions.ClientError:
            any_gone = True

    client = boto3.client('apigateway')
    try:
        logger.info("checking Rest API: %s" % (rest_api_id,))
        client.get_rest_api(restApiId=rest_api_id)
        any_existing = True
    except botocore.exceptions.ClientError:
        any_gone = True

    if (not any_existing):
        return "service_removed"
    elif (any_gone):
        return "unhealthy"
    else:
        return "healthy"


def lambda_handler(event, context):
    """Lambda hander for agent service registration."""
    with open('tmp/service_properties.json') as json_data:
        service_properties = json.load(json_data)

    logger.info("service_name: %s" % (service_properties['service_name'],))
    logger.info("service_version: %s" % (service_properties['service_version'],))

    status = component_status(service_properties['lambda_functions'],
                              service_properties['rest_api_id'])

    register_request = {
            "service_name": service_properties['service_name'],
            "service_version": service_properties['service_version'],
            "endpoint_url": service_properties['endpoint_url'],
            "ttl": "300"
            }
    if (status == 'healthy'):
        logger.info('registering healthy service')

        register_request["status"] = 'healthy'

        response = signed_post(
          service_properties['discovery_service_endpoint']+"/catalog/register",
          "us-east-1",
          "execute-api",
          json.dumps(register_request))


    elif (status == 'unhealthy'):
        logger.info('registering unhealthy service')

        register_request["status"] = 'unhealthy'

        response = signed_post(
          service_properties['discovery_service_endpoint']+"/catalog/register",
          "us-east-1",
          "execute-api",
          json.dumps(register_request))

    else:
        logger.info('removing service and registrar')

        deregister_request = {
            "service_name": service_properties['service_name'],
            "service_version": service_properties['service_version']
            }

        response = signed_post(
            service_properties['discovery_service_endpoint'] +
            "/catalog/deregister",
            "us-east-1",
            "execute-api",
            json.dumps(deregister_request))

        client = boto3.client('lambda')
        client.delete_function(
                 FunctionName=service_properties['registrar_name'])

The Lambda registrar agent is packaged with a property file that defines the Lambda functions and Amazon API Gateway deployment that are part of the service. The registrar agent uses the component_status function to inspect the state of those parts and takes action, depending on what it discovers:

  • If all of the parts are there, the service is considered healthy. The register function is called with the service information and a healthy status.
  • If only some of the parts are there, the service is considered unhealthy. The register function is called with the service information and an unhealthy status.
  • If none of the parts are there, the service is considered to have been removed. The deregister function is called, and the Lambda agent will delete itself because it is no longer needed.

Subsequent register function calls will overwrite the information, so as the health status of our services changes, we can call the function repeatedly. In fact, when we deploy the agent with our Hello World service, we will show how to put the Lambda registrar agent on a five-minute schedule to continuously monitor our service.

Deploy the Hello World Service with the Lambda Agent

We will first implement our simple Hello World Lambda function:


def lambda_handler(api_parameters, context):
    """Hello World Lambda function."""
    return {
            "message": "Hello "+api_parameters['name']
            }

We will create a Swagger file for the service:


{
  "swagger": "2.0",
  "info": {
    "title": "helloworld_service",
    "version": "1.0.0"
  },
  "basePath": "/v1",
  "schemes": ["https"],
  "consumes": ["application/json"],
  "produces": ["application/json"],
  "paths": {
    "/helloworld/{name}": {
      "parameters": [{
        "name": "name",
        "in": "path",
        "description": "The name to say hello to.",
        "required": true,
        "type": "string"
      }],
      "get": {
        "responses": {
          "200": {
            "description": "Hello World message"
          }
        },
        "x-amazon-apigateway-integration": {
          "type": "aws",
          "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$helloworld_serviceARN$/invocations",
          "httpMethod": "POST",
          "requestTemplates": {
            "application/json": "{\"name\": \"$input.params('name')\"}"
          },
          "responses": {
            "default": {
              "statusCode": "200",
              "schema": {
                "$ref": "#/definitions/HelloWorldModel"
              }
            }
          }
        }
      }
    }
  },
  "definitions": {
    "HelloWorldModel": {
      "type": "object",
      "properties": {
        "message": {
          "type": "string"
        }
      },
      "required": ["message"]
    }
  }
}

Now we are ready to pull everything we have done in this blog series together: we will deploy this service with a Lambda registrar agent that registers and deregisters it with our serverless discovery service. First, we need to add the requests Python module to the directory we are deploying from because our Lambda registrar agent is dependent on it.


pip install requests -t /path/to/project-dir

Second, we deploy the Hello World service and the Lambda registrar agent:


ACCOUNT_NUMBER = _your aws account number

######################################
# Deploy Hello World Service
######################################
create_deployment_package("/tmp/helloworld.zip", ["helloworld_service.py"])
hello_world_arn = create_lambda_function(
                       "/tmp/helloworld.zip",
                       "helloworld_service",
                       "arn:aws:iam::"+ACCOUNT_NUMBER+":role/lambda_s3",
                       "helloworld_service.lambda_handler",
                       "Hello World service.",
                       ACCOUNT_NUMBER)
replace_instances_in_file("swagger.json",
                          "/tmp/swagger_with_arn.json",
                          "$helloworld_serviceARN$",
                          hello_world_arn)
api_id = create_api("/tmp/swagger_with_arn.json")
rest_api_id, stage, endpoint_url = deploy_api(api_id, "/tmp/swagger_with_arn.json", "dev")

######################################
# Deploy Lambda Registrar Agent
######################################
with open('/tmp/service_properties.json',
          'w') as outfile:
    json.dump(
      {
       "lambda_functions": ["helloworld_service"],
       "rest_api_id": rest_api_id,
       "stage": stage,
       "endpoint_url": endpoint_url,
       "service_name": "helloworld",
       "service_version": "1.0",
       "discovery_service_endpoint":
       "https://1vvw0qvh4i.execute-api.us-east-1.amazonaws.com/dev",
       "registrar_name": "registrar_"+rest_api_id
       }, outfile)

create_deployment_package("/tmp/helloworld_registrar.zip",
                          ["registrar.py", "/tmp/service_properties.json",
                           "requests"])
registrar_arn = create_lambda_function(
                       "/tmp/helloworld_registrar.zip",
                       "registrar_"+rest_api_id,
                       "arn:aws:iam::"+ACCOUNT_NUMBER+":role/lambda_s3",
                       "registrar.lambda_handler",
                       "Registrar for Hello World service.",
                       ACCOUNT_NUMBER)

After we have deployed the Hello World service, we create a JSON file (service_properties.json) with some of the outputs from that deployment. This JSON file is packaged with the Lambda registrar agent.

Both the service and the agent are now deployed, but nothing is triggering the agent to execute. We will use the following to create a five-minute monitoring schedule using CloudWatch events:


client = boto3.client('events')
response = client.put_rule(
    Name="registrar_"+rest_api_id,
    ScheduleExpression='rate(5 minutes)',
    State='ENABLED'
)
rule_arn = response['RuleArn']

lambda_client = boto3.client('lambda')
response = lambda_client.add_permission(
        FunctionName=registrar_arn,
        StatementId="registrar_"+rest_api_id,
        Action="lambda:InvokeFunction",
        Principal="events.amazonaws.com",
        SourceArn=rule_arn
    )

response = client.put_targets(
    Rule="registrar_"+rest_api_id,
    Targets=[
        {
            'Id': "registrar_"+rest_api_id,
            'Arn': registrar_arn
        },
    ]
)

Now we have deployed a service that is being continuously updated in the discovery service. We can use it like this:


############################
# 1. Do service lookup
############################
request_url="https://yourrestapiid.execute-api.us-east-1.amazonaws.com/"\
            "dev/catalog/helloworld/1.0"
response = requests.get(request_url)
json_response = json.loads(response.content)


############################
# 2. Use the service
############################
request_url=("%s/helloworld/Magnus" % (json_response['endpoint_url'],))

response = requests.get(request_url)
json_response = json.loads(response.content)
logger.info("Message: %s" % (json_response['message'],))

We should get the following output:


INFO:root:Message: Hello Magnus

Summary

We have implemented a fairly simple but functional discovery service without provisioning any servers or containers. We can build on this by adding more advanced monitoring, circuit breakers, caching, additional protocols for discovery, etc. By providing a stable host name for our discovery service (instead of the one generated by API Gateway), we can make that a central part of our microservices architecture.

We showed how to use Amazon API Gateway and AWS Lambda to build a discovery service using Python, but the approach is general. It should work for other services you want to build. The examples provided for creating and updating the services can be enhanced and integrated into any CI/CD platforms to create a fully automated deployment pipeline.

Serverless Service Discovery: Part 3: Registration

by Magnus Bjorkman | on | in Python | | Comments

In this, the third part of our serverless service discovery series, we will show how to configure Amazon API Gateway to require AWS Identity and Access Management (IAM) for authentication and how to create a V4 signature to call our register and deregister methods.

We have created all the functions required to manage our API and code, so we can jump directly into creating our new functions.

Registering and Deregistering Services

We start by creating a Lambda function for registering a service:


def lambda_handler(api_parameters, context):
    """Lambda hander for registering a service."""
    logger.info("lambda_handler - service_name: %s"
                " service_version: %s"
                % (api_parameters["service_name"],
                   api_parameters["service_version"]))

    table = boto3.resource('dynamodb',
                           region_name='us-east-1').Table('Services')

    table.put_item(
           Item={
                'name': api_parameters["service_name"],
                'version': api_parameters["service_version"],
                'endpoint_url': api_parameters["endpoint_url"],
                'ttl': int(api_parameters["ttl"]),
                'status': api_parameters["status"],
            }
        )

This function takes the input and stores it in Amazon DynamoDB. If you call the function with the same service name and version (our DynamoDB key), then it will overwrite the existing item.

Followed by the function to deregister:


def lambda_handler(api_parameters, context):
    """Lambda hander for deregistering a service."""
    logger.info("lambda_handler - service_name: %s"
                " service_version: %s"
                % (api_parameters["service_name"],
                   api_parameters["service_version"]))

    table = boto3.resource('dynamodb',
                           region_name='us-east-1').Table('Services')

    table.delete_item(
            Key={
                'name': api_parameters["service_name"],
                'version': api_parameters["service_version"]
            }
        )

The function removes the item from the DynamoDB table based on the service name and version.

We need to add the new functions and API methods to the Swagger file:


{
  "swagger": "2.0",
  "info": {
    "title": "catalog_service",
    "version": "1.0.0"
  },
  "basePath": "/v1",
  "schemes": ["https"],
  "consumes": ["application/json"],
  "produces": ["application/json"],
  "paths": {
    "/catalog/{serviceName}/{serviceVersion}": {
      "parameters": [{
        "name": "serviceName",
        "in": "path",
        "description": "The name of the service to look up.",
        "required": true,
        "type": "string"
      },
      {
        "name": "serviceVersion",
        "in": "path",
        "description": "The version of the service to look up.",
        "required": true,
        "type": "string"
      }],
      "get": {
        "responses": {
          "200": {
            "description": "version information"
          },
          "404": {
            "description": "service not found"
          }
        },
        "x-amazon-apigateway-integration": {
          "type": "aws",
          "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$catalog_serviceARN$/invocations",
          "httpMethod": "POST",
          "requestTemplates": {
            "application/json": "{\"service_name\": \"$input.params('serviceName')\",\"service_version\": \"$input.params('serviceVersion')\"}"
          },
          "responses": {
            "default": {
              "statusCode": "200",
              "schema": {
                "$ref": "#/definitions/CatalogServiceModel"
              }
            },
            ".*NotFound.*": {
              "statusCode": "404",
              "responseTemplates" : {
                 "application/json": "{\"error_message\":\"Service Not Found\"}"
                } 
            } 
          }
        }
      }
    },
    "/catalog/register": {
      "post": {
        "responses": {
          "201": {
            "description": "service registered"
          }
        },
        "parameters": [{
          "name": "body",
          "in": "body",
          "description": "body object",
          "required": true,
          "schema": {
            "$ref":"#/definitions/CatalogRegisterModel"
          }
        }],
        "x-amazon-apigateway-auth" : {
          "type" : "aws_iam" 
        },
        "x-amazon-apigateway-integration": {
          "type": "aws",
          "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$catalog_registerARN$/invocations",
          "httpMethod": "POST",
          "requestTemplates": {
            "application/json": "$input.json('$')"
          },
          "responses": {
            "default": {
              "statusCode": "201"
            } 
          }
        }
      } 
    },
    "/catalog/deregister": {
      "post": {
        "responses": {
          "201": {
            "description": "service deregistered"
          }
        },
        "parameters": [{
          "name": "body",
          "in": "body",
          "description": "body object",
          "required": true,
          "schema": {
            "$ref":"#/definitions/CatalogDeregisterModel"
          }
        }],
        "x-amazon-apigateway-auth" : {
          "type" : "aws_iam" 
        },
        "x-amazon-apigateway-integration": {
          "type": "aws",
          "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$catalog_deregisterARN$/invocations",
          "httpMethod": "POST",
          "requestTemplates": {
            "application/json": "$input.json('$')"
          },
          "responses": {
            "default": {
              "statusCode": "201"
            } 
          }
        }
      } 
    }
  },
  "definitions": {
    "CatalogServiceModel": {
      "type": "object",
      "properties": {
        "endpoint_url": {
          "type": "string"
        },
        "ttl": {
          "type": "integer"
        },
        "status": {
          "type": "string"
        }
      },
      "required": ["endpoint_url", "ttl", "status"]
    },
    "CatalogRegisterModel": {
      "type": "object",
      "properties": {
        "service_name": {
          "type": "string"
        },
        "service_version": {
          "type": "string"
        },
        "endpoint_url": {
          "type": "string"
        },
        "ttl": {
          "type": "integer"
        },
        "status": {
          "type": "string"
        }
      },
      "required": ["service_name","service_version","endpoint_url", "ttl", "status"]
    },
    "CatalogDeregisterModel": {
      "type": "object",
      "properties": {
        "service_name": {
          "type": "string"
        },
        "service_version": {
          "type": "string"
        }
      },
      "required": ["service_name","service_version"]
    }
  }
}

The new methods will be POST-based, so we need to define models (CatalogRegisterModel and CatalogDeregisterModel) for the data passed through the method body. After API Gateway processes the models, the JSON objects will be passed, as is, to the Lambda functions.

We set the x-amazon-apigateway-auth element to the type of aws_iam for the register and deregister methods, so API Gateway will require a V4 signature when we access them.

We can now deploy our new functions:


ACCOUNT_NUMBER = _your account number_

create_deployment_package("/tmp/catalog_register.zip", ["catalog_register.py"])
catalog_register_arn = create_lambda_function(
                       "/tmp/catalog_register.zip",
                       "catalog_register",
                       "arn:aws:iam::"+ACCOUNT_NUMBER+":role/lambda_s3",
                       "catalog_register.lambda_handler",
                       "Registering a service.",
                       ACCOUNT_NUMBER)
replace_instances_in_file("swagger.json",
                          "/tmp/swagger_with_arn.json",
                          "$catalog_registerARN$", catalog_register_arn)
create_deployment_package("/tmp/catalog_deregister.zip",
                          ["catalog_deregister.py"])
catalog_deregister_arn = create_lambda_function(
                       "/tmp/catalog_deregister.zip",
                       "catalog_deregister",
                       "arn:aws:iam::"+ACCOUNT_NUMBER+":role/lambda_s3",
                       "catalog_deregister.lambda_handler",
                       "Deregistering a service.",
                       ACCOUNT_NUMBER)
replace_instances_in_file("/tmp/swagger_with_arn.json",
                          "/tmp/swagger_with_arn.json",
                          "$catalog_deregisterARN$", catalog_deregister_arn)
catalog_service_arn = get_function_arn("catalog_service")
replace_instances_in_file("/tmp/swagger_with_arn.json",
                          "/tmp/swagger_with_arn.json",
                          "$catalog_serviceARN$", catalog_service_arn)
api_id = update_api("/tmp/swagger_with_arn.json")
deploy_api(api_id, "/tmp/swagger_with_arn.json", "dev")

We can try out the new register service like this:


json_body = {
            "service_name": "registerservice3",
            "service_version": "1.0",
            "endpoint_url": "notarealurlregister3",
            "ttl": "300",
            "status": "healthy"
            }
request_url = "https://yourrestapi.execute-api.us-east-1.amazonaws.com/"\
              "dev/catalog/register"
response = requests.post(
            request_url,
            data=json.dumps(json_body))
if(not response.ok):
    logger.error("Error code: %i" % (response.status_code,))


We should get something like this:


ERROR:root:Error code: 403

Signing a Request with Signature Version 4

To successfully call our new services, we need to implement a client that will sign the request to the API with a Version 4 signature. First we implement the functions that creates the signature:


from botocore.credentials import get_credentials
from botocore.session import get_session
import requests
import json
import logging
import sys
import datetime
import hashlib
import hmac
import urlparse
import urllib
from collections import OrderedDict

def sign(key, msg):
    """Sign string with key."""
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()


def getSignatureKey(key, dateStamp, regionName, serviceName):
    """Create signature key."""
    kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, 'aws4_request')
    return kSigning


def create_canonical_querystring(params):
    """Create canonical query string."""
    ordered_params = OrderedDict(sorted(params.items(), key=lambda t: t[0]))
    canonical_querystring = ""
    for key, value in ordered_params.iteritems():
        if len(canonical_querystring) > 0:
            canonical_querystring += ","
        canonical_querystring += key+"="+value
    return canonical_querystring


def sign_request(method, url, credentials, region, service, body=''):
    """Sign a HTTP request with AWS V4 signature."""
    ###############################
    # 1. Create a Canonical Request
    ###############################
    t = datetime.datetime.utcnow()
    amzdate = t.strftime('%Y%m%dT%H%M%SZ')
    # Date w/o time, used in credential scope
    datestamp = t.strftime('%Y%m%d')

    # Create the different parts of the request, with content sorted
    # in the prescribed order
    parsed_url = urlparse.urlparse(url)
    canonical_uri = parsed_url.path
    canonical_querystring = create_canonical_querystring(
                              urlparse.parse_qs(parsed_url.query))
    canonical_headers = ("host:%s\n"
                         "x-amz-date:%s\n" %
                         (parsed_url.hostname, amzdate))
    signed_headers = 'host;x-amz-date'
    if (not (credentials.token is None)):
        canonical_headers += ("x-amz-security-token:%s\n") % (credentials.token,)
        signed_headers += ';x-amz-security-token'

    payload_hash = hashlib.sha256(body).hexdigest()
    canonical_request = ("%s\n%s\n%s\n%s\n%s\n%s" %
                         (method,
                          urllib.quote(canonical_uri),
                          canonical_querystring,
                          canonical_headers,
                          signed_headers,
                          payload_hash))

    #####################################
    # 2. Create a String to Sign
    #####################################
    algorithm = 'AWS4-HMAC-SHA256'
    credential_scope = ("%s/%s/%s/aws4_request" % 
                        (datestamp,
                         region,
                         service))
    string_to_sign = ("%s\n%s\n%s\n%s" %
                       (algorithm,
                        amzdate,
                        credential_scope,
                        hashlib.sha256(canonical_request).hexdigest()))
    #####################################
    # 3. Create a Signature
    #####################################
    signing_key = getSignatureKey(credentials.secret_key,
                                  datestamp, region, service)
    signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'),
                         hashlib.sha256).hexdigest()

    ######################################################
    # 4. Assemble request to it can be used for submission
    ######################################################
    authorization_header = ("%s Credential=%s/%s, "
                            "SignedHeaders=%s, "
                            "Signature=%s" %
                            (algorithm,
                             credentials.access_key,
                             credential_scope,
                             signed_headers,
                             signature))
    headers = {'x-amz-date': amzdate, 'Authorization': authorization_header}
    if (not (credentials.token is None)):
        headers['x-amz-security-token'] = credentials.token
    request_url = ("%s://%s%s" % 
                   (parsed_url.scheme,parsed_url.netloc,canonical_uri))
    if (len(canonical_querystring) > 0):
        request_url += ("?%s" % (canonical_querystring,))

    return request_url, headers, body

The main function, sign_request, can sign requests for both POST and GET methods. It also works with both short and long term credentials. For more information about creating Signature Version 4 requests, see Signing Requests

We implement the following method to submit a POST request:


def signed_post(url, region, service, data, **kwargs):
    """Signed post with AWS V4 Signature."""
    credentials = get_credentials(get_session())

    request_url, headers, body = sign_request("POST", url, credentials, region,
                                              service, body=data)

    return requests.post(request_url, headers=headers, data=body, **kwargs)

We are using botocore functionality to get the configured keys on the instance we are running. If we are running this on an Amazon EC2 instance or AWS Lambda, botocore will use the configured IAM role.

We can now test the service by calling register:


json_body = {
            "service_name": "registerservice6",
            "service_version": "1.0",
            "endpoint_url": "notarealurlregister6",
            "ttl": "300",
            "status": "healthy"
            }
request_url = "https://yourrestapiid.execute-api.us-east-1.amazonaws.com/"\
              "dev/catalog/register"
response = signed_post(
            request_url,
            "us-east-1",
            "execute-api",
            json.dumps(json_body))
if(not response.ok):
    logger.error("Error code: %i" % (response.status_code,))
else:
    logger.info("Successfully registered the service.")

The test should complete without a failure. To test, look up this item:


request_url="https://your_rest_api_id.execute-api.us-east-1.amazonaws.com/"\
            "dev/v1/catalog/registerservice6/1.0"
response = requests.get(request_url)
json_response = json.loads(response.content)
logging.info("Endpoint URL: %s" % (json_response['endpoint_url'],))
logging.info("TTL: %i" % (json_response['ttl'],))
logging.info("Status: %s" % (json_response['status'],))

You should get the following output:


INFO:root:Endpoint URL: notarealurlregister6
INFO:root:TTL: 300
INFO:root:Status: healthy

Serverless Service Discovery: Part 2: Lookup

by Magnus Bjorkman | on | in Python | | Comments

In this, the second part of our serverless service discovery series, we will use Amazon DynamoDB to store information about the services in our service discovery service and update our AWS Lambda function to read information from the DynamoDB table.

Updating Amazon API Gateway and AWS Lambda

As part of adding new functionality to what we are implementing, we need to update the Service Discovery API and the code in API Gateway and AWS Lambda. First, we update the code in AWS Lambda:


def get_function_arn(function_name):
    """Return ARN given the Function Name.

    :param function_name: The name of the Lambda function.
    :return: The ARN for the Lambda function.
    """
    client = boto3.client('lambda')

    response = client.get_function(
        FunctionName=function_name
    )

    return response['Configuration']['FunctionArn']


def update_lambda_function(package_name, function_name):
    """Update a Lambda function from zip-file.

    :param package_name: The name of the package. Full or relative path.
    :param function_name: The name of the Lambda function.
    :return: The ARN for the Lambda function.
    """
    with open(package_name, "rb") as package_file:
        package_data = package_file.read()

    # connect to Lambda API
    client = boto3.client('lambda')

    # update the function code
    client.update_function_code(
        FunctionName=function_name,
        ZipFile=package_data,
        Publish=True
    )

    # get function configuration to get top level ARN
    return get_function_arn(function_name)

The get_function_arn returns the top level ARN for the Lambda function. When we update the code, we get the ARN for the version that was uploaded, but we need to use the top level ARN with Swagger.

The update_lambda_function updates the code of the Lambda function only. There are other functions in the Lambda API to update other configurations for Lambda functions.

Next we update the API with Swagger:


def update_api(swagger_file_name):
    """Update an API defined in Swagger.

    :param swagger_file_name: The name of the swagger file.
                              Full or relative path.
    :return: The id of the REST API.
    """
    # get the API Gateway ID of the existing API
    rest_api_name = get_rest_api_name(swagger_file_name)
    client = boto3.client('apigateway')
    paginator = client.get_paginator('get_rest_apis')
    rest_api_id = ""
    for response in paginator.paginate():
        for item in response["items"]:
            if (rest_api_name == item["name"]):
                rest_api_id = item["id"]

    with open(swagger_file_name, "r") as swagger_file:
        swagger_data = swagger_file.read()

    response = client.put_rest_api(restApiId=rest_api_id,
                                   body=swagger_data)

    return response['id']

We first query API Gateway for the REST API identifier based on the name in the Swagger file. We need to submit this identifier for the update to work.

We can use the same deployment method we used in the first blog post to deploy the update to the API Gateway stage. If the stage already exists, it will just be updated.

Creating a DynamoDB Table

We need to be able to persistently store the information about a service, and we need to be able to create, update, and query that information. DynamoDB is a fully managed serverless service that works well with AWS Lambda. We will start by creating a table for our discovery service:


dynamodb = boto3.resource('dynamodb', region_name = 'us-east-1')  

# create the table
table = dynamodb.create_table(
    TableName='Services',
    KeySchema=[ { 'AttributeName': 'name',
                  'KeyType': 'HASH' }, 
                { 'AttributeName': 'version',
                  'KeyType': 'RANGE' } ],
    AttributeDefinitions=[ { 'AttributeName': 'name',
                             'AttributeType': 'S' },
                           { 'AttributeName': 'version',
                             'AttributeType': 'S' }, ],
    ProvisionedThroughput={ 'ReadCapacityUnits': 10,
                            'WriteCapacityUnits': 10 } )

# wait for the table to be ready
# this will block until the table is ACTIVE
table = boto3.resource('dynamodb').Table('Services')
table.wait_until_exists()

# insert some test data
with table.batch_writer() as batch:
    batch.put_item(Item={
                'name': 'testservice1', 
                'version': '1.0', 
                'endpoint_url': 'notarealurl1',
                'ttl': 300,
                'status': 'healthy' })
    batch.put_item(Item={
                'name': 'testservice2', 
                'version': '1.0', 
                'endpoint_url': 'notarealurl2',
                'ttl': 600,
                'status': 'healthy' })

The only attributes that we need to define are the keys. We will store additional attributes, but we can add them as we store items in the table.
We are using the name and version as the key because that matches our access pattern.
We are provisioning 10 read and write capacity units for the table. The number can be adjusted, depending on the amount of traffic the service receives and the effectiveness of the client caching.
After the table is created and active, we then prepare for the testing of our lookup service by inserting a couple of test records.

Looking Up Service Information from a DynamoDB Table

We are now ready to update our Lambda function so we can start using our DynamoDB table:


def lambda_handler(api_parameters, context):
    """Lambda hander for service lookup."""
    logger.info("lambda_handler - service_name: %s"
                " service_version: %s"
                % (api_parameters["service_name"],api_parameters["service_version"]))

    table = boto3.resource('dynamodb',region_name='us-east-1').Table('Services')

    dynamodb_response = table.get_item(
                    Key={
                        'name': str(api_parameters["service_name"]),
                        'version': str(api_parameters["service_version"])
                    }
                )

    if ('Item' in dynamodb_response):
        logger.info("found service with: %s" %
                     (dynamodb_response['Item']['endpoint_url'],))
        return {
            "endpoint_url": dynamodb_response['Item']['endpoint_url'],
            "ttl": dynamodb_response['Item']['ttl'],
            "status": dynamodb_response['Item']['status']
            }
    else:
        raise Exception('NotFound')

The function gets the item from the table, and then returns a JSON object with the information to the client.

Notice that we throw an exception if we don’t find a record in DynamoDB. We can use this exception in API Gateway to map to a 404 HTTP code. We update the two response sections in the Swagger file to make that happen:


{
  "swagger": "2.0",
  "info": {
    "title": "catalog_service",
    "version": "1.0.0"
  },
  "basePath": "/v1",
  "schemes": ["https"],
  "consumes": ["application/json"],
  "produces": ["application/json"],
  "paths": {
    "/catalog/{serviceName}/{serviceVersion}": {
      "parameters": [{
        "name": "serviceName",
        "in": "path",
        "description": "The name of the service to look up.",
        "required": true,
        "type": "string"
      },
      {
        "name": "serviceVersion",
        "in": "path",
        "description": "The version of the service to look up.",
        "required": true,
        "type": "string"
      }],
      "get": {
        "responses": {
          "200": {
            "description": "version information"
          },
          "404": {
            "description": "service not found"
          }
        },
        "x-amazon-apigateway-integration": {
          "type": "aws",
          "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$catalog_serviceARN$/invocations",
          "httpMethod": "POST",
          "requestTemplates": {
            "application/json": "{\"service_name\": \"$input.params('serviceName')\",\"service_version\": \"$input.params('serviceVersion')\"}"
          },
          "responses": {
            "default": {
              "statusCode": "200",
              "schema": {
                "$ref": "#/definitions/CatalogServiceModel"
              }
            },
            ".*NotFound.*": {
              "statusCode": "404",
              "responseTemplates" : {
                 "application/json": "{\"error_message\":\"Service Not Found\"}"
                } 
            } 
          }
        }
      }
    }
  },
  "definitions": {
    "CatalogServiceModel": {
      "type": "object",
      "properties": {
        "endpoint_url": {
          "type": "string"
        },
        "ttl": {
          "type": "integer"
        },
        "status": {
          "type": "string"
        }
      },
      "required": ["endpoint_url", "ttl", "status"]
    }
  }
}

We use a regular expression (.*NotFound.*) under x-amazon-apigateway-integration -> responses to catch our exception and map it to a static JSON message.

We can now put everything together and update the code and API:


create_deployment_package("/tmp/catalog_service.zip", ["catalog_service.py"])
function_arn = update_lambda_function(
                       "/tmp/catalog_service.zip",
                       "catalog_service")
replace_instances_in_file("swagger.json",
                          "/tmp/swagger_with_arn.json",
                          "$catalog_serviceARN$", function_arn)
api_id = update_api("/tmp/swagger_with_arn.json")
deploy_api(api_id, "/tmp/swagger_with_arn.json", "dev")

We can use the following to test our deployment:


request_url="https://yourrestapiid.execute-api.us-east-1.amazonaws.com/"\
            "dev/catalog/testservice2/1.0"
response = requests.get(request_url)
json_response = json.loads(response.content)
logging.info("Endpoint URL: %s" % (json_response['endpoint_url'],))
logging.info("TTL: %i" % (json_response['ttl'],))
logging.info("Status: %s" % (json_response['status'],))

That should give us the following results:


INFO:root:Endpoint URL: notarealurl2
INFO:root:TTL: 600
INFO:root:Status: healthy

Serverless Service Discovery – Part 1: Get Started

by Magnus Bjorkman | on | in Python | | Comments

AWS provides a lot of features and services for writing serverless architectures with Python. In this four-part series, we will show how you can use Python to manage and implement Amazon API Gateway, AWS Lambda, and Amazon DynamoDB. We will use a common use case, service discovery, to showcase a simple way to do this with Python and boto3. Service discovery is a foundational service for microservices. There are many implementations running on servers or in containers, including Consul by HashiCorp and ZooKeeper from Apache.

This four-part series will cover the following topics:

  • Part 1: Get Started: Using Python and Swagger to Deploy to Amazon API Gateway and AWS Lambda
  • Part 2: Lookup: Looking Up Service Information in Amazon DynamoDB from AWS Lambda
  • Part 3: Registration: Using Signature Version 4 Authentication to API Gateway and AWS Lambda
  • Part 4: Registrar: Using a Registrar Agent in AWS Lambda to Manage Service Registration

By the end of the series, we will have built the system shown in this diagram:

Then we will be able to use a client that can look up a Hello World service in the discovery service, and call the Hello World service. We will also implement a registrar agent with the Hello World service that will keep the information about the Hello World service up-to-date in the discovery service.

Today’s post will cover these areas of our overall design:

We will create the basics of setting up a service running on API Gateway and Lambda. So we can do something easy to get us started, for this first step, we will return hard-coded values in the service.

We will create a few functions to make it easy to manage our serverless architecture. These are all of the imports used by management functions in this series:

import json
import os
import logging
import zipfile
import boto3

We set the log level to INFO:


logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)

Creating an AWS Lambda Function

We start with a couple of utility methods that will help us package a list of files or directories into a zip file that can be used with AWS Lambda:

def zipdir(path, ziph):
    """Add directory to zip file.

    :param path: The top level path to traverse to discover files to add.
    :param ziph: A handle to a zip file to add files to.
    """
    for root, dirs, files in os.walk(path):
        for file in files:
            ziph.write(os.path.join(root, file))


def create_deployment_package(package_name, file_names):
    """Create a deployment package for Lambda.

    :param package_name: The name of the package. Full or relative path.
    :param file_names: Files or folders to add to the package.
    """
    ziph = zipfile.ZipFile(package_name, "w", zipfile.ZIP_DEFLATED)
    for file_name in file_names:
        if (os.path.isdir(file_name)):
            zipdir(file_name, ziph)
        else:
            ziph.write(file_name)
    ziph.close()

The next function will use the package we just created to create the Lambda function:


def create_lambda_function(package_name, function_name, role,
                           handler, description, account_number):
    """Create a Lambda function from zip-file.

    :param package_name: The name of the package. Full or relative path.
    :param function_name: The name of the Lambda function to create.
    :param role: The Role ARN to use when executing Lambda function
    :param handler: The handler to execute when the Lambda function is called.
    :param description: The description of the Lambda function.
    :param: account_number: The Account number of the API Gateway using this
                            function.
    :return: The ARN for the Lambda function.
    """
    with open(package_name, "rb") as package_file:
        package_data = package_file.read()

    # connect to Lambda API
    client = boto3.client('lambda')

    # create the function
    response = client.create_function(
        FunctionName=function_name,
        Runtime="python2.7",
        Role=role,
        Handler=handler,
        Code={'ZipFile': package_data},
        Description=description,
        Timeout=60,
        MemorySize=128,
        Publish=True
    )

    # store away the name and arn for later use
    function_arn = response['FunctionArn']
    function_name = response['FunctionName']

    # add permissions for the function to be called by API Gateway
    response = client.add_permission(
        FunctionName=response['FunctionArn'],
        StatementId=response['FunctionName']+"-invoke",
        Action="lambda:InvokeFunction",
        Principal="apigateway.amazonaws.com",
        SourceArn='arn:aws:execute-api:us-east-1:'+account_number+':*'
    )

    return function_arn

We read the package into memory and provide it directly to the create_function method that creates our Lambda function. You might want to put a large package in Amazon S3 and then submit a reference to the package.

We need to give permissions to API Gateway to call our Lambda function. We do that using the AWS Lambda resource policies, adding the ARN of the API Gateway service for our account to the Lambda permissions.

Creating an API with Swagger

We again start with a couple of utility methods.


def replace_instances_in_file(filename_source, filename_target, old, new):
    """Replace string occurence in file.

    :param filename_source: The name of the file to read in.
    :param filename_target: The name of the file to write to.
    :param old: The string to find in the file.
    :param new: The string to replace any found occurrences with.
    """
    with open(filename_source, 'r') as f:
        newlines = []
        for line in f.readlines():
            newlines.append(line.replace(old, new))
    with open(filename_target, 'w') as f:
        for line in newlines:
            f.write(line)


def get_rest_api_name(swagger_file):
    """Get Rest API Name from Swagger file.

    :param swagger_file: The name of the swagger file. Full or relative path.
    :return: The name of the API defined in the Swagger file.
    """
    with open(swagger_file) as json_data:
        api_def = json.load(json_data)
        json_data.close()
        rest_api_name = api_def["info"]["title"]
        return rest_api_name

The replace_instances_in_file function allows us to take Lambda function ARNs and put them into specific places in the Swagger file. We will put in a marker string in the Swagger file. This function finds the marker and replaces it with the Lambda ARN.

The get_rest_api_name function allows us to get the name of the REST API specified in the Swagger file so we can use it with calls to the API Gateway API.

In the following function, we are using the newly released API function to import an API defined in Swagger:


def create_api(swagger_file_name):
    """Create an API defined in Swagger.

    :param swagger_file_name: The name of the swagger file.
                              Full or relative path.
    :return: The id of the REST API.
    """
    with open(swagger_file_name, "r") as swagger_file:
        swagger_data = swagger_file.read()

    client = boto3.client('apigateway')
    response = client.import_rest_api(body=swagger_data)

    return response['id']

Like the creation of the Lambda function, we read the Swagger file into memory and submit it directly to the function.

The last management function deploys the API to an API Gateway stage so we have a public host name that we can use to access the API:


def deploy_api(api_id, swagger_file, stage):
    """Deploy API to the given stage.

    :param api_id: The id of the API.
    :param swagger_file: The name of the swagger file. Full or relative path.
    :param stage: The name of the stage to deploy to.
    :return: Tuple of Rest API ID, stage and Enpoint URL.
    """
    client = boto3.client('apigateway')

    with open(swagger_file) as json_data:
        api_def = json.load(json_data)
        json_data.close()
        logger.info("deploying: "+api_id+" to "+stage)
        client.create_deployment(restApiId=api_id,
                                 stageName=stage)

        # print the end points
        logger.info("--------------------- END POINTS (START) ---------------")
        for path, path_object in api_def["paths"].iteritems():
            logger.info("End Point: https://%s"
                        ".execute-api.us-east-1.amazonaws.com/"
                        "%s%s" % (api_id, stage, path))
        logger.info("--------------------- END POINTS (END) -----------------")

        enpoint_url = ("https://%s"
                       ".execute-api.us-east-1.amazonaws.com/"
                       "%s" % (api_id, stage))
        return api_id, stage, enpoint_url

Deploying a Skeleton Service

We are now ready to test the functions with a simple skeleton of our service lookup function. The function is minimal and includes hard-coded values:


def lambda_handler(api_parameters, context):
    """Lambda hander for service lookup."""
    logger.info("lambda_handler - service_name: %s"
                " service_version: %s"
                % (api_parameters["service_name"],
                   api_parameters["service_version"]))

    response = {
            "endpoint_url": "notarealurl",
            "ttl": "300",
            "status": "healthy"
         }

    return response

Given a service name and a service version, the function will return three values:

  • The endpoint URL from which the service can be accessed.
  • The time to live (TTL) for this information so that a client knows for how long to cache this information and can avoid unnecessary calls to the service.
  • The status of the service, either healthy or unhealthy.

We define the API in a Swagger file for the preceeding Lambda function:


{
  "swagger": "2.0",
  "info": {
    "title": "catalog_service",
    "version": "1.0.0"
  },
  "schemes": ["https"],
  "consumes": ["application/json"],
  "produces": ["application/json"],
  "paths": {
    "/catalog/{serviceName}/{serviceVersion}": {
      "parameters": [{
        "name": "serviceName",
        "in": "path",
        "description": "The name of the service to look up.",
        "required": true,
        "type": "string"
      },
      {
        "name": "serviceVersion",
        "in": "path",
        "description": "The version of the service to look up.",
        "required": true,
        "type": "string"
      }],
      "get": {
        "responses": {
          "200": {
            "description": "version information"
          }
        },
        "x-amazon-apigateway-integration": {
          "type": "aws",
          "uri": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/$catalog_serviceARN$/invocations",
          "httpMethod": "POST",
          "requestTemplates": {
            "application/json": "{\"service_name\": \"$input.params('serviceName')\",\"service_version\": \"$input.params('serviceVersion')\"}"
          },
          "responses": {
            "default": {
              "statusCode": "200",
              "schema": {
                "$ref": "#/definitions/CatalogServiceModel"
              }
            } 
          }
        }
      }
    }
  },
  "definitions": {
    "CatalogServiceModel": {
      "type": "object",
      "properties": {
        "endpoint_url": {
          "type": "string"
        },
        "ttl": {
          "type": "integer"
        },
        "status": {
          "type": "string"
        }
      },
      "required": ["endpoint_url", "ttl", "status"]
    }
  }
}

We define our service method as a GET method that will take the service name and service version as part of the path. We have also defined a response model (CatalogServiceModel) that specifies our return properties as the endpoint URL, the TTL, and the status.

The x-amazon-apigateway-integration element specifies how Amazon API Gateway will be integrated with AWS Lambda. The marker $catalog_serviceARN$ will be replaced with the AWS Lambda function ARN when this service is deployed.

We can now use all of the above to deploy our service to Lambda and API Gateway:


ACCOUNT_NUMBER = _your AWS account number_

create_deployment_package("/tmp/catalog_service.zip", ["catalog_service.py"])
function_arn = create_lambda_function(
                       "/tmp/catalog_service.zip",
                       "catalog_service",
                       "arn:aws:iam::"+ACCOUNT_NUMBER+":role/lambda_s3",
                       "catalog_service.lambda_handler",
                       "Looking up service information.",
                       ACCOUNT_NUMBER)
replace_instances_in_file("swagger.json",
                          "/tmp/swagger_with_arn.json",
                          "$catalog_serviceARN$", function_arn)
api_id = create_api("/tmp/swagger_with_arn.json")
deploy_api(api_id, "/tmp/swagger_with_arn.json", "dev")

We can use this to test our new deployment:


import requests
import json
import logging

request_url = "https://yourrestapiid.execute-api.us-east-1.amazonaws.com/"\
              "dev/catalog/testservice1/1.0"
response = requests.get(request_url)
json_response = json.loads(response.content)
logging.info("Endpoint URL: %s" % (json_response['endpoint_url'],))
logging.info("TTL: %i" % (json_response['ttl'],))
logging.info("Status: %s" % (json_response['status'],))

That should give us the following results:


Endpoint URL: notarealurl
TTL: 300
Status: healthy