Блог Amazon Web Services

Использование Amazon SQS очередей необрабатываемых сообщений (DLQ) для повторной обработки

Оригинал статьи: ссылка (James Beswick, Senior Developer Advocate; Alexandre Pinhel, Specialist SA Manager; Guillaume Marchand и Luke Hargreaves, Solutions Architects)

Amazon Simple Queue Service (Amazon SQS) – это полностью управляемый сервис очередей сообщений, с помощью которого можно изолировать и масштабировать микросервисы, распределенные системы и бессерверные приложения.

Распространённый функционал в Amazon SQS – это очереди необрабатываемых сообщений (DLQ). DLQ используется для хранения сообщений, которые не могут быть обработаны потребителем.

Данный пост описывает как добавить отказоустойчивости в автоматизированном режиме к вашим SQS очередям. Этот механизм мониторит DLQ и отправляет сообщения обратно в основную очередь, чтобы проверить возможность их повторной обработки. Также используется специальный алгоритм, чтобы не допустить бесконечного зацикливания обработки сообщений. После каждой попытки повторной обработки, время ожидания увеличивается, пока не будет достигнут предел для повторной обработки.

Я использую Amazon SQS DLQAWS Lambda, и специальный алгоритм для уменьшения количества попыток повторений для неуспешно обработанных сообщений. Затем я запакую и размещу данное бессерверное решение в AWS Serverless Application Repository.

DLQ и повторная обработка сообщений

Основная задача DLQ – это управление ситуацией в случае, когда сообщения не могут быть корректно обработаны потребителем. Это позволяет изолировать необработанные сообщения, чтобы определить причину ошибок, возникающих при их обработке. Часто такие ситуации возникают из-за ошибок в приложениях. Например, приложение-обработчик не может корректно разобрать сообщение и выдает необработанное сообщение об ошибке. Эта ошибка затем ведет к ответу с кодом ошибки, что приводит к отправке сообщения в DLQ.  Документация AWS содержит руководство с подробным описанием конфигурации очереди Amazon SQS для необрабатываемых сообщений.

Для обработки неуспешных сообщений, я настроил механизм повторной обработки сообщений используя алгоритм экспоненциальной отсрочки. Целью данного алгоритма является использование постоянно увеличивающихся временных интервалов между неуспешно повторяющихся попытками отправки. Большинство подобных алгоритмов используют джиттер (задержку, выбранную случайным образом) для предотвращения последовательных коллизий. Такой подход распределяет повторные попытки во времени, предоставляя больше возможностей для успешной обработки.

Описание решения

Последовательность событий, происходящих с сообщением с момента отправки до SQS:

  1. Исходное приложение отправляет сообщение в очередь SQS.
  2. Приложение-получатель не может успешно обработать сообщение из этой же SQS очереди.
  3. Сообщение перемещается из основной SQS очереди в DLQ по-умолчанию в соответствии с настройками очереди.
  4. Lambda функция настроена на получение событий из SQS DLQ. Она получает сообщение и отправляет его обратно в основную SQS очередь с добавлением временного таймера видимости сообщения.
  5. Временной таймер настроен с использованием алгоритма экспоненциальной отсрочки и джиттера.
  6. Вы можете настроить максимальное количество попыток повторной обработки. Если сообщение превысит этот лимит, то сообщение отправляется во вторую DLQ, где оператор проверяет и обрабатывает сообщения вручную.

Как работает функция повторной отправки

Каждый раз, когда SQS DLQ получает сообщение, она запускает Lambda функцию выполняющую повторную отправку. Код функции использует атрибут SQS сообщения `sqs-dlq-replay-nb` для хранения текущего количества реализованных попыток повторной отправки. Количество повторений сравнивается с лимитом, установленным в файле конфигурации приложения. Если лимит превышен, сообщение перемещается в очередь, которой управляет оператор приложения – для ручной обработки. Если лимит не достигнут, функция Lambda использует данные, полученные из события для создания нового сообщения, предназначенного для отправки в основную SQS очередь. И, наконец, функция обновляет счётчик повторений, добавляет временной таймер к сообщению, и затем отправляет сообщение обратно в основную очередь.

def handler(event, context):
    """Lambda function handler."""
    for record in event['Records']:
        nbReplay = 0
        # number of replay
        if 'sqs-dlq-replay-nb' in record['messageAttributes']:
            nbReplay = int(record['messageAttributes']['sqs-dlq-replay-nb']["stringValue"])

        nbReplay += 1
        if nbReplay > config.MAX_ATTEMPS:
            raise MaxAttempsError(replay=nbReplay, max=config.MAX_ATTEMPS)

        # SQS attributes
        attributes = record['messageAttributes']
        attributes.update({'sqs-dlq-replay-nb': {'StringValue': str(nbReplay), 'DataType': 'Number'}})

        _sqs_attributes_cleaner(attributes)

        # Backoff
        b = backoff.ExpoBackoffFullJitter(base=config.BACKOFF_RATE, cap=config.MESSAGE_RETENTION_PERIOD)
        delaySeconds = b.backoff(n=int(nbReplay))

        # SQS
        SQS.send_message(
            QueueUrl=config.SQS_MAIN_URL,
            MessageBody=record['body'],
            DelaySeconds=int(delaySeconds),
            MessageAttributes=record['messageAttributes']
        )

Как использовать приложение

Вы можете использовать это бессерверное приложение с помощью одного из следующих вариантов:

  • Lambda консоли: выберите “Browse serverless app repository” опцию для создания функции. Выберите “amazon-sqs-dlq-replay-backoff” приложение в репозитории для бессерверных приложений. Затем настройте приложение, установив параметры по-умолчанию для SQS и для функционала повторений.
  • Serverless Framework, как описано Yan Cui в данном посте.
  • AWS CloudFormation шаблона, с использованием ресурса AWS::ServerlessRepo::Application,  как описано в документации.

Пример CloudFormation шаблона, использующего AWS Serverless Application репозиторий:

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Resources:
  ReplaySqsQueue:
    Type: AWS::Serverless::Application
    Properties:
      Location: 
        ApplicationId: arn:aws:serverlessrepo:eu-west-1:1234123412:applications~sqs-dlq-replay
        SemanticVersion: 1.0.0
      Parameters:
        BackoffRate: "2"
        MaxAttempts: "3"

Выводы

Я описал как алгоритм экспоненциальной отсрочки с джиттером улучшает возможности обработки сообщений с помощью Amazon SQS очередей. Вы можете использовать приложение amazon-sqs-dlq-replay-backoff из AWS Serverless Application репозитория. Скачать код можно в репозитории GitHub.

Для ознакомления с DLQ в Amazon SQS, прочтите:

Для имплементации механизма повторений, ознакомьтесь с документацией:

Больше информации о бессерверных технологиях вы найдете по ссылке: https://serverlessland.com.