Tag: Amazon EMR


Turbocharge your Apache Hive Queries on Amazon EMR using LLAP

by Jigar Mistry | on | Permalink | Comments |  Share

Apache Hive is one of the most popular tools for analyzing large datasets stored in a Hadoop cluster using SQL. Data analysts and scientists use Hive to query, summarize, explore, and analyze big data.

With the introduction of Hive LLAP (Low Latency Analytical Processing), the notion of Hive being just a batch processing tool has changed. LLAP uses long-lived daemons with intelligent in-memory caching to circumvent batch-oriented latency and provide sub-second query response times.

This post provides an overview of Hive LLAP, including its architecture and common use cases for boosting query performance. You will learn how to install and configure Hive LLAP on an Amazon EMR cluster and run queries on LLAP daemons.

What is Hive LLAP?

Hive LLAP was introduced in Apache Hive 2.0, which provides very fast processing of queries. It uses persistent daemons that are deployed on a Hadoop YARN cluster using Apache Slider. These daemons are long-running and provide functionality such as I/O with DataNode, in-memory caching, query processing, and fine-grained access control. And since the daemons are always running in the cluster, it saves substantial overhead of launching new YARN containers for every new Hive session, thereby avoiding long startup times.

When Hive is configured in hybrid execution mode, small and short queries execute directly on LLAP daemons. Heavy lifting (like large shuffles in the reduce stage) is performed in YARN containers that belong to the application. Resources (CPU, memory, etc.) are obtained in a traditional fashion using YARN. After the resources are obtained, the execution engine can decide which resources are to be allocated to LLAP, or it can launch Apache Tez processors in separate YARN containers. You can also configure Hive to run all the processing workloads on LLAP daemons for querying small datasets at lightning fast speeds.

LLAP daemons are launched under YARN management to ensure that the nodes don’t get overloaded with the compute resources of these daemons. You can use scheduling queues to make sure that there is enough compute capacity for other YARN applications to run.

Why use Hive LLAP?

With many options available in the market (Presto, Spark SQL, etc.) for doing interactive SQL  over data that is stored in Amazon S3 and HDFS, there are several reasons why using Hive and LLAP might be a good choice:

  • For those who are heavily invested in the Hive ecosystem and have external BI tools that connect to Hive over JDBC/ODBC connections, LLAP plugs in to their existing architecture without a steep learning curve.
  • It’s compatible with existing Hive SQL and other Hive tools, like HiveServer2, and JDBC drivers for Hive.
  • It has native support for security features with authentication and authorization (SQL standards-based authorization) using HiveServer2.
  • LLAP daemons are aware about of the columns and records that are being processed which enables you to enforce fine-grained access control.
  • It can use Hive’s vectorization capabilities to speed up queries, and Hive has better support for Parquet file format when vectorization is enabled.
  • It can take advantage of a number of Hive optimizations like merging multiple small files for query results, automatically determining the number of reducers for joins and groupbys, etc.
  • It’s optional and modular so it can be turned on or off depending on the compute and resource requirements of the cluster. This lets you to run other YARN applications concurrently without reserving a cluster specifically for LLAP.

How do you install Hive LLAP in Amazon EMR?

To install and configure LLAP on an EMR cluster, use the following bootstrap action (BA):

s3://aws-bigdata-blog/artifacts/Turbocharge_Apache_Hive_on_EMR/configure-Hive-LLAP.sh

This BA downloads and installs Apache Slider on the cluster and configures LLAP so that it works with EMR Hive. For LLAP to work, the EMR cluster must have Hive, Tez, and Apache Zookeeper installed.

You can pass the following arguments to the BA.

Argument Definition Default value
--instances Number of instances of LLAP daemon Number of core/task nodes of the cluster
--cache Cache size per instance 20% of physical memory of the node
--executors Number of executors per instance Number of CPU cores of the node
--iothreads Number of IO threads per instance Number of CPU cores of the node
--size Container size per instance 50% of physical memory of the node
--xmx Working memory size 50% of container size
--log-level Log levels for the LLAP instance INFO

(more…)

Setting up Read Replica Clusters with HBase on Amazon S3

by Tony Nguyen and Zach York | on | Permalink | Comments |  Share

