Планирование заданий и задач

Завершено

Расписания MapReduce работают на уровне заданий и задач. Клиенты отправляют задания, а JobTracker (JT) разделяет их на задачи map и reduce. Планирование заданий определяет, какое задание будет выполняться следующим, а планирование задач упорядочивает задачи в задании. В Hadoop MapReduce JT планирует как задания, так и задачи, хотя планировщики заданий являются подключаемыми, то есть не являются частью кода JT. Планировщики задач при этом интегрированы в код JT. Внедрение подключаемых планировщиков заданий в платформу Hadoop рассматривается как эволюция кластерных вычислений4. Подключаемые планировщики позволяют настроить Hadoop для конкретных рабочих нагрузок и приложений. Эта возможность позволяет использовать планировщики заданий, оптимизированные для постоянно увеличивающегося списка приложений MapReduce. Кроме того, подключаемые планировщики увеличивают удобочитаемость кода и упрощают эксперименты и тестирование, которые необходимы для исследований.

В Hadoop MapReduce используется несколько альтернативных планировщиков заданий, в том числе схема по умолчанию "первым поступил — первым обслужен" (FIFO) и планировщики Fair и Capacity4, 5, 6, 7, 13. FIFO извлекает задания из рабочей очереди в порядке поступления, сначала самые старые, и запускает их одно за другим. Эта стратегия может показаться простой и удобной в использовании, но у нее есть свои недостатки:

  • Планирование FIFO не поддерживает вытеснение заданий. Выполнение заданий не может быть прервано, чтобы ожидающие задания продолжили работу и выполнили свои цели по производительности, такие как предотвращение нехватки ресурсов для заданий и (или) эффективное совместное использование ресурсов. В результате одновременное совместное использование кластерных ресурсов невозможно. Пока одно задание задействует все ресурсы кластера (слоты), другие операции не могут быть продолжены. Поэтому следующее задание в очереди должно ожидать, пока слоты задач освободятся и у текущего задания больше не будет задач для выполнения. Это ограничение может легко привести к конфликту справедливости, при котором долго выполняемое задание блокирует весь кластер, лишая ресурсов все небольшие задания.
  • Планировщик FIFO не учитывает приоритет или размер задания, а время отправки задания полностью определяет его важность. Поэтому задания могут долго ожидать в очереди, какими бы срочными они ни были. Это ограничение создает дополнительные проблемы с равнодоступностью и откликом.

Чтобы устранить недостатки планировщика FIFO, Facebook разработал более сложный планировщик — Fair Scheduler5, который теперь является частью дистрибутива Apache Hadoop. Fair Scheduler представляет ресурсы кластера в виде слотов map и reduce и предлагает способ совместного использования кластеров таким образом, что все задания получают, в среднем, одинаковое число слотов с течением времени. Планировщик намечает набор пулов, в который помещаются задания. Каждому пулу назначается набор общих ресурсов, отражающих слоты map и reduce, которые могут занимать задания. Чем больше пулу назначается общих ресурсов, тем больше слотов map и reduce могут использовать задания. Задания в пуле можно запланировать с помощью планировщика FIFO или Fair Scheduler. Задания в пулах всегда планируются с помощью Fair Scheduler.

Когда отправляется только одно задание, Fair Scheduler предоставляет ему все доступные слоты map и reduce кластера. При отправке новых заданий планировщик назначает их освобождающимся слотам. При условии равномерного распределения слотов каждое задание получает примерно одинаковый объем времени ЦП. Эта стратегия может, очевидно, допускать одновременное выполнение нескольких заданий в одном кластере Hadoop благодаря выделению слотов кластера. Это означает, что каждое задание имеет эксклюзивный доступ к определенному количеству слотов в кластере Hadoop. Такое расположение аналогично совместному использованию памяти в операционных системах, при котором система выделяет каждому процессу независимую часть основной памяти. В результате может сосуществовать несколько процессов. Совместное использование пространства, предлагаемое Fair Scheduler, позволяет выполнять короткие задания в приемлемое время, не лишая длительные задания ресурсов. Задания, требующие меньше времени, могут быстро выполняться, а задания, требующие больше времени, продолжают работать. Чтобы избежать перегрузки из-за совместного использования и завершать работу вовремя, Fair Scheduler может ограничить количество одновременных заданий.

