AWS Big Data Blog

Monitor and Optimize Analytic Workloads on Amazon EMR with Prometheus and Grafana

Amazon EMR is a big data service offered by AWS to run Apache Spark and other open-source applications on AWS in a cost-effective manner. Monitoring Amazon EMR clusters is essential to help detect critical issues with the applications or infrastructure in real time and identify root causes quickly. Viewing how the clusters are being used over time helps operations and engineering teams find potential performance bottlenecks and optimization opportunities to scale out or scale in their clusters and plan for capacity. In this post, we show how to integrate Prometheus, an open-source systems monitoring and alerting tool, and Grafana, an open-source visualization and analytics tool, to provide an end-to-end monitoring system for EMR clusters. Furthermore, we share an example that demonstrates how you can use Prometheus and Grafana to easily identify opportunities to optimize your EMR jobs to improve performance and reduce cost.

This post discusses the following:

  • Installing and configuring Prometheus and Grafana on an Amazon Elastic Compute Cloud (Amazon EC2) instance.
  • Configuring an EMR cluster to emit metrics that Prometheus can scrape from the cluster.
  • Using the Grafana dashboards to analyze the metrics for a workload on the EMR cluster and optimize it.
  • How Prometheus can push alerts to the Alertmanager, which is a component in Prometheus. The Alertmanager sends notifications to the alert_sns_forwarder component, which forwards the notifications to Amazon Simple Notification Service (Amazon SNS).
  • Configuring Amazon SNS to send email notifications.
  • A few considerations when bringing this monitoring system to production.

The following diagram illustrates the solution architecture.

Exporters are agents that gather metrics from the systems being monitored and provide endpoints for Prometheus to poll the metrics. Node_exporter and jmx_exporters collect metrics from the operating system and applications such as YARN and HDFS, respectively, on each node in the EMR cluster.

The Prometheus server polls the endpoints exposed by these exporters on each node to gather the metrics. These metrics are then stored locally on the Prometheus server. When a user opens the Grafana dashboards in their browser, the Grafana server queries the Prometheus server to generate the dashboards that are displayed in the browser.

You can set up alerts in Prometheus; when the alert thresholds are breached, the Prometheus server pushes alerts to the Alertmanager. The Alertmanager takes care of deduplicating, grouping, and routing the notifications to the receivers. Receivers that send events to PagerDuty, Slack, Opsgenie, and others, are natively supported by the Alertmanager.

Because there is no native integration of the Alertmanager with Amazon SNS, we use an implementation of the generic webhook receiver, alert_sns_forwarder, to transform and route the notification message to a pre-configured topic in Amazon SNS. You can subscribe to the topic to receive the alerts via email, SMS, HTTP/S, Amazon Simple Queue Service (Amazon SQS), or AWS Lambda.

Prerequisites

Before getting started, you must have the following prerequisites:

Deploying the solution

We provide the CloudFormation template in this post as a general guide. Please review and customize it as needed. You should also be aware that some of the resources deployed by this stack incur costs when they remain in use.

For purposes of this post, the resources are installed in a VPC with a public subnet. We recommend installing the resources in a private subnet where possible for production. In addition, we recommend enabling TLS connections and password authentication from Prometheus to node_exporter and jmx_exporter and for Grafana. To make it easier to troubleshoot the setup, the CloudFormation template includes setting up network ingress access to port 9090 so you can access the Prometheus UI remotely. You can remove this access if not needed.

