AWS Database Blog

Implementing Alerting on Amazon Elasticsearch Data

Yash Pant is a solutions architect in Amazon Web Services

Amazon Elasticsearch Service (Amazon ES) customers often ask how to implement alerting on information that is being indexed by their Amazon Elasticsearch domain. In this blog post, I will discuss the steps required to set up an alerting mechanism so that you can send near real-time alerts based on criteria you define.

The Approach

To set up alerting, we will use the scheduled event trigger in AWS Lambda, which allows a Lambda function to execute at a fixed rate (for example, every minute). Our Lambda function will contain Elasticsearch queries that we want to run against documents that are being indexed on our Amazon ES domain. If the results of our queries indicate that the information we are tracking has met some predefined criteria, then we will publish this information to an Amazon Simple Notification Service (SNS) topic. Subscribers to the SNS topic can then consume this information and take appropriate action (for example, sending out an email with details about this alert).

Demo

In this example, I’ve set up an Amazon ES domain that captures and indexes information from VPC Flow Logs in real time. We will monitor an IP address in our VPC for a DoS attack. If the VPC Flow Logs reveal that there were more than 100 instances of rejected network traffic to the IP address in the last minute, this information will be published to an SNS topic to alert SNS consumers.

To set up an SNS topic:

  1. In the AWS Management Console, open the SNS console.
  2. Choose Create a topic.
  3. Give your topic a name, and, if you want to send text message alerts, a display name. The display name will appear on all text messages you send through SNS.Your SNS topic has been created! Save the topic ARN because you will need it later.

Now that we have the Amazon ES domain and SNS topic, let’s create the Lambda function that will handle the alerting:

  1. In the AWS Lambda console, choose Create a Lambda function.
    CreateLambdaFunction1
  2. On the Select blueprint page, choose Blank Function. We’re going to start from scratch.
    BlankFunction
  3. On the Configure triggers page, choose CloudWatch Events – Schedule as your trigger. In Rule name, type EveryMinuteLambda. From the Schedule expression drop-down list, choose rate(1 minute). This will trigger the execution of the Lambda function every minute. Select the Enable Trigger check box so that Amazon CloudWatch has permissions to trigger your Lambda function.CreateLambdaFunction
  4. On the Configure function page, give your function a name (for example, VPCFlowLogsAlerts). For Runtime, choose Node.js 4.3.
    ConfigFunction

Now let’s get to the code. We’ll start with some Imports statements and then define some constants:

/* == Imports == */
var AWS = require('aws-sdk');
var path = require('path');

/* == Globals == */
ES_REGION = 'us-west-2';
ES_ENDPOINT = '[YOUR ENDPOINT HERE].us-west-2.es.amazonaws.com';
SNS_TOPIC_ARN = 'arn:aws:sns:us-west-2:[YOUR ACCOUNT ID]:ElasticsearchAlerts';
THRESHOLD = 100;
IP_ADDRESS_TO_MONITOR = "172.31.19.62";

You’ll need the Amazon ES domain endpoint, the region, and the SNS topic ARN you saved earlier. The IP_ADDRESS_TO_MONITOR variable is the IP address we are monitoring for DOS attempts. THRESHOLD is the number of instances of rejected traffic to be logged in the last minute before an alert is triggered. (Here the variable is set to 100.) You can customize this code to meet your own alerting criteria.

The next step is to define the query we are going to make against Elasticsearch. The query will request documents that match three conditions:

  • The IP address in the document must be the IP address defined in the IP_ADDRESS_TO_MONITOR variable.
  • The document must say the traffic was rejected. VPC Flow Logs mark all network traffic as either “ACCEPT” or “REJECT.” We want to filter for rejected traffic only.
  • The document must be from the last minute because the Lambda code will be executed every minute. We can use the timestamp in the VPC Flow Logs to make sure we are looking at traffic for the past minute only.

Here’s what this query looks like:

/* == ES Query to make == */
var query = {
 "query":{
  "bool":{
   "must":[
   {
    "query":{
     "match":{
       "dstaddr": IP_ADDRESS_TO_MONITOR
      }
     }
    },
    {
     "query":{
       "match":{
        "action":"reject"
       }
      }
     },
     {
      "query":{
        "range":{
         "@timestamp":{
          "gte":"now-1m/m",
           "lt":"now/m"
          }
         }
        }
       }
      ]
     }
    }
   };

Now that some constants have been defined, let’s talk about the execution of the Lambda function.

