Tag: Amazon EMR
This post has been translated into Japanese.
In today’s business environments, data is generated in a continuous fashion by a steadily increasing number of diverse data sources. Therefore, the ability to continuously capture, store, and process this data to quickly turn high-volume streams of raw data into actionable insights has become a substantial competitive advantage for organizations.
Apache Flink is an open source project that is well-suited to form the basis of such a stream processing pipeline. It offers unique capabilities that are tailored to the continuous analysis of streaming data. However, building and maintaining a pipeline based on Flink often requires considerable expertise, in addition to physical resources and operational efforts.
This post outlines a reference architecture for a consistent, scalable, and reliable stream processing pipeline that is based on Apache Flink using Amazon EMR, Amazon Kinesis, and Amazon Elasticsearch Service. An AWSLabs GitHub repository provides the artifacts that are required to explore the reference architecture in action. Resources include a producer application that ingests sample data into an Amazon Kinesis stream and a Flink program that analyses the data in real time and sends the result to Amazon ES for visualization.
Analyzing geospatial taxi data in real time
Consider a scenario related to optimizing taxi fleet operations. You obtain information continuously from a fleet of taxis currently operating in New York City. Using this data, you want to optimize the operations by analyzing the gathered data in real time and making data-based decisions.
You would like, for instance, to identify hot spots—areas that are currently in high demand for taxis—so that you can direct unoccupied taxis there. You also want to track current traffic conditions so that you can give approximate trip durations to customers, for example, for rides to the nearby airports. Naturally, your decisions should be based on information that closely reflects the current demand and traffic conditions. The incoming data needs to be analyzed in a continuous and timely fashion. Relevant KPIs and derived insights should be accessible to real-time dashboards.
For the purpose of this post, you emulate a stream of trip events by replaying a dataset of historic taxi trips collected in New York City into Amazon Kinesis Streams. The dataset is available from the New York City Taxi & Limousine Commission website. It contains information on the geolocation and collected fares of individual taxi trips.
In more realistic scenarios, you could leverage AWS IoT to collect the data from telemetry units installed in the taxis and then ingest the data into an Amazon Kinesis stream.
Architecture of a reliable and scalable stream processing pipeline
Because the pipeline serves as the central tool to operate and optimize the taxi fleet, it’s crucial to build an architecture that is tolerant against the failure of single nodes. The pipeline should adapt to changing rates of incoming events. Therefore, you should separate the ingestion of events, their actual processing, and the visualization of the gathered insights into different components. By loosely coupling these components of the infrastructure and using managed services, you can increase the robustness of the pipeline in case of failures. You can also scale the different parts of your infrastructure individually and reduce the efforts that are required to build and operate the entire pipeline.
Sometimes, data to be analyzed is spread across buckets owned by different accounts. In order to ensure data security, appropriate credentials management needs to be in place. This is especially true for large enterprises storing data in different Amazon S3 buckets for different departments. For example, a customer service department may need access to data owned by the research department, but the research department needs to provide that access in a secure manner.
This aspect of securing the data can become quite complicated. Amazon EMR uses an integrated mechanism to supply user credentials for access to data stored in S3. When you use an application (Hive, Spark, etc.) on EMR to read or write a file to or from an S3 bucket, the S3 API call needs to be signed by proper credentials to be authenticated.
Usually, these credentials are provided by the EC2 instance profile that you specify during cluster launch. What if the EC2 instance profile credentials are not enough to access an S3 object, because that object requires a different set of credentials?
This post shows how you can use a custom credentials provider to access S3 objects that cannot be accessed by the default credentials provider of EMRFS.
EMRFS and EC2 instance profiles
When an EMR cluster is launched, it needs an IAM role to be specified as the Amazon EC2 instance profile. An instance profile is a container that is used to pass the permissions contained in an IAM role when the EC2 instance is starting up. The IAM role essentially defines the permissions for anyone who assumes the role.
In the case of EMR, the IAM role contained in the instance profile has permissions to access other AWS services such as Amazon S3, Amazon CloudWatch, Amazon Kinesis, etc. This role obtains temporary credentials via the EC2 instance metadata service and provides them to the application that needs to access other AWS services.
For example, when a Hive application on EMR needs to read input data from an S3 bucket (where the S3 bucket path is specified by the s3:// URI), it invokes a default credentials provider function of EMRFS. The provider in turn obtains the temporary credentials from the EC2 instance profile and uses those credentials to sign the S3 GET request.
Custom credentials providers
In certain cases, the credentials obtained by the default credentials provider might not be enough to sign requests to an S3 bucket to which your IAM user does not have permissions to access. Maybe the bucket has a different owner, or restrictive bucket policies that allow access only to a specific IAM user or role.
In situations like this, you have other options that allow your IAM user to access the data. You could modify the S3 bucket policy to allow access to your IAM user but this might be a security risk. A better option is to implement a custom credentials provider for EMRFS to ensure that your S3 requests are signed by the correct credentials. A custom credentials provider ensures that only a configured EMR cluster has access to the data in S3. It provides much better control over who can access the data.
Configuring a custom credentials provider for EMRFS
Create a credentials provider by implementing both the AWSCredentialsProvider (from the AWS Java SDK) and the Hadoop Configurable classes for use with EMRFS when it makes calls to Amazon S3.
Each implementation of AWSCredentialsProvider can choose its own strategy for loading credentials depending on the use case. You can either load credentials using the AWS STS AssumeRole API action or from a Java properties file if you would like to make API calls using the credentials of a specific IAM user. Then, package your custom credentials provider in a JAR file, upload the JAR file to your EMR cluster, and specify the class name by setting fs.s3.customAWSCredentialsProvider in the emrfs-site configuration classification.
Suppose you would like to analyze data stored in an S3 bucket owned by the research department of your company which has its own AWS account. You can launch an EMR cluster in your account and leverage EMRFS to access data stored in the bucket owned by the research department.
For this example, the two accounts of your company are:
- Research: email@example.com (Account ID: 123456789012)
- Your department: firstname.lastname@example.org (Account ID: 111222333444)
You have come up with an exciting hypothesis, and now you are keen to find and analyze as much data as possible to prove (or refute) it. There are many datasets that might be applicable, but they have been created at different times by different people and don’t conform to any common standard. They use different names for variables that mean the same thing and the same names for variables that mean different things. They use different units of measurement and different categories. Some have more variables than others. And they all have data quality issues (for example, badly formed dates and times, invalid geographic coordinates, and so on).
You first need a way to harmonize these datasets, to identify the variables that mean the same thing and make sure that these variables have the same names and units. You also need to clean up or remove records with invalid data.
After the datasets are harmonized, you need to search through the data to find the datasets you’re interested in. Not all of them have records that are relevant to your hypothesis, so you want to filter on a number of important variables to narrow down the datasets and verify they contain enough matching records to be significant.
Having identified the datasets of interest, you are ready to run your custom analyses on the data they contain so that you can prove your hypothesis and create beautiful visualizations to share with the world!
In this blog post, we will describe a sample application that illustrates how to solve these problems. You can install our sample app, which will:
- Harmonize and index three disparate datasets to make them searchable.
- Present a data-driven, customizable UI for searching the datasets to do preliminary analysis and to locate relevant datasets.
- Integrate with Amazon Athena and Amazon QuickSight for custom analysis and visualization.
The Police Data Initiative seeks to improve community and law enforcement relations through the public availability of data related to police activity. Datasets from participating cities, available through the Public Safety Open Data Portal, have many of the problems just outlined. Despite the commonality of crime and location metadata, there is no standard naming or value scheme. Datasets are stored in various locations and in various formats. There is no central search and discovery engine. To gain insights and value from this data, you have to analyze datasets city by city.
Although the focus of this post is police incident data, the same approach can be used for datasets in other domains, such as IoT, personalized medicine, news, weather, finance, and much more.
Our architecture uses the following AWS services:
- Amazon EMR (with Apache Spark and Jupyter notebooks) to explore, clean, harmonize (transform), describe, and save multiple, loosely coupled datasets.
- Amazon S3 to store both raw and harmonized datasets.
- Amazon Elasticsearch Service (Amazon ES) to host secure, searchable indexes of selected dataset variables and the associated dictionary/metadata used to power the search web page.
- Amazon EC2 Container Service (ECS) to host a web-based search UI.
- Amazon Athena and Amazon QuickSight to provide analysis and reporting.
- AWS CodeBuild and AWS CodePipeline to build and deliver the search UI application on ECS.
- AWS Identity and Access Management (IAM) policies and instance roles allow least-privilege access to Amazon ES from the UI containers and from the EMR cluster.
- AWS CloudFormation to orchestrate the provisioning of the environment.
The diagram below illustrates the solution architecture:
In the last few years, there has been a rapid rise in enterprises adopting the Apache Hadoop ecosystem for critical workloads that process sensitive or highly confidential data. Due to the highly critical nature of the workloads, the enterprises implement certain organization/industry wide policies and certain regulatory or compliance policies. Such policy requirements are designed to protect sensitive data from unauthorized access.
A common requirement within such policies is about encrypting data at-rest and in-flight. Amazon EMR uses “security configurations” to make it easy to specify the encryption keys and certificates, ranging from AWS Key Management Service to supplying your own custom encryption materials provider.
You create a security configuration that specifies encryption settings and then use the configuration when you create a cluster. This makes it easy to build the security configuration one time and use it for any number of clusters.
In this post, I go through the process of setting up the encryption of data at multiple levels using security configurations with EMR. Before I dive deep into encryption, here are the different phases where data needs to be encrypted.
As anyone visiting their doctor may have noticed, gone are the days of physicians recording their notes on paper. Physicians are more likely to enter the exam room with a laptop than with paper and pen. This change is the byproduct of efforts to improve patient outcomes, increase efficiency, and drive population health. Pushing for these improvements has created many new data opportunities as well as challenges. Using a combination of AWS services and open source software, we can use these new datasets to work towards these goals and beyond.
When you get a physical examination, your doctor’s office has an electronic chart with information about your demographics (name, date of birth, address, etc.), healthcare history, and current visit. When you go to the hospital for an emergency, a whole new record is created that may contain duplicate or conflicting information. A simple example would be that my primary care doctor lists me as Joe whereas the hospital lists me as Joseph.
Providers record patient information across different software platforms. Each of these platforms can have varying implementations of complex healthcare data standards. Also, each system needs to communicate with a central repository called a health information exchange (HIE) to build a central, complete clinical record for each patient.
In this post, I demonstrate the capability to consume different data types as messages, transform the information within the messages, and then use AWS service to take action depending on the message type.
Overview of Mirth Connect
Using open source technologies on AWS, you can build a system that transforms, stores, and processes this data as needed. The system can scale to meet the ever-increasing demands of modern medicine. The project that ingests and processes this data is called Mirth Connect.
Mirth Connect is an open source, cross-platform, bidirectional, healthcare integration engine. This project is a standalone server that functions as a central point for routing and processing healthcare information.
Running Mirth Connect on AWS provides the necessary scalability and elasticity to meet the current and future needs of healthcare organizations.
Healthcare data hub walkthrough
Healthcare information comes from various sources and can generate large amounts of data:
- Health information exchange (HIE)
- Electronic health records system (EHR)
- Practice management system (PMS)
- Insurance company systems
- Pharmacy systems
- Other source systems that can make data accessible
Messages typically require some form of modification (transformation) to accommodate ingestion and processing in other systems. Using another project, Blue Button, you can dissect large healthcare messages and locate the sections/items of interest. You can also convert those messages into other formats for storage and analysis.
The examples in this post focus on the following data types representing information made available from a typical healthcare organization:
- HL7 (Health Level Seven) version 2 messages
- CDA (Clinical Data Architecture)/CDD (Continuity of Care Document)
- DICOM (Digital Imaging and Communications in Medicine)
- CSV (comma-separated variable)
HL7 version 2 messages define both a message format and communication protocol for health information. They are broken into different message types depending on the information that they transmit.
There are many message types available, such as ordering labs, prescription dispensing, billing, and more. During a routine doctor visit, numerous messages are created for each patient. This provides a lot of information but also a challenge in storage and processing. For a full list of message types, see Data Definition Tables, section A6. The two types used for this post are:
- ADT A01 (patient admission and visit notification)
- SIU S12 (new appointment booking)
As you can see, this text is formatted as delimited data, where the delimiters are defined in the top line message called the MSG segment. Mirth Connect can parse these messages and communicate using the standard HL7 network protocol.
The typical progression for creating and using a trained model for recommendations falls into two general areas: training the model and hosting the model. Model training has become a well-known standard practice. We want to highlight one of many ways to host those recommendations (for example, see the Analyzing Genomics Data at Scale using R, AWS Lambda, and Amazon API Gateway post).
In this post, we look at one possible way to host a trained ALS model on Amazon EMR using Apache Spark to serve movie predictions in real time. It is a continuation of two recent posts that are prerequisite:
- Building a Recommendation Engine with Spark ML on Amazon EMR using Zeppelin
- Installing and Running JobServer for Apache Spark on Amazon EMR
In future posts we will cover other alternatives for serving real-time machine-learning predictions, namely AWS Lambda and Amazon EC2 Container Service, by running the prediction functions locally and loading the saved models from S3 to the local execution environments.
Walkthrough: Trained ALS model
For this walkthrough, you use the MovieLens dataset as set forth in the Building a Recommendation Engine post; the data model should have already been generated and persisted to Amazon S3. It uses the Alternating Least Squares (ALS) algorithm to train the data for generating the proper model.
Using JobServer, you take that model and persist it in memory in JobServer on Amazon EMR. After it’s persisted, you can expose RESTful endpoints to AWS Lambda, which in turn can be invoked from a static UI page hosted on S3, securing access with Amazon Cognito.
Here are the steps that you follow:
- Create the infrastructure, including EMR with JobServer and Lambda.
- Load the trained model into Spark on EMR via JobServer.
- Stage a static HTML page on S3.
- Access the AWS Lambda endpoints via the static HTML page authenticated with Amazon Cognito.
The following diagram shows the infrastructure architecture.
Jonathan Fritz is a Senior Product Manager for Amazon EMR
Customers can take advantage of the Amazon EMR API to create and terminate EMR clusters, scale clusters using Auto Scaling or manual resizing, and submit and run Apache Spark, Apache Hive, or Apache Pig workloads. These decisions are often triggered from cluster state-related information.
Previously, you could use the “describe” and “list” set of API operations to find the relevant information about your EMR clusters and associated instance groups, steps, and Auto Scaling policies. However, programmatic applications that check resource state changes and post notifications or take actions are forced to poll these API operations, which provides a slower end-to-end reaction time and additional management overhead than if you were able to use an event-driven architecture.
With new support for Amazon EMR in Amazon CloudWatch Events, you can be notified quickly and programmatically respond to state changes in your EMR clusters. Additionally, these events are also displayed in the Amazon EMR console, on the Cluster Details page in the Events section.
There are four new EMR event types:
- Cluster State Change
- Instance Group State Change
- Step State Change
- Auto Scaling State Change
CloudWatch Events allows you to create filters and rules to match these events and route them to Amazon SNS topics, AWS Lambda functions, Amazon SQS queues, streams in Amazon Kinesis Streams, or built-in targets. You then have the ability to programmatically act on these events, including sending emails and SMS messages, running retry logic in Lambda, or tracking the state of running steps. For more information about the sample events generated for each event type, see the CloudWatch Events documentation.
The following is an example using the CloudWatch Events console to route EMR step failure events to Lambda for automated retry logic and to SNS to push a notification to an email alias:
Miguel Tormo is a Big Data Support Engineer in AWS Premium Support
Amazon EMR provides a managed Hadoop framework that makes it easy, fast, and cost-effective to process vast amounts of data across dynamically scalable Amazon EC2 instances. Amazon EMR defines three types of nodes: master node, core nodes, and task nodes.
It’s common to run commands on each node by using SSH agent forwarding and running a loop on the master node to connect through SSH to every core or task node. However, there are cases in which you might want to run commands on select nodes only (for example, to generate a report on a particular instance type). For this reason, it helps to have an alternative approach for automating command execution on Amazon EMR clusters.
SaltStack is an open source project for automation and configuration management. It started as a remote execution engine designed to scale to many machines while delivering high-speed execution. Saltstack uses its own protocol, which is based on the ZeroMQ library.
SaltStack bootstrap action
You can use the new bootstrap action that installs SaltStack on Amazon EMR. It provides a basic configuration that enables selective targeting of the nodes based on instance roles, instance groups, and other parameters. Even if an instance group gets resized, each new node will execute the bootstrap action that installs SaltStack and registers the node with the master.
After your Amazon EMR cluster is up and running, and SaltStack is successfully deployed, you can now use the SaltStack CLI to configure and run commands on your cluster nodes.
Here are some examples of salt commands:
To check connectivity to all registered nodes
sudo salt '*' test.ping
Varun Rao is a Big Data Architect for AWS Professional Services
Role-based access control (RBAC) is an important security requirement for multi-tenant Hadoop clusters. Enforcing this across always-on and transient clusters can be hard to set up and maintain.
Imagine an organization that has an RBAC matrix using Active Directory users and groups. They would like to manage it on a central security policy server and enforce it on all Hadoop clusters that are spun up on AWS. This policy server should also store access and audit information for compliance needs.
In this post, I provide the steps to enable authorization and audit for Amazon EMR clusters using Apache Ranger.
Apache Ranger is a framework to enable, monitor, and manage comprehensive data security across the Hadoop platform. Features include centralized security administration, fine-grained authorization across many Hadoop components (Hadoop, Hive, HBase, Storm, Knox, Solr, Kafka, and YARN) and central auditing. It uses agents to sync policies and users, and plugins that run within the same process as the Hadoop component, like NameNode and HiveServer2.
Using the setup in the following diagram, multiple EMR clusters can sync policies with a standalone security policy server. The idea is similar to a shared Hive metastore that can be used across EMR clusters.
In this walkthrough, three users—analyst1, analyst2, and admin1—are set up for the initial authorization, as shown in the following diagram. Using the Ranger Admin UI, I show how to modify these access permissions. These changes are propagated to the EMR cluster and validated through Hue. (more…)
Low-Latency Access on Trillions of Records: FINRA’s Architecture Using Apache HBase on Amazon EMR with Amazon S3
John Hitchingham is Director of Performance Engineering at FINRA
The Financial Industry Regulatory Authority (FINRA) is a private sector regulator responsible for analyzing 99% of the equities and 65% of the option activity in the US. In order to look for fraud, market manipulation, insider trading, and abuse, FINRA’s technology group has developed a robust set of big data tools in the AWS Cloud to support these activities.
One particular application, which requires low-latency retrieval of items from a data set that contains trillions of records, enables FINRA analysts to investigate particular sets of related trade activity. FINRA’s new architecture for this application, Apache HBase on Amazon EMR using Amazon S3 for data storage, has resulted in cost savings of over 60%, drastically reduced the time for recovery or upgrades, and alleviated resource contention.
Original application architecture
Early in the 2 ½ year migration of FINRA’s Market Regulation Portfolio to the AWS Cloud, FINRA developed a system on AWS to replace an on-premises solution that allowed analysts to query this trade activity. This solution provided fast random access across trillions of trade records, which would quickly grow to over 700 TB of data.
FINRA selected Apache HBase, which is optimized for random access over large data sets, to store and serve this data. Our initial Apache HBase cluster used a commercial Hadoop distribution running on Amazon EC2 with data stored in on-cluster HDFS. To hold 700 TB of compressed data with 3x replication for HDFS, we required over 2 PB of storage on the cluster with 60 hs1.8xlarge instances. We updated our HBase table after the market close each day using the HBase bulk load API operation, which leverages Apache Hadoop MapReduce. This provided a simple, performant way to load billions of records each night.
FINRA’s analysts were thrilled with the performance–queries that took minutes and hours to run on the old on-premises system now returned in sub-seconds to minutes with Apache HBase. However, there were several operational challenges with our new system:
- Disaster recovery: Because of the data size (700+ TB), it would take us days to move this data and restore our cluster in the event of a failure. This would also apply if we needed to restore our cluster in another Availability Zone, in the event of problems in a single zone.
- Resource contention: Sometimes the batch load processing started late due to upstream data availability, which runs up against the window where users are executing their queries. This impacted query performance, and balancing these workflows on the same cluster proved challenging.
- Cluster maintenance: Upgrading Apache HBase and other components on the cluster was difficult. Creating new, parallel clusters sized for the data volume was cost prohibitive and doing rolling updates were operationally risky and time-consuming from rebalancing over 2 PB of HDFS blocks.
- Cost: Because we had to combine storage and compute by using on-cluster HDFS for our storage, we were paying for compute capacity that was nearly idle most of the time. It was just being used to store our data.
Decoupling storage and compute for HBase
Elsewhere in FINRA’s portfolio of analytic applications in AWS, we increasingly used an architecture that separated storage from compute. FINRA stores data on Amazon S3 for low cost, durable, scalable storage and uses Amazon EMR for scalable compute workloads using Hive, Presto, and Spark. With EMR and EMRFS, these engines can directly query data in S3 as if it were stored in HDFS. This has several advantages, including eliminating the need to load data into on-cluster HDFS, checkpointing processing to keep job state during possible Amazon EC2 Spot Instance loss, and having our data durable and available across zones with no extra management.
We wondered if it would be possible to leverage EMR to run HBase with storage on S3 instead of in HDFS. With the new support for using S3 as a storage layer with HBase on EMR, we were excited to work with AWS on evaluating this new architecture for our cluster. With this new configuration, HBase uses S3 to store table data and metadata, and still uses a small footprint in HDFS to store the HBase write-ahead log. In addition to the HBase Region Server in-memory cache, EMR configures the HBase bucket cache to cache data on the local disks of each node, giving faster read performance than directly accessing S3 for each request.