Блог Amazon Web Services

Использование самостоятельно установленной Apache Kafka в качестве источника событий для AWS Lambda

Оригинал статьи: ссылка (James Beswick, Senior Developer Advocate)

Apache Kafka – это платформа с открытым исходным кодом для потоковой передачи событий, используемая для поддержки таких рабочих нагрузок, как конвейеры обработки данных и потоковая аналитика. Apache Kafka является распределенной потоковой платформой, которая концептуально похожа на Amazon Kinesis.

Теперь, когда самостоятельно установленная Kafka поддерживается в качестве источника событий для AWS Lambda, вы можете обрабатывать сообщения из топиков (topics) в функциях Lambda. Это облегчает интеграцию ваших самостоятельно установленных кластеров Kafka с последующими бессерверными процессами обработки данных.

В этой статье я объясню, как самостоятельно установить кластер Apache Kafka на Amazon EC2 и настроить ключевые элементы сетевой конфигурации. Я также покажу, как создать функцию Lambda для обработки сообщений из топика Kafka. Несмотря на то, что этот процесс похож на использование Amazon Managed Streaming for Apache Kafka (Amazon MSK) в качестве источника событий, есть некоторые важные различия.

Введение

Использование Kafka в качестве источника событий работает аналогично использованию Amazon SQS или Amazon Kinesis. Во всех этих случаях сервис Lambda запрашивает новые записи или сообщения из источника событий, после чего синхронно вызывает нужную функцию Lambda. Сервис Lambda считывает сообщения партиями и передает их в вашу функцию в виде параметров (payload) события вызова функции.

Сервис Lambda выступает как приложение-потребитель сообщений (consumer application) для вашего топика Kafka. Он запрашивает сообщения из одной или нескольких партиций и отправляет эти записи в нужную функцию. Сервис Lambda продолжает запрашивать сообщения до тех пор, пока в топике не остаётся необработанных сообщений.

Настройка сети для самостоятельно установленной Kafka

Инстансы Amazon EC2 с запущенной Kafka лучше всего разворачивать в приватных подсетях. Для того чтобы функция Lambda могла опрашивать инстансы Kafka, вам необходимо убедиться в том, что в публичной подсети каждой зоны доступности (AZ) запущен NAT Gateway.

В окружениях, предназначенных для тестирования и разработки, можно настроить маршрутизацию трафика через один NAT Gateway в одной AZ. Для производственных рабочих нагрузок в целях резервного дублирования рекомендуется, чтобы в каждой зоне доступности было развёрнуто по одному NAT Gateway. Шаги ниже помогут вам развернуть следующую архитектуру:

Архитектура самостоятельно установленной Kafka

  1. Создайте VPC с публичными и приватными подсетями и NAT Gateway, обеспечивающим доступ в Интернет. Чтобы развернуть такую инфраструктуру с помощью AWS CloudFormation, используйте данный шаблон.
  2. Из консоли управления VPC отредактируйте группу безопасности по умолчанию, созданную этим шаблоном, чтобы разрешить входящий трафик на следующие порты:
    • Custom TCP: порты 2888-3888 со всех IP-адресов.
    • SSH (порт 22), только с вашего IP-адреса.
    • Custom TCP: порт 2181 со всех IP-адресов.
    • Custom TCP: порт 9092 со всех IP-адресов.
    • Весь трафик от ресурсов в той же самой группе безопасности.

Конфигурация группы безопасности

Развертывание инстансов EC2 и установка Kafka

Теперь вы можете развернуть инстансы EC2, используя только что созданную сетевую конфигурацию, и установить Kafka:

  1. Из консоли управления EC2 запустите инстанс с Ubuntu Server 18.04 LTS. Убедитесь, что в каждой приватной подсети в разных зонах доступности запущено по одному инстансу. Присвойте всем запущенным инстансам EC2 группу безопасности, созданную ранее шаблоном CloudFormation.
  2. Затем запустите еще один инстанс EC2 в любой из публичных подсетей. Это – бастион-хост, используемый для доступа к инстансам в приватных подсетях. Также присвойте бастион-хосту группу безопасности, созданную ранее шаблоном CloudFormation.
    Инстансы EC2
  3. Подключитесь по SSH к бастион-хосту и затем к первому инстансу EC2 в приватной подсети, используя подходящий метод для вашей операционной системы. В этой статье объясняются различные методы, которые вы можете использовать. Повторите процесс в другом терминале для второго инстанса EC2 в приватной подсети.
    Терминалы, подключенные к EC2 инстансам
  4. Установите Java на обоих инстансах:
    sudo add-apt-repository ppa:webupd8team/java
    sudo apt update
    sudo apt install openjdk-8-jdk
    java –version
  5. Установите Kafka на обоих инстансах:
    wget http://www-us.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz
    tar xzf kafka_2.12-2.3.1.tgz
    ln -s kafka_2.12-2.3.1 kafka