Many customers have taken advantage of the numerous benefits of running Apache HBase on Amazon S3 for data storage, including lower costs, data durability, and easier scalability. Customers such as FINRA have lowered their costs by 60% by moving to an HBase on S3 architecture along with the numerous operational benefits that come with decoupling storage from compute and using S3 as the storage layer. HBase on S3 allows you to turn on a cluster and immediately start querying against data within S3 rather than having to go through a lengthy snapshot restore process.

With the launch of Amazon EMR 5.7.0, you can now take the high availability and durability of HBase on S3 one step further to the cluster level, where you can now start multiple HBase read-only clusters that can connect to the same HBase root directory in S3. This allows you to ensure that your data is always reachable through read replica clusters and run their clusters across multiple Availability Zones.

In this post, I guide you through setting up read replica clusters with HBase on S3.

HBase Overview

Apache HBase is a massively scalable, distributed big data store in the Apache Hadoop ecosystem. It is an open-source, non-relational, versioned database that runs on top of the Hadoop Distributed Filesystem (HDFS). It is built for random, strictly consistent, real time access for tables with billions of rows and millions of columns. It has tight integration with Apache HadoopApache Hive, and Apache Pig, so you can easily combine massively parallel analytics with fast data access. HBase’s data model, throughput, and fault tolerance are a good match for workloads in ad tech, web analytics, financial services, applications using time-series data, and many more.

Table structure in HBase, like many NoSQL technologies, should be directly influenced by the queries and access patterns of the data. Query performance varies drastically based on the way the cluster has to process and return the data.

HBase on S3

To use HBase on S3 read replicas, you must first be using HBase on S3. For those unfamiliar with HBase on S3 architecture, this section conveys some of the basics.

By using S3 as a data store for HBase, you can separate your cluster’s storage and compute nodes. This enables you to cut costs by sizing your cluster for your compute requirements. You don’t have to pay to store your entire dataset with 3x replication in the on-cluster Hadoop Distributed File System (HDFS).

EMR configures HBase on Amazon S3 to cache data in-memory and on-disk in your cluster to improve read performance from S3. You can quickly and easily scale up or scale down compute nodes without impacting your underlying storage. Or you can terminate your cluster to cut costs and quickly restore it in another Availability Zone.

HBase with support for S3 is available on EMR releases from 5.2.0 onward. To use S3 as a data store, configure the storage mode and specify a root directory in your HBase configuration. Also, it’s recommended to enable EMRFS consistent view. For more information, see Apache HBase on Amazon S3.

Use cases for HBase on S3 read replica clusters

Using HBase on S3 allows your data to be stored safely and durably. It persists the data off-cluster, which eliminates the dangers of data loss for persisted writes when the cluster is terminated. However, there can situations where you want to make sure that your data on HBase is highly available, even in the rare event of the cluster or Availability Zone failure. Another case could be when you want the ability to have multiple clusters access the same root directory in S3. If you have a primary cluster that goes under heavy load during bulk loads, writes, and compactions, this feature allows you to create secondary clusters that off-load and separate the read load from the write load, ensuring that you meet your read SLAs while optimizing around cost and performance.

The following diagram shows HBase on S3 without read replicas. In this scenario, events such as cluster failure or Availability Zone failure render users unable to access data on HBase.

The HBase root directory, including HFiles and metadata, resides in S3:

Prior to EMR 5.7.0, multiple clusters could not be pointed to the same root directory. For architectures requiring high availability, you needed to create duplicate data on S3.

(more…)

Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3

by Illya Yalovyy | on | Permalink | Comments |  Share

Although it’s common for Amazon EMR customers to process data directly in Amazon S3, there are occasions where you might want to copy data from S3 to the Hadoop Distributed File System (HDFS) on your Amazon EMR cluster. Additionally, you might have a use case that requires moving large amounts of data between buckets or regions. In these use cases, large datasets are too big for a simple copy operation. Amazon EMR can help with this, and offers a utility – S3distCp – to help with moving data from S3 to other S3 locations or on-cluster HDFS.

In the Hadoop ecosystem, DistCp is often used to move data. DistCp provides a distributed copy capability built on top of a MapReduce framework. S3DistCp is an extension to DistCp that is optimized to work with S3 and that adds several useful features. In addition to moving data between HDFS and S3, S3DistCp is also a Swiss Army knife of file manipulations. In this post we’ll cover the following tips for using S3DistCp, starting with basic use cases and then moving to more advanced scenarios:

