Tag: vertx


Writing Custom Metrics to Amazon CloudWatch Using the AWS SDK for Java

by Sascha Moellering | on | in Java | Permalink | Comments |  Share

Metrics measure the performance of your system. Several AWS services provide free metrics, such as the CPU usage of an Amazon EC2 instance. You can create Amazon CloudWatch alarms based on metrics and send Amazon SNS messages when the alarm state changes. You can use this mechanism to implement elastic scaling if the message is sent to an Auto Scaling group to change the desired capacity of the group. For many workloads, metrics like CPU usage are sufficient. However, from time to time, workloads have specific requirements and need a more complex metric to scale efficiently. It’s possible to publish your own metrics to CloudWatch, known as custom metrics, by using the AWS CLI, an API, or the CloudWatch collectd plugin. In this blog post, we’ll show you a more complex example of using the capabilities of the AWS SDK for Java to implement a framework integration to publish framework-related custom metrics to CloudWatch.

Integrating Vert.x and Amazon CloudWatch

Vert.x is an event-driven, reactive, nonblocking, and polyglot framework to implement microservices. It runs on the Java virtual machine (JVM) by using the low-level IO library Netty. You can write applications in Java, JavaScript, Groovy, Ruby, and Ceylon. The framework offers a simple and scalable actor-like concurrency model: Vert.x calls handlers by using a thread known as an event loop. To use this model, you have to write code known as verticles. Those verticles share certain similarities with actors in the Actor Model, and to use them, you have to implement the `Verticle` interface.
The following example shows a basic verticle implementation.

public class SimpleVerticle extends AbstractVerticle {
      // Method is called when the verticle is deployed
      public void start() {
      }

      // Optional method, called when verticle is undeployed
      public void stop() {
      }
}

Verticles communicate with each other using a single event bus. Those messages are sent on the event bus to a specific address, and verticles can register to this address by using handlers. In our example, we use the default event bus address cloudwatch.metrics. Then we register this address to consume all messages and push this data into CloudWatch.

With only a few exceptions, none of the APIs in Vert.x block the calling thread. Similar to Node.js, Vert.x uses the reactor pattern. However, in contrast to Node.js, Vert.x uses several event loops. Unfortunately, not all APIs in the Java ecosystem are written asynchronously, for example, the JDBC API. Vert.x offers a possibility to run this, blocking APIs without blocking the event loop. These special verticles are called worker verticles. You don’t execute worker verticles by using the standard Vert.x event loops, but by using a dedicated thread from a worker pool. Basically, this means that worker verticles don’t block the event loop.

If you start writing low-latency applications, you can reach a certain point where internal metrics of frameworks are required for further optimization. By default, Vert.x doesn’t record any metrics, but offers a Service Provider Interface (SPI) that you can implement to get more information about the behavior of Vert.x internals. The interface that you have to implement is described in the API documentation.
Vert.x provides an in-depth look into the framework by offering metrics for the following:

  • Datagram/UDP
  • Vert.x event bus
  • HTTP client
  • HTTP server
  • TCP client
  • TCP server
  • Pools used by Vert.x, such as execute blocking or worker verticle

To receive metrics from Vert.x, for example, HTTP server metrics, you have to implement the `HttpServerMetrics` interface and the following method from the `VertxMetrics` interface :

HttpServerMetrics<?, ?, ?> createMetrics(HttpServer httpServer, SocketAddress address, HttpServerOptions serverOptions);

The following code snippet shows a typical implementation of `HttpServerMetrics`.

private final LongAdder processingTime = new LongAdder();
    private final LongAdder requestCount = new LongAdder();
    private final LongAdder requests = new LongAdder();
    private final SocketAddress localAddress;
    private final HttpServerMetricsSupplier httpServerMetricsSupplier;

    public  HttpServerMetricsImpl(SocketAddress localAddress, HttpServerMetricsSupplier httpServerMetricsSupplier) {
        this.localAddress = localAddress;
        this.httpServerMetricsSupplier = httpServerMetricsSupplier;
        httpServerMetricsSupplier.register(this);
    }

    @Override
    public void responseEnd(Long nanoStart, HttpServerResponse response) {
        long requestProcessingTime = System.nanoTime() - nanoStart;
        processingTime.add(requestProcessingTime);
        requestCount.increment();
        requests.decrement();
    }

