How to: Implement Various Producer-Consumer Patterns
This topic describes how to implement the producer-consumer pattern in your application. In this pattern, the producer sends messages to a message block, and the consumer reads messages from that block.
The topic demonstrates two scenarios. In the first scenario, the consumer must receive each message that the producer sends. In the second scenario, the consumer periodically polls for data, and therefore does not have to receive each message.
Both examples in this topic use agents, message blocks, and message-passing functions to transmit messages from the producer to the consumer. The producer agent uses the concurrency::send function to write messages to a concurrency::ITarget object. The consumer agent uses the concurrency::receive function to read messages from a concurrency::ISource object. Both agents hold a sentinel value to coordinate the end of processing.
For more information about asynchronous agents, see Asynchronous Agents. For more information about message blocks and message-passing functions, see Asynchronous Message Blocks and Message Passing Functions.
Example
In this example, the producer agent sends a series of numbers to the consumer agent. The consumer receives each of these numbers and computes their average. The application writes the average to the console.
This example uses a concurrency::unbounded_buffer object to enable the producer to queue messages. The unbounded_buffer class implements ITarget and ISource so that the producer and the consumer can send and receive messages to and from a shared buffer. The send and receive functions coordinate the task of propagating the data from the producer to the consumer.
// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
: _target(target)
, _count(count)
, _sentinel(sentinel)
{
}
protected:
void run()
{
// Send the value of each loop iteration to the target buffer.
while (_count > 0)
{
send(_target, static_cast<int>(_count));
--_count;
}
// Send the sentinel value.
send(_target, _sentinel);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<int>& _target;
// The number of values to send.
unsigned int _count;
// The sentinel value, which informs the consumer agent to stop processing.
int _sentinel;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<int>& source, int sentinel)
: _source(source)
, _sentinel(sentinel)
{
}
// Retrieves the average of all received values.
int average()
{
return receive(_average);
}
protected:
void run()
{
// The sum of all values.
int sum = 0;
// The count of values received.
int count = 0;
// Read from the source block until we receive the
// sentinel value.
int n;
while ((n = receive(_source)) != _sentinel)
{
sum += n;
++count;
}
// Write the average to the message buffer.
send(_average, sum / count);
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<int>& _source;
// The sentinel value, which informs the agent to stop processing.
int _sentinel;
// Holds the average of all received values.
single_assignment<int> _average;
};
int wmain()
{
// Informs the consumer agent to stop processing.
const int sentinel = 0;
// The number of values for the producer agent to send.
const unsigned int count = 100;
// A message buffer that is shared by the agents.
unbounded_buffer<int> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer, count, sentinel);
consumer_agent consumer(buffer, sentinel);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
// Print the average.
wcout << L"The average is " << consumer.average() << L'.' << endl;
}
This example produces the following output.
The average is 50.
In this example, the producer agent sends a series of stock quotes to the consumer agent. The consumer agent periodically reads the current quote and prints it to the console.
This example resembles the previous one, except that it uses a concurrency::overwrite_buffer object to enable the producer to share one message with the consumer. As in the previous example, overwrite_buffer class implements ITarget and ISource so that the producer and the consumer can act on a shared message buffer.
// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<double>& target)
: _target(target)
{
}
protected:
void run()
{
// For illustration, create a predefined array of stock quotes.
// A real-world application would read these from an external source,
// such as a network connection or a database.
array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };
// Send each quote to the target buffer.
for_each (begin(quotes), end(quotes), [&] (double quote) {
send(_target, quote);
// Pause before sending the next quote.
concurrency::wait(20);
});
// Send a negative value to indicate the end of processing.
send(_target, -1.0);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<double>& _target;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<double>& source)
: _source(source)
{
}
protected:
void run()
{
// Read quotes from the source buffer until we receive
// a negative value.
double quote;
while ((quote = receive(_source)) >= 0.0)
{
// Print the quote.
wcout.setf(ios::fixed);
wcout.precision(2);
wcout << L"Current quote is " << quote << L'.' << endl;
// Pause before reading the next quote.
concurrency::wait(10);
}
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<double>& _source;
};
int wmain()
{
// A message buffer that is shared by the agents.
overwrite_buffer<double> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer);
consumer_agent consumer(buffer);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
}
This example produces the following sample output.
Current quote is 24.44. Current quote is 24.44. Current quote is 24.65. Current quote is 24.99. Current quote is 23.76. Current quote is 22.30. Current quote is 25.89.
Unlike with an unbounded_buffer object, the receive function does not remove the message from the overwrite_buffer object. If the consumer reads from the message buffer more than one time before the producer overwrites that message, the receiver obtains the same message every time.
Compiling the Code
Copy the example code and paste it in a Visual Studio project, or paste it in a file that is named producer-consumer.cpp and then run the following command in a Visual Studio Command Prompt window.
cl.exe /EHsc producer-consumer.cpp