AWS HPC Blog

Data Science workflows at insitro: using redun on AWS Batch

This post was contributed by Matt Rasmussen, VP of Software Engineering at insitro.

At insitro, we seek to improve drug discovery by integrating Machine Learning (ML), high-throughput biology and real-world clinical data to build predictive models that can help bring better medicines to patients faster.

We recently open sourced a new data science tool called redun, which allows data scientists to define complex scientific workflows that scale from their laptop to large-scale distributed runs on serverless platforms like AWS Batch and AWS Glue. redun builds upon several insights from build systems to functional programming languages to provide abstractions for writing dynamic and complex dataflows that are often difficult or impossible express in other workflow engine systems. Over the last two years, we’ve used redun to power our drug discovery work by implementing workflows across multiple domains, such as bioinformatics, image processing, cheminformatics, and general data science. Redun is implemented as a Python library and thus can be easily combined with the rich set of tools and libraries in each of these fields. Using some key features of AWS’s compute offerings, redun is able to naturally scale scientific analyses across heterogeneous cloud infrastructure.

In this blog post series, we illustrate these benefits with an example Bioinformatics workflow, and we will highlight some of the techniques redun and AWS Batch use to work together.

Example redun Bioinformatics workflow

We’ll walk through a common task in Bioinformatics called sequence read alignment, for which we have implemented an example workflow (06_bioinfo_batch). At a high-level, we will be performing the following computational steps:

  • Downloading a reference genome and reference variant calls from public sources to our own S3 bucket.
  • Indexing the reference genome for quick random access.
  • Reading a manifest file of sequencing read files (FASTQs).
  • Merging FASTQs across sequencing lanes to make sample-specific FASTQs.
  • Preparing reads (trimming adapter sequences).
  • Aligning reads to the reference genome using bwa-mem2.
  • Post-processing the alignment (mark duplicate reads, recalibrate base calls).
  • Collect alignment metrics.

Preparing our environment

To run this example, we can use the following steps to prepare our development environment. First, we can install redun from the Python Package Index using the usual pip command:

pip install redun

Second, we can obtain the example workflow by cloning the redun github repo:

git clone https://github.com/insitro/redun.git
cd redun/examples/06_batch_bioinfo/

Our workflow will use several Bioinformatics tools installed within a Docker image. An example Makefile is provided to automate building the Docker image and pushing it to an ECR Docker repository.

# From the directory: redun/examples/06_batch_bioinfo/
cd docker

# Log in to AWS ECR
make login

# Build the docker image, tag it, and push it to a new ECR Docker repository.
make setup build push

# Return to the workflow example
cd ..

If everything is successful, you should be able to see a new image, redun_example_bioinfo_batch, in your ECR registry.

Configuring our workflow

redun can run workflows across several different kinds of execution backends, which redun calls Executors. For this example, we will configure a redun Executor for running Docker-based jobs on AWS Batch. redun uses an INI file (typically .redun/redun.ini) to define the workflow configuration. Here, is an example of what that looks like:

[executors.batch]
type = aws_batch
image = YOUR_ACCOUNT_ID.dkr.ecr.us-west-2.amazonaws.com/redun_example_bioinfo_batch:latest
queue = YOUR_QUEUE_NAME
s3_scratch = s3://YOUR_BUCKET/redun/
role = arn:aws:iam::YOUR_ACCOUNT_ID:role/YOUR_ROLE

As you can see, we have a section called [executor.batch] which defines a new executor named “batch” that we will use in our workflow. The type parameter specifies that this executor is for AWS Batch. Next, we only need four pieces of high-level information: a Docker image that we would like to use for our jobs (image), an AWS Batch Queue for job submission (queue), a scratch space for transferring temporary files to and from the Batch containers (s3_scratch), and lastly an IAM role (role) that our jobs will use such that they have proper access to the necessary resources (AWS Batch, S3, etc).

If desired, multiple redun Executors can be defined in case some jobs in the workflow have different requirements (e.g. GPUs, high memory, high storage).

[executors.batch-gpu]
type = aws_batch
queue = my-gpu-queue
# Other config...

Running our workflow