1. Copy or move files without transformation
2. Copy and change file compression on the fly
3. Copy files incrementally
4. Copy multiple folders in one job
5. Aggregate files based on a pattern
6. Upload files larger than 1 TB in size
7. Submit a S3DistCp step to an EMR cluster

1. Copy or move files without transformation

We’ve observed that customers often use S3DistCp to copy data from one storage location to another, whether S3 or HDFS. Syntax for this operation is simple and straightforward:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table

The source location may contain extra files that we don’t necessarily want to copy. Here, we can use filters based on regular expressions to do things such as copying files with the .log extension only.

(more…)

Build a Healthcare Data Warehouse Using Amazon EMR, Amazon Redshift, AWS Lambda, and OMOP

by Ryan Hood | on | Permalink | Comments |  Share

In the healthcare field, data comes in all shapes and sizes. Despite efforts to standardize terminology, some concepts (e.g., blood glucose) are still often depicted in different ways. This post demonstrates how to convert an openly available dataset called MIMIC-III, which consists of de-identified medical data for about 40,000 patients, into an open source data model known as the Observational Medical Outcomes Partnership (OMOP) Common Data Model (CDM). It describes the architecture and steps for analyzing data across various disconnected sources of health datasets so you can start applying Big Data methods to health research.

Before designing and deploying a healthcare application on AWS, make sure that you read through the AWS HIPAA Compliance whitepaper. This covers the information necessary for processing and storing patient health information (PHI).

Note: If you arrived at this page looking for more info on the movie Mimic 3: Sentinel, you might not enjoy this post.

OMOP overview

The OMOP CDM helps standardize healthcare data and makes it easier to analyze outcomes at a large scale. The CDM is gaining a lot of traction in the health research community, which is deeply involved in developing and adopting a common data model. Community resources are available for converting datasets, and there are software tools to help unlock your data after it’s in the OMOP format. The great advantage of converting data sources into a standard data model like OMOP is that it allows for streamlined, comprehensive analytics and helps remove the variability associated with analyzing health records from different sources.

OMOP ETL with Apache Spark

Observational Health Data Sciences and Informatics (OHDSI) provides the OMOP CDM in a variety of formats, including Apache Impala, Oracle, PostgreSQL, and SQL Server. (See the OHDSI Common Data Model repo in GitHub.) In this scenario, the data is moved to AWS to take advantage of the unbounded scale of Amazon EMR and serverless technologies, and the variety of AWS services that can help make sense of the data in a cost-effective way—including Amazon Machine Learning, Amazon QuickSight, and Amazon Redshift.

This example demonstrates an architecture that can be used to run SQL-based extract, transform, load (ETL) jobs to map any data source to the OMOP CDM. It uses MIMIC ETL code provided by Md. Shamsuzzoha Bayzid. The code was modified to run in Amazon Redshift.

Getting access to the MIMIC-III data

Before you can retrieve the MIMIC-III data, you must request access on the PhysioNet website, which is hosted on Amazon S3 as part of the Amazon Web Services (AWS) Public Dataset Program. However, you don’t need access to the MIMIC-III data to follow along with this post.

Solution architecture and loading process

The following diagram shows the architecture that is used to convert the MIMIC-III dataset to the OMOP CDM.

(more…)

Tips for Migrating to Apache HBase on Amazon S3 from HDFS

by Bruno Faria | on | Permalink | Comments |  Share

Starting with Amazon EMR 5.2.0, you have the option to run Apache HBase on Amazon S3. Running HBase on S3 gives you several added benefits, including lower costs, data durability, and easier scalability.

HBase provides several options that you can use to migrate and back up HBase tables. The steps to migrate to HBase on S3 are similar to the steps for HBase on the Apache Hadoop Distributed File System (HDFS). However, the migration can be easier if you are aware of some minor differences and a few “gotchas.”

In this post, I describe how to use some of the common HBase migration options to get started with HBase on S3.

HBase migration options

Selecting the right migration method and tools is an important step in ensuring a successful HBase table migration. However, choosing the right ones is not always an easy task.

