Рекомендации по работе с библиотекой асинхронных агентов

В этом документе описывается эффективное использование библиотеки асинхронных агентов. Библиотека агентов способствует модели программирования на основе субъектов и передаче сообщений внутри процесса для грубого потока данных и задач конвейера.

Дополнительные сведения о библиотеке агентов см. в разделе "Асинхронная библиотека агентов".

Разделы

Этот документ содержит следующие разделы.

Использование агентов для изоляции состояния

Библиотека агентов предоставляет альтернативы общему состоянию, позволяя подключать изолированные компоненты через асинхронный механизм передачи сообщений. Асинхронные агенты наиболее эффективны при изоляции внутреннего состояния от других компонентов. При изоляции состояния несколько компонентов обычно не действуют на общих данных. Изоляция состояний позволяет приложению масштабироваться, так как уменьшается количество разных операций в общей памяти. Изоляция состояния также снижает вероятность взаимоблокировки и состояния гонки, так как компоненты не должны синхронизировать доступ к общим данным.

Обычно состояние изолировано в агенте путем хранения элементов данных в private классах агента или protected разделах и с помощью буферов сообщений для обмена данными об изменениях состояния. В следующем примере показан класс, производный basic_agent от параллелизма::agent. Класс basic_agent использует два буфера сообщений для взаимодействия с внешними компонентами. Один буфер сообщений содержит входящие сообщения; другой буфер сообщений содержит исходящие сообщения.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }
   
   // Retrieves the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;
         
         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Полные примеры определения и использования агентов см. в пошаговом руководстве. Создание приложения на основе агента и пошагового руководства. Создание агента потока данных.

[В начало]

Использование механизма регулирования для ограничения количества сообщений в конвейере данных

Многие типы буферов сообщений, такие как параллелизм::unbounded_buffer, могут содержать неограниченное количество сообщений. Когда производитель сообщений отправляет сообщения в конвейер данных быстрее, чем потребитель может обрабатывать эти сообщения, приложение может ввести состояние с низкой памятью или вне памяти. Можно использовать механизм регулирования, например семафор, чтобы ограничить количество сообщений, которые одновременно активны в конвейере данных.

В следующем базовом примере показано, как использовать семафор для ограничения количества сообщений в конвейере данных. Конвейер данных использует функцию параллелизма::wait для имитации операции, которая занимает не менее 100 миллисекунда. Так как отправитель создает сообщения быстрее, чем потребитель может обрабатывать эти сообщения, в этом примере определяется semaphore класс, позволяющий приложению ограничить количество активных сообщений.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            (Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };
  
   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

Объект semaphore ограничивает конвейер обрабатывать не более двух сообщений одновременно.

Производитель в этом примере отправляет относительно мало сообщений потребителю. Поэтому в этом примере не демонстрируется потенциальное условие нехватки памяти или нехватки памяти. Однако этот механизм полезен, если конвейер данных содержит относительно большое количество сообщений.

Дополнительные сведения о создании класса семафора, используемого в этом примере, см. в разделе "Практическое руководство. Использование класса Контекста для реализации кооперативного Семафора".

[В начало]

Не выполняйте детализированную работу в конвейере данных

Библиотека агентов наиболее полезна, если работа, выполняемая конвейером данных, является достаточно грубой. Например, один компонент приложения может считывать данные из файла или сетевого подключения и иногда отправлять эти данные другому компоненту. Протокол, используемый библиотекой агентов для распространения сообщений, приводит к тому, что механизм передачи сообщений имеет большую нагрузку, чем параллельные конструкции задач, предоставляемые библиотекой параллельных шаблонов (PPL). Поэтому убедитесь, что работа, выполняемая конвейером данных, достаточно длинна, чтобы компенсировать эти затраты.

Хотя конвейер данных наиболее эффективен при выполнении своих задач, каждый этап конвейера данных может использовать конструкции PPL, такие как группы задач и параллельные алгоритмы для более точной работы. Пример крупнозернистой сети данных, которая использует детализированный параллелизм на каждом этапе обработки, см. в пошаговом руководстве по созданию сети обработки изображений.

[В начало]

Не передавать полезные данные большого сообщения по значению

В некоторых случаях среда выполнения создает копию каждого сообщения, который он передает из одного буфера сообщений в другой буфер сообщений. Например, класс параллелизма::overwrite_buffer предлагает копию каждого сообщения, которое оно получает для каждого из целевых объектов. Среда выполнения также создает копию данных сообщения при использовании функций передачи сообщений, таких как параллелизм::send и concurrency::receive для записи сообщений и чтения сообщений из буфера сообщений. Хотя этот механизм помогает устранить риск параллельной записи в общие данные, это может привести к низкой производительности памяти, когда полезные данные сообщения относительно большие.

Указатели или ссылки можно использовать для повышения производительности памяти при передаче сообщений с большими полезными данными. В следующем примере сравнивается передача больших сообщений по значению на передачу указателей на один и тот же тип сообщения. В примере определяются два типа агента, producer и consumerони действуют над message_data объектами. В этом примере сравнивается время, необходимое для отправки производителем нескольких объектов потребителю, с тем временем, которое требуется агенту производителя для отправки нескольких message_data указателей на message_data объекты потребителю.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }
       
   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }
       
   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;
      
   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

В этом примере создаются следующие примеры выходных данных:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

Версия, использующая указатели, работает лучше, так как она устраняет требование для среды выполнения создать полную копию каждого message_data объекта, который он передает от производителя потребителю.

[В начало]

Использование shared_ptr в сети данных, если ответственность не определена

При отправке сообщений по указателю через конвейер передачи сообщений или сеть обычно выделяется память для каждого сообщения перед сетью и освобождает память в конце сети. Хотя этот механизм часто работает хорошо, существуют случаи, в которых трудно или невозможно использовать его. Например, рассмотрим случай, в котором сеть данных содержит несколько конечных узлов. В этом случае нет четкого расположения для освобождения памяти для сообщений.

Для решения этой проблемы можно использовать механизм, например std::shared_ptr, который позволяет использовать указатель на несколько компонентов. Когда окончательный shared_ptr объект, принадлежащий ресурсу, уничтожается, ресурс также освобождается.

В следующем примере показано, как использовать shared_ptr для совместного использования значений указателя между несколькими буферами сообщений. В примере выполняется подключение объекта параллелизма::overwrite_buffer к трем объектам параллелизма::call . Класс overwrite_buffer предлагает сообщения каждому из своих целевых объектов. Так как в конце сети данных есть несколько владельцев данных, в этом примере используется shared_ptr для предоставления каждому call объекту общего доступа к сообщениям.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;
      
   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

В этом примере создаются следующие примеры выходных данных:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

См. также

Рекомендации по работе со средой выполнения с параллелизмом
Библиотека асинхронных агентов
Пошаговое руководство. Создание приложения на основе агента
Пошаговое руководство. Создание агента потоков данных
Пошаговое руководство. Создание сети обработки изображений
Рекомендации по работе с библиотекой параллельных шаблонов
Общие рекомендации в среде выполнения с параллелизмом