In this example, the `responseEnd` method is called if an HTTP server response has ended. The processing time of the request is calculated, the number of requests is incremented, and the number of current requests is decremented. Now we have to send the data we collected to CloudWatch.
To collect metrics data and send it to CloudWatch, we need to implement the `MetricSupplier` interface and override the `collect()` method. Each metric value is represented by an object of type `CloudWatchDataPoint`. This data point class is a simple POJO containing the name of the metric, the value, the timestamp of collection, and a CloudWatch StandardUnit. The `StandardUnit` enumeration represents the unit of the data point in CloudWatch (e.g., Bytes). After collecting a list of data points, the `Sender` class pushes the data to CloudWatch. To connect to CloudWatch, the Sender class uses the AWS SDK for Java and the `DefaultAWSCredentialsProviderChain`. This enables you to use Vert.x-CloudWatch SPI on an EC2 instance, as well as on your local development workstation.

    public Sender(Vertx vertx, VertxCloudwatchOptions options, Context context) {
        this.vertx = vertx;

        // Configuring the CloudWatch client
        // AWS credentials provider chain that looks for credentials in this order:
        //      - Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by the SDK for Java)
        //      - Java System Properties - aws.accessKeyId and aws.secretKey
        //      - Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI
        //      - Instance profile credentials delivered through the Amazon EC2 metadata service
        this.cloudWatchClient = initCloudWatchClient(options.getCloudwatchRegion());
        this.namespace = options.getNamespace();
        this.instanceId = options.getInstanceId();

        batchSize = options.getBatchSize();
        batchDelay = NANOSECONDS.convert(options.getBatchDelay(), SECONDS);
        queue = new ArrayList<>(batchSize);
        sendTime = System.nanoTime();

        context.runOnContext(aVoid -> timerId = vertx.setPeriodic(MILLISECONDS.convert(batchDelay, NANOSECONDS), this::flushIfIdle));
    }

    ...

    private void send(List<CloudWatchDataPoint> dataPoints) {
        List<MetricDatum> cwData = toCloudwatchData(dataPoints);
        PutMetricDataRequest metricDataRequest = new PutMetricDataRequest();
        metricDataRequest.setMetricData(cwData);
        metricDataRequest.setNamespace(this.namespace);
        Future future = cloudWatchClient.putMetricDataAsync(metricDataRequest);
        sendTime = System.nanoTime();

        try {
            future.get();
        } catch (Exception exc) {
            LOG.error(exc);
        }
    }

    private List<MetricDatum> toCloudwatchData(List<CloudWatchDataPoint> dataPoints) {
        List<MetricDatum> metrics = new ArrayList<>();

        dataPoints.forEach(metric -> {

            MetricDatum point = new MetricDatum();

            point.setTimestamp(new Date(metric.getTimestamp()));
            point.setValue((double) metric.getValue());
            point.setMetricName(metric.getName());
            point.setUnit(metric.getStandardUnit());
            List<Dimension> dimensionList = new ArrayList<>();
            dimensionList.add(new Dimension().withName("InstanceId").withValue(this.instanceId));

            point.setDimensions(dimensionList);
            metrics.add(point);
        });

        return metrics;
    }

To use the CloudWatch Vert.x SPI implementation, we have to set the necessary metrics options. In our case, we want to use the CloudWatch namespace `Vertx/CloudWatch`. Let’s assume that the application runs on an EC2 instance. In this case, the CloudWatch SPI automatically detects the region that the EC2 instance is running in and the instance ID. This information is determined by using the EC2MetadataUtils-class.
After setting the metrics options, we initiate a Vert.x instance and create a simple HTTP server on port 8080 that returns “Hello Vert.x!” in plain text. The SPI automatically detects that an HTTP server is created and collects HTTP server-related metrics such as the number of HTTP connections, the number of bytes sent, and a set of other metrics.
In addition to that, we want to send the consumed memory of the JVM to CloudWatch. This custom metric isn’t collected by the SPI, so we have to calculate the consumed memory by using the Runtime-class. A timer sends this data as a JSON message every five seconds over the event bus to the CloudWatch SPI. The SPI collects the data and sends it to CloudWatch.

    VertxOptions options = new VertxOptions().setMetricsOptions(
                new VertxCloudwatchOptions()
                        .setEnabled(true)
                        .setMetricsBridgeEnabled(true)
                        .setBatchSize(10)
                        .setBatchDelay(30)
                        .setNamespace("Vertx/Cloudwatch"));
    vertx = Vertx.vertx(options);

    // Creating HTTP server for metrics
    HttpServer server = vertx.createHttpServer();

    server.requestHandler(request -> {

        // This handler is called for each request that arrives on the server
        HttpServerResponse response = request.response();
        response.putHeader("content-type", "text/plain");

        // Write to the response and end it
        response.end("Hello Vert.x!");
    });

    vertx.setPeriodic(5000, id -> {
        long usedMem = this.getUsedMemory();
        JsonObject message = new JsonObject()
                .put("metricName", "JVMMemory")
                .put("unit", StandardUnit.Megabytes.toString())
                .put("value", usedMem);

        vertx.eventBus().publish("cloudwatch.metrics", message);
    });

    server.listen(8080);

The following figure shows metrics such as the number of HTTP connections, the number of requests, the amount of bytes sent, and the consumed memory displayed as a graph in CloudWatch.

Vert.x metrics

Note that a custom metric is defined as the unique combination of metric name and dimensions associated with the metric. Custom metrics are priced based on monthly usage per metric. See CloudWatch pricing for details.

Summary

In this blog post we created a Vert.x SPI implementation to write framework metrics to CloudWatch. We used the capabilities of the AWS SDK for Java not only for the communication with CloudWatch, but also to get insights about the instance and the region using EC2 metadata. We hope we’ve given you ideas for creating your own applications and framework integrations by using the AWS SDK for Java. Feel free to share your ideas and thoughts in the comments below!