Containers

How Affirm uses AWS Fargate and Apache Airflow to manage batch jobs

This post was contributed by Greg Sterin, Senior Staff Software Engineer, Affirm.

Affirm’s mission is to deliver honest financial products that improve lives. Affirm is reinventing credit to make it more honest and friendly, giving consumers the flexibility to buy now and pay later without any hidden fees or compounding interest.

Affirm’s Platform Engineering team is responsible for building frameworks, systems, and tools that allow the rest of engineering to produce and deploy software that is scalable, reliable, secure, correct, consistent, instrumented, and well tested. The team builds core backend frameworks on which most of our online and offline business logic is developed and executed. One such key component is the infrastructure for running our numerous online and offline batch jobs, built by the Batch Compute Infrastructure team within Platform. Our engineering teams run hundreds of batch jobs on a daily basis to support everything from our financial pipelines (accounting ledger, loan sales, repayments, disbursement, reconciliations, billing etc.), to simple maintenance tasks like removing expired prequalification offers periodically or cleaning up old ElasticSearch indices. Our developer friendly batch infrastructure frameworks enable our engineers to easily implement, test, deploy, scale and monitor compute and storage intensive pipelines with very low effort.

Due to the periodic nature of our batch tasks, it is critical that we are able to acquire the necessary capacity during times when many jobs are running, while avoiding paying for capacity when jobs are not running. To address batch pipelines needs, we designed a system that dynamically provisions capacity as needed, using​ ​Apache Airflow​ for our scheduling, retries, failure messaging, and tracking of runs, and AWS Fargate​ to quickly acquire dynamic capacity and pay for what we use.

How we got here: a story of growth

In the early days of Affirm, we needed to run batch jobs on periodic schedules. We chose​ ​Luigi​ as our pipeline framework. Luigi allows composing tasks that can depend on other tasks (into a DAG), tracks completion of tasks, and ensures that only one instance of a given task runs at a time.

Luigi has no built-in scheduler, so we naturally scheduled these jobs into the​ ​crontab​ of nodes provisioned to run our batch jobs. Early on, we only needed two EC2 nodes (in two different Availability Zones for resiliency) running the same crontab. Luigi would make sure that even though two instances of a task got triggered, only one would get to run. However, if that task failed or became unresponsive, the other job run would detect this and rerun the task on the other node, creating a resilient system in cases of Availability Zone or node failures.

As our business grew and the number and types of batch jobs that needed to be run increased, two nodes were not enough to run all the jobs. We created more nodes and started deploying subsets of jobs on different sets of nodes, offering high availability. This ensured that:

1)  We had capacity available to run all of our jobs
2)  There was isolation of CPU and memory resources for different job types.

However, as more engineering teams spun up to support Affirm’s growth, and the rate of new tasks increased, managing this capacity became a burden for our Batch Infrastructure team. Additionally, we needed a way to scale up capacity when it was needed, but scale down when jobs completed and were no longer necessary. Keeping these nodes up all the time would support our peak capacity, but was wasteful; there were periods of time where these nodes would sit idle when tasks were not running.

While using Auto Scaling groups with rules based on metrics is the traditional way to handle autoscaling, tracking metrics is more complex with batch jobs than with web servers. For example, you might autoscale your web server by scaling up whenever average CPU exceeds some percentage. However, for batch jobs, this doesn’t work. Once the jobs are scheduled and starts running on a node, new nodes that are spun up will be idle and have no effect. The metric needs to track the total amount of requested capacity, and also make sure that new jobs are scheduled on specific nodes that are reserved explicitly for that job. Since we were scheduling jobs from the crontab, such an approach was not feasible to achieve. It was time for a new design.

Separating scheduling from execution with Apache Airflow

Our first step was to decouple job scheduling from job execution. On a crontab, the same node that does the scheduling runs the actual work specified by the crontab. If your goal is to spin up capacity based on what gets scheduled, you cannot have your scheduling occur on the actual capacity. It needs to happen beforehand.​ ​Apache Airflow​ is a job scheduling framework that has been widely adopted, and provided what we were looking for. Instead of specifying schedules in the crontab, we declare airflow DAG files that specify what needs to run and when.

