AWS Database Blog

Rate-limiting calls to Amazon DynamoDB using Python Boto3, Part 2: Distributed Coordination

Part 1 of this series showed how to rate-limit calls to Amazon DynamoDB by using Python Boto3 event hooks. In this post, I expand on the concept and show how to rate-limit calls in a distributed environment, where you want a maximum allowed rate across the full set of clients but can’t use direct client-to-client communication.

Rate limiting in a distributed environment can be useful when you have multiple clients and don’t want to give a predetermined fixed capacity to each client because the number of clients might fluctuate or the consumption per client might vary over time. The technique discussed in this post uses an Amazon Simple Storage Service (Amazon S3) bucket for distributed coordination of the shared system state. It’s a proven solution used in Bulk Executor for Amazon DynamoDB and its logic is available encapsulated in the DistributedDynamoDBMonitorWorker and DistributedDynamoDBMonitorAggregator classes on GitHub.

Solution overview

When deploying a large set of distributed clients that come and go and don’t always know about each other—such as with AWS Glue executors—you generally can’t rely on direct client-to-client communication. As a result, it’s necessary to have a shared resource as a coordination point such as a DynamoDB table, an Amazon ElastiCache cluster, or an Amazon S3 bucket prefix.

This post uses Amazon S3 as the coordination point. When you use AWS Glue, an associated Amazon S3 bucket must be present to hold its scripts, so using that Amazon S3 resource minimizes dependencies.

The following are some key design points for this post’s solution:

  • Each client can rate-limit itself by using the DynamoDBMonitor code base, as discussed in Part 1.
  • Each client can access a shared Amazon S3 bucket and prefix. It’s common to have a shared Amazon S3 bucket for distributed execution contexts such as AWS Glue.
  • Each client can write (and repeatedly overwrite) its read/write consumption status and current timestamp to a unique Amazon S3 object within that prefix.
  • A background thread can summarize the objects written by these clients and create a single summary object. The thread can ignore files with timestamps too far in the past.
  • Each client can regularly read the summary object and calculate if the shared consumption is trending higher than the desired aggregate limits.
  • If shared consumption is too high, each client can lower its local rate limit accordingly.
  • If shared consumption is too low, each client can raise its local rate limit accordingly, up to the maximum local rate.

Limits and costs

When using Amazon S3, it’s important to consider the GetObject and PutObject call rates to ensure they are within limits. If you assume each thread reads/writes every 5 seconds and there are 1,000 distributed client threads:

  • Each per-client object is updated one time every 5 seconds and read every 5 seconds.
  • The summary object is updated one time every 5 seconds.
  • The summary object is read 1,000 times every 5 seconds (200 times per second).

Reading the summary object is the only high-activity event. Because Amazon S3 limits allow up to 5,500 GET requests per object per second, this consumption is well within the Amazon S3 limits. For efficiency, ensure that the bucket does not have S3 Versioning enabled.

It’s also important to consider costs. Assuming 1,000 clients, every 5 seconds there will be approximately 2,000 reads and 1,000 writes. With today’s us-east-1 pricing of $0.005 per thousand PUT operations and $0.0004 per thousand GET operations, that’s $0.0058 every 5 seconds or $0.07 every minute (if coordinating across 1,000 executors).

By contrast, if you were using on-demand capacity mode in DynamoDB, it would cost $0.625 per million writes and $0.125 per million reads, which is the equivalent of $0.000625 per thousand writes and $0.000125 per thousand reads. For the same PUT and GET operations as above, that’s $0.000875 every 5 seconds or $0.01 every minute.

Pre-built classes for distributed coordination

The distributed rate-limiting logic is encapsulated in two Python classes. DistributedDynamoDBMonitorAggregator runs the summary thread, and DistributedDynamoDBMonitorWorker is used by each client. The following code shows sample usage of these two classes:

# THIS CODE APPEARS IN THE AWS GLUE DRIVER

    # Create a session
    session = boto3.Session()

    # In a single driver context, start the aggregator thread by specifying a bucket and prefix
    # The session is used here to access Amazon S3
    aggregator = DistributedDynamoDBMonitorAggregator(
        session=session,
        bucket=bucket,
        prefix=prefix
    )


    # THIS CODE APPEARS IN EACH AWS GLUE TASK

    # Create a session in this context
    session = boto3.Session()

    # Create the distributed monitor with the same bucket and prefix 
    # The session is used here to access Amazon S3 and to manage the Boto3 event hooks
    DistributedDynamoDBMonitorWorker(
            session=session,
            bucket=bucket,
            prefix=prefix,
            aggregate_max_read_rate=aggregate_max_read_rate,
            aggregate_max_write_rate=aggregate_max_write_rate,
            worker_max_read_rate=worker_max_read_rate,
            worker_max_write_rate=worker_max_write_rate
        )

    # In each worker client context
    client = session.client("dynamodb")
    ... perform read and write calls 

Any code using this client benefits from the distributed rate limiting logic because the session has been modified to use event hooks, as first discussed in Part 1.

Code walkthrough of the aggregator class

