AWS Big Data Blog

How the Allen Institute uses Amazon EMR and AWS Step Functions to process extremely wide transcriptomic datasets

This is a guest post by Gautham Acharya, Software Engineer III at the Allen Institute for Brain Science, in partnership with AWS Data Lab Solutions Architect Ranjit Rajan, and AWS Sr. Enterprise Account Executive Arif Khan.

The human brain is one of the most complex structures in the universe. Billions of neurons and trillions of connections come together to form a labyrinthine network of activity. Understanding the mechanisms that guide our minds is one of the most challenging problems in modern scientific research.

The Allen Institute for Brain Science is dedicated to solving large-scale, fundamental problems in neuroscience. Our mission is to accelerate the rate at which the world understands the inner workings of the human brain and to uncover the essence of what makes us human.

Processing extremely wide datasets

As a part of “big science,” one of our core principles, we seek to tackle scientific challenges at scales no one else has attempted before. One of these challenges is processing large-scale transcriptomic datasets. Transcriptomics is the study of RNA. In particular, we’re interested in the genes that are expressed in individual neurons. The human brain contains almost 100 billion neurons—how do they differ from each other, and what genes do they express? After a series of complex analysis using cutting-edge techniques such as Smart-Seq and 10x Genomics Chromium Sequencing, we produce extremely large matrices of numeric values.

Such matrices are called feature matrices. Each column represents the feature of a cell, which in this case are genes. A genome is over 50,000 genes, so a single matrix can have over 50,000 columns! We expect the number of rows in our matrices to increase over time, reaching tens of millions, if not more. These matrices can reach 500 GB or more in size. Over the next few years, we want to be able to ingest tens or hundreds of such matrices.

Our goal is to provide low-latency visualizations on such matrices, allowing researchers to aggregate, slice, and dissect our data in real time. To do this, we run a series of precomputations that store expensive calculations in a database for future retrieval.

We wanted to create a flexible, scalable pipeline to run computations on these matrices and store the results for visualizations.

The pipeline

We wanted to build a pipeline that takes these large matrices as inputs, runs various Spark jobs, and stores the outputs in an Apache HBase cluster. We wanted to create something flexible so that we could easily add additional Spark transformations.

We decided on AWS Step Functions as our workflow-orchestration tool of choice. Step Functions allows us to create a state machine that orchestrates the dataflow from payload submission to database loading.

After close collaboration with the engineers at the AWS Data Lab, we came up with the following pipeline architecture.

At a high level, our pipeline has the following workflow:

  1. Trigger a state machine from an upload event to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. Copy and unzip the input ZIP file containing a feature matrix into an Amazon S3 working directory.
  3. Run Spark jobs on Amazon EMR to transform input feature matrices into various pre-computed datasets. Store all intermittent results in a working directory on Amazon S3 and output the results of the Spark Jobs as HFiles.
  4. Bulk load the results of our Spark jobs into Apache HBase.

The preceding architecture diagram is deceptively simple. We found a number of challenges during our initial implementation, which we discuss in the following sections.

Lack of transaction support and rollbacks across tables in Apache HBase

The results of our Spark jobs are a number of precomputed views of our original input dataset. Each view is stored as a separate table in Apache HBase. A major drawback of Apache HBase is the lack of a native transactional system. HBase only provides row-level atomicity. Our worst-case scenario is writing partial data—cases where some views are updated, but not others, showing different results for different visualizations and resulting in scientifically incorrect data!

We worked around this by rolling our own blue/green system on top of Apache HBase. We suffix each set of tables related to a dataset with a universally unique identifier (UUID). We use Amazon DynamoDB to track the UUID associated with each individual dataset. When an update to a dataset is being written, the UUID is not switched in DynamoDB until we verify that all the new tables have been successfully written to Apache HBase. We have an API on top of HBase to facilitate reads. This API checks DynamoDB for the dataset UUID before querying HBase, so user traffic is never redirected toward a new view until we confirm a successful write. Our API involves an AWS Lambda function using HappyBase to connect to our HBase cluster, wrapped in an Amazon API Gateway layer to provide a REST interface. The following diagram illustrates this architecture.

The read path has the following steps:

  • R1 – API Gateway invokes a Lambda function to fetch data from a dataset
  • R2 – The Lambda function requests and receives the dataset UUID from DynamoDB
  • R3 – Lambda queries the Apache HBase cluster with the UUID