Fair Scheduler также может учитывать приоритеты заданий, назначая весовые коэффициенты, которые влияют на долю общего времени ЦП, выделенного заданию. Кроме того, планировщик может гарантировать наличие минимальных долей для пулов, тем самым обеспечивая достаточное количество слотов map и reduce для всех заданий в пуле. Если пул содержит задания, он получает хотя бы минимальную долю ресурсов. Когда пул станет пустым (дальнейшие задания не запланированы), Fair Scheduler равномерно распределяет назначенные слоты по активным пулам. Если пул не использует всю гарантированную долю, Fair Scheduler распределяет избыточные слоты map и reduce равномерно между другими пулами.

Чтобы предоставить каждому пулу гарантированную минимальную долю, Fair Scheduler также поддерживает вытеснение заданий в других пулах. Эта процедура приводит к вытеснению некоторых или всех внешних задач map и reduce довольно грубым образом. Поскольку Hadoop MapReduce пока не поддерживает приостановку выполняющихся задач, Fair Scheduler просто прерывает задачи в других пулах, если они превышают гарантированную минимальную долю. Hadoop может допускать потерю задач, поэтому эта стратегия не приводит к сбою вытесненных заданий. Но это может повлиять на эффективность, так как завершенные задачи должны быть выполнены снова, и проделанная работа тратится впустую. Чтобы максимально сократить такие избыточные вычисления, Fair Scheduler выбирает недавно запущенные задачи из заданий, превысивших норму, в качестве кандидатов на завершение.

Третий планировщик, разработанный компанией Yahoo!, Capacity Scheduler, в чем-то похож на Fair Scheduler. Он также делит ресурсы (слоты) в пространстве, но создает несколько очередей, а не пулов. Каждая очередь имеет настраиваемое число (емкость) слотов map и reduce и может содержать несколько заданий. Все задания в очереди имеют доступ к выделенной емкости очереди. В очереди планирование выполняется на основе приоритета с конкретными настраиваемыми строгими и нестрогими ограничениями, а планировщик дополнительно корректирует приоритеты на основе времени отправки задания. Когда слот освобождается, Capacity Scheduler назначает его наименее загруженной очереди и выбирает самое старое отправленное задание. Избыточные емкости между очередями (неиспользуемые слоты) временно назначаются другим нуждающимся очередям, даже если они превышают первоначальные выделенные емкости. Если исходной очереди позже понадобятся эти переназначенные слоты, Capacity Scheduler позволит всем выполняющимся там задачам успешно завершиться. И только тогда планировщик возвращает базовые слоты обратно в исходные очереди или задания (т. е. задачи не будут завершены принудительно). Хотя переназначенные слоты работают в других очередях и исходные очереди откладываются, отказ от вытеснения заданий упрощает структуру Capacity Scheduler и устраняет лишние вычисления. Наконец, как и Fair Scheduler, Capacity Scheduler предоставляет каждой очереди минимальную гарантированную емкость. Каждой очереди назначается гарантированная емкость, поэтому общая емкость кластера равна сумме емкости всех очередей (без чрезмерного выделения емкости).

Планирование задания с помощью планировщиков FIFO, Fair или Capacity подразумевает планирование всех составляющих их задач. Для последней процедуры Hadoop MapReduce использует стратегию извлечения. То есть после планирования задания (J) JobTracker не сразу передает задачи map и reduce задания J в TaskTracker, а ждет от TT запросов через пакеты пульса. При получении запросов для задач map JT следует основному принципу планирования, согласно которому перемещать вычисления в сторону данных дешевле, чем перемещать данные в сторону вычислений. В результате, чтобы сократить сетевой трафик, JT пытается запланировать задачи map рядом с соответствующими входными блоками HDFS. Эту цель легко выполнить, так как входные данные задачи map обычно размещаются в одном TT.

Однако при планировании задач reduce JT не учитывает этот принцип, в основном потому, что входные данные задачи reduce (секции) обычно включают в себя выходные данные многих задач map, созданных в нескольких TT. Когда TT отправляет запрос, JT назначает задачу reduce, $R$, независимо от расстояния в сети TT, который передает данные $R$7. В связи с этой стратегией планировщик задач reduce в Hadoop не учитывает расположение.

