Tag: Amazon EMR

Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3

by Illya Yalovyy | on | | Comments

Although it’s common for Amazon EMR customers to process data directly in Amazon S3, there are occasions where you might want to copy data from S3 to the Hadoop Distributed File System (HDFS) on your Amazon EMR cluster. Additionally, you might have a use case that requires moving large amounts of data between buckets or regions. In these use cases, large datasets are too big for a simple copy operation. Amazon EMR can help with this, and offers a utility – S3distCp – to help with moving data from S3 to other S3 locations or on-cluster HDFS.

In the Hadoop ecosystem, DistCp is often used to move data. DistCp provides a distributed copy capability built on top of a MapReduce framework. S3DistCp is an extension to DistCp that is optimized to work with S3 and that adds several useful features. In addition to moving data between HDFS and S3, S3DistCp is also a Swiss Army knife of file manipulations. In this post we’ll cover the following tips for using S3DistCp, starting with basic use cases and then moving to more advanced scenarios:

1. Copy or move files without transformation
2. Copy and change file compression on the fly
3. Copy files incrementally
4. Copy multiple folders in one job
5. Aggregate files based on a pattern
6. Upload files larger than 1 TB in size
7. Submit a S3DistCp step to an EMR cluster

1. Copy or move files without transformation

We’ve observed that customers often use S3DistCp to copy data from one storage location to another, whether S3 or HDFS. Syntax for this operation is simple and straightforward:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table

The source location may contain extra files that we don’t necessarily want to copy. Here, we can use filters based on regular expressions to do things such as copying files with the .log extension only.


Build a Healthcare Data Warehouse Using Amazon EMR, Amazon Redshift, AWS Lambda, and OMOP

by Ryan Hood | on | | Comments

In the healthcare field, data comes in all shapes and sizes. Despite efforts to standardize terminology, some concepts (e.g., blood glucose) are still often depicted in different ways. This post demonstrates how to convert an openly available dataset called MIMIC-III, which consists of de-identified medical data for about 40,000 patients, into an open source data model known as the Observational Medical Outcomes Partnership (OMOP) Common Data Model (CDM). It describes the architecture and steps for analyzing data across various disconnected sources of health datasets so you can start applying Big Data methods to health research.

Before designing and deploying a healthcare application on AWS, make sure that you read through the AWS HIPAA Compliance whitepaper. This covers the information necessary for processing and storing patient health information (PHI).

Note: If you arrived at this page looking for more info on the movie Mimic 3: Sentinel, you might not enjoy this post.

OMOP overview

The OMOP CDM helps standardize healthcare data and makes it easier to analyze outcomes at a large scale. The CDM is gaining a lot of traction in the health research community, which is deeply involved in developing and adopting a common data model. Community resources are available for converting datasets, and there are software tools to help unlock your data after it’s in the OMOP format. The great advantage of converting data sources into a standard data model like OMOP is that it allows for streamlined, comprehensive analytics and helps remove the variability associated with analyzing health records from different sources.

OMOP ETL with Apache Spark

Observational Health Data Sciences and Informatics (OHDSI) provides the OMOP CDM in a variety of formats, including Apache Impala, Oracle, PostgreSQL, and SQL Server. (See the OHDSI Common Data Model repo in GitHub.) In this scenario, the data is moved to AWS to take advantage of the unbounded scale of Amazon EMR and serverless technologies, and the variety of AWS services that can help make sense of the data in a cost-effective way—including Amazon Machine Learning, Amazon QuickSight, and Amazon Redshift.

This example demonstrates an architecture that can be used to run SQL-based extract, transform, load (ETL) jobs to map any data source to the OMOP CDM. It uses MIMIC ETL code provided by Md. Shamsuzzoha Bayzid. The code was modified to run in Amazon Redshift.

Getting access to the MIMIC-III data

Before you can retrieve the MIMIC-III data, you must request access on the PhysioNet website, which is hosted on Amazon S3 as part of the Amazon Web Services (AWS) Public Dataset Program. However, you don’t need access to the MIMIC-III data to follow along with this post.

Solution architecture and loading process

The following diagram shows the architecture that is used to convert the MIMIC-III dataset to the OMOP CDM.


Tips for Migrating to Apache HBase on Amazon S3 from HDFS

by Bruno Faria | on | | Comments

Starting with Amazon EMR 5.2.0, you have the option to run Apache HBase on Amazon S3. Running HBase on S3 gives you several added benefits, including lower costs, data durability, and easier scalability.

