AWS Management & Governance Blog

Collecting Apache Flink metrics in the Amazon CloudWatch agent

Apache Flink is a distributed stream processing engine. You can run Flink on Amazon EMR as a YARN application. You can view Flink metrics through its web UI, but what if you want to react to them? In this blog post, I’ll show you how to use the CloudWatch agent to collect Flink metrics into Amazon CloudWatch, where you can monitor them, add them to a dashboard, and trigger alerts or event-driven processes.

Overview of solution

Users start a job by connecting through AWS Systems Manager to an EMR cluster. The cluster runs a Flink job, which reports metrics through the StatsD Reporter to the CloudWatch agent's local StatsD interface. The CloudWatch agent pulls configuration from the AWS Systems Manager Parameter Store. The CloudWatch agent publishes metrics to Amazon CloudWatch, where they can be viewed by users.

Figure 1: Solution workflow

Flink supports sending metrics to external metric systems through metric reporters. The CloudWatch agent supports the collection of custom metrics through local interfaces for some of those same external systems. The solution I describe in this post uses StatsD, a metric system supported by Flink and the CloudWatch agent.

This solution uses an Amazon EMR bootstrap script stored in an Amazon Simple Storage Service (Amazon S3) bucket. This script downloads, installs, and configures the CloudWatch agent on each node. The agent pulls its configuration from a centrally managed AWS Systems Manager Parameter Store, as opposed to using local configuration files that would require distribution across the cluster.

In more recent versions of EMR, the Flink StatsD metric reporter library must be built or downloaded onto the cluster and shared across the hosts. This solution demonstrates an EMR cluster step that downloads the library from a public repository and distributes it through a shared path.

The result is a EMR cluster with the CloudWatch agent on every node, each configured from a central Parameter Store. Flink metrics are emitted through the Flink StatsD metric reporter to the StatsD interface on the CloudWatch agent on each node. The CloudWatch agent collects and publishes these metrics to CloudWatch under a custom metric namespace, where they can be viewed in the CloudWatch console. Users use Systems Manager Session Manager for remote connections to run commands on cluster nodes, such as starting and stopping Flink jobs.

Figure 1 illustrates this approach. Users start a job by connecting through AWS Systems Manager to an EMR cluster. The cluster runs a Flink job, which reports metrics through the StatsD Reporter to the CloudWatch agent’s local StatsD interface. The CloudWatch agent pulls configuration from the AWS Systems Manager Parameter Store. The CloudWatch agent publishes metrics to Amazon CloudWatch, where they can be viewed by users.

Walkthrough

You use the console to perform most of the steps in this walkthrough, but you can also use the AWS CLI or AWS CloudFormation templates.

Here are the steps:

  1. Create an AWS Identity and Access Management (IAM) instance profile role for EMR.
  2. Create a Systems Manager parameter with the CloudWatch agent’s configuration.
  3. Create and upload the bootstrap script to an S3 bucket.
  4. Create an EMR cluster with:
    • Flink as an installed application.
    • Configuration properties to report Flink metrics through the StatsD library.
    • A step to download and install the Flink StatsD metric reporter library.
    • A step to start the Flink cluster.
  5. Connect to the EMR cluster through Systems Manager Session Manager and start a long-running Flink job.
  6. Monitor the Flink metrics in the CloudWatch console.

Steps 1-4 are handled for you in this example CloudFormation template.

Click Launch Stack to launch a CloudFormation stack in your account and deploy the template.

CloudFormation quicklink button labeled “Launch Stack”.

This template creates an IAM role, IAM instance profile, SSM Parameter, and EMR cluster. The cluster will download a Flink library from Maven and start a Flink application. You will be billed for the AWS resources used if you create a stack from this template.

The CloudFormation wizard will ask you to modify or provide these parameters:

  • InstanceType: The type of instance for all instance groups. Defaults to m4.xlarge.
  • InstanceCountCore: The number of instances in the core instance group. Defaults to 2.
  • EMRReleaseLabel: The EMR release label you wish to use. Defaults to emr-5.32.0.
  • BootstrapScriptPath: The S3 path of your CloudWatch Agent installation bootstrap script. If you launch your stack through the Launch Stack quicklink, this parameter will be set to the S3 path of an Amazon-hosted copy of the bootstrap script.
  • Subnet: The EC2 subnet to launch cluster into. You must provide this parameter.
  • EC2KeyPairName: An optional EC2 keypair for connecting to cluster nodes, as an alternative to Session Manager.

If you use this CloudFormation template, skip to “Step 5: Connect to the cluster and run a Flink job” after the stack has been successfully created.

Prerequisites

