Миграция дорого для инициализации java.util.параллельный.Вызываемые устройства для Apache Spark

Мне нужно перенести программу Java на Apache Spark. Текущая Java активно использует функциональность, предоставляемую java.util.параллельный и работает на одной машине. Так как инициализация рабочего (вызываемого) является дорогостоящей, рабочие используются снова и снова — т. е. рабочий повторно включается в пул, как только он завершается и возвращает свой результат.

Более точный:

  • Текущая реализация работает на небольших наборах данных в диапазоне 10e06 записей / несколько GBs.
  • Данные содержат записи, которые могут быть обработаны независимо. То есть, можно запустить одного работника на задачу и отправить его в пул потоков java.
  • Однако настройка работника для обработки записи включает загрузку дополнительных данных и построение графиков… все вместе некоторое время ГБ и К. П. У. в границах минут.
  • Некоторые данные действительно могут быть переданы работникам, например, некоторые таблицы поиска, но в этом нет необходимости. Некоторые данные являются частными для работника и поэтому не являются общими. Работник может изменить данные во время обработки записи и только позже быстро сбросить их, например кэши, специфичные для обрабатываемой в данный момент записи. Таким образом, работник может повторно ввести себя в пул и начать работу над следующей записью, не переходя через дорогостоящую инициализацию.
  • Время выполнения на одного работника и запись находится в диапазоне секунд.
  • Рабочие возвращают свои результаты через ExecutorCompletionService, т. е. Результаты позже извлекаются вызывающим пулом.брать.)(get() в центральной части программы.

Знакомство с Apache Spark я нахожу большинство примеров использования стандартных преобразований и действий. Я также нахожу примеры, которые добавляют свои собственные функции в DAG путем расширения API . Тем не менее, все эти примеры придерживаются простых легких вычислений и приходят без стоимости инициализации.

Теперь я задаюсь вопросом, Какой лучший подход к разработке приложения Spark, которое повторно использует какой-то «тяжелый рабочий». Исполнители, по-видимому, являются единственными постоянными сущностями, которые могут содержать пул таких работников. Однако, будучи новичком в мире Spark, я, скорее всего, упущу какой-то момент…

отредактировано 20161007

Найден ответ, указывающий на (возможное) решение с помощью функций. Поэтому вопрос в том, Могу ли я

  1. Раздел Мой раздел в соответствии с количеством исполнителей,
  2. Каждый исполнитель получает ровно один раздел для работы
  3. Моя функция (называемая setup в связанном решении ) создает пул потоков и повторно использует работников
  4. Отдельная функция объединить позже объединяет результаты

1 ответ

  1. Текущая архитектура представляет собой монолитную многопоточную архитектуру с общим состоянием между потоками. Учитывая, что размер вашего набора данных относительно невелик для современного оборудования, вы можете легко распараллелить его с Spark, где вы замените потоки исполнителями в узлах кластера.

    Из вашего вопроса я понимаю, что ваши две основные проблемы заключаются в том, может ли Spark обрабатывать сложные параллельные вычисления и как поделиться необходимыми битами состояния в распределенной среде.

    Сложная бизнес-логика: что касается первой части, вы можете запустить произвольно сложную бизнес-логику в исполнителях Spark, которые эквивалентны рабочим потокам в вашей текущей архитектуре.

    Это сообщение в блоге от cloudera хорошо объясняет концепцию наряду с другими важными концепциями модели выполнения:

    http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/

    Одним из аспектов, на который вам нужно будет обратить внимание, является конфигурация вашего задания Spark, чтобы избежать тайм-аутов из-за того, что исполнители слишком долго заканчивают, что может быть ожидаемо для приложения со сложной бизнес-логикой, как у вас.

    Дополнительные сведения см. На странице excellent из DataBricks, а более конкретно-поведение выполнения:

    http://spark.apache.org/docs/latest/configuration.html#execution-behavior

    Общее состояние: вы можете совместно использовать сложные структуры данных, такие как графики и конфигурации приложений в Spark между узлами. Один из подходов, который хорошо работает, — широковещательные переменные, где копия состояния, которое должно быть распределено, распределена на каждый узел. Ниже приведены некоторые очень хорошие объяснения концепции:

    https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-broadcast.html

    http://g-chi.github.io/2015/10/21/Spark-why-use-broadcast-variables/

    Это сократит задержку от вашего приложения, обеспечивая при этом локальность данных.

    Обработка данных может быть выполнена на основе раздела (подробнее здесь: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-partitions.html ), с суммированием результатов на драйвере или с использованием аккумуляторов (подробнее здесь: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-accumulators.html ). В случае, если результирующие данные сложны, подход секционирования может работать лучше, а также дает вам более точный контроль над выполнением приложений.

    Что касается требований к аппаратным ресурсам, кажется, что вашему приложению нужно несколько гигабайт для общего состояния, которое должно остаться в памяти, и дополнительно еще несколько гигабайт для данных в каждом узле. Вы можете установить модель персистентности в MEMORY_AND_DISK, чтобы гарантировать, что вы не будете запускать из памяти, больше деталей на

    http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence