Блог Amazon Web Services
Apache Spark — за рамками MapReduce
Виталий Федоренко (Vitalii Fedorenko), AWS Big Data Cloud Architect
Данная статья это вольный перевод одного из наиболее популярных AWS постов об Apache Spark Джона Фрица.
Как многим из Вас уже известно, веб-сервис Amazon EMR упрощает обработку и анализ больших объемов данных используя такие библиотеки как Hadoop, Hive, HBase, Presto, Impala и Spark. В то время как Hadoop MapReduce все еще популярен для обработки больших объёмов данных, анализа неструктурированных данных и машинного обучения, Apache Spark стал вытеснять его, обеспечивая лучшую производительность кластера и скорость разработки. Используя движок на основе направленного ациклического графа (DAG), Spark создает более эффективный план выполнения запросов преобразования данных. Кроме того, Spark использует отказоустойчивые распределенные наборы данных (RDD), сохраняя промежуточные, входные и выходные данные в памяти, а не на диске. Эти элементы функциональности повышают производительность определенных программ по сравнению с Hadoop (к примеру, машинное обучение), так как MapReduce выполняет задачи по последовательной схеме и затрачивает большее количество ресурсов на ввод-вывод промежуточных данных на диск.
Spark поддерживает такие языки программирования как Scala, Python, Java, и SQL API, популярные алгоритмы машинного обучения, обработку графов и потоков данных. В Spark есть множество параметров которые упрощают разработку по сравнению с различными абстракциями, обернутыми вокруг Hadoop MapReduce API.
Spark на Amazon EMR
Вы можете создавать масштабируемые Spark кластера с различными типами EC2 инстансов непосредственно из консоли Amazon EMR, CLI или API. Работая в контейнере EMR, Spark может читать данные через EMRFS напрямую из S3, пересылать логи в хранилище данных S3, использовать Spot EC2 для снижения затрат, а также интегрироваться с такими функциями безопасности AWS как IAM роли, группы безопасности EC2 и шифрование S3 в состоянии покоя (на стороне сервера или клиента). Одним из основных достоинств является то, что нет никакой дополнительной платы за Spark на AWS EMR.
Spark включает в себя интерфейс SQL для интерактивных SQL-запросов, MLlib для масштабируемых распределенных алгоритмов машинного обучения, Spark Streaming для создания приложений обработки потоков и GraphX для работы с графиками. Вы также можете установить на EMR кластер Ganglia для дополнительного мониторинга Spark. Приложения на EMR Spark запускаются через EMR Step API, напрямую через Spark API или Spark Shell на главном узле кластера.
Примеры использования Spark клиентами AWS
Несколько примеров использования Spark клиентами AWS:
- Система рекомендации контента читателям Washington Post работает на основе Spark: https://conferences.oreilly.com/strata/strata-ny-2016/public/schedule/detail/51468
- Yelp использует библиотеки машинного обучения Spark MLlib, чтобы увеличить количество просмотров рекламных объявлений: https://spark-summit.org/2017/events/yelp-ad-targeting-at-scale-with-apache-spark/
- Корпорация Hearst использует Spark Streaming для обработки данных о кликах на сайте Cosmopolitan. Это позволяет им создавать отчеты о читаемости статей и трендах в режиме реального времени: https://www.youtube.com/watch?v=6cwbbqi36k8
- Krux использует Spark и EMRFS для обработки логов, хранящихся в Amazon S3: https://aws.amazon.com/solutions/case-studies/krux/
Пример анализа данных с помощью Apache Spark
В этой части мы покажем пример того как можно в течение короткого времени начать обработку данных с помощью Spark на Amazon EMR ответив на несколько вопросов о задержке рейсов и аннулировании внутренних рейсов в Соединенных Штатах Америки. Министерство транспорта США публикует общедоступные данные о перелетах с 1987 года. Мы преобразовали формат исходных файлов из CSV в Parquet (для лучшей производительности) и загрузили его в общедоступную корзину S3 s3://us-east-1.elasticmapreduce.samples/flightdata/input
. Этот набор данных составляет около 4 ГБ (79 ГБ без сжатия) и содержит более 160 миллионов строк. Набор данных довольно большой и оптимальным средством для его обработки будет Spark. Мы вычислим 10 аэропортов с наибольшим количеством вылетов, рейсы с задержкой более 15 минут, рейсы с задержкой более 60 минут, и отмененные рейсы. Мы также узнаем количество отмененных рейсов по 10 самым популярным маршрутам. Все эти запросы будут реализованы на SQL, а само приложение написано на Scala. Вот, к примеру, несколько строк из исходного кода:
// Файлы parquet могут быть зарегистрированы как таблицы для дальнейшего использования в SQL запросах
val df = session.read.parquet ("s3://us-east-1.elasticmapreduce.samples/flightdata/input/")
// Топ 10 аэропортов с наибольшим количеством вылетов с 2000 года
df.createOrReplaceTempView("flights")
val topDepartures = session.sql("SELECT origin, count (*) AS total_departures
FROM flights WHERE year >= '2000' GROUP BY origin ORDER BY total_departures DESC LIMIT 10")
Полный исходный код доступен по адресу: https://s3.amazonaws.com/us-east-1.elasticmapreduce.samples/flightdata/sparkapp/FlightSample-1.0.scala
Jar файл со скомпилированным кодом: https://s3.amazonaws.com/us-east-1.elasticmapreduce.samples/flightdata/sparkapp/flightsample-1.0.jar
Обратите внимание, что приложение создает таблицу «flights», которая является фреймом в памяти (DataFrame), и SQL-запрос считывает эту таблицу из памяти для сокращения затрат ввода-вывода на диск. Кроме того, EMR Spark использует EMRFS для доступа к данным в S3 без необходимости сначала копировать их в HDFS.
Теперь давайте запустим EMR Spark кластер из трех m3.xlarge инстансов EC2 для выполнения нашего приложения. Данные для этого примера расположены в us-east-1 и скорость чтения будет наиболее оптимальна если кластер находится в этом же регионе. Для начала откройте страницу EMR «Create Cluster», а затем «Go to Advanced Options» в консоли Amazon EMR. Затем выберите Spark в качестве приложения.
По умолчанию Amazon EMR конфигурирует Spark для использования динамического распределения ресурсов и устанавливает количество ядер процессора и RAM для каждого Spark executor на основе типа EC2 инстанса. Вы всегда можете переопределить эти параметры во время выполнения задач, передав дополнительные аргументы в команду spark-submit. Теперь перейдите к разделу Add Steps в нижней части страницы, и выберите Spark Application, затем нажмите Configuration чтобы добавить шаг приложения Spark:
Выберите «Cluster» для «Deploy mode». В качестве ссылки на приложение введите s3://us-east-1.elasticmapreduce.samples/flightdata/sparkapp/flightsample-1.0.jar
. В поле «Аrguments» укажите путь S3, куда бы вы хотели чтобы Spark записал результат. Нажмите «Add» и установите флажок «Auto-terminate cluster after the last step is completed» в нижней части страницы, чтобы кластер автоматически удалился после завершения работы приложения. Нажмите «Continue», чтобы перейти ко второму этапу создания кластера. Просмотрите EC2 конфигурацию вашего кластера. Для этого приложения мы выберем один master-узел и два core-узла типа m4.xlarge. Нажмите «Continue» и снимите флажки параметров «Logging» и «Termination protection» в разделе «General Options». Нажмите «Continue», чтобы перейти к последнему этапу (все параметры остаются без изменения). В заключение, нажмите «Create cluster». Amazon EMR запустит кластер и ваше Spark приложение. По завершении вы можете просмотреть результат по указанному вами ранее пути в S3. Мы не будем раскрывать здесь результатов нашего отчета, но обязательно захватите с собой что-нибудь почитать, если вы летите из Чикаго!
Дополнительную информацию о Spark на Amazon EMR вы можете найти на странице https://aws.amazon.com/elasticmapreduce/spark/