Поделиться через


Рекомендации по работе с библиотекой асинхронных агентов

В этом документе описано, как эффективно использовать библиотеку асинхронных агентов. Библиотека агентов поддерживает модель программирования на основе субъектов и внутрипроцессную передачу сообщений для недетализированного потока данных и задач по конвейеризации.

Дополнительные сведения о библиотеке агентов см. в разделе Библиотека асинхронных агентов.

Подразделы

Этот документ содержит следующие разделы.

  • Используйте агенты для изолирования состояния

  • Используйте механизм регулирования для ограничения количества сообщений в конвейере данных

  • Не выполняйте детализированную работу в конвейере данных

  • Не передавайте большие полезные нагрузки сообщений по значению

  • Если владение не определено, используйте в сети данных shared_ptr

Используйте агенты для изолирования состояния

Библиотека агентов предоставляет альтернативы общему состоянию, позволяя подключать изолированные компоненты через механизм асинхронной передачи сообщений. Асинхронные агенты наиболее эффективны, когда их внутреннее состояние изолировано от других компонентов. При изоляции состояний несколько компонентов обычно не работают с общими данными одновременно. Изоляция состояния позволяет масштабировать приложение, так как оно уменьшает борьбу за общую память. Изоляция состояния также уменьшает вероятность взаимоблокировок и состояния гонки, так как компонентам не нужно синхронизировать доступ к общим данным.

Обычно, чтобы изолировать состояние агента, нужно задержать члены данных в разделах private или protected класса агента и использовать буферы сообщений, чтобы сообщать об изменениях состояния. В следующем примере показан класс basic_agent, наследуемый от Concurrency::agent. Класс basic_agent использует два буфера сообщений, чтобы связываться с внешними компонентами. Один буфер сообщений содержит входящие сообщения; второй буфер сообщений — исходящие сообщения.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public Concurrency::agent
{
public:
   basic_agent(Concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   Concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = Concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;

         // Write the result to the output message buffer.
         Concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   Concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   Concurrency::unbounded_buffer<int> _output;
};

Полные примеры определения и использования агентов см. в разделах Пошаговое руководство. Создание приложения на основе агента и Пошаговое руководство. Создание агента потоков данных.

[в начало]

Используйте механизм регулирования для ограничения количества сообщений в конвейере данных

Многие типы буферов сообщений, например Concurrency::unbounded_buffer, могут содержать неограниченное количество сообщений. Если производитель отправляет сообщения в конвейер данных быстрее, чем получатель может их обработать, приложение может оказаться в состоянии нехватки или отсутствия памяти. Чтобы ограничить количество сообщений, параллельно обрабатываемых в конвейере данных, можно использовать регулирующий механизм, например семафор.

В следующем основном примере показано, как использовать семафор для ограничения числа сообщений в конвейере данных. Конвейер данных использует функцию Concurrency::wait, чтобы имитировать операцию, занимающую не менее 100 миллисекунд. Так как отправитель создает сообщения быстрее, чем получатель может их обработать, пример определяет класс semaphore, позволяющий приложению ограничить количество активных сообщений.

// message-throttling.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace Concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(LONG capacity);

   // Acquires access to the semaphore.
   void acquire();

   // Releases access to the semaphore.
   void release();

private:
   // The semaphore count.
   LONG _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(unsigned int count = 0L);

   // Decrements the event counter.
   void signal();

   // Increments the event counter.
   void add_count();

   // Blocks the current context until the event is set.
   void wait();

private:
   // The current count.
   volatile long _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};