For this walkthrough, you need the following:

  • An AWS account.
  • An S3 bucket for storing the bootstrap script.
  • A VPC created in Amazon Virtual Private Cloud (Amazon VPC), where your EMR cluster will be launched.
    • Default IAM service roles for Amazon EMR permissions to AWS services and resources. You can create these roles with the aws emr create-default-roles AWS CLI command.
  • An optional EC2 key pair, if you plan to connect to your cluster through SSH rather than Systems Manager Session Manager.

Step 1: Create the IAM instance profile role

This solution uses the default EMR service role, but creates a custom instance profile role. I recommend a custom instance profile role because you will attach nonstandard IAM policies to the role, and you might not want those policies applied to other EMR clusters you create with the default instance profile role.

For more information, see Creating IAM roles in the IAM User Guide and Configure IAM Service Roles for Amazon EMR Permissions to AWS Services and Resources in the Amazon EMR Management Guide.

To create the IAM instance profile role

  1. In the IAM console, from the navigation pane, choose Roles, and then choose Create role.
  2. For the type of trusted entity, choose AWS service. For the use case, choose EC2 and then choose Next.
  3. Filter and attach the AmazonElasticMapReduceforEC2Role
  4. Filter and attach the AmazonSSMManagedInstanceCore
  5. Filter and attach the CloudWatchAgentServerPolicy
  6. Choose Next and then choose Next
  7. Enter a name for the role, and then choose Create role.

Step 2: Create the Systems Manager parameter

This configuration sets a CloudWatch Metrics custom namespace (FlinkSystem) and configures a StatsD interface on the host at port 8125. For more information, see Publishing Custom Metrics, Retrieve Custom Metrics with StatsD, and Manually Create or Edit the CloudWatch Agent Configuration File in the Amazon CloudWatch User Guide and AWS Systems Manager Parameter Store in the AWS Systems Manager User Guide.

To create the Systems Manager parameter

  1. In the Systems Manager console, from the navigation pane, choose Parameter Store, and then choose Create Parameter.
  2. For Name, enter AmazonCloudWatch-Config.json.
  3. For Value, use the configuration JSON:
{
    "agent": {
        "logfile": "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log",
        "debug": false,
        "run_as_user": "cwagent"
    },
    "metrics": {
        "namespace": "FlinkSystem",
        "metrics_collected": {
            "statsd": {
                "service_address": ":8125",
                "metrics_collection_interval": 60,
                "metrics_aggregation_interval": 300
            }
        },
        "aggregation_dimensions": [["InstanceId"],[]]
    }
}
  1. Choose Create Parameter.

Step 3: Create and upload the bootstrap script to an S3 bucket

For more information, see Uploading objects in the Amazon S3 User Guide and Installing and Running the CloudWatch Agent on Your Servers in the Amazon CloudWatch User Guide.

To create and the upload the bootstrap script

  1. Create a local file named bootstrap_cloudwatch_agent.sh with the following content:
#!/bin/bash

echo -e 'Installing CloudWatch Agent... \n'
sudo rpm -Uvh --force https://s3.amazonaws.com/amazoncloudwatch-agent/amazon_linux/amd64/latest/amazon-cloudwatch-agent.rpm

echo -e 'Starting CloudWatch Agent... \n'
sudo amazon-cloudwatch-agent-ctl -a fetch-config -m ec2 -c ssm:AmazonCloudWatch-Config.json -s
  1. In the Amazon S3 console, choose your S3 bucket.
  2. On the Objects tab, choose Upload.
  3. Choose Add files, and then choose the bootstrap script.
  4. Choose Upload, and then choose Exit.

Step 4: Create the EMR cluster

For more information about EMR, see Create Bootstrap Actions to Install Additional Software and Work with Steps Using the AWS CLI and Console in the Amazon EMR Management Guide. For more information about Flink, see Creating a Cluster with Flink and Configuring Flink in the Amazon EMR Release Guide.

Configure your cluster with the following properties:

Classification Property Value Reason
hadoop-env JAVA_HOME /usr/lib/jvm/java-11-amazon-corretto.x86_64 Set the Java version
flink-conf jobmanager.web.address 0.0.0.0 Configure web servers
flink-conf rest.address 0.0.0.0
flink-conf jobmanager.heap.size 1024m Set memory allocation
flink-conf taskmanager.memory.process.size 1728m
flink-conf metrics.reporter.stsd.port 8125 Set up the StatsD reporter
flink-conf metrics.reporter.stsd.host localhost
flink-conf metrics.reporters stsd
flink-conf metrics.reporter.stsd.class org.apache.flink.metrics.statsd.StatsDReporter
flink-conf metrics.scope.jm jobmanager Simplify system scope metric names
flink-conf metrics.scope.jm.job jobmanager.<job_id>
flink-conf metrics.scope.tm taskmanager.<tm_id>
flink-conf metrics.scope.tm.job taskmanager.<tm_id>.<job_id>
flink-conf metrics.scope.task taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
flink-conf metrics.scope.operator taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>

