Design Pattern for Highly Parallel Compute: Recursive Scaling with Amazon SQS
Scaling based on Amazon Simple Queue Service (SQS) is a commonly used design pattern. At AWS Professional Services, we have recently used a variant of this pattern to achieve highly parallel computation for larger customers. In fact, any use case with a tree-like set of entities can use this pattern. It’s useful in a workflow where all nodes between the root and leaves must be processed rapidly and in parallel. Some use cases would be database parent-child records or graph relationships. Let us explore a couple of example scenarios, both hypothetical (COVID contact tracing) and real (processing of existing Amazon Simple Storage Service (S3) content).
Imagine that an application has collected the following tree-like COVID contact data, where ‘A’ has come in contact with ‘B’, ‘C’ and ‘D’. Then these people came in contact with a different (or the same) set of people. This continues several layers deep. In this scenario, we need to write an application to rapidly communicate with all persons in the tree.
A sequential solution would perform the following actions for each person (or node) in Figure 1, one by one, likely using a fixed number of compute instances:
- Send emails out to each entity in the chain
- Calculate risk based on medical history
- Update a central database
Our parallelized solution would perform the same actions faster, thus acting quickly to mitigate spread. It will:
- Take ‘A’ as the input and spin up (or reuse) a container instance on Amazon Elastic Container Service (Amazon ECS) for each entity in the chain. This will result in the simultaneous processing of entities, regardless of whether an entity is a leaf node or parent to some other node.
An efficient algorithm to traverse an inverted tree and process nodes in parallel can be applied to many common needs. This is true whether the nodes are rows in a relational database, nodes in a graph database, or S3 prefixes. In each such scenario, a compute operation is performed for each node in the tree. Recursion is the ideal solution for discovery and processing of such nodes.
In a real-life scenario for an AWS customer, hundreds of petabytes of existing S3 content in named buckets were copied and processed. The data lake content spanned thousands of “subfolders” (prefixes) nested several dozens of layers deep. The leaf directories had thousands of objects each. The goal was to achieve massive parallelization of these prefixes during copying and processing. The situation demanded running many instances of Python or Java AWS SDK thread pools in parallel, to rapidly process non-overlapping prefixes. It was important to achieve this while retaining control on the number of parallel nodes, ability to report progress, and retry failures.
The Design Pattern: Recursive Amazon ECS Scaling Using SQS and Custom Metrics
Processing starts when the first message (representing the root, or entity ‘A’ in Figure 1) is posted to the SQS queue, see following Figure 2.
Each ECS task processes one named node at a time. It enumerates other nodes, especially the ones that the current one is the parent of. It then recursively posts back more messages to the SQS queue, one for each such child node. This prompts the automatic scaling mechanism to spin up more ECS tasks because the queue size has changed. A recalculated value of the custom metric, “Backlog per Task”, which refers to the number of unread messages divided by the number of ECS Tasks, is posted to Amazon CloudWatch by either an AWS Lambda function or a command-line interface (CLI) command running every one minute. Each new task instance helps drain the queue and continues posting more messages upon finding child nodes in the tree. This continues until there are no nodes left.
There are several ways to publish custom metrics and auto-scale; two are shown in Figure 2.
Considerations for Design Pattern Usage
1. Defining a format and schema for the SQS messages. JSON is a good portable option.
A message should identify a single node for processing. For S3 content processing, this can be the prefix. For a graphical or relational database, this can be the id or primary key.
2. Development of Docker Container Image and Deployment to ECS.
a. The process running inside the container can, in a nearly continuous loop with sleeps, read the SQS queue. We can pass parameters like the queue URL as task environment variables.
b. The ECS Task should be created with a Task IAM Role (not to be confused with Execution Role) that permits accessing the queue and other needed resources.
c. On reading a message from the queue, the ECS task can discover whether the node it is processing is a parent node with more child nodes, or a leaf node. A variant pattern may involve feeding a manifest along with the message, thus decreasing the need for inline discovery.
If the task determines that the node in question is a parent, it must enumerate all child nodes, and create an SQS message for each. It will then post all of them back to the SQS queue. It can also wrap up any other processing, such as updating a tracker database. Note that an S3 prefix can have objects directly residing in it, in addition to other nested prefixes; hence it can have the characteristics of both parent and leaf nodes.
3. Choosing an auto-scaling design.
a. AWS Fargate is recommended as the launch type.
b. The ECS cluster can be created one of two ways:
i. With both Task Definition and Service Definition: this causes ECS to ensure that at least one instance of the task is always running. If we want the compute instances running only when there are messages in the queue, this might be unnecessary.
ii. With a Task Definition but no Service Definition: this is preferred for on-demand tasks where the number of running tasks can go down to zero when there are no messages in the queue.
In both (i) and (ii), the container could execute a loop to drain and process the SQS queue.
If we choose (i), the loop could be infinite with sleeps in between, as a stopped task will cause the ECS Service to spin up another. For example, if the process spawned by the CMD directive in the Dockerfile exits, a new instance will be created.
If we choose (ii), the loop can be designed to exit if the queue is empty. When a new message is posted, we can rely on the auto-scaling mechanism to spin up additional ECS tasks.
c. Consideration of Latency: The design should consider the latency involved both in sensing the change of queue depth or updating metric values, in addition to auto-scaling response times.
Custom metrics cannot be updated more frequently than 1 minute apart. Automatic scaling needs time to react and spin up instances as well. This results in at least 2–3 minutes of latency in processing a new message, when all existing tasks are busy, or none exist yet. The computation of the custom metric would rely on the SQS attribute ApproximateNumberOfMessages, which is not continuously updated.
4. Design Consideration: An optional “depth-specifier” in the JSON message. When a specific message is processed, the task can decrement this integer when recursively posting messages for the next layer of children. On reaching zero, recursion can stop, switching from parallel to sequential processing. This technique is useful if we want to control the extent of parallelization, for example, to throttle. See Figure 3 following:
To understand how a tracking scaling policy can be implemented using a custom CloudWatch metric, see the following documentation (let us call it Exhibit A), though not all steps cannot be followed verbatim: Scaling based on Amazon SQS. Whereas Exhibit A applies a target tracking scaling policy to Amazon EC2 Instances, we must apply the same to an ECS Fargate cluster. However, it does demonstrate the usage of the custom metric, “Backlog per Task (Instance),” which is published to CloudWatch. In Alternate Design 2 in Figure 2, this is done using CLI or SDK running every minute as a cronjob or scheduled task, which is the way shown in Exhibit A as well.
In Alternate Design 1, Figure 2, a scheduled Lambda function is used to publish the custom metric to CloudWatch. This is actually preferable instead of CLI/SDK, as it is a completely serverless solution.
We have seen how we can extend the commonly used design pattern of automatically scaling computation as a response to system load, just by monitoring queue depth. In this modified design pattern, the compute units leverage this response to increase parallelism.