int wmain()
{
   // The number of messages to send to the consumer.
   const int MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(int i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();   
}

//
// semaphore class implementation.
//

semaphore::semaphore(LONG capacity)
   : _semaphore_count(capacity)
{
}

// Acquires access to the semaphore.
void semaphore::acquire()
{
   // The capacity of the semaphore is exceeded when the semaphore count 
   // falls below zero. When this happens, add the current context to the 
   // back of the wait queue and block the current context.
   if (InterlockedDecrement(&_semaphore_count) < 0)
   {
      _waiting_contexts.push(Context::CurrentContext());
      Context::Block();
   }
}

// Releases access to the semaphore.
void semaphore::release()
{
   // If the semaphore count is negative, unblock the first waiting context.
   if (InterlockedIncrement(&_semaphore_count) <= 0)
   {
      // A call to acquire might have decremented the counter, but has not
      // yet finished adding the context to the queue. 
      // Create a spin loop that waits for the context to become available.
      Context* waiting = NULL;
      if (!_waiting_contexts.try_pop(waiting))
      {
         Context::Yield();
      }

      // Unblock the context.
      waiting->Unblock();
   }
}

//
// countdown_event class implementation.
//

countdown_event::countdown_event(unsigned int count)
   : _current(static_cast<long>(count)) 
{
   // Set the event if the initial count is zero.
   if (_current == 0L)
      _event.set();
}

// Decrements the event counter.
void countdown_event::signal() {
   if(InterlockedDecrement(&_current) == 0L) {
      _event.set();
   }
}

// Increments the event counter.
void countdown_event::add_count() {
   if(InterlockedIncrement(&_current) == 1L) {
      _event.reset();
   }
}

// Blocks the current context until the event is set.
void countdown_event::wait() {
   _event.wait();
}

В данном примере получается следующий результат.

0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4

Объект semaphore ограничивает конвейер так, чтобы он обрабатывал не более двух сообщений одновременно.

В этом примере производитель отправляет получателю не очень много сообщений. Поэтому этот пример не демонстрирует состояние нехватки или отсутствия памяти. Тем не менее, этот механизм полезен, если конвейер данных содержит относительно большое количество сообщений.

Дополнительные сведения о создании класса семафора, использованного в этом примере, см. в разделе Практическое руководство. Использование класса Context для реализации семафора, поддерживающего параллельный доступ.

[в начало]

Не выполняйте детализированную работу в конвейере данных

Библиотека агентов особенно полезна, если конвейер данных выполняет относительно недетализированную работу. Например, один компонент приложения может считывать данные из файла или сетевого подключения и отправлять эти данные другому компоненту. Протокол, используемый библиотекой агентов для распространения сообщений, приводит к дополнительным затратам ресурсов на механизм передачи сообщений по сравнению со структурами параллельных задач, предоставляемыми библиотекой параллельных шаблонов (PPL). Поэтому в конвейере данных следует выполнять достаточно продолжительную работу, чтобы оправдать дополнительную нагрузку.

Несмотря на то, что конвейер данных наиболее эффективен при выполнении недетализированных задач, каждый из этапов конвейера может использовать структуры PPL, например группы задач и параллельные алгоритмы, чтобы выполнять более детализированную работу. Пример недетализированной сети данных, использующей детализированный параллелизм на каждом из этапов обработки см. в разделе Пошаговое руководство. Создание сети обработки изображений.

[в начало]

Не передавайте большие полезные нагрузки сообщений по значению

В некоторых случаях среда выполнения создает копии всех сообщений, передаваемых между буферами сообщений. Например, класс Concurrency::overwrite_buffer предлагает копию каждого получаемого сообщения всем своим целевым объектам. Среда выполнения также создает копию данных сообщения при использовании для записи сообщений в буфер сообщений и чтения из него таких функций передачи сообщений, как Concurrency::send и Concurrency::receive. Этот механизм помогает исключить опасность параллельной записи в общие данные, но он может привести к падению производительности памяти, если полезная нагрузка сообщения относительно велика.

Чтобы увеличить производительность памяти при передаче сообщений с большими полезными нагрузками, можно использовать указатели или ссылки. В следующем примере сравнивается передача больших сообщений по значению и передача указателей на этот тип сообщений. В примере определяется два типа агентов, producer и consumer, действующих с объектами message_data. Пример сравнивает время, которое требуется производителю для отправки нескольких объектов message_data получателю, с временем, которое требуется агенту производителя для отправки получателю нескольких указателей на объекты message_data.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace Concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

В данном примере получается следующий результат.

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

Версия, использующая указатели, обеспечивает большую производительность, потому что среда выполнения не должна создавать полные копии всех объектов message_data, передаваемых от производителя получателю.

[в начало]

Если владение не определено, используйте в сети данных shared_ptr

При отправке сообщений по указателю через конвейер или сеть передачи сообщений обычно выделяют память для каждого сообщения в начале сети и высвобождают память в конце сети. Этот механизм часто работает хорошо, но в некоторых случаях использовать его трудно или невозможно. Например, рассмотрим ситуацию, в которой сеть данных содержит несколько конечных узлов. В этом случае нет определенного расположения для высвобождения памяти для сообщений.

Чтобы решить эту проблему, можно использовать такой механизм, как std::shared_ptr, позволяющий нескольким компонентам владеть указателем. Когда уничтожается последний объект shared_ptr, владеющий ресурсом, ресурс высвобождается.

В следующем примере показано, как с помощью объекта shared_ptr обеспечить совместное использование значений указателя несколькими буферами сообщений. В примере объект Concurrency::overwrite_buffer соединяется с тремя объектами Concurrency::call. Класс overwrite_buffer предлагает сообщения всем своим целевым объектам. Так как в конце сети данных имеется несколько владельцев данных, в этом примере используется shared_ptr, чтобы позволить всем объектам call владеть сообщениями.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace Concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

В данном примере получается следующий результат.

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

См. также

Задачи

Пошаговое руководство. Создание приложения на основе агента

Основные понятия

Рекомендации по работе со средой выполнения с параллелизмом

Библиотека асинхронных агентов

Другие ресурсы

Пошаговое руководство. Создание агента потоков данных

Пошаговое руководство. Создание сети обработки изображений

Рекомендации по работе с библиотекой параллельных шаблонов

Общие рекомендации в среде выполнения с параллелизмом