HBase provides several options that you can use to migrate and back up HBase tables. The steps to migrate to HBase on S3 are similar to the steps for HBase on the Apache Hadoop Distributed File System (HDFS). However, the migration can be easier if you are aware of some minor differences and a few “gotchas.”

In this post, I describe how to use some of the common HBase migration options to get started with HBase on S3.

HBase migration options

Selecting the right migration method and tools is an important step in ensuring a successful HBase table migration. However, choosing the right ones is not always an easy task.

The following HBase helps you migrate to HBase on S3:

  • Snapshots
  • Export and Import
  • CopyTable

The following diagram summarizes the steps for each option.


Visualize Big Data with Amazon QuickSight, Presto, and Apache Spark on Amazon EMR

by Luis Wang | on | | Comments

Last December, we introduced the Amazon Athena connector in Amazon QuickSight, in the Derive Insights from IoT in Minutes using AWS IoT, Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight post.

The connector allows you to visualize your big data easily in Amazon S3 using Athena’s interactive query engine in a serverless fashion. This turned out to be a very popular combination, as customers benefit from the speed, agility, and cost benefit that serverless business intelligence (BI) and analytics architecture brings.

Today, we’re excited to announce two new native connectors in QuickSight for big data analytics: Presto and Spark. With the Presto and SparkSQL connector in QuickSight, you can easily create interactive visualizations over large datasets using Amazon EMR.

EMR provides a simple and cost effective way to run highly distributed processing frameworks such as Presto and Spark when compared to on-premises deployments. EMR provides you with the flexibility to define specific compute, memory, storage, and application parameters and optimize your analytic requirements.

In this post, I walk you through connecting QuickSight to an EMR cluster running Presto. If you’d like a walkthrough with Spark, let us know in the comments section!

Presto overview

Presto is an open source, distributed SQL query engine for running interactive analytic queries against data sources ranging from gigabytes to petabytes. It supports the ANSI SQL standard, including complex queries, aggregations, joins, and window functions. Presto can run on multiple data sources, including Amazon S3.

Presto’s execution framework is fundamentally different from that of Hive/MapReduce. Presto has a custom query and execution engine where the stages of execution are pipelined, similar to a directed acyclic graph (DAG), and all processing occurs in memory to reduce disk I/O. This pipelined execution model can run multiple stages in parallel and streams data from one stage to another as the data becomes available. This reduces end-to-end latency and makes Presto a great tool for ad hoc data exploration over large data sets.


Use the following steps to connect QuickSight to an EMR cluster running Presto:

  1. Create an EMR cluster with the latest 5.5.0 release.
  2. Configure LDAP for user authentication in QuickSight.
  3. Configure SSL using a QuickSight supported certificate authority (CA).
  4. Create tables for Presto in the Hive metastore.
  5. Whitelist the QuickSight IP address range in your EMR master security group rules.
  6. Connect QuickSight to Presto and create some visualizations.


You need run Presto version 0.167, at a minimum, which is the first release that supports LDAP authentication. LDAP authentication is a requirement for the Presto and Spark connectors and QuickSight refuses to connect if LDAP is not configured on your cluster.

Create an EMR cluster with release version 5.5.0

In the EMR console, use the Quick Create option to create a cluster.  For this post, use most of the default settings with a few exceptions. To install both Presto and Spark on your cluster (and customize other settings), create your cluster from the Advanced Options wizard instead.

Make sure that EMR release 5.5.0 is selected and under Applications, choose Presto. If you have an EC2 key pair, you can use it. Otherwise, create a key pair (.PEM file) and then return to this page to create the cluster. 


Build a Real-time Stream Processing Pipeline with Apache Flink on AWS

by Steffen Hausmann | on | | Comments

This post has been translated into Japanese.

In today’s business environments, data is generated in a continuous fashion by a steadily increasing number of diverse data sources. Therefore, the ability to continuously capture, store, and process this data to quickly turn high-volume streams of raw data into actionable insights has become a substantial competitive advantage for organizations.

Apache Flink is an open source project that is well-suited to form the basis of such a stream processing pipeline. It offers unique capabilities that are tailored to the continuous analysis of streaming data. However, building and maintaining a pipeline based on Flink often requires considerable expertise, in addition to physical resources and operational efforts.

