Блог Amazon Web Services

Построение архитектур на основе событий с помощью Amazon SNS FIFO

Оригинал статьи: ссылка (James Beswick,  Senior Developer Advocate, AWS Serverless Team; Christian Mueller, Principal Solutions Architect). часть 1, часть 2, часть 3 (текущая)

Разработчики всё чаще применяют архитектуры на основе событий для построения распределённых приложений с низкой связностью друг с другом. Бывает, что события должны передаваться всем приложениям-адресатам в строгом порядке. При помощи Amazon SNS FIFO топиков и Amazon SQS FIFO очередей, вы можете построить приложение, с требованиями по строгой очередности доставки сообщений, фильтрации, дедупликации и шифрованию.

В этом посте, я приведу пример архитектуры на основе событий, и покажу как её построить на основе Amazon SNS FIFO топиков и Amazon SQS FIFO очередей.

Типовые требования для архитектур на основе событий

Консистентность данных является типовым бизнес требованием в архитектурах на основе событий, что можно перевести в технические требования о нулевой потере данных и строгой очередности обработки сообщений. Например, если вы быстро обновляете доменный объект, вам необходимо быть уверенным, что все события будут получены подписчиком именно в том порядке, в каком они были обновлены. Таким образом, текущее состояние доменного объекта будет представлено последним сообщением, которое получили подписчики. Аналогичным образом, все события обновления объекта должны быть получены после первого события создания объекта.

До появления функционала Amazon SNS FIFO, архитекторы должны были разрабатывать приложения с учётом возможности получения сообщений в неправильном порядке, и обрабатывать такую ситуацию перед обработкой сообщений.

Другой типовой сложностью является предотвращение дубликатов, при отправке событий в сервис сообщений. Если отправитель события получает ошибку, например, сетевую задержку, то он не может определить, было ли отправленное сообщение получено и обработано сервисом сообщений, или нет.

Клиент может повторить отправку, так как это является поведением по-умолчанию для некоторых HTTP кодов ответов в AWS SDKs, что может привести к дубликатам сообщений.

До появления Amazon SNS FIFO, разработчики должны были проектировать приложения с учётом идемпотентности. В некоторых случаях, когда сообщение не может быть идемпотентным, было необходимо обеспечить идемпотентность в логике получателя сообщения. Например, путём добавления хранилища типа ключ-значение, такого как Amazon DynamoDB или Amazon ElastiCache (Redis) к сервису-получателю. Используя такой подход, получатель может отслеживать факт повторного получения события с тем же идентификатором.

Исследование на примере кадрового агенства

Данное тестовое приложение моделирует веб-сайт кадрового агенства со списком доступных вакансий. Приложение состоит из нескольких сервисов, три из которых я рассмотрю подробно.

Пользовательский сервис, реализующий антикоррупционную логику, получает change data capture (CDC) поток сообщений из базы данных. Он преобразует низкоуровневые сообщения полученные из базы данных в понятные сообщения бизнес-уровня для приложений, реализующих доменную логику. Эти бизнес-сообщения отправляются в SNS FIFO “JobEvents.fifo“ топик. Таким образом, сервисы, которым нужна эта информация, могут подписаться на данные сообщения и обрабатывать их в асинхронном режиме.

В аналитическом домене, сервису нужно получать все сообщения. У него есть SQS FIFO “AnalyticsJobEvents.fifo” очередь, подписанная на SNS FIFO “JobEvents.fifo“ топик. Сервис использует SQS FIFO как источник данных для функции AWS Lambda, которая обрабатывает и сохраняет полученные сообщения в Amazon S3. S3 – это объектное хранилище, которое обеспечивает высокую доступность, масштабируемость, безопасность и производительность при работе с объектами. Интеграция S3 с сервисами Amazon EMRAWS Glue или Amazon Athena для получения инсайтов из ваших данных.

Сервис инвентаризации, использует SQS FIFO “InventoryJobEvents.fifo” очередь, которая подписана на SNS FIFO “JobEvents.fifo“ топик. Этому сервису нужны только сообщения типа “JobCreated” и “JobDeleted”, так как он отслеживает какие предложения о работе доступны в настоящий момент, и сохраняет эту информацию в DynamoDB таблице. Для обеспечения этой логики, он использует SNS политику фильтрации для получения не всех, а только нужных сообщений.

Данное тестовое приложение рассматривает возможности SNS FIFO, поэтому мы не рассматриваем другие сервисы. Наш пример следует лучшим практикам по работе с SQS и рекоммендациям SNS redrive policy, а также настраивает очередь для недоставленных сообщений (dead-letter queues, DLQ). Это может быть полезно в случаях, когда SNS не может доставить сообщение адресату – SQS очереди. Это также помогает, если Lambda-функция несколько раз не может корректно обработать сообщение из SQS FIFO очереди-источника. В обоих случаях, обязательным требованием является, чтобы SQS DLQ была SQS FIFO очередью.