We are now ready to run our example workflow. The redun Python packages has installed for us a new command line program called redun, which provides several subcommands for running and inspecting workflows. The first subcommand we will use is redun run. For arguments, it takes a Python script filename, a task name, and additional arguments to pass to the task. The first command we will run will prepare some example input data for us:

redun run workflow.py prepare_samples --output-path s3://YOUR_BUCKET/bioinfo_batch/

Note, YOUR_BUCKET should be an S3 bucket where you have read and write access. Now, we are ready to run the main workflow:

redun run workflow.py main \
    --samples-file s3://YOUR_BUCKET/bioinfo_batch/samples.txt \
    --output-path s3://YOUR_BUCKET/bioinfo_batch/

This will process all the sequencing samples listed in the manifest file s3://YOUR_BUCKET/bioinfo_batch/samples.txt and store sequencing alignment outputs in s3://YOUR_BUCKET/bioinfo_batch/. Overall, you should see output on your command line similar to the following:

[redun] | JOB STATUS 2021/07/06 17:07:05
[redun] | JOB STATUS 2021/07/06 17:07:05
    [redun] | JOB STATUS 2021/07/06 17:07:05
    [redun] | TASK                                                PENDING RUNNING  FAILED  CACHED    DONE   TOTAL
    [redun] |
    [redun] | ALL                                                       0       0       0       0      46      46
    [redun] | redun.const                                               0       0       0       0       1       1
    [redun] | redun.copy_file                                           0       0       0       0       4       4
    [redun] | redun.eval_                                               0       0       0       0       2       2
    [redun] | redun.examples.bioinfo_docker.align_sample                0       0       0       0       2       2
    [redun] | redun.examples.bioinfo_docker.align_samples               0       0       0       0       1       1
    [redun] | redun.examples.bioinfo_docker.base_recalib                0       0       0       0       2       2
    [redun] | redun.examples.bioinfo_docker.collect_align_metrics       0       0       0       0       2       2
    [redun] | redun.examples.bioinfo_docker.collect_depth_metrics       0       0       0       0       2       2
    [redun] | redun.examples.bioinfo_docker.concat_files                0       0       0       0       4       4
    [redun] | redun.examples.bioinfo_docker.cut_adapters                0       0       0       0       2       2
    [redun] | redun.examples.bioinfo_docker.download_genome_ref         0       0       0       0       1       1
    [redun] | redun.examples.bioinfo_docker.download_sites_files        0       0       0       0       1       1
    [redun] | redun.examples.bioinfo_docker.get_sample_read_pairs       0       0       0       0       1       1
    [redun] | redun.examples.bioinfo_docker.index_genome_ref            0       0       0       0       1       1
    [redun] | redun.examples.bioinfo_docker.main                        0       0       0       0       1       1
    [redun] | redun.examples.bioinfo_docker.mark_dups                   0       0       0       0       2       2
    [redun] | redun.examples.bioinfo_docker.merge_fastq                 0       0       0       0       2       2
    [redun] | redun.examples.bioinfo_docker.run_bwa                     0       0       0       0       2       2
    [redun] | redun.script                                              0       0       0       0      13      13
    [redun]
    [redun] Execution duration: 8.82 seconds
{'expt': {'align_metrics': {'align_metrics': File(path=s3://YOUR_BUCKET/bioinfo_batch/expt/expt.alignment_metrics.txt, hash=fb23d375),
                            'has_insert_metrics': True,
                            'insert_metrics': File(path=s3://YOUR_BUCKET/bioinfo_batch/expt/expt.insert_metrics.txt, hash=ad4cf78e),
                            'insert_metrics.status': File(path=s3://YOUR_BUCKET/bioinfo_batch/expt/expt.insert_metrics.status, hash=daaf8d5e),
                            'insert_size_hist': File(path=s3://YOUR_BUCKET/bioinfo_batch/expt/expt.insert_size_histogram.pdf, hash=828a55f7)},
          'bai': File(path=s3://YOUR_BUCKET/bioinfo_batch/expt/expt.recalib.bam.bai, hash=8c83723a),
          'bam': File(path=s3://YOUR_BUCKET/bioinfo_batch/expt/expt.recalib.bam, hash=0f74f997),
          'depth_metrics': {'depth': File(path=s3://YOUR_BUCKET/bioinfo_batch/expt/expt.depth.txt, hash=4f823939)},
          'dup_metrics': File(path=s3://YOUR_BUCKET/bioinfo_batch/expt/expt.mark_dup.markdup.metrics.txt, hash=5d8d233d)},
 'toy': {'align_metrics': {'align_metrics': File(path=s3://YOUR_BUCKET/bioinfo_batch/toy/toy.alignment_metrics.txt, hash=ebb9027f),
                           'has_insert_metrics': True,
                           'insert_metrics': File(path=s3://YOUR_BUCKET/bioinfo_batch/toy/toy.insert_metrics.txt, hash=a260420b),
                           'insert_metrics.status': File(path=s3://YOUR_BUCKET/bioinfo_batch/toy/toy.insert_metrics.status, hash=e67f69c9),
                           'insert_size_hist': File(path=s3://YOUR_BUCKET/bioinfo_batch/toy/toy.insert_size_histogram.pdf, hash=b844442d)},
         'bai': File(path=s3://YOUR_BUCKET/bioinfo_batch/toy/toy.recalib.bam.bai, hash=a0a99f12),
         'bam': File(path=s3://YOUR_BUCKET/bioinfo_batch/toy/toy.recalib.bam, hash=15bdb8ab),
         'depth_metrics': {'depth': File(path=s3://YOUR_BUCKET/bioinfo_batch/toy/toy.depth.txt, hash=37f830e6)},
         'dup_metrics': File(path=s3://YOUR_BUCKET/bioinfo_batch/toy/toy.mark_dup.markdup.metrics.txt, hash=2a4169e6)}}

Understanding the features

Next, we’ll look some example tasks to learn about how redun is making use of AWS Batch.

Calling bioinformatics scripts in Docker containers

One of the main tasks defined in the workflow is run_bwa, which uses the BWA program to align reads in the FASTQ files (reads1.fastq.gz and reads2.fastq.gz) to a reference human genome. The output will be an alignment file in the BAM format. Here, is what the task code looks like:

@task(version="2")
def run_bwa(
    sample_id: str,
    reads1: File,
    reads2: File,
    output_bam_path: str,
    genome_ref_file: File,
    genome_ref_index: Dict[str, File],
    platform: str = "illumina",
) -> Dict[str, File]:
    """
    Align FASTQ reads to a reference genome to produce a BAM alignment.
    """
    local_genome_ref = genome_ref_file.basename()

    return script(

        # Shell script for performing sequence alignment.
        f"""
        bwa-mem2 mem \
            -R "@RG\\tID:{sample_id}\\tSM:{sample_id}\\tPL:{platform}" \
            -o sample.aln.sam \
            {local_genome_ref} \
            reads1.fastq.gz \
            reads2.fastq.gz
        samtools view -o sample.aln.bam sample.aln.sam
        gatk \
            --java-options "-Xmx14G" \
            SortSam \
            --INPUT sample.aln.bam \
            --OUTPUT /dev/stdout \
            --SORT_ORDER "coordinate" \
            --CREATE_INDEX false \
            --CREATE_MD5_FILE false \
            | \
        gatk \
            --java-options "-Xmx14G" \
             SetNmMdAndUqTags \
            --INPUT /dev/stdin \
            --OUTPUT sample.aln.sorted.settag.bam \
            --CREATE_INDEX true \
            --CREATE_MD5_FILE true \
            --REFERENCE_SEQUENCE {local_genome_ref}
        """,

        # AWS Batch job configuration.
        executor="batch",
        memory=30,

        # Input and output file staging from S3 to local container.
        inputs=[
            reads1.stage("reads1.fastq.gz"),
            reads2.stage("reads2.fastq.gz"),
            genome_ref_file.stage(),
            [file.stage() for file in genome_ref_index.values()],
        ],
        outputs={
            "bam": File(output_bam_path).stage("sample.aln.sorted.settag.bam"),
            "stdout": File("-"),
        },
    )

All tasks in redun are defined using the @task decorator. Here, we’ve defined run_bwa() which can be called from higher-level tasks to define a pipeline. Within the task, we make a call to redun’s script() task where we can define a shell script we would like to execute. We make use of Python’s f-strings to substitute a few variables. Next, we define that this script should run on AWS Batch, using the keyword argument executor="batch", and that it should use 30Gb of memory per job. Other options can also be defined such as number of vCPUs, shared memory size, etc.

The last thing to point out about this task are the inputs and outputs arguments, which perform a step we call File Staging. Many Bioinformatic programs and data science tools expect their data to be accessible from the local file system. Therefore, for data in object storage, we often have to copy datasets into the container before our script runs, and then copy files out of the container afterwards. We are free to write such copy commands ourselves using something like aws s3 cp ..., however, doing that for every script could be quite tedious and error prone. Instead, we make use of the File.stage() method to define a pairing between remote (S3) and local files, from which redun will automatically generate the appropriate copy commands.

staged_file: StagingFile = File(remote_path).stage(local_path)

Lastly, as a convenience, the final return value of script() takes the shape of outputs, but with each StagingFile replaced by File(remote_path). Therefore, in the example above, the final return value will be something like

{
    "bam": File(output_bam_path),
    "stdout": string_containing_script_stdout,
}

Combining tasks into pipelines

Next, let’s look at how we can combine tasks like run_bwa() into larger pipelines. Specifically, the task align_sample combines several steps for processing the DNA sequence reads of a single sample.

@task()
def align_sample(
    sample_id: str,
    read_pairs: Dict[str, str],
    genome_ref_file: File,
    genome_ref_index: Dict[str, File],
    sites_files: List[File],
    output_path: str,
) -> Dict[str, Any]:
    """
    Align a single sample's paired FASTQs.
    """
    # Merge fastqs across lanes.
    read1_files = list(map(File, read_pairs["1"]))
    read2_files = list(map(File, read_pairs["2"]))

    output_fastq_path = os.path.join(output_path, sample_id)
    reads = merge_fastq(read1_files, read2_files, output_fastq_path)

    # Cut adapters off of reads.
    output_cutadapt_path = os.path.join(output_path, f"{sample_id}.cutadapt")
    reads = cut_adapters(reads["reads1"], reads["reads2"], output_cutadapt_path)

    # Run alignment.
    output_bam_path = os.path.join(output_path, f"{sample_id}.aln.sorted.settag.bam")
    genome_ref_bwa_index = {ext: genome_ref_index[ext] for ext in BWA_INDEX_EXTS}
    align = run_bwa(
        sample_id,
        reads["reads1"],
        reads["reads2"],
        output_bam_path,
        genome_ref_file=genome_ref_file,
        genome_ref_index=genome_ref_bwa_index,
    )

    # Postprocess alignment.
    align2 = mark_dups(align["bam"], os.path.join(output_path, f"{sample_id}.mark_dup"))
    align3 = base_recalib(
        align2["bam"],
        os.path.join(output_path, f"{sample_id}.recalib"),
        genome_ref_file=genome_ref_file,
        genome_ref_fai_file=genome_ref_index["fai"],
        genome_ref_dict_file=genome_ref_index["dict"],
        sites_files=sites_files,
    )
    align_metrics = collect_align_metrics(
        align3["bam"], os.path.join(output_path, sample_id), genome_ref_file=genome_ref_file
    )
    depth_metrics = collect_depth_metrics(align3["bam"], os.path.join(output_path, sample_id))

    return {
        "bam": align3["bam"],
        "bai": align3["bai"],
        "dup_metrics": align2["dup_metrics"],
        "align_metrics": align_metrics,
        "depth_metrics": depth_metrics,
    }

When a task is called, such as merge_fastq(...) it is treated as a lazy function call, and returns immediately with a special value called an Expression.

In:  reads = merge_fastq(read1_files, read2_files, output_fastq_path)
Out: TaskExpression('merge_fastq', (read1_files, read2_file, output_fastq_path), {})

The Expression above (TaskExpression is one kind of Expression) represents a lazy value that will eventually be computed by calling the function merge_fastq with the given positional arguments, (read1_files1, read2_file, output_fastq_path), and no keyword arguments, {}.

We are free to pass an Expression as an argument to any other task, and thus build up a larger chain of task executions. Expression objects also implement common operators, such as __getitem__ and math operators (+-, etc), in order to provide a convenient way to chain additional small lazy value manipulations. You can see one example of that use here, where we pass the Expression reads["reads1"] as an argument to cut_adapters(...). Overall, we can visualize these expressions as forming a tree or more generally a Directed Acyclic Graph (DAG).

Figure 1: Evaluating a sequence alignment workflow using graph reduction.** In redun, workflows are represented as an Expression Graph (left) which contain concrete value nodes (grey) and Expression nodes (blue). The redun scheduler identifies tasks that are ready to execute by finding subtrees that can be reduced (red boxes), substituting task results back into the Expression Graph (red arrows). The scheduler continues to find reductions until the Expression Graph reduces to a single concrete value (grey box, far right). If any reduction has been done before (determine by comparing an Expression's hash), the redun scheduler can replay the reduction from a central cache and skip task re-execution.

Figure 1: Evaluating a sequence alignment workflow using graph reduction. In redun, workflows are represented as an Expression Graph (left) which contain concrete value nodes (grey) and Expression nodes (blue). The redun scheduler identifies tasks that are ready to execute by finding subtrees that can be reduced (red boxes), substituting task results back into the Expression Graph (red arrows). The scheduler continues to find reductions until the Expression Graph reduces to a single concrete value (grey box, far right). If any reduction has been done before (determine by comparing an Expression’s hash), the redun scheduler can replay the reduction from a central cache and skip task re-execution.

The redun scheduler uses the Expression Graph to schedule task executions using a process called graph reduction, a technique commonly used in interpreters and compilers (Figure 1). In this example, the redun scheduler would first identify the TaskExpression, merge_fastq, as the first task to execute because all of its arguments are concrete (read1_filesread2_fileoutput_fastq_path). When the task finishes execution, its result is substituted into the graph, which is called a reduction. As reductions are performed, addition reductions/executions become possible. Eventually, the entire graph reduces to a single concrete value, a BAM alignment.

Expressive workflows

Redun’s use of Expression Graphs and graph reduction enables a very expressive approach to defining workflows. For example, there are patterns expressible in redun that are difficult or impossible to express in other workflow systems: fanouts of dynamic degree (i.e. parallel map), sequential tasks of dynamic depth (e.g. loops), recursionhigher order tasks (i.e. tasks that tasks as arguments), and error handling (e.g. throwing and catching errors across a workflow). The key reason why this is possible is that the graph that the user writes to represent their program, the Expression Graph, is distinct from the graph used for scheduling, the Call Graph (also called a trace or data flow), and the reduction process shown above allows the data flow to be derived automatically from the Expression Graph. Workflow frameworks that do not distinguish between a program and its data flow trace, can unintentionally restrict users to overly static workflows. Early versions of workflow engines ran into this restriction when trying to represent the workflow as a single static data flow DAG. More recent workflow engines that aim to be more expressive and dynamic, have each taken a different approach towards how they represent the program graph with varying success.

Conclusion

In this blog post, we reviewed a new data science tool, redun, that allows easy development of scientific workflows in the cloud using serverless compute platforms like AWS Batch and AWS Glue. Specifically, we showed how redun lends itself to Bioinformatics workflows which typically involve wrapping Unix-based programs that require file staging to and from object storage. In the next blog post, we will see how redun scales to large and heterogenous workflows by leveraging AWS Batch features such as Array Jobs and AWS Glue features such as Glue DynamicFrame.

The content and opinions in this blog are those of the third-party author and AWS is not responsible for the content or accuracy of this blog.

Matt Rasmussen

Matt Rasmussen

As VP of Software Engineering at insitro, Matt is responsible for leading the development of data pipelines, data storage systems, provenance tracking, and engineering infrastructure to support the high-throughput biology and Machine Learning teams at insitro.

Previously, as VP of Engineering for Myriad Genetics, Matt led engineering teams focused on software automation and genomic data pipelines to make high complexity genetic testing routine in clinical practice. During his time at Counsyl, Matt developed and scaled the software behind several successful prenatal genetic testing products. Matt holds a Ph.D in Computer Science from MIT, where he developed efficient bioinformatic algorithms with applications in evolutionary genomics and population genetics. In his spare time, Matt enjoys running, drawing, programming for fun and playing with his kids.