AWS Database Blog

Indexing Metadata in Amazon Elasticsearch Service Using AWS Lambda and Python

by Amit Sharma | on | in Elasticsearch, Lambda | | Comments

Amit Sharma (@amitksh44) is a solutions architect at Amazon Web Services.

You can use Amazon S3 to implement a data lake architecture as the single source of truth for all your data. Taking this approach not only allows you to reliably store massive amounts of data but also enables you to ingest the data at a very high speed and do further analytics on it. Ease of analytics is important because as the number of objects you store increases, it becomes difficult to find a particular object—one needle in a haystack of billions.

Objects in S3 contain metadata that identifies those objects along with their properties. When the number of objects is large, this metadata can be the magnet that allows you to find what you’re looking for. Although you can’t search this metadata directly, you can employ Amazon Elasticsearch Service to store and search all of your S3 metadata. This blog post gives step-by-step instructions about how to store the metadata in Amazon Elasticsearch Service (Amazon ES) using Python and AWS Lambda.

Services

Using S3 event notifications and Lambda triggers
In this post, we use S3 event notifications and Lambda triggers to maintain metadata for S3 objects in Amazon ES. S3 notification enables you to receive notifications when certain events happen in your bucket. These events can be for any action in an S3 bucket, such as PUT, COPY, POST, DELETE, and so on. More details about S3 event notifications are available in the AWS documentation.

S3 event notifications integrate with Lambda using triggers. Using this integration, you can write Lambda functions that process Amazon S3 events. To do this, in Amazon S3 you add a bucket notification configuration that identifies the type of event that you want Amazon S3 to publish and the Lambda function that you want to invoke.

AWSAccount

High-level flow between S3 and Lambda

Putting it together
To put all these parts together, you can take the following steps.
Configuring AWS Lambda with Amazon S3
To configure Lambda with S3, start by choosing AWS Lambda on the console.

Lambda1

If this is the first time you’ve created a Lambda function, choose Get Started Now.

GetStartedLambda

Choose Configure triggers.

ConfigureTriggers

On the next page, you should be able to select the triggers you want to work with.

ConfigureTriggersNext

Choose the S3 bucket and the type of event that you want to capture. You can leave the Prefix and Suffix fields blank or, based on your use case, fill them in.

For example, if you expect all files to come in a folder called /appTier/appServer1, you can use that path as the Prefix value. Similarly, if you expect the files to arrive with a certain suffix like .log, .jpg, .avi, and so on, you can use that in the Suffix field. Events are triggered for an object only if both the Prefix and Suffix fields are matched. Also, select the Enable Trigger check box.

EnableTrigger

Next, provide a name and description and choose Python 2.7 as the run-time environment. Because we are going to upload the code separately, choose Upload a .ZIP file for Code entry. Leave the handler information as the default: lambda_function.lambda_handler.

ConfigureFunction1

Now, let’s create the AWS Identity and Access Management (IAM) roles and related permissions so that our Lambda function can access the AWS resources we need. To do this, choose Create a new role from template(s), give a name to this new role, and for Policy templates, choose S3 object read-only-permission.

ChooseTemplate

ChooseTemplate1

In Advanced settings, leave the Memory, Timeout, and VPC settings as the default. Choosing Next will create the Lambda function and also associates the right permissions in S3 so you can invoke this Lambda function. You can verify this by checking this in S3 console. To do this, go to the properties of the S3 bucket you specified earlier and to the Events section, as shown following:

Events

Choose the modify icon to see the details and verify the name of the Lambda function.

Creating the Amazon ES domain
Now, let’s create the Amazon ES domain. Go to Services, and choose Elasticsearch Service in Analytics:

ES

Choose the Get Started button on the front page and type a name for your domain (I chose my-es-cluster):

ESDomain

As shown following, choose an instance type and an instance count (both can be changed later if necessary). We recommend choosing m3.medium or larger if you are planning to put this feature into production. Alternatively, t2.micro is a good choice if you are creating a development environment or a small proof of concept.

For storage, you have choices between instance-based storage and various types of Amazon EBS volumes (General Purpose, Provisioned IOPS and Magnetic). Start with a General Purpose EBS volume and monitor the overall performance with the FreeStorageSpace, JVMMemoryPressure, and CPUUtilization metrics and metrics about query response times before changing the storage type. For a good reference to handling errors and mitigations, see the AWS documentation.

An important question is: How much storage do you need? For example, if every object uploaded to S3 has metadata sized 1 KB and you expect 10 million objects, you should provision a total of at least 20 GB: 10 GB for the primary instance and an additional 10 GB for the replica. For a more detailed discussion on scaling and capacity planning for Elasticsearch, see the Elasticsearch documentation.

ConfigureCluster

Next, set the access policy. I chose to make mine wide open in order to simplify testing, but don’t do this for your cluster. I could also have used one of the IP-based or user-based templates in the wizard to create a more restrictive policy. For more details on controlling access to your cluster, see this blog post.

SetupAccess