Развёртывание приложения

Для развёртывания приложения с использованием подхода «инфраструктура-как-код», мы используем AWS SAM. SAM предоставляет сокращённый синтаксис для описания Lambda-функций, API, баз данных и сопоставления источников событий. Во время развёртывания, AWS SAM темплейт преобразуется в синтаксис AWS CloudFormation.

Чтобы начать, склонируйте “event-driven-architecture-with-sns-fifo” репозиторий отсюда. Или, скачайте архив здесь и распакуйте в выбранную директорию.

В качестве подготовки, нужно установить SAM CLIPython 3, и PIP. Также нужно настроить AWS CLI.

Перейдите в корневую директорию проекта и соберите приложение с помощью SAM. SAM скачивает необходимые зависимости и сохраняет их локально. Для этого, выполните следующие команды в терминале:

git clone https://github.com/aws-samples/event-driven-architecture-with-amazon-sns-fifo.git
cd event-driven-architecture-with-amazon-sns-fifo
sam build

Вы должны увидеть вывод в консоли:

Теперь, развернём приложение следующей командой:

sam deploy --guided

Введите необходимые параметры для развёртывания, такие как название стэка и выбранный AWS Region:

После успешного развёртывания, вы должны увидеть следующее сообщение в консоли:

Детали реализации

Я рассмотрю три сервиса, которые используются в тестовом приложении и как они используют функционал SNS FIFO.

Антикоррупционный сервис

Сервису принадлежит SNS FIFO “JobEvents.fifo” топик, куда он публикует бизнес сообщения, связанные с размещением вакансий. Он использует SNS FIFO топик из-за необходимости соблюдения порядка в рамках каждого ID вакансии. SNS FIFO не настроен на дедупликацию на основе контента сообщения, так как для этого необходим уникальный ID сообщения дедупликации для каждого сообщения. Соответствующий SAM темплейт выглядит следующим образом:

  JobEventsTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: JobEvents.fifo
      FifoTopic: true
      ContentBasedDeduplication: false

Для упрощения, антикоррупционная функция в тестовом приложении не запрашивает сообщения из CDC потока внешей базы данных. Она использует Amazon CloudWatch Events в качестве источника данных при запуске функции каждую минуту.

Я передаю SNS FIFO топик Amazon Resource Name (ARN) в функцию через переменную окружения. Это делает функцию более переносимой для развёртывания в разных окружениях и стадиях. IAM политика функции предоставляет права для отправки сообщений только в определённый SNS топик:

  AntiCorruptionFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: anti-corruption-service/
      Handler: app.lambda_handler
      Runtime: python3.7
      MemorySize: 256
      Environment:
        Variables:
          TOPIC_ARN: !Ref JobEventsTopic
      Policies:
        - SNSPublishMessagePolicy
            TopicName: !GetAtt JobEventsTopic.TopicName
      Events:
        Trigger:
          Type: 
          Properties:
            Schedule: 'rate(1 minute)'

Антикоррупционная функция использует функционал SNS publish API, который позволяет определить “MessageDeduplicationId” и “MessageGroupId”. Свойство “MessageDeduplicationId” используется для фильтрации дубликатов сообщений, которые отправляются в SNS FIFO в рамках 5-минутного интервала дедупликации. Свойство “MessageGroupId” необходимо, так как SNS FIFO обрабатывает все события вакансий в рамках одной группы сообщений строго по порядку, независимо от других групп сообщений в рамках одного топика.

Другим важным аспектом в данном примере является использование “MessageAttributes”. Для определения атрибута сообщения, мы назначаем имя eventType” и значения: “JobCreated”, “JobSalaryUpdated”, и “JobDeleted”. Это позволяет подписчикам определять SNS политики фильтрации для получения только определённых типов сообщений:

import boto3
from datetime import datetime
import json
import os
import random
import uuid

TOPIC_ARN = os.environ['TOPIC_ARN']

sns = boto3.client('sns')

def lambda_handler(event, context):
    jobId = str(random.randrange(0, 1000))

    send_job_created_event(jobId)
    send_job_updated_event(jobId)
    send_job_deleted_event(jobId)
    return

def send_job_created_event(jobId):
    messageId = str(uuid.uuid4())

    response = sns.publish(
        TopicArn=TOPIC_ARN,
        Subject=f'Job {jobId} created',
        MessageDeduplicationId=messageId,
        MessageGroupId=f'JOB-{jobId}',
        Message={...},
        MessageAttributes = {
            'eventType': {
                'DataType': 'String',
                'StringValue': 'JobCreated'
            }
        }
    )
    print('sent message and received response: {}'.format(response))
    return