I recommend overriding the metrics scope properties because, by default, the reporter prefixes metric names with the internal host (for example, ip-10-0-34-0_us-west-2_compute_internal). These properties drop the host part, making the metric name more readable.

What if you need to filter your CloudWatch metrics by the host they came from? The host is included in the CloudWatch metric as a dimension, so you can filter or search on it. For more information, see Dimensions in the Amazon CloudWatch User Guide and System Metrics in the Flink documentation.

You will need to add a bootstrap action that installs and configures the CloudWatch agent and a cluster step to download the Flink StatsD reporter, which I demonstrate using Maven repository.

Finally, you will need a cluster step that starts the Flink cluster, using a command like:

flink-yarn-session -d -n 2

This command uses the flink-yarn-session wrapper over yarn-session.sh to start a Flink cluster in a detached state with two task managers. Change these values as appropriate for your cluster configuration.

For more information, see Working with Flink Jobs in Amazon EMR in the Amazon EMR Release Guide.

To create the cluster

  1. In the Amazon EMR console, choose Create cluster, and then choose Go to advanced options.
  2. For Software Configuration, choose release 5.32.0.
  3. Install the Flink 1.11.2 application.

To configure the cluster properties

  1. In Edit software settings, choose Enter configuration, and then enter the configuration in JSON format:
[
  {
    "classification": "hadoop-env",
    "properties": {},
    "configurations": [
      {
        "classification": "export",
        "properties": {
          "JAVA_HOME": "/usr/lib/jvm/java-11-amazon-corretto.x86_64"
        },
        "configurations": []
      }
    ]
  },
  {
    "classification": "flink-conf",
    "properties": {
      "jobmanager.web.address": "0.0.0.0",
      "rest.address": "0.0.0.0",
      "jobmanager.heap.size": "1024m",
      "taskmanager.memory.process.size": "1728m",
      "metrics.reporter.stsd.port": "8125",
      "metrics.reporter.stsd.host": "localhost",
      "metrics.reporters": "stsd",
      "metrics.reporter.stsd.class": "org.apache.flink.metrics.statsd.StatsDReporter",
      "metrics.scope.jm": "jobmanager",
      "metrics.scope.jm.job": "jobmanager.<job_id>",
      "metrics.scope.tm": "taskmanager.<tm_id>",
      "metrics.scope.tm.job": "taskmanager.<tm_id>.<job_id>",
      "metrics.scope.task": "taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>",
      "metrics.scope.operator": "taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"
    },
    "configurations": []
  }
]

To add a step to install the metrics reporter library

  1. Under Steps, change the value of Step type to Custom JAR, and then choose Add Step.
  2. For Name, enter AddLibraries.
  3. For JAR location, enter command-runner.jar.
  4. For Arguments, use:
sudo
wget
https://repo1.maven.org/maven2/org/apache/flink/flink-metrics-statsd/1.11.2/flink-metrics-statsd-1.11.2.jar
-P
/usr/lib/flink/lib/
  1. Choose Add.

To add a step to install the metrics reporter library

  1. Under Steps, choose Add Step.
  2. For Name, enter FlinkStart.
  3. For JAR location, enter command-runner.jar.
  4. For Arguments, use:
bash
-c
"flink-yarn-session -d -n 2"
  1. Choose Next.

To choose cluster hardware settings

  1. Choose the network and EC2 subnet to launch your cluster in.
  2. Choose your desired cluster node instance types and number of instances.
  3. Choose Next.

To choose general settings for the cluster

  1. For Cluster name, enter Demo Flink Metrics.
  2. In Tag, enter DemoFlinkMetrics.
  3. For bootstrap action type, choose Custom, and then choose Configure.
  4. For Script location, enter the S3 path of your uploaded bootstrap script.
  5. Choose Next.

To choose cluster security settings and launch

  1. If you plan to connect to your cluster through SSH rather than Systems Manager Session Manager, for EC2 key pair, choose your key pair.
  2. For Permissions, choose Custom.
  3. For EC2 instance profile, use the custom IAM role you created earlier.
  4. Choose Create cluster.

Step 5: Connect to the cluster and run a Flink job