The Amazon ES domain is secured with a user-based IAM policy, so we’ll start by creating a signed request with the Elasticsearch query we’ve just defined. The AWS SDK for Node.js provides a function that can get AWS credentials from the Lambda environment and add them to the headers in an HTTP POST request, which will query our Amazon ES domain.

The HTTP POST request includes the Amazon ES domain endpoint, the query path, and the query itself. The path for querying Elasticsearch indices has the following syntax:

“/<index-name>/<doctype>/_search.”

In this case, the index name is based on the current date (for example, “cw-2016.10.10”). Because we want to query the most recent minute of data, we will build our index name by getting the current date. The rest of our path is constant because the doctype is “VPCFlowLogs.” We will convert the query to a JSON string and set it as the body of the request. After the request is ready to go, we will use the AWS SDK for Node.js to sign and send the request to Elasticsearch.

After we get a response from Elasticsearch, the Lambda function will parse it to see how many hits we’ve got. If we get more hits than our threshold value (for this demo, 100), then we can publish this information to our SNS topic.

There are two functions for doing this. The first is the handler function. It begins execution when the Lambda function is triggered. This function will make the request. If the query reveals that we’ve had over 100 instances of rejected traffic in the last minute, it will call the second function, which publishes this information to our SNS topic.

/* Lambda "main": Execution begins here */
exports.handler = function(event, context) {
    
    /* Get AWS credentials to sign request */
    var creds = new AWS.EnvironmentCredentials('AWS');
    
    //Create a signed request
    var endpoint = new AWS.Endpoint(ES_ENDPOINT);
    var req = new AWS.HttpRequest(endpoint);
    
    
    /* Set the HTTP request parameters */
    req.method = 'POST';
    
    //Build the path using today's date for the index name
    today = new Date();
    month = today.getMonth() + 1;
    //If the month is between 1-9 we need to convert to "01", "02", etc. to match the name of the index
    if (month < 10) {
          month = "0" + month;
    } 
    req.path = '/cwl-' + today.getFullYear() + '.' + (month) + '.' + today.getDate() + '/VPCFlowLogs/_search';

    req.region = ES_REGION;
    req.headers['presigned-expires'] = false; 
    req.headers['Host'] = endpoint.host; 
    req.headers['content-type'] = 'application/json';

    //Stringify our ES query and set it as the body of the POST request 
    req.body = JSON.stringify(query);

    //Sign the request using the AWS Node.js SDK 
    var signer = new AWS.Signers.V4(req , 'es'); 
    signer.addAuthorization(creds, new Date());

    //Send the ES query 
    var send = new AWS.NodeHttpClient(); 
    send.handleRequest(req, null, function(httpResp) { 
    var respBody = ''; 

        //Build the response 
        httpResp.on('data', function (chunk) { 
            respBody += chunk; 
        }); 

        //See how many hits we got at the end of the response 
        httpResp.on('end', function (chunk) { 
            resp = JSON.parse(respBody); 
            //If the number of hits on our query are greater than our set threshold, 
            //we publish this info to an SNS topic 
            if (resp.hits.total > THRESHOLD) {
                publishToSNS(resp.hits.total, context);
            }
        });
    }, 
    
    function(err) {
        context.fail('Lambda failed with error ' + err);
    });
}

/* This function publishes alerts to an SNS topic */
function publishToSNS(hits, context) {
    //Setup the SNS topic using the AWS Node.js SDK
    var sns = new AWS.SNS();
    
    //Publish the message
    sns.publish({
    Message: hits + ' instances of rejected traffic on ' + IP_ADDRESS_TO_MONITOR,
    TopicArn: SNS_TOPIC_ARN
    }, function(err, data) {
        if (err) {
            context.fail('Failed to publish to SNS ' + err);
        }
        context.done(null, 'Function Finished!');  
    });
}

The final step is to set the IAM role for the Lambda function. For Existing role, choose the default lambda_basic_execution role. This role has the permissions required to publish information to the SNS topic. Because we are using a signed HTTP request against our Amazon ES domain, we don’t need to include it in the IAM policy.

Handler

We can now choose Create Lambda function to set up the alerting. If the VPC Flow Logs reveal a potential DOS attack on the IP address we are monitoring, our SNS topic will be notified within a minute. SNS can be used to send an email or text message or trigger another Lambda function.

Summary

You can use the approach outlined in this post for alerting based on any Elasticsearch query. Just use a Lambda function that is triggered on a regular interval to run queries against your Amazon ES domain. You can then parse the responses to those queries. If the response contains information that would make you want to send an alert, publish this information to an SNS topic.

We hope you found this post helpful. Feel free to leave your feedback in the comments.