Airflow can natively dispatch the execution of work to a fleet of worker nodes using​ Celery​, a task queuing framework. This native approach, however, did not fully meet our needs. While it allows you to spread tasks across a fleet of worker nodes, which you can autoscale, a few issues remain unsolved:

1. The Celery task that is scheduled contains no information about how compute or memory intensive the task is, and not all tasks are equal. Some may only need 1/10th of a vCPU and 1GB of memory, while others need 32 vCPUs and 30GB. So, if you have two worker nodes, each running two tasks, but the tasks on one node are much heavier than those on another node, it is not trivial to make it so that a new task gets scheduled on the node running ‘lighter’ tasks.

2. Scale down needs to wait for all jobs to drain from a node. Just because the worker fleet as a whole is underutilized, if any single job is running on a node, that node cannot be shut down until that job is finished. We would need to write code to handle this case robustly, including making Celery workers that need to be drained continue to process existing tasks while refusing to accept new ones, tracking when all jobs have drained from such a node, and finally shutting it down.

Dynamic job capacity with AWS Fargate on Amazon ECS

In looking for solutions to the problems with autoscaling capacity for jobs, we found a great fit in​ ​AWS Fargate​ on Amazon ECS. Fargate allows you to run containers without managing servers, as well as specify the memory and CPU reservations the task needs. It will manage the capacity to run the job, execute the job with the resources you requested, and only bill you for the time used. This solved all our concerns with managing autoscaling:

  1. We can have jobs specify the CPU and memory they need, and that amount will be allocated by Fargate
  2. We don’t pay for idle capacity when jobs are not running
  3. We get new capacity dynamically on the most granular level: per job. We also don’t need to worry about resource isolation and starvation.

The dynamic jobs framework on Airflow using Fargate on ECS

All our tasks are defined as Airflow DAGs, which allow the owners of the task to configure the error email notifications, timeouts, and retries for their task. Airflow’s task execution is done via a concept of operators​, which represent a single, idempotent task in a DAG. In our case, all our tasks use a modified version of the​ ​ECS Operator​, called the AffirmECSOperator, which we import into Airflow as a​ ​plugin​. Our modified operator enhances the existing ECSOperator in several ways:

  1. Adding links in the Airflow logs to the Amazon CloudWatch Logs that actually contain the logs for the tasks being run.
  2. Passing in additional API arguments not supported by the existing operator into ECS RunTask invocations to specify things like the Fargate networkConfiguration, placementConstraints, and count (number of instances of the configured task to run).
  3. Saving the references to the ECS Task IDs the Operator triggers in an​ ​Airflow Variable​, so that if the Airflow worker that executes RunTask is restarted or dies while waiting on the task to run in Fargate, the retry of that task reclaims these tasks that continue running in Fargate, rather than spinning up additional ‘duplicate’ tasks.

When task owners configure their DAGs, they also specify the number of vCPUs and memory that their task needs, which get passed into the ECS RunTask API as well. Fargate will run tasks with the requested resources.

Once the task is started in Fargate, the AffirmECSOperator will poll on the tasks in Fargate until they either complete or fail. If they fail, it raises an exception so that Airflow registers the task as failed and executes appropriate retries or fails and notifies the owners.

In order to execute a task in ECS, you need a​ ​Task Definition​ that specifies which containers run as a part of the tasks, their entrypoint, the IAM role these tasks should run under, and where the logs output to in CloudWatch Logs. We use salt stack to create these task definitions at deploy time:

# Build ECS task definition
{%- ​for​ ​item​ ​in​ ​pillar​.​dynamic_jobs​.​ecs​.​cpu_memory_combinations​ %} 
ecs-task-definition-fargate-{{ item.suffix }}​: 
boto_ecs.register_definition​:
  - ​require​:
    - ​test​: ​success-image-exists
  - ​region​: {{ ​grains​.​location​ }}