This post outlines a reference architecture for a consistent, scalable, and reliable stream processing pipeline that is based on Apache Flink using Amazon EMR, Amazon Kinesis, and Amazon Elasticsearch Service. An AWSLabs GitHub repository provides the artifacts that are required to explore the reference architecture in action. Resources include a producer application that ingests sample data into an Amazon Kinesis stream and a Flink program that analyses the data in real time and sends the result to Amazon ES for visualization.

Analyzing geospatial taxi data in real time

Consider a scenario related to optimizing taxi fleet operations. You obtain information continuously from a fleet of taxis currently operating in New York City. Using this data, you want to optimize the operations by analyzing the gathered data in real time and making data-based decisions.

You would like, for instance, to identify hot spots—areas that are currently in high demand for taxis—so that you can direct unoccupied taxis there. You also want to track current traffic conditions so that you can give approximate trip durations to customers, for example, for rides to the nearby airports. Naturally, your decisions should be based on information that closely reflects the current demand and traffic conditions. The incoming data needs to be analyzed in a continuous and timely fashion. Relevant KPIs and derived insights should be accessible to real-time dashboards.

For the purpose of this post, you emulate a stream of trip events by replaying a dataset of historic taxi trips collected in New York City into Amazon Kinesis Streams. The dataset is available from the New York City Taxi & Limousine Commission website. It contains information on the geolocation and collected fares of individual taxi trips.

In more realistic scenarios, you could leverage AWS IoT to collect the data from telemetry units installed in the taxis and then ingest the data into an Amazon Kinesis stream.

Architecture of a reliable and scalable stream processing pipeline

Because the pipeline serves as the central tool to operate and optimize the taxi fleet, it’s crucial to build an architecture that is tolerant against the failure of single nodes. The pipeline should adapt to changing rates of incoming events. Therefore, you should separate the ingestion of events, their actual processing, and the visualization of the gathered insights into different components. By loosely coupling these components of the infrastructure and using managed services, you can increase the robustness of the pipeline in case of failures. You can also scale the different parts of your infrastructure individually and reduce the efforts that are required to build and operate the entire pipeline.


Securely Analyze Data from Another AWS Account with EMRFS

by Jigar Mistry | on | | Comments

Sometimes, data to be analyzed is spread across buckets owned by different accounts. In order to ensure data security, appropriate credentials management needs to be in place. This is especially true for large enterprises storing data in different Amazon S3 buckets for different departments. For example, a customer service department may need access to data owned by the research department, but the research department needs to provide that access in a secure manner.

This aspect of securing the data can become quite complicated. Amazon EMR uses an integrated mechanism to supply user credentials for access to data stored in S3. When you use an application (Hive, Spark, etc.) on EMR to read or write a file to or from an S3 bucket, the S3 API call needs to be signed by proper credentials to be authenticated.

Usually, these credentials are provided by the EC2 instance profile that you specify during cluster launch. What if the EC2 instance profile credentials are not enough to access an S3 object, because that object requires a different set of credentials?

This post shows how you can use a custom credentials provider to access S3 objects that cannot be accessed by the default credentials provider of EMRFS.

EMRFS and EC2 instance profiles

When an EMR cluster is launched, it needs an IAM role to be specified as the Amazon EC2 instance profile. An instance profile is a container that is used to pass the permissions contained in an IAM role when the EC2 instance is starting up. The IAM role essentially defines the permissions for anyone who assumes the role.

In the case of EMR, the IAM role contained in the instance profile has permissions to access other AWS services such as Amazon S3, Amazon CloudWatch, Amazon Kinesis, etc. This role obtains temporary credentials via the EC2 instance metadata service and provides them to the application that needs to access other AWS services.