This section reviews the essential code blocks of the DistributedDynamoDBMonitorAggregator Python class. The class constructor takes the following parameters:

  • Self, always the first argument in Python constructors.
  • The session that will be used for connecting to Amazon S3.
  • The bucket name.
  • The prefix. This can be, for example, an AWS Glue job ID.
  • A staleness cutoff. This is how stale a file can be in seconds before it’s effectively ignored as an out of date leftover file.
  • The minimum interval in seconds between running aggregation loops.
  • The name of the summary file.
  • Whether to automatically start the background thread.
class DistributedDynamoDBMonitorAggregator:
    def __init__(self, session, bucket, prefix, 
                 staleness_cutoff=15, interval=5, 
                 output_key='summary.json', autostart=True):

The DistributedDynamoDBMonitorAggregator code is too long to include here. The key aspect is it repeatedly calls the following function.

First, the code gets the list of Amazon S3 objects written by clients, excluding the summary file:

    def aggregate_once(self, max_workers: int = 16):
        aggregated_read_rate = 0.0
        aggregated_write_rate = 0.0
        active_workers = 0

        # 1) Collect keys (paginated)
        keys = []
        paginator = self.s3_client.get_paginator("list_objects_v2")
        for page in paginator.paginate(Bucket=self.bucket, Prefix=self.prefix):
            for obj in page.get("Contents", []) or []:
                k = obj["Key"]
                if k.endswith(".json") and not k.endswith(self.output_key):
                    keys.append(k)

Then fetches the Amazon S3 objects in parallel:

# 2) Concurrently fetch + parse (skip if no keys; aggregates stay zero)
        def fetch(key: str):
            try:
                now = time.time()
                resp = self.s3_client.get_object(Bucket=self.bucket, Key=key)
                data = json.loads(resp["Body"].read().decode("utf-8"))

                timestamp = data.get("timestamp")
                if (now - timestamp) > self.staleness_cutoff:
                    log.debug(f"[Aggregator] Skipping stale worker {key} (age {(now-timestamp):.1f}s)")
                    return None

                return float(data.get("read_rate", 0) or 0), float(data.get("write_rate", 0) or 0)
            except (BotoCoreError, ClientError, json.JSONDecodeError) as e:
                log.debug(f"[Aggregator] Failed to read {key}: {e}")
                return None

        if keys:
            with ThreadPoolExecutor(max_workers=max_workers) as ex:
                for result in ex.map(fetch, keys):
                    if result is None:
                        continue
                    r, w = result
                    aggregated_read_rate  += r
                    aggregated_write_rate += w
                    active_workers        += 1

Then writes the aggregated summary out to the summary file in the same bucket prefix location:

# 3) Write the summary out
        summary = {
            "timestamp": time.time(),
            "aggregated_read_rate": aggregated_read_rate,
            "aggregated_write_rate": aggregated_write_rate,
            "active_workers": active_workers,
            "aggregation_time_utc": datetime.utcnow().replace(microsecond=0).isoformat()
        }

        self.s3_client.put_object(
            Bucket=self.bucket,
            Key=f"{self.prefix}{self.output_key}",
            Body=json.dumps(summary).encode("utf-8"),
        )

Code walkthrough of the worker class

This section reviews the essential code blocks of the DistributedDynamoDBMonitorWorker class.

The worker class constructor takes a different set of parameters:

  • Self, as usual
  • The session that will be used for connecting to Amazon S3 and also adding event hooks.
  • The bucket name.
  • The prefix. Same as passed to DistributedDynamoDBMonitorAggregator.
  • The maximum allowed reads and writes in aggregate.
  • The maximum allowed reads and writes per worker.
  • The initial read and write rates of the worker. None means to use the default (using an algorithm within the constructor code).
  • The sync interval in seconds for how often to sync the data to Amazon S3.
  • The ID of this worker. This ID will be used in the Amazon S3 object name. None means use a UUID.
  • The name of the summary file.
  • Whether to automatically start the background thread.
class DistributedDynamoDBMonitorWorker:
    def __init__(self, session, bucket, prefix,
                 aggregate_max_read_rate=100000,
                 aggregate_max_write_rate=50000,
                 worker_max_read_rate=1500,
                 worker_max_write_rate=500,
                 worker_initial_read_rate=None,
                 worker_initial_write_rate=None,
                 sync_interval=5,
                 worker_id=None,
                 summary_key='summary.json',
                 autostart=True):

The DistributedDynamoDBMonitorWorker code is too long to include here. The key aspect is it repeatedly calls the following function.

First the function calculates its read and write metrics and writes, or overwrites, the results to its specific object in Amazon S3.