The write path has the following steps:

  • W1 – The state machine bulk loads new dataset tables to the Apache HBase cluster suffixed with the new UUID
  • W2 – After validation, the state machine updates DynamoDB so user traffic is directed towards those changes

Stalled Spark jobs on extremely wide datasets

Although Apache Spark is a fantastic engine for running distributed compute operations, it doesn’t do too well when scaling to extremely wide datasets. We routinely operate on data that surpasses 50,000 columns, which often causes issues such as a stalled JavaToPython step in our PySpark job. Although we have more investigating to do to figure out why our Spark jobs hang on these wide datasets, we found a simple workaround in the short term—batching!

A number of our jobs involve computing simple columnar aggregations on our data. This means that each calculation on a column is completely independent of all the other columns. This lends itself quite well to batching our compute. We can break our input columns into chunks and run our compute on each chunk.

The following code chunks Apache Spark aggregation functions into groups of columns:

def get_aggregation_for_matrix_and_metadata(matrix, metadata, group_by_arg, agg_func, cols_per_write):
   '''
   Performs an aggregation on the joined matrix, aggregating the desired column by the given function.
   agg_func must be a valid Pandas UDF function. Runs in batches so we don't overload the Task Scheduler with 50,000
   columns at once.
   '''
   # Chunk the data
   for col_group in pyspark_utilities.chunks(matrix.columns, cols_per_write):

       # Add the row key to the column group
       col_group.append(matrix.columns[0])

       selected_matrix = matrix.select(pyspark_utilities.escape_column_list(col_group))

       # create argument list for group by and then process
       cast_as_udf = pyspark_functions.pandas_udf(
                       agg_func,
                       pyspark_datatype.FloatType(),
                       pyspark_functions.PandasUDFType.GROUPED_AGG)

       udf_input = [cast_as_udf(selected_matrix [column_name]).alias(column_name)
                    for column_name in selected_matrix .columns
                    if column_name != group_by_arg]

       yield joined.groupby(group_by_arg).agg(*udf_input)

We then write the results of each batch to an HFile, which is then later bulk loaded into HBase.

Because the post-aggregation DataFrame was very small, we found a significant performance increase in coalescing the DataFrame post-aggregation and checkpointing the results before writing the HFiles. This forces Spark to compute the aggregation before writing the HFiles. HFiles need to be sorted by row key, so it’s easier to pass a smaller DataFrame to our HFile converter.

Using Apache Spark to write DataFrames as HFiles

Apache Spark supports writing DataFrames in multiple formats, including as HFiles. However, the documentation for doing so leaves a lot to be desired. To write out our Spark DataFrames as HFiles, we had to take the following steps:

  1. Convert a DataFrame into a HFile-compatible format, assuming that the first column is the HBase rowkey—(row_key, column_family, col, value).
  2. Create a JAR file containing a converter to convert input Python Objects into Java key-value byte classes. This step took a lot of trial and error—we couldn’t find clear documentation on how the Python object was serialized and passed into the Java function.
  3. Call the saveAsNewAPIHadoopFile function, passing in the relevant information: the ZooKeeper Quorum IP, port, and cluster DNS of our Apache HBase on the Amazon EMR cluster; the HBase table name; the class name of our Java converter function; and more.

The following code writes HFiles:

import src.spark_transforms.pyspark_jobs.pyspark_utilities as pyspark_utilities
import src.spark_transforms.pyspark_jobs.output_handler.emr_constants as constants


def csv_to_key_value(row, sorted_cols, column_family):
   '''
   This method is an RDD mapping function that will map each
   row in an RDD to an hfile-formatted tuple for hfile creation
   (rowkey, (rowkey, columnFamily, columnQualifier, value))
   '''
   result = []
   for index, col in enumerate(sorted_cols[constants.ROW_KEY_INDEX + 1:], 1):
       row_key = str(row[constants.ROW_KEY_INDEX])
       value = row[index]

       if value is None:
           raise ValueError(f'Null value found at {row_key}, {col}')

       # We store sparse representations, dropping all zeroes.
       if value != 0:
           result.append((row_key, (row_key, column_family, col, value)))

   return tuple(result)


def get_sorted_df_by_cols(df):
   '''
   Sorts the matrix by column. Retains the row key as the initial column.
   '''
   cols = [df.columns[0]] + sorted(df.columns[1:])
   escaped_cols = pyspark_utilities.escape_column_list(cols)
   return df.select(escaped_cols)


