如何:实现各种制造者-使用者模式

本主题介绍如何在应用程序中实现生产者-使用者模式。 在此模式下,制造者向消息块发送消息,使用者从该块读取消息。

本主题演示了两种方案。 在第一种方案中,使用者必须接收生产者发送的每条消息。 在第二种方案中,使用者定期轮询数据,因此不必接收每条消息。

本主题中的这两个示例都使用代理、消息块和消息传递函数将消息从生产者传输到使用者。 生产者代理使用 concurrency::send 函数将消息写入 concurrency::ITarget 对象。 使用者代理使用 concurrency::receive 函数从 concurrency::ISource 对象读取消息。 两个代理都持有一个 sentinel 值来协调处理结束。

有关异步代理的详细信息,请参阅异步代理。 有关消息块和消息传递函数的详细信息,请参阅异步消息块消息传递函数

示例:向使用者代理发送一系列数字

在此示例中,生产者代理向使用者代理发送一系列数字。 使用者将接收每一个数字并计算其平均值。 应用程序将平均值写入控制台。

此示例使用 concurrency::unbounded_buffer 对象,使生产者能够对消息进行排队。 unbounded_buffer 类实现 ITargetISource,以便生产者和使用者可以通过共享的缓冲区发送和接收消息。 sendreceive 函数协调将数据从生产者传播到使用者的任务。

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

本示例生成以下输出。

The average is 50.

示例:向使用者代理发送一系列股票报价

在此示例中,生产者代理向使用者代理发送一系列股票报价。 使用者代理定期读取当前报价并将其打印到控制台。

此示例与之前的示例类似,不同之处在于它使用 concurrency::overwrite_buffer 对象,使生产者能够与使用者共享一条消息。 与之前的示例中一样,overwrite_buffer 类实现 ITargetISource,以便生产者和使用者可以对共享的消息缓冲区执行操作。

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

此示例生成以下示例输出。

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

unbounded_buffer 对象不同,receive 函数不会从 overwrite_buffer 对象中删除消息。 如果使用者在生产者覆盖消息之前多次从消息缓冲区读取,则接收方每次都会获得相同的消息。

编译代码

复制示例代码,并将它粘贴到 Visual Studio 项目中,或粘贴到名为 producer-consumer.cpp 的文件中,再在 Visual Studio 命令提示符窗口中运行以下命令。

cl.exe /EHsc producer-consumer.cpp

另请参阅

异步代理库
异步代理
异步消息块
消息传递函数