For example, when a Hive application on EMR needs to read input data from an S3 bucket (where the S3 bucket path is specified by the s3:// URI), it invokes a default credentials provider function of EMRFS. The provider in turn obtains the temporary credentials from the EC2 instance profile and uses those credentials to sign the S3 GET request.

Custom credentials providers

In certain cases, the credentials obtained by the default credentials provider might not be enough to sign requests to an S3 bucket to which your IAM user does not have permissions to access. Maybe the bucket has a different owner, or restrictive bucket policies that allow access only to a specific IAM user or role.

In situations like this, you have other options that allow your IAM user to access the data. You could modify the S3 bucket policy to allow access to your IAM user but this might be a security risk. A better option is to implement a custom credentials provider for EMRFS to ensure that your S3 requests are signed by the correct credentials. A custom credentials provider ensures that only a configured EMR cluster has access to the data in S3. It provides much better control over who can access the data.

Configuring a custom credentials provider for EMRFS

Create a credentials provider by implementing both the AWSCredentialsProvider (from the AWS Java SDK) and the Hadoop Configurable classes for use with EMRFS when it makes calls to Amazon S3.

Each implementation of AWSCredentialsProvider can choose its own strategy for loading credentials depending on the use case. You can either load credentials using the AWS STS AssumeRole API action or from a Java properties file if you would like to make API calls using the credentials of a specific IAM user. Then, package your custom credentials provider in a JAR file, upload the JAR file to your EMR cluster, and specify the class name by setting fs.s3.customAWSCredentialsProvider in the emrfs-site configuration classification.


Suppose you would like to analyze data stored in an S3 bucket owned by the research department of your company which has its own AWS account. You can launch an EMR cluster in your account and leverage EMRFS to access data stored in the bucket owned by the research department.

For this example, the two accounts of your company are:

  • Research: research@yourcompany.com (Account ID: 123456789012)
  • Your department: aws@yourcompany.com (Account ID: 111222333444)


Harmonize, Search, and Analyze Loosely Coupled Datasets on AWS

by Ryan Jancaitis, Oliver Atoa, and Bob Strahan | on | | Comments

You have come up with an exciting hypothesis, and now you are keen to find and analyze as much data as possible to prove (or refute) it. There are many datasets that might be applicable, but they have been created at different times by different people and don’t conform to any common standard. They use different names for variables that mean the same thing and the same names for variables that mean different things. They use different units of measurement and different categories. Some have more variables than others. And they all have data quality issues (for example, badly formed dates and times, invalid geographic coordinates, and so on).

You first need a way to harmonize these datasets, to identify the variables that mean the same thing and make sure that these variables have the same names and units. You also need to clean up or remove records with invalid data.

After the datasets are harmonized, you need to search through the data to find the datasets you’re interested in. Not all of them have records that are relevant to your hypothesis, so you want to filter on a number of important variables to narrow down the datasets and verify they contain enough matching records to be significant.

Having identified the datasets of interest, you are ready to run your custom analyses on the data they contain so that you can prove your hypothesis and create beautiful visualizations to share with the world!

In this blog post, we will describe a sample application that illustrates how to solve these problems. You can install our sample app, which will:

  • Harmonize and index three disparate datasets to make them searchable.
  • Present a data-driven, customizable UI for searching the datasets to do preliminary analysis and to locate relevant datasets.
  • Integrate with Amazon Athena and Amazon QuickSight for custom analysis and visualization.

Example data

The Police Data Initiative seeks to improve community and law enforcement relations through the public availability of data related to police activity. Datasets from participating cities, available through the Public Safety Open Data Portal, have many of the problems just outlined. Despite the commonality of crime and location metadata, there is no standard naming or value scheme. Datasets are stored in various locations and in various formats. There is no central search and discovery engine. To gain insights and value from this data, you have to analyze datasets city by city.

Although the focus of this post is police incident data, the same approach can be used for datasets in other domains, such as IoT, personalized medicine, news, weather, finance, and much more.


Our architecture uses the following AWS services:

The diagram below illustrates the solution architecture:

HarmonizeSearch_1 (more…)

Secure Amazon EMR with Encryption

by Sai Sriparasa | on | | Comments

In the last few years, there has been a rapid rise in enterprises adopting the Apache Hadoop ecosystem for critical workloads that process sensitive or highly confidential data. Due to the highly critical nature of the workloads, the enterprises implement certain organization/industry wide policies and certain regulatory or compliance policies. Such policy requirements are designed to protect sensitive data from unauthorized access.

A common requirement within such policies is about encrypting data at-rest and in-flight. Amazon EMR uses “security configurations” to make it easy to specify the encryption keys and certificates, ranging from AWS Key Management Service to supplying your own custom encryption materials provider.

You create a security configuration that specifies encryption settings and then use the configuration when you create a cluster. This makes it easy to build the security configuration one time and use it for any number of clusters.


In this post, I go through the process of setting up the encryption of data at multiple levels using security configurations with EMR. Before I dive deep into encryption, here are the different phases where data needs to be encrypted.


Create a Healthcare Data Hub with AWS and Mirth Connect

by Joseph Fontes | on | | Comments

As anyone visiting their doctor may have noticed, gone are the days of physicians recording their notes on paper. Physicians are more likely to enter the exam room with a laptop than with paper and pen. This change is the byproduct of efforts to improve patient outcomes, increase efficiency, and drive population health. Pushing for these improvements has created many new data opportunities as well as challenges. Using a combination of AWS services and open source software, we can use these new datasets to work towards these goals and beyond.

When you get a physical examination, your doctor’s office has an electronic chart with information about your demographics (name, date of birth, address, etc.), healthcare history, and current visit. When you go to the hospital for an emergency, a whole new record is created that may contain duplicate or conflicting information. A simple example would be that my primary care doctor lists me as Joe whereas the hospital lists me as Joseph.

Providers record patient information across different software platforms. Each of these platforms can have varying implementations of complex healthcare data standards. Also, each system needs to communicate with a central repository called a health information exchange (HIE) to build a central, complete clinical record for each patient.

In this post, I demonstrate the capability to consume different data types as messages, transform the information within the messages, and then use AWS service to take action depending on the message type.

Overview of Mirth Connect

Using open source technologies on AWS, you can build a system that transforms, stores, and processes this data as needed. The system can scale to meet the ever-increasing demands of modern medicine. The project that ingests and processes this data is called Mirth Connect.

Mirth Connect is an open source, cross-platform, bidirectional, healthcare integration engine. This project is a standalone server that functions as a central point for routing and processing healthcare information.

Running Mirth Connect on AWS provides the necessary scalability and elasticity to meet the current and future needs of healthcare organizations.

Healthcare data hub walkthrough

Healthcare information comes from various sources and can generate large amounts of data:

  • Health information exchange (HIE)
  • Electronic health records system (EHR)
  • Practice management system (PMS)
  • Insurance company systems
  • Pharmacy systems
  • Other source systems that can make data accessible

Messages typically require some form of modification (transformation) to accommodate ingestion and processing in other systems. Using another project, Blue Button, you can dissect large healthcare messages and locate the sections/items of interest. You can also convert those messages into other formats for storage and analysis.

Data types

The examples in this post focus on the following data types representing information made available from a typical healthcare organization:

HL7 version 2 messages define both a message format and communication protocol for health information. They are broken into different message types depending on the information that they transmit.

There are many message types available, such as ordering labs, prescription dispensing, billing, and more. During a routine doctor visit, numerous messages are created for each patient. This provides a lot of information but also a challenge in storage and processing. For a full list of message types, see Data Definition Tables, section A6. The two types used for this post are:

  • ADT A01 (patient admission and visit notification)

View a sample HL7 ADT A01 message

  • SIU S12 (new appointment booking)

View a sample SIU S12 message

As you can see, this text is formatted as delimited data, where the delimiters are defined in the top line message called the MSG segment. Mirth Connect can parse these messages and communicate using the standard HL7 network protocol.



Serving Real-Time Machine Learning Predictions on Amazon EMR

by Derek Graeber and Guy Ernest | on | | Comments

The typical progression for creating and using a trained model for recommendations falls into two general areas: training the model and hosting the model. Model training has become a well-known standard practice. We want to highlight one of many ways to host those recommendations (for example, see the Analyzing Genomics Data at Scale using R, AWS Lambda, and Amazon API Gateway post).

In this post, we look at one possible way to host a trained ALS model on Amazon EMR using Apache Spark to serve movie predictions in real time. It is a continuation of two recent posts that are prerequisite:

In future posts we will cover other alternatives for serving real-time machine-learning predictions, namely AWS Lambda and Amazon EC2 Container Service, by running the prediction functions locally and loading the saved models from S3 to the local execution environments.

Walkthrough: Trained ALS model

For this walkthrough, you use the MovieLens dataset as set forth in the Building a Recommendation Engine post; the data model should have already been generated and persisted to Amazon S3. It uses the Alternating Least Squares (ALS) algorithm to train the data for generating the proper model.

Using JobServer, you take that model and persist it in memory in JobServer on Amazon EMR. After it’s persisted, you can expose RESTful endpoints to AWS Lambda, which in turn can be invoked from a static UI page hosted on S3, securing access with Amazon Cognito.

Here are the steps that you follow:

  1. Create the infrastructure, including EMR with JobServer and Lambda.
  2. Load the trained model into Spark on EMR via JobServer.
  3. Stage a static HTML page on S3.
  4. Access the AWS Lambda endpoints via the static HTML page authenticated with Amazon Cognito.

The following diagram shows the infrastructure architecture.