Tag: Amazon EMR
You have come up with an exciting hypothesis, and now you are keen to find and analyze as much data as possible to prove (or refute) it. There are many datasets that might be applicable, but they have been created at different times by different people and don’t conform to any common standard. They use different names for variables that mean the same thing and the same names for variables that mean different things. They use different units of measurement and different categories. Some have more variables than others. And they all have data quality issues (for example, badly formed dates and times, invalid geographic coordinates, and so on).
You first need a way to harmonize these datasets, to identify the variables that mean the same thing and make sure that these variables have the same names and units. You also need to clean up or remove records with invalid data.
After the datasets are harmonized, you need to search through the data to find the datasets you’re interested in. Not all of them have records that are relevant to your hypothesis, so you want to filter on a number of important variables to narrow down the datasets and verify they contain enough matching records to be significant.
Having identified the datasets of interest, you are ready to run your custom analyses on the data they contain so that you can prove your hypothesis and create beautiful visualizations to share with the world!
In this blog post, we will describe a sample application that illustrates how to solve these problems. You can install our sample app, which will:
- Harmonize and index three disparate datasets to make them searchable.
- Present a data-driven, customizable UI for searching the datasets to do preliminary analysis and to locate relevant datasets.
- Integrate with Amazon Athena and Amazon QuickSight for custom analysis and visualization.
The Police Data Initiative seeks to improve community and law enforcement relations through the public availability of data related to police activity. Datasets from participating cities, available through the Public Safety Open Data Portal, have many of the problems just outlined. Despite the commonality of crime and location metadata, there is no standard naming or value scheme. Datasets are stored in various locations and in various formats. There is no central search and discovery engine. To gain insights and value from this data, you have to analyze datasets city by city.
Although the focus of this post is police incident data, the same approach can be used for datasets in other domains, such as IoT, personalized medicine, news, weather, finance, and much more.
Our architecture uses the following AWS services:
- Amazon EMR (with Apache Spark and Jupyter notebooks) to explore, clean, harmonize (transform), describe, and save multiple, loosely coupled datasets.
- Amazon S3 to store both raw and harmonized datasets.
- Amazon Elasticsearch Service (Amazon ES) to host secure, searchable indexes of selected dataset variables and the associated dictionary/metadata used to power the search web page.
- Amazon EC2 Container Service (ECS) to host a web-based search UI.
- Amazon Athena and Amazon QuickSight to provide analysis and reporting.
- AWS CodeBuild and AWS CodePipeline to build and deliver the search UI application on ECS.
- AWS Identity and Access Management (IAM) policies and instance roles allow least-privilege access to Amazon ES from the UI containers and from the EMR cluster.
- AWS CloudFormation to orchestrate the provisioning of the environment.
The diagram below illustrates the solution architecture:
In the last few years, there has been a rapid rise in enterprises adopting the Apache Hadoop ecosystem for critical workloads that process sensitive or highly confidential data. Due to the highly critical nature of the workloads, the enterprises implement certain organization/industry wide policies and certain regulatory or compliance policies. Such policy requirements are designed to protect sensitive data from unauthorized access.
A common requirement within such policies is about encrypting data at-rest and in-flight. Amazon EMR uses “security configurations” to make it easy to specify the encryption keys and certificates, ranging from AWS Key Management Service to supplying your own custom encryption materials provider.
You create a security configuration that specifies encryption settings and then use the configuration when you create a cluster. This makes it easy to build the security configuration one time and use it for any number of clusters.
In this post, I go through the process of setting up the encryption of data at multiple levels using security configurations with EMR. Before I dive deep into encryption, here are the different phases where data needs to be encrypted.
As anyone visiting their doctor may have noticed, gone are the days of physicians recording their notes on paper. Physicians are more likely to enter the exam room with a laptop than with paper and pen. This change is the byproduct of efforts to improve patient outcomes, increase efficiency, and drive population health. Pushing for these improvements has created many new data opportunities as well as challenges. Using a combination of AWS services and open source software, we can use these new datasets to work towards these goals and beyond.
When you get a physical examination, your doctor’s office has an electronic chart with information about your demographics (name, date of birth, address, etc.), healthcare history, and current visit. When you go to the hospital for an emergency, a whole new record is created that may contain duplicate or conflicting information. A simple example would be that my primary care doctor lists me as Joe whereas the hospital lists me as Joseph.
Providers record patient information across different software platforms. Each of these platforms can have varying implementations of complex healthcare data standards. Also, each system needs to communicate with a central repository called a health information exchange (HIE) to build a central, complete clinical record for each patient.
In this post, I demonstrate the capability to consume different data types as messages, transform the information within the messages, and then use AWS service to take action depending on the message type.
Overview of Mirth Connect
Using open source technologies on AWS, you can build a system that transforms, stores, and processes this data as needed. The system can scale to meet the ever-increasing demands of modern medicine. The project that ingests and processes this data is called Mirth Connect.
Mirth Connect is an open source, cross-platform, bidirectional, healthcare integration engine. This project is a standalone server that functions as a central point for routing and processing healthcare information.
Running Mirth Connect on AWS provides the necessary scalability and elasticity to meet the current and future needs of healthcare organizations.
Healthcare data hub walkthrough
Healthcare information comes from various sources and can generate large amounts of data:
- Health information exchange (HIE)
- Electronic health records system (EHR)
- Practice management system (PMS)
- Insurance company systems
- Pharmacy systems
- Other source systems that can make data accessible
Messages typically require some form of modification (transformation) to accommodate ingestion and processing in other systems. Using another project, Blue Button, you can dissect large healthcare messages and locate the sections/items of interest. You can also convert those messages into other formats for storage and analysis.
The examples in this post focus on the following data types representing information made available from a typical healthcare organization:
- HL7 (Health Level Seven) version 2 messages
- CDA (Clinical Data Architecture)/CDD (Continuity of Care Document)
- DICOM (Digital Imaging and Communications in Medicine)
- CSV (comma-separated variable)
HL7 version 2 messages define both a message format and communication protocol for health information. They are broken into different message types depending on the information that they transmit.
There are many message types available, such as ordering labs, prescription dispensing, billing, and more. During a routine doctor visit, numerous messages are created for each patient. This provides a lot of information but also a challenge in storage and processing. For a full list of message types, see Data Definition Tables, section A6. The two types used for this post are:
- ADT A01 (patient admission and visit notification)
- SIU S12 (new appointment booking)
As you can see, this text is formatted as delimited data, where the delimiters are defined in the top line message called the MSG segment. Mirth Connect can parse these messages and communicate using the standard HL7 network protocol.
The typical progression for creating and using a trained model for recommendations falls into two general areas: training the model and hosting the model. Model training has become a well-known standard practice. We want to highlight one of many ways to host those recommendations (for example, see the Analyzing Genomics Data at Scale using R, AWS Lambda, and Amazon API Gateway post).
In this post, we look at one possible way to host a trained ALS model on Amazon EMR using Apache Spark to serve movie predictions in real time. It is a continuation of two recent posts that are prerequisite:
- Building a Recommendation Engine with Spark ML on Amazon EMR using Zeppelin
- Installing and Running JobServer for Apache Spark on Amazon EMR
In future posts we will cover other alternatives for serving real-time machine-learning predictions, namely AWS Lambda and Amazon EC2 Container Service, by running the prediction functions locally and loading the saved models from S3 to the local execution environments.
Walkthrough: Trained ALS model
For this walkthrough, you use the MovieLens dataset as set forth in the Building a Recommendation Engine post; the data model should have already been generated and persisted to Amazon S3. It uses the Alternating Least Squares (ALS) algorithm to train the data for generating the proper model.
Using JobServer, you take that model and persist it in memory in JobServer on Amazon EMR. After it’s persisted, you can expose RESTful endpoints to AWS Lambda, which in turn can be invoked from a static UI page hosted on S3, securing access with Amazon Cognito.
Here are the steps that you follow:
- Create the infrastructure, including EMR with JobServer and Lambda.
- Load the trained model into Spark on EMR via JobServer.
- Stage a static HTML page on S3.
- Access the AWS Lambda endpoints via the static HTML page authenticated with Amazon Cognito.
The following diagram shows the infrastructure architecture.
Jonathan Fritz is a Senior Product Manager for Amazon EMR
Customers can take advantage of the Amazon EMR API to create and terminate EMR clusters, scale clusters using Auto Scaling or manual resizing, and submit and run Apache Spark, Apache Hive, or Apache Pig workloads. These decisions are often triggered from cluster state-related information.
Previously, you could use the “describe” and “list” set of API operations to find the relevant information about your EMR clusters and associated instance groups, steps, and Auto Scaling policies. However, programmatic applications that check resource state changes and post notifications or take actions are forced to poll these API operations, which provides a slower end-to-end reaction time and additional management overhead than if you were able to use an event-driven architecture.
With new support for Amazon EMR in Amazon CloudWatch Events, you can be notified quickly and programmatically respond to state changes in your EMR clusters. Additionally, these events are also displayed in the Amazon EMR console, on the Cluster Details page in the Events section.
There are four new EMR event types:
- Cluster State Change
- Instance Group State Change
- Step State Change
- Auto Scaling State Change
CloudWatch Events allows you to create filters and rules to match these events and route them to Amazon SNS topics, AWS Lambda functions, Amazon SQS queues, streams in Amazon Kinesis Streams, or built-in targets. You then have the ability to programmatically act on these events, including sending emails and SMS messages, running retry logic in Lambda, or tracking the state of running steps. For more information about the sample events generated for each event type, see the CloudWatch Events documentation.
The following is an example using the CloudWatch Events console to route EMR step failure events to Lambda for automated retry logic and to SNS to push a notification to an email alias:
Miguel Tormo is a Big Data Support Engineer in AWS Premium Support
Amazon EMR provides a managed Hadoop framework that makes it easy, fast, and cost-effective to process vast amounts of data across dynamically scalable Amazon EC2 instances. Amazon EMR defines three types of nodes: master node, core nodes, and task nodes.
It’s common to run commands on each node by using SSH agent forwarding and running a loop on the master node to connect through SSH to every core or task node. However, there are cases in which you might want to run commands on select nodes only (for example, to generate a report on a particular instance type). For this reason, it helps to have an alternative approach for automating command execution on Amazon EMR clusters.
SaltStack is an open source project for automation and configuration management. It started as a remote execution engine designed to scale to many machines while delivering high-speed execution. Saltstack uses its own protocol, which is based on the ZeroMQ library.
SaltStack bootstrap action
You can use the new bootstrap action that installs SaltStack on Amazon EMR. It provides a basic configuration that enables selective targeting of the nodes based on instance roles, instance groups, and other parameters. Even if an instance group gets resized, each new node will execute the bootstrap action that installs SaltStack and registers the node with the master.
After your Amazon EMR cluster is up and running, and SaltStack is successfully deployed, you can now use the SaltStack CLI to configure and run commands on your cluster nodes.
Here are some examples of salt commands:
To check connectivity to all registered nodes
sudo salt '*' test.ping
Varun Rao is a Big Data Architect for AWS Professional Services
Role-based access control (RBAC) is an important security requirement for multi-tenant Hadoop clusters. Enforcing this across always-on and transient clusters can be hard to set up and maintain.
Imagine an organization that has an RBAC matrix using Active Directory users and groups. They would like to manage it on a central security policy server and enforce it on all Hadoop clusters that are spun up on AWS. This policy server should also store access and audit information for compliance needs.
In this post, I provide the steps to enable authorization and audit for Amazon EMR clusters using Apache Ranger.
Apache Ranger is a framework to enable, monitor, and manage comprehensive data security across the Hadoop platform. Features include centralized security administration, fine-grained authorization across many Hadoop components (Hadoop, Hive, HBase, Storm, Knox, Solr, Kafka, and YARN) and central auditing. It uses agents to sync policies and users, and plugins that run within the same process as the Hadoop component, like NameNode and HiveServer2.
Using the setup in the following diagram, multiple EMR clusters can sync policies with a standalone security policy server. The idea is similar to a shared Hive metastore that can be used across EMR clusters.
In this walkthrough, three users—analyst1, analyst2, and admin1—are set up for the initial authorization, as shown in the following diagram. Using the Ranger Admin UI, I show how to modify these access permissions. These changes are propagated to the EMR cluster and validated through Hue. (more…)
Low-Latency Access on Trillions of Records: FINRA’s Architecture Using Apache HBase on Amazon EMR with Amazon S3
John Hitchingham is Director of Performance Engineering at FINRA
The Financial Industry Regulatory Authority (FINRA) is a private sector regulator responsible for analyzing 99% of the equities and 65% of the option activity in the US. In order to look for fraud, market manipulation, insider trading, and abuse, FINRA’s technology group has developed a robust set of big data tools in the AWS Cloud to support these activities.
One particular application, which requires low-latency retrieval of items from a data set that contains trillions of records, enables FINRA analysts to investigate particular sets of related trade activity. FINRA’s new architecture for this application, Apache HBase on Amazon EMR using Amazon S3 for data storage, has resulted in cost savings of over 60%, drastically reduced the time for recovery or upgrades, and alleviated resource contention.
Original application architecture
Early in the 2 ½ year migration of FINRA’s Market Regulation Portfolio to the AWS Cloud, FINRA developed a system on AWS to replace an on-premises solution that allowed analysts to query this trade activity. This solution provided fast random access across trillions of trade records, which would quickly grow to over 700 TB of data.
FINRA selected Apache HBase, which is optimized for random access over large data sets, to store and serve this data. Our initial Apache HBase cluster used a commercial Hadoop distribution running on Amazon EC2 with data stored in on-cluster HDFS. To hold 700 TB of compressed data with 3x replication for HDFS, we required over 2 PB of storage on the cluster with 60 hs1.8xlarge instances. We updated our HBase table after the market close each day using the HBase bulk load API operation, which leverages Apache Hadoop MapReduce. This provided a simple, performant way to load billions of records each night.
FINRA’s analysts were thrilled with the performance–queries that took minutes and hours to run on the old on-premises system now returned in sub-seconds to minutes with Apache HBase. However, there were several operational challenges with our new system:
- Disaster recovery: Because of the data size (700+ TB), it would take us days to move this data and restore our cluster in the event of a failure. This would also apply if we needed to restore our cluster in another Availability Zone, in the event of problems in a single zone.
- Resource contention: Sometimes the batch load processing started late due to upstream data availability, which runs up against the window where users are executing their queries. This impacted query performance, and balancing these workflows on the same cluster proved challenging.
- Cluster maintenance: Upgrading Apache HBase and other components on the cluster was difficult. Creating new, parallel clusters sized for the data volume was cost prohibitive and doing rolling updates were operationally risky and time-consuming from rebalancing over 2 PB of HDFS blocks.
- Cost: Because we had to combine storage and compute by using on-cluster HDFS for our storage, we were paying for compute capacity that was nearly idle most of the time. It was just being used to store our data.
Decoupling storage and compute for HBase
Elsewhere in FINRA’s portfolio of analytic applications in AWS, we increasingly used an architecture that separated storage from compute. FINRA stores data on Amazon S3 for low cost, durable, scalable storage and uses Amazon EMR for scalable compute workloads using Hive, Presto, and Spark. With EMR and EMRFS, these engines can directly query data in S3 as if it were stored in HDFS. This has several advantages, including eliminating the need to load data into on-cluster HDFS, checkpointing processing to keep job state during possible Amazon EC2 Spot Instance loss, and having our data durable and available across zones with no extra management.
We wondered if it would be possible to leverage EMR to run HBase with storage on S3 instead of in HDFS. With the new support for using S3 as a storage layer with HBase on EMR, we were excited to work with AWS on evaluating this new architecture for our cluster. With this new configuration, HBase uses S3 to store table data and metadata, and still uses a small footprint in HDFS to store the HBase write-ahead log. In addition to the HBase Region Server in-memory cache, EMR configures the HBase bucket cache to cache data on the local disks of each node, giving faster read performance than directly accessing S3 for each request.
Jonathan Fritz is a Senior Product Manager for Amazon EMR
Customers running Apache Spark, Presto, and the Apache Hadoop ecosystem take advantage of Amazon EMR’s elasticity to save costs by terminating clusters after workflows are complete and resizing clusters with low-cost Amazon EC2 Spot Instances. For instance, customers can create clusters for daily ETL or machine learning jobs and shut them down when they complete, or scale out a Presto cluster serving BI analysts during business hours for ad hoc, low-latency SQL on Amazon S3.
With new support for Auto Scaling in Amazon EMR releases 4.x and 5.x, customers can now add (scale out) and remove (scale in) nodes on a cluster more easily. Scaling actions are triggered automatically by Amazon CloudWatch metrics provided by EMR at 5 minute intervals, including several YARN metrics related to memory utilization, applications pending, and HDFS utilization.
In EMR release 5.1.0, we introduced two new metrics, YARNMemoryAvailablePercentage and ContainerPendingRatio, which serve as useful cluster utilization metrics for scalable, YARN-based frameworks like Apache Spark, Apache Tez, and Apache Hadoop MapReduce. Additionally, customers can use custom CloudWatch metrics in their Auto Scaling policies.
The following is an example Auto Scaling policy on an instance group that scales 1 instance at a time to a maximum of 40 instances or a minimum of 10 instances. The instance group scales out when the memory available in YARN is less than 15%, and scales in when this metric is greater than 75%: Also, the instance group scales out when the ratio of pending YARN containers over allocated YARN containers is 0.75.
Craig Foster is a Big Data Engineer with Amazon EMR
Apache Flink is a parallel data processing engine that customers are using to build real time, big data applications. Flink enables you to perform transformations on many different data sources, such as Amazon Kinesis Streams or the Apache Cassandra database. It provides both batch and streaming APIs. Also, Flink has some SQL support for these stream and batch datasets. Most of Flink’s API actions are very similar to the transformations on distributed object collections found in Apache Hadoop or Apache Spark. Flink’s API is categorized into DataSets and DataStreams. DataSets are transformations on sets or collections of distributed data, while DataStreams are transformations on streaming data like those found in Amazon Kinesis.
Flink is a pure data streaming runtime engine, which means that it employs pipeline parallelism to perform operations on results of previous data transforms in real time. This means that multiple operations are performed concurrently. The Flink runtime handles exchanging data between these transformation pipelines. Also, while you may write a batch application, the same Flink streaming dataflow runtime implements it.
The Flink runtime consists of two different types of daemons: JobManagers, which are responsible for coordinating scheduling, checkpoint, and recovery functions, and TaskManagers, which are the worker processes that execute tasks and transfer data between streams in an application. Each application has one JobManager and at least one TaskManager.
You can scale the number of TaskManagers but also control parallelism further by using something called a “task slot.” In Flink-on-YARN, the JobManagers are co-located with the YARN ApplicationMaster, while each TaskManager is located in separate YARN containers allocated for the application.
Today we are making it even easier to run Flink on AWS as it is now natively supported in Amazon EMR 5.1.0. EMR supports running Flink-on-YARN so you can create either a long-running cluster that accepts multiple jobs or a short-running Flink session in a transient cluster that helps reduce your costs by only charging you for the time that you use.
You can also configure a cluster with Flink installed using the EMR configuration API with configuration classifications for logging and configuration parameters.