The CloudFormation template contains several nested templates. Together, they do the following:

  1. Choose the VPC and subnet to deploy this solution.
  2. Create an EC2 instance with the instance type of your choosing.
  3. Download, install, and configure Prometheus as a service with the right scrape configuration to connect to the EMR cluster being monitored.
  4. Download, install, and configure alert_sns_forwarder as a service that transforms alert notifications from the AlertManager to Amazon SNS messages and publishes those messages to Amazon SNS.
  5. Download, install, and configure the Alertmanager as a service that forwards alert notifications from Prometheus server to alert_sns_forwarder.
  6. Set up a sample alert to send notification messages to Amazon SNS when disk space usage is over 90% on any of the nodes in the EMR cluster being monitored.
  7. Download, install, and configure Grafana as a service that connects to the Prometheus data source. The following dashboards are pre-installed to visualize various metrics on the EMR cluster being monitored:
    • OS Level Metrics – Select CPU, memory, and disk metrics exposed by the Amazon Linux operating system.
    • HDFS – DataNode Metrics – Select storage and network metrics exposed by the HDFS data node process.
    • HDFS – NameNode Metrics – Select storage and replication metrics exposed by the HDFS name node process.
    • YARN – Resource Manager – Select resource, application, and container metrics exposed by the YARN resource manager process.
    • YARN – Node Manager – Select resource and container metrics exposed by the YARN node manager process.
    • YARN – Queues – Select resource, application, and container metrics filtered by YARN queues.
    • JVM Metrics – Select memory and garbage collection metrics exposed by JVM of HDFS and YARN processes.
    • Log Metrics – Log fatals, errors, and warnings by the logger of HDFS and YARN processes.
    • RPC Metrics – Select RPC metrics exposed by HDFS and YARN processes.
  8. Create an Amazon EC2 security group. You can configure network access to inbound TCP ports 22 (SSH), Grafana (3000), and Prometheus UI (9090) with parameters in the CloudFormation template. This allows you to lock down access to the Prometheus and Grafana EC2 instance launched to known CIDR scopes and ports.
  9. Create an IAM instance profile that is used to associate with the EC2 instance with Prometheus and Grafana installed.
  10. Create an SNS topic and subscribe the email address provided as a parameter in the template to receive the notifications from Prometheus Alertmanager.
  11. Launch an EMR cluster with a bootstrap action script that does the following:
    • Download and set up node_exporter, which exposes OS metrics to Prometheus, as a service, on all nodes.
    • Download jmx_exporter, which is used by HDFS Name Node, HDFS Data Node, YARN Resource Manager, and YARN Node Manager processes on all nodes to expose application metrics to Prometheus.
    • Configure HDFS Name Node, HDFS Data Node, YARN Resource Manager, and YARN Node Manager processes on the cluster to launch with jmx_exporter as a Java agent.
  12. Create additional master and slave security groups of the EMR cluster to allow network ingress to ports 7001, 7005, and 9100 from the Prometheus server.

Launching the CloudFormation stack

To launch your stack and provision your resources, complete the following steps:

  1. Choose the following Launch Stack link:

This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the template on the AWS CloudFormation console as required. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters:

Parameter Description Default Value
Stack name A meaningful name for the stack; for example, emrPrometheusGrafana. None
Network Configuration
VPC The VPC where the EC2 instance and EMR cluster should be launched. None
Subnet The subnet where the EC2 instance and EMR cluster should be launched. None
Amazon EMR Configuration
EMRClusterName

Name of the EMR cluster.

 

emrPrometheusBlog
MasterInstanceType Instance type of the master node. m5.xlarge
CoreInstanceType

Instance type of the core node.

 

m5.xlarge
CoreInstanceCount

Number of core instances.

 

2

EMRSSHIPRange

 

The IP address range in CIDR notation (for example, <your ip address>/32) for SSHing to the master node of the EMR cluster. If you want to grant access to your local computer’s public IPv4 address, you can go to https://checkip.amazonaws.com/ or run curl https://checkip.amazonaws.com/ in the terminal. Rules with source of 0.0.0.0/0 allow all IP addresses to access your instance. We recommend setting security group rules to allow access from known IP addresses only. None

EMRKeyName

 

An existing EC2 key pair to enable SSH access to the master node of the EMR cluster. None
EMRReleaseLabel

The Amazon EMR release version.

 

emr-6.0.0
Amazon EC2 Configuration
InstanceType The EC2 instance type to install Prometheus, Alertmanager, alert_sns_forwarder, and Grafana services. Because the instance needs to host Prometheus server, which works as a time series database storing metrics data, an instance with at least 50 GB disk space would be advisable, depending on the usage. For this post, we choose t3.small. t3.small
KeyName An existing EC2 key pair to enable SSH access to the instance. None
SSHIPRange The IP address range in CIDR notation (for example, <your ip address>/32) for SSHing to the EC2 instances. To grant access to your local computer’s public IPv4 address, go to http://checkip.amazonaws.com/ or run curl http://checkip.amazonaws.com/ in the terminal. None

User Interfaces Network Ingress Access

GrafanaIPRange The IP address range in CIDR notation (for example, <your ip address>/32) for accessing the Grafana dashboards on port 3000. To grant access to your local computer’s public IPv4 address, go to http://checkip.amazonaws.com/ or run curl http://checkip.amazonaws.com/ in the terminal. None
PrometheusUIIPRange The IP address range in CIDR notation (for example, <your ip address>/32) for accessing the Prometheus UI on port 9090. To grant access to your local computer’s public IPv4 address, go to http://checkip.amazonaws.com/ or run curl http://checkip.amazonaws.com/ in the terminal. None
Alerts from Prometheus
EmailAddress The email address that you use to subscribe to alerts from Prometheus. You must confirm this subscription via an email message from Amazon SNS. None

 

  1. Enter the parameter values from the preceding table.
  2. Choose Next.
  3. On the next screen, enter any required tags, an IAM role, or any advanced options.
  4. Choose Next.
  5. Review the details on the final screen and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names or require CAPABILITY_AUTO_EXPAND.

  1. Choose Create.