Finally, review the settings and choose Confirm and create. That’s it! The cluster will be created in a few minutes.

ESCluster

Creating the Lambda function
Now comes the main code that will actually push the metadata coming from every trigger generated by object creation events. Remember that Lambda has been configured with an execution role that has read-only permissions to read from S3. At a high level, the Python code does the following:

  • Reads the metadata from S3 event
  • Connects to the Amazon ES domain endpoint
  • Creates an index if one has not already been created
  • Writes the metadata into Amazon ES

To connect to Amazon ES, the Python code uses a few specific libraries such as Elasticsearch, RequestsHttpConnection, and urllib. We are going to upload the code to the Lambda function so you can download these packages in a specific folder by using the following command. But first, make sure pip is installed—find steps to do this on the pip website. Note that the sample code available for download includes all the required libraries, so this step is optional and given here mainly for your understanding:

pip install requests -t /path/to/project-dir
pip install Elasticsearch -t /path/to/project-dir
pip install urllib3 -t /path/to/project-dir

Make sure these libraries are now available in the current directory. Now we are ready to look at the code.

The following function connects to Amazon ES:

def connectES(esEndPoint):
 print ('Connecting to the ES Endpoint {0}'.format(esEndPoint))
 try:
  esClient = Elasticsearch(
   hosts=[{'host': esEndPoint, 'port': 443}],
   use_ssl=True,
   verify_certs=True,
   connection_class=RequestsHttpConnection)
  return esClient
 except Exception as E:
  print("Unable to connect to {0}".format(esEndPoint))
  print(E)
  exit(3)

This function takes the domain endpoint as an argument and returns the Elasticsearch client instance. Be sure to use your domain’s endpoint to declare esClient:
esClient = connectES("search-domainname-yourDomainEndpoint.REGION.es.amazonaws.com")
The following function creates an Amazon ES index:

def createIndex(esClient):
 try:
  res = esClient.indices.exists('metadata-store')
  print("Index Exists ... {}".format(res))
  if res is False:
   esClient.indices.create('metadata-store', body=indexDoc)
   return 1
 except Exception as E:
  print("Unable to Create Index {0}".format("metadata-store"))
  print(E)
  exit(4)

Note that this function takes esClient as an instance of the Elasticsearch client returned by the connectES function. Also note that ‘metadata-store’ and ‘indexDoc’ are the name and mapping of the index we are trying to create. The ‘indexDoc’ mapping is defined following:

indexDoc = {
 "dataRecord" : {
  "properties" : {
   "createdDate" : {
    "type" : "date",
    "format" : "dateOptionalTime"
   },
   "objectKey" : {
    "type" : "string",
    "format" : "dateOptionalTime"
   },
   "content_type" : {
    "type" : "string"
   },
   "content_length" : {
    "type" : "long"
   },
   "metadata" : {
    "type" : "string"
   }
  }
 },
"settings" : {
 "number_of_shards": 1,
 "number_of_replicas": 0
 }
}

We are storing five fields:

  • createdDate
  • objectKey
  • content_type
  • content_length
  • metadata

As part of this, there’s a couple of important points to consider.

First, it’s important to plan your shards. The best number of primary and replica shards depends upon multiple things such as instance sizes, amount of data, frequency of new data being generated and old data being purged, query types, and so on. To give an example, for time-series data (for example, Logfile) you can maintain different indexes per hour, per day, and per week depending upon the speed of data being generated—we recommend daily indexes in most cases. Because older logs are less likely to be queried, you can re-index those to lower primary shard numbers or else drop the entire index. A more detailed discussion is provided in the Elasticsearch documentation.

Also, consider using bulk indexing. The preceding code sample works fine for a lot of use cases with low to moderate traffic—for example, up to 100 PUTs per second on S3 with 1KB of metadata. However, for higher traffic volumes we recommend to use larger instances and instead of indexing every document use the _bulk index API call to efficiently dump the data into an Elasticsearch cluster. In a follow-up blog, we will give architectural patterns and recommendations on how to do _bulk indexing efficiently and cost-effectively.

For a detailed explanation about shard settings as part of the cluster planning, refer to the Elasticsearch documentation.

Following is the function that actually writes metadata into Elasticsearch:

def indexDocElement(esClient, key, response):
  try:
   indexObjectKey = key
   indexcreatedDate = response['LastModified']
   indexcontent_length = response['ContentLength']
   indexcontent_type = response['ContentType']
   indexmetadata = json.dumps(response['Metadata'])
   retval = esClient.index(index='metadata-store', doc_type='images', body={
     'createdDate': indexcreatedDate,
     'objectKey': indexObjectKey,
     'content_type': indexcontent_type,
     'content_length': indexcontent_length,
     'metadata': indexmetadata
   })
  except Exception as E:
    print("Doc not indexed")
    print("Error: ",E)
    exit(5)

This function takes esClient, an S3 object key, and the complete response of the S3.get_object function. This response contains the actual metadata. The elements in response are indexed by calling esClient.index. The document ID is autogenerated by Elasticsearch. You can see all the index options in the Elasticsearch documentation.

