Практическое руководство. Реализация различных шаблонов "источник-приемник"

В этом разделе описывается, как реализовать шаблон производителя-потребителя в приложении. В этом шаблоне производитель отправляет сообщения в блок сообщений, а потребитель считывает сообщения из этого блока.

В этом разделе демонстрируется два сценария. В первом сценарии потребитель должен получать каждое сообщение, которое отправляет производитель. Во втором сценарии потребитель периодически опрашивает данные и поэтому не должен получать каждое сообщение.

Оба примера в этом разделе используют агенты, блоки сообщений и функции передачи сообщений для передачи сообщений от производителя потребителю. Агент производителя использует функцию параллелизма::send для записи сообщений в объект concurrency::ITarget . Агент потребителя использует функцию параллелизма::receive для чтения сообщений из объекта concurrency::ISource . Оба агента содержат значение sentinel для координации окончания обработки.

Дополнительные сведения об асинхронных агентах см. в разделе "Асинхронные агенты". Дополнительные сведения о блоках сообщений и функциях передачи сообщений см. в разделе "Асинхронные блоки сообщений" и функции передачи сообщений.

Пример. Отправка ряда чисел в агент потребителя

В этом примере агент производителя отправляет ряд чисел агенту-получателю. Потребитель получает каждое из этих чисел и вычисляет их среднее значение. Приложение записывает среднее значение в консоль.

В этом примере используется объект параллелизма::unbounded_buffer для включения производителя в очередь сообщений. Класс unbounded_buffer реализует ITarget и ISource позволяет производителю и потребителю отправлять и получать сообщения в общий буфер и из нее. receive Функции send координирует задачу распространения данных от производителя к потребителю.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

В этом примере формируются следующие данные:

The average is 50.

Пример. Отправка рядов фондовых котировок агенту-потребителю

В этом примере агент производителя отправляет ряд котировок акций агенту-потребителю. Агент-потребитель периодически считывает текущую цитату и выводит его в консоль.

Этот пример похож на предыдущий , за исключением того, что он использует объект параллелизма::overwrite_buffer для предоставления продюсеру общего доступа к одному сообщению с потребителем. Как и в предыдущем примере, overwrite_buffer класс реализует ITarget и ISource позволяет производителю и потребителю действовать в буфере общего сообщения.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

В этом примере создается следующий пример выходных данных.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

В отличие от unbounded_buffer объекта, receive функция не удаляет сообщение из overwrite_buffer объекта. Если потребитель считывает из буфера сообщений несколько раз, прежде чем производитель перезаписывает это сообщение, получатель получает одно и то же сообщение каждый раз.

Компиляция кода

Скопируйте пример кода и вставьте его в проект Visual Studio или вставьте его в файл с именем producer-consumer.cpp , а затем выполните следующую команду в окне командной строки Visual Studio.

cl.exe /EHsc producer-consumer.cpp

См. также

Библиотека асинхронных агентов
Асинхронные агенты
Асинхронные блоки сообщений
Функции передачи сообщений