Поделиться через


Практическое руководство. Использование групп расписаний для определения порядка выполнения

В среде выполнения с параллелизмом порядок планирования задач не является определяющим. Однако для изменения порядка выполнения задач можно использовать политики планирования. В этом разделе показано, как использовать группы расписаний совместно с политикой планировщика Concurrency::SchedulingProtocol для определения порядка выполнения задач.

В этом примере набор задач выполняется дважды с двумя разными политиками планирования. Обе политики ограничивают максимальное число ресурсов обработки двумя. При первом выполнении используется политика EnhanceScheduleGroupLocality (по умолчанию), а при втором — политика EnhanceForwardProgress. В условиях политики EnhanceScheduleGroupLocality планировщик выполняет все задачи в одной группе расписаний, пока не будут завершены или переданы все задачи. В условиях политики EnhanceForwardProgress планировщик переходит к следующей группе расписаний методом циклического перебора после завершения или передачи каждой задачи.

Если каждая группа расписаний содержит связанные задачи, политика EnhanceScheduleGroupLocality, как правило, позволяет повысить производительность, так как расположение кэша при выполнении всех задач сохраняется. Политика EnhanceForwardProgress позволяет продвигаться вперед с выполнением задач, ее полезно использовать, если необходимо равноправное планирование в разных группах расписаний.

Пример

В этом примере определяется класс work_yield_agent, наследуемый от Concurrency::agent. Класс work_yield_agent выполняет блок работы, возвращает текущий контекст, затем выполняет еще один блок работы. Агент использует функцию Concurrency::wait для совместной передачи текущего контекста, чтобы обеспечить возможность выполнения других контекстов.

В этом примере создается четыре объекта work_yield_agent. Чтобы проиллюстрировать, как задавать политики планировщика для определения порядка выполнения агентов, в этом примере первые два агента связываются с одной группой расписаний, а другие два — с другой. В этом примере метод Concurrency::CurrentScheduler::CreateScheduleGroup используется для создания объектов Concurrency::ScheduleGroup. В этом примере все четыре агента выполняются дважды с двумя разными политиками планирования.

// scheduling-protocol.cpp
// compile with: /EHsc
#include <agents.h>
#include <vector>
#include <algorithm>
#include <iostream>
#include <sstream>

using namespace Concurrency;
using namespace std;

#pragma optimize( "", off )
// Simulates work by performing a long spin loop.
void spin_loop()
{
   for (int i = 0; i < 500000000; ++i)
   {
   }
}
#pragma optimize( "", on )

// Agent that performs some work and then yields the current context.
class work_yield_agent : public agent
{
public:
   explicit work_yield_agent(
      unsigned int group_number, unsigned int task_number)
      : _group_number(group_number)
      , _task_number(task_number)
   {
   }

   explicit work_yield_agent(Scheduler& scheduler,
      unsigned int group_number, unsigned int task_number)
      : agent(scheduler)
      , _group_number(group_number)
      , _task_number(task_number)
   {
   }

   explicit work_yield_agent(ScheduleGroup& group,
      unsigned int group_number, unsigned int task_number)
      : agent(group)       
      , _group_number(group_number)
      , _task_number(task_number)
   {
   }

protected:
   // Performs the work of the agent.   
   void run()
   {
      wstringstream header, ss;

      // Create a string that is prepended to each message.
      header << L"group " << _group_number 
             << L",task " << _task_number << L": ";

      // Perform work.
      ss << header.str() << L"first loop..." << endl;
      wcout << ss.str();
      spin_loop();

      // Cooperatively yield the current context. 
      // The task scheduler will then run all blocked contexts.
      ss = wstringstream();
      ss << header.str() << L"waiting..." << endl;
      wcout << ss.str();
      Concurrency::wait(0);

      // Perform more work.
      ss = wstringstream();
      ss << header.str() << L"second loop..." << endl;
      wcout << ss.str();
      spin_loop();

      // Print a final message and then set the agent to the 
      // finished state.
      ss = wstringstream();
      ss << header.str() << L"finished..." << endl;
      wcout << ss.str();

      done();
   }  

private:
   // The group number that the agent belongs to.
   unsigned int _group_number;
   // A task number that is associated with the agent.
   unsigned int _task_number;
};