def flat_map_to_hfile_format(df, column_family):
   '''
   Flat maps the matrix DataFrame into an RDD formatted for conversion into HFiles.
   '''
   sorted_df = get_sorted_df_by_cols(df)
   columns = sorted_df.columns
   return sorted_df.rdd.flatMap(lambda row: csv_to_key_value(row, columns, column_family)).sortByKey(True)


def write_hfiles(df, output_path, zookeeper_quorum_ip, table_name, column_family):
   '''
   This method will sort and map the medians psyspark dataFrame and
   then write to hfiles in the output directory using the supplied
   hbase configuration.
   '''
   # sort columns other than the row key (first column)

   rdd = flat_map_to_hfile_format(df, column_family)

   conf = {
           constants.HBASE_ZOOKEEPER_QUORUM: zookeeper_quorum_ip,
           constants.HBASE_ZOOKEEPER_CLIENTPORT: constants.ZOOKEEPER_CLIENTPORT,
           constants.ZOOKEEPER_ZNODE_PARENT: constants.ZOOKEEPER_PARENT,
           constants.HBASE_TABLE_NAME: table_name
           }

   rdd.saveAsNewAPIHadoopFile(output_path,
                              constants.OUTPUT_FORMAT_CLASS,
                              keyClass=constants.KEY_CLASS,
                              valueClass=constants.VALUE_CLASS,
                              keyConverter=constants.KEY_CONVERTER,
                              valueConverter=constants.VALUE_CONVERTER,
                              conf=conf)

The following code describes all the constants configuration:

HBASE_ZOOKEEPER_QUORUM="hbase.zookeeper.quorum"
HBASE_ZOOKEEPER_CLIENTPORT="hbase.zookeeper.property.clientPort"
ZOOKEEPER_ZNODE_PARENT="zookeeper.znode.parent"
HBASE_TABLE_NAME="hbase.mapreduce.hfileoutputformat.table.name"

OUTPUT_FORMAT_CLASS='org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2'
KEY_CLASS='org.apache.hadoop.hbase.io.ImmutableBytesWritable'
VALUE_CLASS='org.apache.hadoop.hbase.KeyValue'
KEY_CONVERTER="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
VALUE_CONVERTER="KeyValueConverter"

ZOOKEEPER_CLIENTPORT='2181'
ZOOKEEPER_PARENT='/hbase'

ROW_KEY_INDEX = 0

The following code is a Java class to serialize input PySpark RDDs:

import org.apache.spark.api.python.Converter;
import org.apache.hadoop.hbase.KeyValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

/**
* This class is used to convert a tuple
* supplied by a spark job created in Python
* to the corresponding hbase keyValue type
* which is needed for hfile creation.
*
*/
@SuppressWarnings("rawtypes")
public class KeyValueConverter implements Converter {

  private static final long serialVersionUID = 1L;

  /**
   * this method will take a tuple object supplied
   * by Python spark job and convert and
   * return the corresponding hbase KeyValue object.
   */
  public Object convert(Object obj) {
     KeyValue cell;
     List<?> list = new ArrayList<>();
     if (obj.getClass().isArray()) {
          list = Arrays.asList((Object[])obj);
      } else if (obj instanceof Collection) {
          list = new ArrayList<>((Collection<?>)obj);
      }

     cell = new KeyValue(
           list.get(0).toString().getBytes(),
           list.get(1).toString().getBytes(),
           list.get(2).toString().getBytes(),
           list.get(3).toString().getBytes());

     return cell;
  }
}

Looking ahead

Our computation pipeline was a success, and you can see the resulting visualizations on https://transcriptomics.brain-map.org/.

We’ve been thrilled with AWS’s reliable and feature-rich ecosystem. We used Amazon EMR, Step Functions, and Amazon S3 to build a robust, large-scale data processing pipeline.

Since writing this post, we’ve done much more, including a cross-database transaction system, wide-matrix transposes in Spark, and more. Big Data problems in neuroscience never end, and we’re excited to share more with you in the future!


About the Authors

Gautham Acharya is a Software Engineer at the Allen Institute for Brain Science. He works on the backend data platform team responsible for integrating multimodal neuroscience data into a single cohesive system.

 

 

Ranjit Rajan is a Data Lab Solutions Architect with AWS. Ranjit works with AWS customers to help them design and build data and analytics applications in the cloud.

 

 

Arif Khan is a Senior Account Executive with Amazon Web Services. He works with nonprofit research customers to help shape and deliver on a strategy that focuses on customer success, building mind share and driving broad use of Amazon’s utility computing services to support their mission.