Настройка и запуск Zookeeper

Настройте и запустите сервис Zookeeper, который управляет брокерами Kafka:

  1. Настройте Zookeeper ID на первом инстансе:
    cd kafka
    mkdir /tmp/zookeeper
    touch /tmp/zookeeper/myid
    echo "1" >> /tmp/zookeeper/myid
  2. Повторите процесс на втором инстансе, используя другое значение Zookeeper ID:
    cd kafka
    mkdir /tmp/zookeeper
    touch /tmp/zookeeper/myid
    echo "2" >> /tmp/zookeeper/myid
  3. На первом инстансе отредактируйте файл config/zookeeper.properties, добавив приватный IP-адрес второго инстанса:
    initLimit=5
    syncLimit=2
    tickTime=2000
    # list of servers: <ip>:2888:3888
    server.1=0.0.0.0:2888:3888 
    server.2=<<IP address of second instance>>:2888:3888
  4. На втором инстансе отредактируйте файл config/zookeeper.properties, добавив приватный IP-адрес первого инстанса:
    initLimit=5
    syncLimit=2
    tickTime=2000
    # list of servers: <ip>:2888:3888
    server.1=<<IP address of first instance>>:2888:3888 
    server.2=0.0.0.0:2888:3888
  5. Запустите Zookeeper на обоих инстансах: bin/zookeeper-server-start.sh config/zookeeper.properties.

Настройка и запуск Kafka

Настройте и запустите брокеры Kafka:

  1. На первом инстансе отредактируйте файл config/server.properties следующим образом:
    broker.id=1
    zookeeper.connect=0.0.0.0:2181, =<<IP address of second instance>>:2181
  2. На первом инстансе отредактируйте файл config/server.properties следующим образом:
    broker.id=2
    zookeeper.connect=0.0.0.0:2181, =<<IP address of first instance>>:2181
  3. Запустите Kafka на обоих инстансах:
    bin/kafka-server-start.sh config/server.properties

После выполнения этих шагов Zookeeper и Kafka будут запущены на обоих инстансах. Если вы используете отдельные терминалы, это выглядит так:

Терминалы с запущенными Zookeeper и Kafka

Создание топика и отправка в него сообщений

Kafka организует каналы доставки сообщений в топики (topics), которые представляют собой виртуальные группы из одной или нескольких партиций, расположенных на брокерах в кластере Kafka. Несколько приложений могут отправлять сообщения в топики Kafka. Эти сообщения впоследствии могут быть получены и обработаны несколькими потребителями. Сообщения добавляются в конец топика, а потребители сообщений получают и обрабатывают их в своем темпе.

Выполните следующие шаги на любом из двух инстансов:

  1. Создайте топик с названием test:
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 2 --topic test
  2. Запустите скрипт для отправки сообщений:
    bin/kafka-console-producer.sh --broker-list localhost:9092 –topic
  3. Введите тестовые сообщения для проверки успешной отправки:
    Отправка сообщений в топик Kafka

На этом этапе вы можете успешно отправлять сообщения в свой самостоятельно установленный кластер Kafka. Далее вы настроите функцию Lambda в качестве потребителя сообщений из тестового топика в этом кластере.

Настройка функции Lambda и привязка источника событий

Вы можете создать привязку источника событий для функции Lambda с помощью интерфейса командной строки AWS или AWS SDK, который предоставляет API CreateEventSourceMapping. В этом руководстве вы будете использовать консоль управления AWS для создания привязки источника событий.

Создайте функцию Lambda, которая использует самостоятельно установленный кластер Kafka и топик в качестве источника событий:

  1. В консоли управления Lambda нажмите Create function.
  2. Введите название функции и выберите Node.js 12.x в качестве среды выполнения.
  3. В разделе Permissions, подразделе Execution role выберите название роли, чтобы открыть консоль управления IAM.
  4. Нажмите Add inline policy и создайте новую политику с названием SelfHostedKafkaPolicy и следующими правами доступа. Замените секцию Resource на ARN ваших инстансов:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "ec2:CreateNetworkInterface",
                    "ec2:DescribeNetworkInterfaces",
                    "ec2:DescribeVpcs",
                    "ec2:DeleteNetworkInterface",
                    "ec2:DescribeSubnets",
                    "ec2:DescribeSecurityGroups",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents"
                ],
                "Resource": " arn:aws:ec2:<REGION>:<ACCOUNT_ID>:instance/<instance-id>"
            }
        ]
    }

    Создание политики безопасности

  5. Нажмите Create policy и убедитесь, что политика появилась в списке Permissions policies.
    Информация о роли IAM
  6. Вернитесь в свойства функции Lambda, выберите раздел Configuration. В подразделе Designer нажмите Add trigger.
  7. В выпадающем списке выберите Apache Kafka:
    • В разделе Bootstrap servers добавьте IP-адреса обоих ранее созданных инстансов, добавив порт 9092.
    • В поле Topic name введите ‘test’.
    • Введите наиболее подходящие вам размер партии сообщений (batch size) и стартовый индекс сообщения (см. документацию, чтобы узнать больше).
    • В поле VPC выберите VPC, созданную ранее шаблоном CloudFormation.
    • В поле VPC subnets выберите обе приватные подсети.
    • В поле VPC security groups выберите созданную группу безопасности.
    • Нажмите Add.
      Конфигурация добавления триггера