​  # This is actually the name of the task definition
  - ​family​: {{ ​task_family​ }}{{ ​item​.​suffix​ }}​_fargate ​# Fargate only supports awsvpc
  - ​networkMode​: ​awsvpc
  - ​requiresCompatibilities​:
    - ​FARGATE
​  # IAM Role ARN we will grant to the ECS Agent and the Docker Daemon
  - ​executionRoleArn​: ​'{{ ecs.execution_role }}'
  - ​taskRoleArn​: ​'{{ecs.task_role }}'
  - ​cpu​: ​'{{ item.cpu }}'
  - ​memory​: ​'{{ item.memory }}'
  - ​containerDefinitions​:
      ​# Runs the main entrypoint for luigi jobs scheduled on fargate 
      ​# Should be overridden when scheduling tasks
      - ​command​:
        - ​help
​       # Our tasks generally need the network ​
       disableNetworking​: ​false
​       # Locks in the base command to run in the container ​
       entryPoint​:
        - ​"{{ entrypoint }}"
​       # Should be overridden when scheduling tasks ​
       environment​:
        - ​name​: ​EXAMPLE_ENV_VAR ​
         value​: ​affirm-ecs-task
       ​essential​: ​true
​       image​: {{ ​docker_tag​ }} ​
       logConfiguration​:
​        logDriver​: ​awslogs ​
        options​:
​         awslogs-region​: {{ ​grains​.​location​ }}
​         awslogs-stream-prefix​: {{ ​log_stream_prefix​ }}
​         awslogs-group​: {{ ​pillar​.​dynamic_jobs​.​ecs​.​cloudwatch_logs​.​log_group​ }}
       ​name​: {{ ​pillar​.​dynamic_jobs​.​ecs​.​container_definition_name​ }}
​       workingDirectory​: ​"{{ vars.dockervenv }}" 
{%- ​endfor​ %}

As part of our automated deployment flow, when a new Docker image (corresponding to a new software release) is pushed to production, our deployment generates a new task definition referencing the appropriate release image, and renders new Airflow DAGs to the Airflow nodes which pass in this new task definition into the AffirmECSOperator so that it launches the right image for these tasks​:

"""
RENDERED BY SALT
"""
from​ airflow ​import​ DAG
from​ airflow.operators ​import​ AffirmECSOperator 
from​ datetime ​import​ datetime, timedelta 
default_args = {
​  'owner'​: ​'payments-prod'​,
​  'depends_on_past'​: ​False​,
​  'start_date'​: datetime.strptime(​'2020-05-01 18:15:00'​, ​'%Y-%m-%d %H:%M:%S'​), ​
  'email'​: [team-email@affirm.com​'],
​  'email_on_failure'​: ​True​,
​  'email_on_retry'​: ​False​,
​  'catchup'​: ​False​,
​  'retries'​: ​3​,
​  'retry_delay'​: timedelta(​seconds​=​60​),
}
dag = DAG(​'example-task'​,
​        default_args​=default_args, ​
        schedule_interval​=​'15 18 * * *'​, ​
        catchup​=​False​)
task0 = AffirmECSOperator(​task_definition​=​'release-v1.2.3'​, ​
  cluster​=​'dynamic-jobs'​,
​  overrides​={
​    'containerOverrides'​: [{
​      'name'​: ​'affirm-docker'​,
      ​'command'​: [​"MyTask"​, ​"--start-date"​, ​"{{
execution_date.strftime('%Y-%m-%d') }}"​], ​
      'cpu'​: ​int​(​1024​),
​      'memory'​: ​int​(​4096​), 
      ​'environment'​: [
          {
​              'name'​: ​'JOB_IMPORTANCE'​, ​
              'value'​: ​'normal'
           }, 
           {
              'name': 'ALERT_PRIORITY',
​              'value'​: ​'P3' 
           }
       ] 
   },
   {
      ​'name'​: ​'metrics_collector'​,
​      'command'​: [​"--resource"​, ​"example-task"​],
   }] 
 },
​ launch_type​=​'FARGATE'​, ​
 additional_runtask_api_args​={ ​
   "networkConfiguration"​: {
​      "awsvpcConfiguration"​: {
​        "assignPublicIp"​: ​"DISABLED"​, 
        ​"securityGroups"​: [​"sg-12345"​], ​
        "subnets"​: [​"subnet-abcd"​, ​"subnet-efgh"​]
      } 
   },
​   "count"​: ​1 
  },
​  region_name​=​'us-east-1'​, ​
  task_id​=​'example-task'​, ​
  execution_timeout​=timedelta(​hours​=​12​), ​
  dag​=dag)