Finally, following is the main Lambda handler code that calls all these functions at the invocation when it is triggered:

def lambda_handler(event, context):
   esClient = connectES("search-domainname-yourDomainEndpoint.REGION.es.amazonaws.com ")
   createIndex(esClient)

   # Get the object from the event and show its content type
   bucket = event['Records'][0]['s3']['bucket']['name']
   key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
   try:
     response = s3.get_object(Bucket=bucket, Key=key)
     print(response)
     print("KEY: " + key)
     print("CONTENT TYPE: " + response['ContentType'])
     print("Metadata : " + json.dumps(response['Metadata']))
     print("Custom 1: " + response['ResponseMetadata']['HTTPHeaders']['x-amz-meta-custom1'])
     print("Custom 2: " + response['ResponseMetadata']['HTTPHeaders']['x-amz-meta-custom2'])
     indexDocElement(esClient,key,response)
     return response['ContentType']
   except Exception as e:
     print(e)
     print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
     raise e

You can also download the entire handler code  from here for the index creation.

To verify that the metadata has been entered into Elasticsearch, you can use Kibana and search using the standard Elasticsearch API calls and queries. For example, you can query by object name as shown following:

bash# curl -XGET https:// search-domainname-yourDomainEndpoint.REGION.esamazonaws.com/metadata-store/images/_search?pretty\&q=objectKey:YOURFILENAME

  {
    "took" : 1,
    "timed_out" : false,
    "_shards" : {
      "total" : 1,
      "successful" : 1,
      "failed" : 0
   },
   "hits" : {
     "total" : 1,
     "max_score" : 9.516893,
     "hits" : [ {
       "_index" : "metadata-store",
       "_type" : "images",
       "_id" : "AVgGSFxdQ43eQcLduwj9",
       "_score" : 9.516893,
       "_source" : {
         "content_length" : 61194,
         "objectKey" : "YOURFILENAME",
         "metadata" : "{\"custom1\": \"banana\", \"custom2\": \"shake\"}",
         "content_type" : "application/octet-stream",
         "createdDate" : "2016-10-27T13:15:54+00:00"
       }
    } ]
  }
}

Following is a screenshot of Kibana after indexing a few documents:

Kibana
Deleting metadata when an S3 object is deleted
To delete the related metadata when you delete an S3 object, follow the same steps as listed preceding—except that at event type selection, choose Object Removed Event as shown following:

ObjectREmoved

The rest of the steps remain the same. Create an additional trigger for object removal for a total of two triggers and two Lambda functions for two different types of events—object PUT, COPY, or POST and object DELETE.

Following is the main handler code:

def lambda_handler(event, context):
  esClient = connectES("search-domainname-yourDomainEndpoint.REGION.es.amazonaws.com ")

  # Get the object from the event and show its content type
  bucket = event['Records'][0]['s3']['bucket']['name']
  key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
  try:
    clearMetaData(esClient,key)
    return 'Removed metadata for ' + key
  except Exception as e:
    print(e)
    print('Error removing object metadata from Elasticsearch Domain.)
    raise e

The clearMetaData function is defined as following:

def clearMetaData(esClient,key):
   try:
    retval = esClient.search(index='metadata-store', doc_type='images', q='objectKey:' + key, fielddata_fields='_id')
    total = retval['hits']['total']
    count = 0
    while (count < total):
      docId = retval['hits']['hits'][count]['_id']
      print("Deleting: " + docId)
      removeDocElement(esClient,docId)
      count = count + 1
    return 1
  except Exception as E:
    print("Removing metadata failed")
    print("Error: ",E)
    exit(5)

This function searches the domain for the given S3 object name and calls another function, removeDocElement, with the document ID as an argument that is unique in the domain. The removeDocElement is defined as following:

def removeDocElement(esClient,docId):
  try:
    retval = esClient.delete(index='metadata-store', doc_type='images', id=docId)
    print("Deleted: " + docId)
    return 1
  except Exception as E:
    print("DocId delete command failed at Elasticsearch.")
    print("Error: ",E)
    exit(5)

This code deletes all the references to that S3 key by using the unique document ID. You can confirm the deletion from Elasticsearch index by using following command:

bash# curl -XGET https:// search-domainname-yourDomainEndpoint.REGION.es.amazonaws.com/metadata-store/images/_search?pretty\&q=objectKey:train.csv
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : null,
    "hits" : [ ]
  }
}

Following is the Amazon CloudWatch monitoring snapshot for the Elasticsearch cluster—you can see a number of metrics such as those dealing with searchable documents, free storage space, cluster health, and so on. These metrics can help you decide how to scale the cluster from both compute and storage perspective. For ex. by monitoring FreeStorageSpace or CPUUtilization you can decide to scale out or scale up the Elasticseach cluster nodes.

Monitoring

You can also download the entire handler code from here for the index deletion.

Thanks for exploring these technologies with me. For deeper information, take a look at Amazon Elasticsearch Service and AWS Lambda. Let me know in the comments below how this post works for you!