// Creates and runs several groups of agents. Each group of agents is associated 
// with a different schedule group.
void run_agents()
{
   // The number of schedule groups to create.
   const unsigned int group_count = 2;
   // The number of agent to create per schedule group.
   const unsigned int tasks_per_group = 2;

   // A collection of schedule groups.
   vector<ScheduleGroup*> groups;
   // A collection of agents.
   vector<agent*> agents;

   // Create a series of schedule groups. 
   for (unsigned int group = 0; group < group_count; ++group)
   {
      groups.push_back(CurrentScheduler::CreateScheduleGroup());

      // For each schedule group, create a series of agents.
      for (unsigned int task = 0; task < tasks_per_group; ++task)
      {
         // Add an agent to the collection. Pass the current schedule 
         // group to the work_yield_agent constructor to schedule the agent
         // in this group.
         agents.push_back(new work_yield_agent(*groups.back(), group, task));
      }
   }

   // Start each agent.
   for_each(agents.begin(), agents.end(), [](agent* a) {
      a->start();
   });

   // Wait for all agents to finsih.
   agent::wait_for_all(agents.size(), &agents[0]);

   // Free the memory that was allocated for each agent.
   for_each(agents.begin(), agents.end(), [](agent* a) {
      delete a;
   });

   // Release each schedule group.
   for_each(groups.begin(), groups.end(), [](ScheduleGroup* group) {
      group->Release();
   });
}

int wmain()
{
   // Run the agents two times. Each run uses a scheduler
   // policy that limits the maximum number of processing resources to two.

   // The first run uses the EnhanceScheduleGroupLocality 
   // scheduling protocol. 
   wcout << L"Using EnhanceScheduleGroupLocality..." << endl;
   CurrentScheduler::Create(SchedulerPolicy(3, 
      MinConcurrency, 1,
      MaxConcurrency, 2,
      SchedulingProtocol, EnhanceScheduleGroupLocality));

   run_agents();
   CurrentScheduler::Detach();

   wcout << endl << endl;

   // The second run uses the EnhanceForwardProgress 
   // scheduling protocol. 
   wcout << L"Using EnhanceForwardProgress..." << endl;
   CurrentScheduler::Create(SchedulerPolicy(3, 
      MinConcurrency, 1,
      MaxConcurrency, 2,
      SchedulingProtocol, EnhanceForwardProgress));

   run_agents();
   CurrentScheduler::Detach();
}

После выполнения примера получается следующий результат.

Using EnhanceScheduleGroupLocality...
group 0,task 0: first loop...
group 0,task 1: first loop...
group 0,task 0: waiting...
group 1,task 0: first loop...
group 0,task 1: waiting...
group 1,task 1: first loop...
group 1,task 0: waiting...
group 0,task 0: second loop...
group 1,task 1: waiting...
group 0,task 1: second loop...
group 0,task 0: finished...
group 1,task 0: second loop...
group 0,task 1: finished...
group 1,task 1: second loop...
group 1,task 0: finished...
group 1,task 1: finished...


Using EnhanceForwardProgress...
group 0,task 0: first loop...
group 1,task 0: first loop...
group 0,task 0: waiting...
group 0,task 1: first loop...
group 1,task 0: waiting...
group 1,task 1: first loop...
group 0,task 1: waiting...
group 0,task 0: second loop...
group 1,task 1: waiting...
group 1,task 0: second loop...
group 0,task 0: finished...
group 0,task 1: second loop...
group 1,task 0: finished...
group 1,task 1: second loop...
group 0,task 1: finished...
group 1,task 1: finished...

Обе политики создают одну и ту же последовательность событий. Однако политика, которая использует EnhanceScheduleGroupLocality, запускает оба агента первой группы планирования, прежде чем будут запущены агенты второй группы. Политика, использующая EnhanceForwardProgress, запускает один агент из первой группы, затем запускает первый агент во второй группе.

Компиляция кода

Скопируйте код примера и вставьте его в проект Visual Studio или файл с именем scheduling-protocol.cpp, затем выполните в окне командной строки Visual Studio 2010 следующую команду.

cl.exe /EHsc scheduling-protocol.cpp

См. также

Основные понятия

Асинхронные агенты

Другие ресурсы

Группы расписаний