The following HBase helps you migrate to HBase on S3:

  • Snapshots
  • Export and Import
  • CopyTable

The following diagram summarizes the steps for each option.

(more…)

Visualize Big Data with Amazon QuickSight, Presto, and Apache Spark on Amazon EMR

by Luis Wang | on | Permalink | Comments |  Share

Last December, we introduced the Amazon Athena connector in Amazon QuickSight, in the Derive Insights from IoT in Minutes using AWS IoT, Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight post.

The connector allows you to visualize your big data easily in Amazon S3 using Athena’s interactive query engine in a serverless fashion. This turned out to be a very popular combination, as customers benefit from the speed, agility, and cost benefit that serverless business intelligence (BI) and analytics architecture brings.

Today, we’re excited to announce two new native connectors in QuickSight for big data analytics: Presto and Spark. With the Presto and SparkSQL connector in QuickSight, you can easily create interactive visualizations over large datasets using Amazon EMR.

EMR provides a simple and cost effective way to run highly distributed processing frameworks such as Presto and Spark when compared to on-premises deployments. EMR provides you with the flexibility to define specific compute, memory, storage, and application parameters and optimize your analytic requirements.

In this post, I walk you through connecting QuickSight to an EMR cluster running Presto. If you’d like a walkthrough with Spark, let us know in the comments section!

Presto overview

Presto is an open source, distributed SQL query engine for running interactive analytic queries against data sources ranging from gigabytes to petabytes. It supports the ANSI SQL standard, including complex queries, aggregations, joins, and window functions. Presto can run on multiple data sources, including Amazon S3.

Presto’s execution framework is fundamentally different from that of Hive/MapReduce. Presto has a custom query and execution engine where the stages of execution are pipelined, similar to a directed acyclic graph (DAG), and all processing occurs in memory to reduce disk I/O. This pipelined execution model can run multiple stages in parallel and streams data from one stage to another as the data becomes available. This reduces end-to-end latency and makes Presto a great tool for ad hoc data exploration over large data sets.

Walkthrough

Use the following steps to connect QuickSight to an EMR cluster running Presto:

  1. Create an EMR cluster with the latest 5.5.0 release.
  2. Configure LDAP for user authentication in QuickSight.
  3. Configure SSL using a QuickSight supported certificate authority (CA).
  4. Create tables for Presto in the Hive metastore.
  5. Whitelist the QuickSight IP address range in your EMR master security group rules.
  6. Connect QuickSight to Presto and create some visualizations.

Prerequisites

You need run Presto version 0.167, at a minimum, which is the first release that supports LDAP authentication. LDAP authentication is a requirement for the Presto and Spark connectors and QuickSight refuses to connect if LDAP is not configured on your cluster.

Create an EMR cluster with release version 5.5.0

In the EMR console, use the Quick Create option to create a cluster.  For this post, use most of the default settings with a few exceptions. To install both Presto and Spark on your cluster (and customize other settings), create your cluster from the Advanced Options wizard instead.

Make sure that EMR release 5.5.0 is selected and under Applications, choose Presto. If you have an EC2 key pair, you can use it. Otherwise, create a key pair (.PEM file) and then return to this page to create the cluster. 

(more…)

Build a Real-time Stream Processing Pipeline with Apache Flink on AWS

by Steffen Hausmann | on | Permalink | Comments |  Share

This post has been translated into Japanese.

In today’s business environments, data is generated in a continuous fashion by a steadily increasing number of diverse data sources. Therefore, the ability to continuously capture, store, and process this data to quickly turn high-volume streams of raw data into actionable insights has become a substantial competitive advantage for organizations.

Apache Flink is an open source project that is well-suited to form the basis of such a stream processing pipeline. It offers unique capabilities that are tailored to the continuous analysis of streaming data. However, building and maintaining a pipeline based on Flink often requires considerable expertise, in addition to physical resources and operational efforts.

This post outlines a reference architecture for a consistent, scalable, and reliable stream processing pipeline that is based on Apache Flink using Amazon EMR, Amazon Kinesis, and Amazon Elasticsearch Service. An AWSLabs GitHub repository provides the artifacts that are required to explore the reference architecture in action. Resources include a producer application that ingests sample data into an Amazon Kinesis stream and a Flink program that analyses the data in real time and sends the result to Amazon ES for visualization.