The nodes at which native Hadoop scheduled each map task and reduce task of the WordCount benchmark.

Рис. 5. Узлы, на которых собственный Hadoop планировал каждую задачу карты и уменьшал задачу теста WordCount

Чтобы продемонстрировать игнорирование расположения и его последствия, мы определяем общее сетевое расстояние задачи reduce $R$, $(TNDR)$, как $\Sigma_{i=0}^n ND_{iR}$, где n — число секций, передаваемых в $R$ с n узлов, а $ND$ — это расстояние в сети, необходимое для перетасовывания секции $i$ в $R$. Очевидно, по мере возрастания TNDR требуется больше времени на перетасовывание секций $R$, а также дополнительная пропускная способность сети. На рис. 5 перечислены узлы, в которых каждая задача map, $M_{i} $ и reduce, $R_{i} $, в тесте производительности WordCount были запланированы Hadoop. В этом случае каждая задача map передает данные каждой задаче reduce, и каждая задача map планируется на отдельном узле. Узлы с 1 по 7 находятся в одной стойке, а остальные — в другой. Hadoop планирует задачи reduce $R_{0}$, $R_{1}$ и $R_{2}$ на узлах 13, 12 и 3 соответственно. В результате $TND_{R_{0}}$ = 30, $TND_{R_{1}}$ = 32 и $TND_{R}$ = 34. Однако если $R_{1}$ и $R_{2}$ запланированы на узлах 11 и 8 соответственно, это приведет к тому, что $TND_{R_{1}}$ = 30 и $TND_{R_{2}}$ = 30. Hadoop в существующем виде не может принимать такие контролируемые решения по планированию.

Текущий планировщик задач reduce в Hadoop не учитывает не только расположение, но также отклонения секционирования. Отклонение секционирования означает значительную разницу в частотности промежуточных ключей и их распределении по разным узлам данных1, 3. На рис. 6 показано отклонение секционирования. На нам изображены размеры секций, которые каждая задача map доставляет каждой задаче reduce в двух разновидностях теста Sort, Sort1 и Sort2 (каждый со своим набором данных), в WordCount и в K-средних8. Отклонения секционирования приводят к неравномерному перетасовыванию, при котором некоторые задачи reduce получают больше данных, чем другие. Неравномерное перетасовывание может привести к снижению производительности, так как задание может выполняться долго, если задача reduce получила большой объем входных данных, но узел, на котором запланирована задача reduce, может ослабить влияние неравномерного перетасовывания. Как правило, планировщик заданий reduce может определять модель сетевой связи, влияя на количество перетасованных данных и время выполнения заданий MapReduce.

The sizes of partitions produced by each feeding map task to each reduce task in Sort1, Sort2, WordCount, and K-means.

Рис. 6. Размеры секций, созданных каждой задачей карты корма, для каждой задачи уменьшения в Sort1, Sort2, WordCount и K-средних

Чтобы повысить эффективность планировщика задач reduce в Hadoop MapReduce, он должен решать вопрос расположения данных и отклонений секционирования вместе. В качестве конкретного примера рис. 7 демонстрирует кластер Hadoop с двумя стойками, каждая из которых включает три узла. Возьмем задачу reduce $R$ с двумя узлами, поставляющими данные, — TT1 и TT2. Нужно запланировать $R$ на запрашивающем TT, предполагая, что TT 1, 2 и 4 запрашивают у JT задачи reduce. С помощью собственного планировщика Hadoop JT может назначить $R$ любому из запрашивающих TT. Если задача $R$ назначена TT4, $TNDR$ будет иметь значение 8. С другой стороны, если $R$ назначена TT1 или TT2, $TNDR$ будет иметь значение 2. Как мы уже говорили, чем меньше значение TND, тем меньше объем сетевого трафика и, соответственно, тем выше производительность.