def _sync_loop(self):
      while not self.stop_event.is_set():
          try:
              if self.stop_event.wait(self.sync_interval * random.uniform(0.9, 1.1)): # +/- 10% for jitter 
                  break

              # Upload own metrics to the worker-specific Amazon S3 file
              upload_key = f"{self.prefix}worker-{self.worker_id}.json"
              wall_ts = time.time()
              mono_now = time.monotonic()

              with self.monitor.metrics_lock:
                  total_read  = float(self.monitor.metrics['read_capacity'])
                  total_write = float(self.monitor.metrics['write_capacity'])

              if self._last_metrics_snapshot is None:
                  read_rate = 0.0
                  write_rate = 0.0
              else:
                  last_t, last_r, last_w = self._last_metrics_snapshot
                  dt = max(mono_now - last_t, 1e-6)
                  read_rate  = max(0.0, (total_read  - last_r) / dt)
                  write_rate = max(0.0, (total_write - last_w) / dt)

              self._last_metrics_snapshot = (mono_now, total_read, total_write)

              payload = json.dumps({
                  "worker_id": self.worker_id,
                  "timestamp": wall_ts,
                  "read_rate": read_rate,
                  "write_rate": write_rate,
              })

              self.s3_client.put_object(Bucket=self.bucket, Key=upload_key, Body=payload.encode('utf-8'))

Then it reads the aggregated summary data:

# Read aggregator summary (if exists)
              try:
                  resp = self.s3_client.get_object(Bucket=self.bucket, Key=f"{self.prefix}{self.summary_key}")
                  summary_data = json.loads(resp['Body'].read().decode('utf-8'))

                  current_agg_read_rate = summary_data.get("aggregated_read_rate", 0.0)
                  current_agg_write_rate = summary_data.get("aggregated_write_rate", 0.0)

Finally, it performs any local adjustments (the algorithm is explained afterward):

# Compute aggregate scaling factor
                  read_scale = self.aggregate_max_read_rate / current_agg_read_rate if current_agg_read_rate > 0 else 1.0
                  write_scale = self.aggregate_max_write_rate / current_agg_write_rate if current_agg_write_rate > 0 else 1.0

                  # Apply that scaling to local limits
                  worker_allowed_read_rate = read_scale * self.monitor.max_read_rate
                  worker_allowed_write_rate = write_scale * self.monitor.max_write_rate

                  # Choose a new local target rate capped at a per-worker maximum
                  new_read_target = min(worker_allowed_read_rate, self.worker_max_read_rate)
                  new_write_target = min(worker_allowed_write_rate, self.worker_max_write_rate)

                  # Apply a smooth adjustment to avoid oscillation
                  smoothing_factor = 0.4  # 0=no change, 1=jump immediately
                  self.monitor.max_read_rate = (1 - smoothing_factor) * self.monitor.max_read_rate + smoothing_factor * new_read_target
                  self.monitor.max_write_rate = (1 - smoothing_factor) * self.monitor.max_write_rate + smoothing_factor * new_write_target

              except self.s3_client.exceptions.NoSuchKey:
                  # If no summary file is available yet, continue using the current rate limits
                  print(f"[{self.worker_id}] No summary file found; using current rate limits")

          except Exception as e:
              print(f"[{self.worker_id}] Error in sync loop: {e}")

A scaling factor tells us what we would multiply our current rate by to achieve the goal rate across the entire distributed system. It is computed by taking the allowed maximum rate and dividing it by the current rate. A value above 1.0 means that, in aggregate, the maximum allowed rate is higher than the current rate. A value below 1.0 means the maximum allows rate is below the current rate. For example, if the aggregate_max_write_rate is 10,000 and the current_agg_write_rate is 11,000, the write_scale is 0.91 (or 91%), which means a 9% slowdown would be in order.

The next section multiplies the current local limits by the scaling factor to calculate potential new local worker limits that get us more aligned to the aggregate goal. If things are running too fast and the write_scale is 0.91 then this worker calculates its worker_allowed_write_rate as 91% of its current write maximum. The min() calls make sure the new value doesn’t exceed the per-worker maximums.

Then it’s time to update. You could imagine just assigning the new value immediately, but experimentally it tends to create overly reactive behaviors with values jumping up and down and not settling down. It works better in a distributed system where it takes time to gather data and time to change behavior to also have it take time to change the goals. The smoothing factor dampens the movement with each iteration. A value of 0.4 moves values 40% of the way to the new value each time.

You can find the code for DistributedDynamoDBMonitorAggregator and DistributedDynamoDBMonitorWorker in the GitHub repo that is part of Bulk Executor for Amazon DynamoDB, where the code is used to implement maximum read and write limits for distributed AWS Glue jobs.

Conclusion

Distributed clients can rate-limit their DynamoDB read and write consumption by using a shared resource such as Amazon S3 or DynamoDB for coordination. Each client reports its own statistics every few seconds. A separate thread reads the individual client statistics and generates an aggregate representation every few seconds. Each client after writing will read the aggregate statistics and adjust its own local rate limits.

If the aggregate rates are running high, each client attempts to lower its own consumption proportionally, dampened by a smoothing factor to avoid overreaction. If the aggregate rates are low, each client attempts to raise its own limits proportionally up to the per-worker maximum. Note that the DynamoDBMonitor class described in Part 1 encapsulates the per-worker rate limiting logic.

Try this solution for your own use case, and share your feedback in the comments.


About the author

Jason Hunter

Jason Hunter

Jason is a California-based Principal Solutions Architect specializing in Amazon DynamoDB. He’s been working with NoSQL databases since 2003. He’s known for his contributions to Java, open source, and XML. You can find more DynamoDB posts and others posts written by Jason Hunter in the AWS Database Blog.