Analyzing geospatial taxi data in real time

Consider a scenario related to optimizing taxi fleet operations. You obtain information continuously from a fleet of taxis currently operating in New York City. Using this data, you want to optimize the operations by analyzing the gathered data in real time and making data-based decisions.

You would like, for instance, to identify hot spots—areas that are currently in high demand for taxis—so that you can direct unoccupied taxis there. You also want to track current traffic conditions so that you can give approximate trip durations to customers, for example, for rides to the nearby airports. Naturally, your decisions should be based on information that closely reflects the current demand and traffic conditions. The incoming data needs to be analyzed in a continuous and timely fashion. Relevant KPIs and derived insights should be accessible to real-time dashboards.

For the purpose of this post, you emulate a stream of trip events by replaying a dataset of historic taxi trips collected in New York City into Amazon Kinesis Streams. The dataset is available from the New York City Taxi & Limousine Commission website. It contains information on the geolocation and collected fares of individual taxi trips.

In more realistic scenarios, you could leverage AWS IoT to collect the data from telemetry units installed in the taxis and then ingest the data into an Amazon Kinesis stream.

Architecture of a reliable and scalable stream processing pipeline

Because the pipeline serves as the central tool to operate and optimize the taxi fleet, it’s crucial to build an architecture that is tolerant against the failure of single nodes. The pipeline should adapt to changing rates of incoming events. Therefore, you should separate the ingestion of events, their actual processing, and the visualization of the gathered insights into different components. By loosely coupling these components of the infrastructure and using managed services, you can increase the robustness of the pipeline in case of failures. You can also scale the different parts of your infrastructure individually and reduce the efforts that are required to build and operate the entire pipeline.

(more…)

Securely Analyze Data from Another AWS Account with EMRFS

by Jigar Mistry | on | Permalink | Comments |  Share

Sometimes, data to be analyzed is spread across buckets owned by different accounts. In order to ensure data security, appropriate credentials management needs to be in place. This is especially true for large enterprises storing data in different Amazon S3 buckets for different departments. For example, a customer service department may need access to data owned by the research department, but the research department needs to provide that access in a secure manner.

This aspect of securing the data can become quite complicated. Amazon EMR uses an integrated mechanism to supply user credentials for access to data stored in S3. When you use an application (Hive, Spark, etc.) on EMR to read or write a file to or from an S3 bucket, the S3 API call needs to be signed by proper credentials to be authenticated.

Usually, these credentials are provided by the EC2 instance profile that you specify during cluster launch. What if the EC2 instance profile credentials are not enough to access an S3 object, because that object requires a different set of credentials?

This post shows how you can use a custom credentials provider to access S3 objects that cannot be accessed by the default credentials provider of EMRFS.

EMRFS and EC2 instance profiles

When an EMR cluster is launched, it needs an IAM role to be specified as the Amazon EC2 instance profile. An instance profile is a container that is used to pass the permissions contained in an IAM role when the EC2 instance is starting up. The IAM role essentially defines the permissions for anyone who assumes the role.

In the case of EMR, the IAM role contained in the instance profile has permissions to access other AWS services such as Amazon S3, Amazon CloudWatch, Amazon Kinesis, etc. This role obtains temporary credentials via the EC2 instance metadata service and provides them to the application that needs to access other AWS services.