Проводились многочисленные исследования о том, как обеспечить учет расположения данных и отклонения секционирования в планировщике задач1, 2, 3, 10, 11, 12. Планировщик Center-of-Gravity Reduce Scheduler (CoGRS)3, например, учитывает эти факторы. Чтобы свести к минимуму сетевой трафик, он пытается запланировать каждую задачу reduce $R$, на узле "центра тяжести", который определяется сетевыми расположениями узлов, поставляющих данные $R$, и отклонениями в размерах секций $R$. В частности, CoGRS использует новую метрику, называемую "взвешенным общим расстоянием в сети" (weighted total network distance, $WTND$) и определяет каждую задачу $R$* как $WTND_{R}$ = $\Sigma_{i=0}^n ND_{iR} \times w_{i}$, где $n$ — это число секций, необходимых $R$, $ND$ — расстояние в сети для перетасовывания секции $i$ в $R$, а $w_{i}$ — весовой коэффициент секции $i$. В принципе, "центром тяжести" $R$ всегда является один из узлов, поставляющих данные для $R$, поскольку он менее затратный для локального доступа к данным, чем в случае перетасовывания данных по сети. Таким образом, CoGRS выбирает "центром тяжести" $R $ узел, поставляющий данные $R$, с минимальным значением $WTND$.

Options for scheduling a reduce task, R, with feeding nodes TT1 and TT2 in a cluster with two racks (CS = core switch, RS = rack switch, TT = TaskTracker, and JT = JobTracker).

Рис. 7. Параметры планирования задачи уменьшения, $R$, с узлами TT1 и TT2 в кластере с двумя стойками (CS = основной коммутатор, RS = переключатель стойки, TT = TaskTracker и JT = JobTracker)


7 TT, поставляющий данные для задачи reduce, $R$, содержит, по крайней мере, одну из задач map, поставляющую данные для $R$.

8 Это программа кластеризации Apache Mahout на основе K-средних8. K-средние — это хорошо известный алгоритм кластеризации для обнаружения знаний и интеллектуального анализа данных9.


Ссылки

  1. S. Ibrahim, H. Jin, L. Lu, S. Wu, B. He, and L. Qi (Dec. 2010). LEEN: Locality/Fairness-Aware Key Partitioning for MapReduce in the Cloud CloudCom
  2. M. Hammoud and M. F. Sakr (2011). Locality-Aware Reduce Task Scheduling for MapReduce CloudCom
  3. M. Hammoud, M. S. Rehman, and M. F. Sakr (2012). Center-of-Gravity Reduce Task Scheduling to Lower MapReduce Network Traffic CLOUD
  4. Hadoop scheduling IBM
  5. Hadoop Fair Scheduler Hadoop
  6. B. Thirumala Rao and L. S. S. Reddy (November 2011). Survey on Improved Scheduling in Hadoop MapReduce in Cloud Environments International Journal of Computer Applications
  7. M. Zaharia, D. Borthakur, J. S. Sarma, K. Elmeleegy, S. Shenker, and I. Stoica (April 2010). Delay Scheduling: A Simple Technique for Achieving Locality and Fairness in Cluster Scheduling EuroSys, pp. 265-278
  8. Mahout Homepage Apache Mahout
  9. S. Huang, J. Huang, J. Dai, T. Xie, and B. Huang (2010). The HiBench Benchmark Suite: Characterization of the MapReduce-Based Data Analysis ICDEW
  10. P. C. Chen, Y. L. Su, J. B. Chang, and C. K. Shieh (2010). Variable-Sized Map and Locality-Aware Reduce on Public-Resource Grids GPC
  11. S. Seo, I. Jang, K. Woo, I. Kim, J. Kim, and S. Maeng (2009). HPMR: Prefetching and Pre-Shuffling in Shared MapReduce Computation Environment CLUSTER
  12. M. Isard, V. Prabhakaran, J. Currey, U. Wieder, K. Talwar, and A. Goldberg (2009). Quincy: Fair Scheduling for Distributed Computing Clusters SOSP
  13. A. C. Murthy, C. Douglas, M. Konar, O. O'Malley, S. Radia, S. Agarwal, and K. V. Vinod (2011). Architecture of Next Generation Apache Hadoop MapReduce Framework Apache Jira

Проверьте свои знания

1.

Сколько уровней планирования имеется в Hadoop MapReduce?

2.

Какое из следующих утверждений не относится к планировщику FIFO?

3.

Каковы основные различия между Fair Scheduler и Capacity Scheduler?

4.

Какой из следующих механизмов планирования задач учитывает расположение?

5.

Как в MapReduce образуется отклонение секций?