Stack creation takes a few minutes. After the CloudFormation stack is created, on the Outputs tab, you can find the following three key-value pairs:

    • ClusterId – The ID of the EMR cluster created.
    • MasterPublicDnsName – The public DNS name of the master node the EMR cluster.
    • WebsiteURL – The URL for the newly created Grafana dashboard. The default login and password are both admin. You are prompted to change the password the first time you log in.

You should also receive an email from no-reply@sns.amazonaws.com asking you to confirm the subscription from Amazon SNS.

  1. Choose the Confirm subscription link in your email.

You have now subscribed your email to the SNS topic that alerts from Prometheus are published to.

Workload example

The following use case demonstrates how you can use the insights of Amazon EMR metrics from the Grafana dashboards to tune the performance of a Hadoop job running on the cluster.

The job for this use case is a simple WordCount that counts the number of words in the input files. You can download the source code for the WordCount program from sourcecode.zip. It is a basic MapReduce program. You use the Yelp business review dataset from Yelp Open Dataset. The original data is in JSON format. For this post, convert the same dataset to GZIP (2.4 GB in size) and BZIP2 (1.8 GB in size) formats. You run this WordCount job on the EMR cluster you launched earlier.

Start the job by adding a step to the cluster with the following code (replace <j-*************> with the cluster ID specified on the Outputs tab of the CloudFormation stack you created and also <s3://bucket-name/outputs-folder> with the Amazon Simple Storage Service (Amazon S3) location for the job output):

$ aws emr add-steps --cluster-id < j-*************>  \
--steps Type=CUSTOM_JAR,Name=WordCountJarGZIP,ActionOnFailure=CONTINUE,Jar=s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/jars/Wordcount-1.0-SNAPSHOT-job.jar,Args=com.amazonaws.support.training.emr.Wordcount,s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/datasets/YelpDataGzip/,<s3://bucket-name/outputs-folder>/outputGzip

You can cancel the step at any time by entering the following code (replace <j-*************> with the cluster ID and <s-*************> with the step ID):

 $ aws emr cancel-steps --cluster-id < j-*************>  \
--step-ids < s-*************> 

When the step is in RUNNING status, go to the Grafana dashboards for metrics insights. The following screenshot shows the YARD – Resource Manager dashboard.

For a very long time during the execution of the job, the Container Stats metrics show only two containers (one ApplicationMaster container and one mapper container) were allocated, while no containers were pending. This is expected, because the input format for this job is GZIP, which isn’t splittable. Therefore, no matter how big the input file is, it only starts one mapper container because there is only one InputSplit. Container allocation doesn’t increase until the mapper stage finishes when the reducer stage starts.

VCores Utilization shows the same insight: it stayed below 50% for a long time until multiple reducer containers kicked in after the completion of the single mapper container.

The OS Level Metrics dashboard in the following screenshot also shows the resource utilization for CPU and memory was quite low during the mapper stage of this job, which took most of the process time.

For a cluster with one m5.xlarge master and two m5.xlarge core nodes, the job took 30 minutes to finish. Out of the 30 minutes, 28 minutes were used by the mapper stage.

Based on these insights from the dashboards, you can do some performance tuning.

For the same dataset, instead of using GZIP, use BZIP2 format for the input file. Submit the step with the following code (replace <j-*************> with the cluster ID specified on the Outputs tab of the CloudFormation stack and <s3://bucket-name/outputs-folder> with Amazon S3 location for the job output):

$ aws emr add-steps --cluster-id < j-*************>  \
--steps Type=CUSTOM_JAR,Name=WordCountJarGBZ2,ActionOnFailure=CONTINUE,Jar=s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/jars/Wordcount-1.0-SNAPSHOT-job.jar,Args=com.amazonaws.support.training.emr.Wordcount,s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/datasets/YelpDataBZ2/,<s3://bucket-name/outputs-folder>/outputBZ2_1

When the step is in RUNNING status, check the same dashboards.

The following screenshot shows the YARN – Resource Manager dashboard.

Container Stats shows that seven containers got allocated immediately when the job started. Additionally, the number of pending containers is significantly higher, at 46. You changed the input format to BZIP2 this time, which is a splittable compression format. As a result, multiple mapper containers were launched, each processing one InputSplit. This improved the parallelism at the mapper stage.

VCores Utilization also shows that 100% of the VCores were used during the peak of the job process.

The OS Level Metrics dashboard in the following screenshot shows the resource utilization also increased during this run of the job.

the job took 11 minutes to finish, which is a 63% performance improvement compared to the previous run.

Can the job run even faster? Based on what you have found from your dashboards, yes. At some point during the second run, 46 pending containers were waiting to be allocated, and VCores Utilization is 100%, which means the cluster allocated all its container resources at capacity. You can resize the cluster by adding a task instance group with 10 m5.xlarge task nodes with the following code (replace <j-*************> with the cluster ID):

$ aws emr add-instance-groups --cluster-id < j-*************> \
--instance-groups InstanceCount=10,InstanceGroupType=task,InstanceType=m5.xlarge

The add-instance-groups command returns an output similar to the following code:

{
    "ClusterId": "j-*************",
    "InstanceGroupIds": [
        "ig-************"
    ]
}

Record the value of InstanceGroupIds; you use this later to set the nodes in the instance group to 0.

The resizing is reflected on the YARN -Resource Manager dashboard in the following screenshot.

You run the same job with the same BZIP2 formatted input dataset, using the following code:

$ aws emr add-steps --cluster-id < j-*************>  \
--steps Type=CUSTOM_JAR,Name=WordCountJarGBZ2,ActionOnFailure=CONTINUE,Jar=s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/jars/Wordcount-1.0-SNAPSHOT-job.jar,Args=com.amazonaws.support.training.emr.Wordcount,s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/datasets/YelpDataBZ2/,<s3://bucket-name/outputs-folder>/outputBZ2_2

This time, all the pending containers immediately got allocated when the job started, because the cluster has more containers in total because you added task nodes.

The job took 4 minutes to complete. This is a 64% performance improvement compared to the second run and 87% performance improvement compared to the first run.

To save cost, after the job is complete, you can scale down the cluster by reducing the task node number in the task instance group that you resized earlier to 0 with the following code (replace <ig-************> with the instance group ID you recorded earlier):

$ aws emr modify-instance-groups \
--instance-groups InstanceGroupId=<ig-************>,InstanceCount=0

Alert example

The CloudFormation stack you created set up an alert to monitor any nodes in the EMR cluster such that if the disk partitions are more than 90% full, an email alert is sent to the email address that you specified.

Follow these steps to write a large file to a disk partition on the master node in the cluster and wait for the system to fire off the email alert:

Connect to the master node in the EMR cluster via SSH with the key:

ssh -i <key-name>.pem hadoop@<ec2 instance public DNS name>

The public DNS name of the master node is specified on the Outputs tab of the CloudFormation stack you created.

  1. Show the current disk usage by entering the following code:
$ df -h /emr
Filesystem      Size  Used Avail Use% Mounted on
/dev/xvdb1      5.0G   56M  5.0G   2% /emr
  1. Create a file sized 4.5 GB so that the disk usage goes above 90%:
$ fallocate -l 4.5G /emr/test.img
  1. Check the disk usage again to confirm that the disk usage is now above 90%. See the following code:
$ df -h /emr
Filesystem      Size  Used Avail Use% Mounted on
/dev/xvdb1      5.0G  4.6G  459M  92% /emr

You can also view the disk usage information from the Grafana OS Level Metrics dashboard. In the following screenshot, the /emr partition shows 91.1% used on the Disk Space Used panel.

You should now expect an email with the subject “Prometheus Alert” from no-reply@sns.amazonaws.com in your inbox.

Considerations for production

Prometheus stores the metrics locally on a time series database that is included in the installation. Planning for disk capacity, disk availability, and snapshots for backup are recommended for improving durability.

By default, the metrics are stored for 15 days. You can configure this retention period with the --storage.tsdb.retention.time command line flag. For solutions that provide remote long-term storage and in some cases high availability, see Remote Endpoints and Storage. The Alertmanager supports creating a cluster for high availability. For more information, see High Availability on the GitHub repo.

Another consideration is how to get notified when the Prometheus monitoring system stops running. The Prometheus server and Alertmanager expose their own metrics by default through localhost:9090/metrics and localhost:9093/metrics endpoints, respectively. These metrics can be scraped and published to Amazon CloudWatch, and you can set up CloudWatch alarms to trigger on missing data points for metrics. For more information, see Using Amazon CloudWatch Alarms.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack and output files in Amazon S3 that you created during the workload use case.

Conclusion

This post showed how you can set up a monitoring system based on Prometheus and Grafana to monitor an EMR cluster and use Grafana dashboards to view metrics to troubleshoot a performance issue. You can also set up alerts in Prometheus to notify you when critical issues arise, and you can view the dashboards to narrow down the root causes. You can extend this monitoring system to monitor multiple EMR clusters and other applications, which makes it a one-stop system for monitoring metrics across your infrastructure and applications.

 


About the authors

Derek Tan is a principal big data architect, covering Amazon EMR and Athena. Prior to working as a Big Data Architect, he was leading engineering teams in Amazon EMR and Amazon Redshift. During his free time, he enjoys traveling, reading, and spending time with his family.

 

 

 

 

Fei Lang is a senior big data architect at AWS. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.