For example, when a Hive application on EMR needs to read input data from an S3 bucket (where the S3 bucket path is specified by the s3:// URI), it invokes a default credentials provider function of EMRFS. The provider in turn obtains the temporary credentials from the EC2 instance profile and uses those credentials to sign the S3 GET request.

Custom credentials providers

In certain cases, the credentials obtained by the default credentials provider might not be enough to sign requests to an S3 bucket to which your IAM user does not have permissions to access. Maybe the bucket has a different owner, or restrictive bucket policies that allow access only to a specific IAM user or role.

In situations like this, you have other options that allow your IAM user to access the data. You could modify the S3 bucket policy to allow access to your IAM user but this might be a security risk. A better option is to implement a custom credentials provider for EMRFS to ensure that your S3 requests are signed by the correct credentials. A custom credentials provider ensures that only a configured EMR cluster has access to the data in S3. It provides much better control over who can access the data.

Configuring a custom credentials provider for EMRFS

Create a credentials provider by implementing both the AWSCredentialsProvider (from the AWS Java SDK) and the Hadoop Configurable classes for use with EMRFS when it makes calls to Amazon S3.

Each implementation of AWSCredentialsProvider can choose its own strategy for loading credentials depending on the use case. You can either load credentials using the AWS STS AssumeRole API action or from a Java properties file if you would like to make API calls using the credentials of a specific IAM user. Then, package your custom credentials provider in a JAR file, upload the JAR file to your EMR cluster, and specify the class name by setting fs.s3.customAWSCredentialsProvider in the emrfs-site configuration classification.

Walkthrough

Suppose you would like to analyze data stored in an S3 bucket owned by the research department of your company which has its own AWS account. You can launch an EMR cluster in your account and leverage EMRFS to access data stored in the bucket owned by the research department.

For this example, the two accounts of your company are:

  • Research: research@yourcompany.com (Account ID: 123456789012)
  • Your department: aws@yourcompany.com (Account ID: 111222333444)

(more…)

Harmonize, Search, and Analyze Loosely Coupled Datasets on AWS

by Ryan Jancaitis, Oliver Atoa and Bob Strahan | on | Permalink | Comments |  Share

You have come up with an exciting hypothesis, and now you are keen to find and analyze as much data as possible to prove (or refute) it. There are many datasets that might be applicable, but they have been created at different times by different people and don’t conform to any common standard. They use different names for variables that mean the same thing and the same names for variables that mean different things. They use different units of measurement and different categories. Some have more variables than others. And they all have data quality issues (for example, badly formed dates and times, invalid geographic coordinates, and so on).

You first need a way to harmonize these datasets, to identify the variables that mean the same thing and make sure that these variables have the same names and units. You also need to clean up or remove records with invalid data.

After the datasets are harmonized, you need to search through the data to find the datasets you’re interested in. Not all of them have records that are relevant to your hypothesis, so you want to filter on a number of important variables to narrow down the datasets and verify they contain enough matching records to be significant.

Having identified the datasets of interest, you are ready to run your custom analyses on the data they contain so that you can prove your hypothesis and create beautiful visualizations to share with the world!

In this blog post, we will describe a sample application that illustrates how to solve these problems. You can install our sample app, which will:

  • Harmonize and index three disparate datasets to make them searchable.
  • Present a data-driven, customizable UI for searching the datasets to do preliminary analysis and to locate relevant datasets.
  • Integrate with Amazon Athena and Amazon QuickSight for custom analysis and visualization.

Example data

The Police Data Initiative seeks to improve community and law enforcement relations through the public availability of data related to police activity. Datasets from participating cities, available through the Public Safety Open Data Portal, have many of the problems just outlined. Despite the commonality of crime and location metadata, there is no standard naming or value scheme. Datasets are stored in various locations and in various formats. There is no central search and discovery engine. To gain insights and value from this data, you have to analyze datasets city by city.

Although the focus of this post is police incident data, the same approach can be used for datasets in other domains, such as IoT, personalized medicine, news, weather, finance, and much more.

Architecture

Our architecture uses the following AWS services:

The diagram below illustrates the solution architecture:

HarmonizeSearch_1 (more…)

Secure Amazon EMR with Encryption

by Sai Sriparasa | on | Permalink | Comments |  Share

In the last few years, there has been a rapid rise in enterprises adopting the Apache Hadoop ecosystem for critical workloads that process sensitive or highly confidential data. Due to the highly critical nature of the workloads, the enterprises implement certain organization/industry wide policies and certain regulatory or compliance policies. Such policy requirements are designed to protect sensitive data from unauthorized access.

A common requirement within such policies is about encrypting data at-rest and in-flight. Amazon EMR uses “security configurations” to make it easy to specify the encryption keys and certificates, ranging from AWS Key Management Service to supplying your own custom encryption materials provider.

You create a security configuration that specifies encryption settings and then use the configuration when you create a cluster. This makes it easy to build the security configuration one time and use it for any number of clusters.

o_Amazon_EMR_Encryption_1

In this post, I go through the process of setting up the encryption of data at multiple levels using security configurations with EMR. Before I dive deep into encryption, here are the different phases where data needs to be encrypted.

(more…)