You have several options for connecting to your cluster to run commands. If your cluster is configured with an EC2 key, you can connect with SSH. If your cluster’s IAM instance profile role has the AmazonSSMManagedInstanceCore policy attached, you can connect through the AWS CLI Session Manager plugin or through the Systems Manager console. For more information, see AWS Systems Manager Session Manager in the AWS Systems Manager User Guide.

To find the master node instance ID

  1. In the Amazon EMR console, from the navigation pane, choose Clusters.
  2. Choose the Demo Flink Metrics cluster.
  3. On the Hardware tab, choose the ID of the instance group where Node type & name is MASTER Management Group.
  4. Make a note of the EC2 instance ID (for example, i-00123456789abcdef).

To connect to the cluster and run a Flink job

  1. In the Systems Manager console, from the navigation pane, choose Session Manager.
  2. On the Sessions tab, choose Start session.
  3. Choose the instance whose instance ID matches the EC2 instance ID of your master node.
  4. Choose Start session.
  5. On the Session Manager remote shell, run the following command:
sudo flink run --jobmanager yarn-cluster --detached /usr/lib/flink/examples/streaming/StateMachineExample.jar

Remote shell output of starting the Flink example job, StateMachineExample. The output shows the job was submitted with job ID 93f3d3bf6dbe6d4caa792a07c379ff9b.

Figure 2: Remote shell output of starting the Flink example job, StateMachineExample

Step 6: Monitor the metrics in CloudWatch

To monitor the metrics in CloudWatch

  1. In the CloudWatch console, from the navigation pane, choose Metrics.
  2. Choose the FlinkSystem custom namespace.
  3. Choose host, and then choose metric_type.
  4. If needed, search for your job ID, which is part of the metric name. It might take several minutes after your Flink job has started for the metrics to start showing up.
  5. Choose and graph metrics of interest.

The following example shows a CloudWatch graph of Flink uptime versus downtime.

CloudWatch graph of job manager uptime versus downtime for job ID 93f3d3bf6dbe6d4caa792a07c379ff9b. The uptime is steadily increasing. The downtime remains flat at zero.

Figure 3: CloudWatch graph of job manager uptime vs. downtime

The following example shows a CloudWatch graph of Flink checkpoint duration.

CloudWatch graph of Flink checkpoint duration for job ID 93f3d3bf6dbe6d4caa792a07c379ff9b. There are five spikes in an otherwise flat signal.

Figure 4: CloudWatch graph of Flink checkpoint duration

Cleaning up

To avoid future charges in your account, delete the resources you created in this walkthrough. The EMR cluster will incur charges as long as the cluster is active, so terminate it after you’re done.

If you launched the example CloudFormation template

  1. In the CloudFormation console, from the navigation pane, choose Stacks.
  2. Choose the stack you launched (EMR-CloudWatch-StatsD-Demo), and then choose Delete.

If you manually created the resources

  1. In the Amazon EMR console, from the navigation pane, choose Clusters. Choose the cluster you created, and then choose Terminate.
  2. In the Systems Manager console, from the navigation pane, choose Parameter Store. Choose AmazonCloudWatch-Config.json, and then choose Delete.
  3. In the IAM console, from the navigation pane, choose Roles. Choose the role you created, and then choose Delete role.

Conclusion

Now that you have completed the steps in this walkthrough, you have the CloudWatch agent running on your cluster hosts and configured to push metrics to CloudWatch through local StatsD interfaces. Your Flink application is configured to report metrics to those local interfaces. You can view and use those Flink metrics in CloudWatch.

You can package and deploy this solution through a CloudFormation template like this example template, which creates the IAM instance profile role, Systems Manager parameter, and EMR cluster.

If you have difficulties getting your metrics to show up in CloudWatch, start your troubleshooting with the Flink application logs (aggregated with yarn logs -applicationId <APPLICATION_ID> command) and the CloudWatch agent logs (located in /opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log on each host).

To take this further, consider using these metrics in CloudWatch Alarms for alerts on threshold breaches or as part of an anomaly detection. You could collect them with other alarms into a composite alarm or configure alarm actions such as sending Amazon Simple Notification Service (Amazon SNS) notifications to trigger event-driven processes such as AWS Lambda functions.

When you know how you want to use these metrics and alarms, use the information in Manually Create or Edit the CloudWatch Agent Configuration File and the Flink metrics documentation to fine-tune this solution to capture exactly what you need. Finally, before you process real workloads, secure your cluster with these best practices for securing Amazon EMR.

About the author

Josh Haycraft

Josh Haycraft

Josh Haycraft is a Senior Software Engineer at Amazon. He builds “batteries-included” security and governance tools for developers to deliver services that are secure by default. Outside of work, he enjoys cooperative board gaming, superhero fiction, and discovering the great National Parks.