def send_job_updated_event(jobId):
    messageId = str(uuid.uuid4())

    response = sns.publish(...)
    print('sent message and received response: {}'.format(response))
    return

def send_job_deleted_event(jobId):
    messageId = str(uuid.uuid4())

    response = sns.publish(...)
    print('sent message and received response: {}'.format(response))
    return

Аналитический сервис

Этот сервис использует SQS FIFO “AnalyticsJobEvents.fifo” очередь, которая подписана на SNS FIFO “JobEvents.fifo” топик. Следуя лучшим практикам, мы определяем redrive политики для SQS FIFO очереди и SNS FIFO подписки в темплейте SAM:

AnalyticsJobEventsQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: AnalyticsJobEvents.fifo
      FifoQueue: true
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt AnalyticsJobEventsQueueDLQ.Arn
        maxReceiveCount: 3

  AnalyticsJobEventsQueueToJobEventsTopicSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      Endpoint: !GetAtt AnalyticsJobEventsQueue.Arn
      Protocol: sqs
      RawMessageDelivery: true
      TopicArn: !Ref JobEventsTopic
      RedrivePolicy: !Sub '{"deadLetterTargetArn": "${AnalyticsJobEventsSubscriptionDLQ.Arn}"}'

Аналитическая Lambda-функция использует SQS FIFO в качестве источника данных. Имя S3 бакета передаётся в функцию через переменную окружения, обеспечивая переносимость кода между окружениями и стадиями. IAM политика для функции позволяет только операции записи объектов для указанного S3 бакета:

 AnalyticsFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: analytics-service/
      Handler: app.lambda_handler
      Runtime: python3.7
      MemorySize: 256
      Environment:
        Variables:
          BUCKET_NAME: !Ref AnalyticsBucket
      Policies:
        - S3WritePolicy:
            BucketName: !Ref AnalyticsBucket
      Events:
        Trigger:
          Type: SQS
          Properties:
            Queue: !GetAtt AnalyticsJobEventsQueue.Arn
            BatchSize: 10

Вы можете ознакомиться с кодом функции в GitHub репозитории.

Сервис инвентаризации

Этот сервис также использует SQS FIFO “InventoryJobEvents.fifo” очередь, которая подписана на SNS FIFO “JobEvents.fifo” топик. Он использует redrive политику для SQS FIFO очереди и SNS FIFO подписки. Данному сервису нужны только определённые сообщения, поэтому он использует SNS политику фильтрации для указания типов сообщений:

InventoryJobEventsQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: InventoryJobEvents.fifo
      FifoQueue: true
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt InventoryJobEventsQueueDLQ.Arn
        maxReceiveCount: 3

  InventoryJobEventsQueueToJobEventsTopicSubscription:
    Type: AWS::SNS::Subscription
    Properties:
      Endpoint: !GetAtt InventoryJobEventsQueue.Arn
      Protocol: sqs
      RawMessageDelivery: true
      TopicArn: !Ref JobEventsTopic
      FilterPolicy: '{"eventType":["JobCreated", "JobDeleted"]}'
      RedrivePolicy: !Sub '{"deadLetterTargetArn": "${InventoryJobEventsQueueSubscriptionDLQ.Arn}"}'

Lambda-функция инвентаризации также использует SQS FIFO в качестве источника данных. Имя DynamoDB таблицы передаётся через переменную окружения. IAM политика позволяет только операции чтения и записи в указанную таблицу:

InventoryFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: inventory-service/
      Handler: app.lambda_handler
      Runtime: python3.7
      MemorySize: 256
      Environment:
        Variables:
          TABLE_NAME: !Ref InventoryTable
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref InventoryTable
      Events:
        Trigger:
          Type: SQS
          Properties:
            Queue: !GetAtt InventoryJobEventsQueue.Arn
            BatchSize: 10

Вы можете ознакомиться с кодом функции в GitHub репозитории.

Выводы

Amazon SNS FIFO топики могут облегчить проектирование архитектур на основе событий и уменьшить объём пользовательского кода, необходимого для создания подобных приложений.

Использую нативную интеграцию с Amazon SQS FIFO очередями, вы можете создавать приложения, отправляющие сообщения тысячам подписчиков. Этот паттерн помогает достигать цели по консистентности данных, дедупликации, фильтрации и шифрованию в режиме почти реального времени, путём использования управляемых сервисов AWS.

Для получения информации о доступности в AWS регионах и сервисных ограничениях, см. SNS endpoints and quotas и SQS endpoints and quotas. Для получения детальной информации о функционале FIFO, см. SNS FIFO и SQS FIFO.