All our tasks run through the same entrypoint command in the image, which is a shell script that passes the command args passed via the RunTask API into the appropriate application entrypoint, so that different tasks can be scheduled through the same entrypoint and therefore the same task definition. Using the entrypoint also prevents any other commands other than task invocations to be executed via Amazon ECS in our images.

Fargate limitations

Fargate is our preferred launch type for executing tasks because we do not have to manage the underlying capacity that executes tasks – it dynamically provisions the capacity we need to run jobs on demand. It is also more secure, as each task runs on a separate EC2 instance in a fully isolated environment.

However, Fargate has two limitations, which block some tasks from running:

  1. Max of 4 vCPUs per container
  2. Max of 10GB local storage (Platform Version 1.3) or 20GB local storage (Platform Version 1.4)

For the subset of tasks we have that need more than 4 vCPUs in a single container to run properly, or require a lot of local storage, we are unable to run the tasks in Fargate, but our framework still needs to support these task runs.

Luckily, Amazon ECS also supports an EC2 launch type for task execution, in which you manage the EC2 capacity that containers run in yourself. These nodes get an ECS Agent installed on them, which connects to the appropriate cluster’s ECS control plane and is able to run tasks scheduled via the RunTask API with the EC2 launch type specified. We manage a small fleet of ECS workers to support such tasks. We currently scale them manually, but plan on implementing​ ​ECS Capacity Providers​ to make our ECS worker fleet capacity more dynamic and on-demand like Fargate.

The end result

After migrating to this new system, we have a significant amount of benefits over our previous cron-based system:

  1. More configuration in control of job owners, including resource reservations for their tasks (to prevent resource contention), retry, timeout, and error notification configuration, all self-service via their DAG definition files.
  2. Less capacity we have to manage ourselves as most of our tasks execute on Fargate. We just need to make sure that our max concurrent tasks limit in Fargate is properly tuned, and we have alerts to allow us to request higher limits when this grows.
  3. Reduced operational load on the infrastructure teams. Each product engineering team has the freedom to define their jobs, pipelines, and required capacity without needing any help from the infrastructure/operations team to provision more capacity
  4. Cost savings from not paying for capacity when it is not being used.
  5. Cost allocation and charge backs can be more granular. Each Fargate task can be tagged with the owning team, helping to identify the cost of running each workload.
  6. On-demand burst capacity when needed (ex: many daily tasks all start at UTC 0, and they can all get capacity when they need it with Fargate.)
  7. Benefits of Airflow UI for operational actions on tasks:
    1. Killing pipelines, rerunning pipelines, and seeing which pipelines are running and their statuses
    2. In the future, we can implement RBAC in Airflow to limit users’ ability to operate on jobs their team does not own.
  8. Security: having AWS manage the infrastructure Fargate runs on means we can trust AWS to maintain the security of the infrastructure the tasks run on and don’t have to manage these operations.

Affirm’s Platform and Infrastructure teams are continuously evolving and improving the technologies that power Affirm. Scaling a global, highly performant, highly available, fault tolerant, secure, observable Infrastructure shared across multiple merchants, vendors and products is hard. Consistency and security requirements make scaling harder. Such hard problems fascinate us and we strive in finding simple solutions. If you’re interested in learning more about what our Platform Engineering team and other teams do, and how to join us, check out our ​technical blogs​ and our ​careers​ page.