Через несколько секунд состояние триггера в консоли Lambda изменится на Enabled. Спустя несколько минут, триггер начнёт получать сообщения из кластера Kafka.

Тестирование функции Lambda

На данный момент вы создали VPC с двумя приватными и двумя публичными подсетями и NAT Gateway. Вы создали кластер Kafka на двух инстансах EC2 в приватных подсетях. Вы создали функцию Lambda с необходимыми правами доступа IAM и настроили привязку источника событий. Далее вы отправите сообщения в тестовый топик Kafka и увидите соответствующие вызовы функции Lambda в логах.

  1. В разделе Function code замените содержимое файла index.js на следующий исходный код и нажмите Deploy:
    exports.handler = async (event) => {
        // Iterate through keys
        for (let key in event.records) {
          console.log('Key: ', key)
          // Iterate through records
          event.records[key].map((record) => {
            console.log('Record: ', record)
            // Decode base64
            const msg = Buffer.from(record.value, 'base64').toString()
            console.log('Message:', msg)
          }) 
        }
    }
  2. Вернитесь в терминал с запущенным скриптом для отправки сообщений и введите тестовое сообщение:
    Отправка тестого сообщения в Kafka
  3. В свойствах функции Lambda выберите раздел Monitoring и нажмите View logs in CloudWatch. В наиболее свежих логах найдите полученное событие в оригинальном виде и декодированное сообщение:
    Логи

Использование Lambda как источника событий

Не обязательно развёртывать функцию Lambda, использованную в привязке источника событий, внутри VPC для получения сообщений из кластера Kafka, запущенного на инстансах в приватных подсетях. Однако вы должны настроить информацию о VPC, подсетях и группе безопасности в привязке кластера Kafka как источника событий.

Функция Lambda должна иметь права доступа для запроса информации о VPC и группы безопасности, а также на управление сетевыми интерфейсами. Этими правами доступа являются:

  • ec2:CreateNetworkInterface
  • ec2:DescribeNetworkInterfaces
  • ec2:DescribeVpcs
  • ec2:DeleteNetworkInterface
  • ec2:DescribeSubnets
  • ec2:DescribeSecurityGroups

Событие, передаваемое в функцию Lambda, содержит массив записей. Каждый элемент массива содержит информацию о топике и идентификатор партиции Kafka, а также метку времени и сообщение в кодировке base64:

Пример события

Есть важное отличие в том, как сервис Lambda подключается к самостоятельно установленному кластеру Kafka по сравнению с Amazon MSK. MSK шифрует передаваемые данные по умолчанию, поэтому соединение с брокером по умолчанию использует TLS. В самостоятельно установленном кластере TLS-аутентификация не поддерживается при использовании Apache Kafka как источника событий. Вместо этого при доступе к брокерам через интернет используется SASL/SCRAM аутентификация, которую можно настроить в привязке источника событий:

Конфигурация SASL/SCRAM аутентификации

О том, как настроить SASL/SCRAM аутентификацию на вашем самостоятельно установленном кластере Kafka, читайте в этой документации. 

Заключение

Lambda теперь поддерживает самостоятельно установленные кластеры Kafka в качестве источника событий, так что вы можете запускать функции Lambda для обработки сообщений в топиках Kafka для интеграции в последующие бессерверные рабочие процессы.

Этот пост показывает, как самостоятельно установить и настроить кластер Kafka на инстансах EC2, а также как настроить сетевую конфигурацию. Я также рассказал о том, как настроить привязку источника событий Lambda и протестировать функцию декодирования сообщений, полученных из Kafka.

Чтобы узнать больше об этом функционале, прочтите документацию. Для получения дополнительных обучающих ресурсов о бессерверных вычислениях, посетите Serverless Land.