HOW TO:實作各種生產者-消費者模式
本主題說明如何在您的應用程式中實作生產者-消費者模式。 在這個模式中,「生產者」(Producer) 會傳送訊息至訊息區塊,而「消費者」(Consumer) 則會從該區塊讀取訊息。
本主題會示範兩個情境。 在第一個情境中,消費者必須收到產生者所傳送的每個訊息。 在第二個情境中,消費者會定期輪詢資料,因此不需要收到每個訊息。
本主題中的兩個範例都會使用代理程式、訊息區塊和訊息傳遞函式,將訊息從生產者傳輸給消費者。 產生者代理程式會使用 concurrency::send 函式來寫入訊息至 concurrency::ITarget 物件。 取用者的代理程式 」 使用 concurrency::receive 函式來讀取郵件的來源 concurrency::ISource 物件。 這兩個代理程式都保有 Sentinel 值,以協調處理結束。
如需非同步代理程式的詳細資訊,請參閱非同步代理程式。 如需訊息區塊和訊息傳遞函式的詳細資訊,請參閱非同步訊息區和訊息傳遞函式。
範例
在這個範例中,生產者代理程式會將一系列的數字傳送給消費者代理程式。 消費者在收到所有這些數字之後,會計算它們的平均值。 應用程式會將平均值寫入主控台。
這個範例會使用 concurrency::unbounded_buffer 物件,才能啟用佇列的訊息產生者。 unbounded_buffer 類別會實作 ITarget 和 ISource,讓生產者和消費者可以在共用緩衝區中傳送與接收訊息。 send 和 receive 函式會協調將資料從生產者傳播給消費者的工作。
// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
: _target(target)
, _count(count)
, _sentinel(sentinel)
{
}
protected:
void run()
{
// Send the value of each loop iteration to the target buffer.
while (_count > 0)
{
send(_target, static_cast<int>(_count));
--_count;
}
// Send the sentinel value.
send(_target, _sentinel);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<int>& _target;
// The number of values to send.
unsigned int _count;
// The sentinel value, which informs the consumer agent to stop processing.
int _sentinel;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<int>& source, int sentinel)
: _source(source)
, _sentinel(sentinel)
{
}
// Retrieves the average of all received values.
int average()
{
return receive(_average);
}
protected:
void run()
{
// The sum of all values.
int sum = 0;
// The count of values received.
int count = 0;
// Read from the source block until we receive the
// sentinel value.
int n;
while ((n = receive(_source)) != _sentinel)
{
sum += n;
++count;
}
// Write the average to the message buffer.
send(_average, sum / count);
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<int>& _source;
// The sentinel value, which informs the agent to stop processing.
int _sentinel;
// Holds the average of all received values.
single_assignment<int> _average;
};
int wmain()
{
// Informs the consumer agent to stop processing.
const int sentinel = 0;
// The number of values for the producer agent to send.
const unsigned int count = 100;
// A message buffer that is shared by the agents.
unbounded_buffer<int> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer, count, sentinel);
consumer_agent consumer(buffer, sentinel);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
// Print the average.
wcout << L"The average is " << consumer.average() << L'.' << endl;
}
這個範例產生下列輸出。
The average is 50.
在這個範例中,生產者代理程式會將一系列的股價傳送給消費者代理程式。 消費者代理程式會定期讀取目前的報價,並將它列印至主控台。
本範例類似於前一個,不同之處在於,它會使用 concurrency::overwrite_buffer 物件,才能啟用產生者和消費者共用一封郵件。 與前一個範例相同,overwrite_buffer 類別會實作 ITarget 和 ISource,讓生產者和消費者可以對共用訊息緩衝區執行動作。
// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<double>& target)
: _target(target)
{
}
protected:
void run()
{
// For illustration, create a predefined array of stock quotes.
// A real-world application would read these from an external source,
// such as a network connection or a database.
array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };
// Send each quote to the target buffer.
for_each (begin(quotes), end(quotes), [&] (double quote) {
send(_target, quote);
// Pause before sending the next quote.
concurrency::wait(20);
});
// Send a negative value to indicate the end of processing.
send(_target, -1.0);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<double>& _target;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<double>& source)
: _source(source)
{
}
protected:
void run()
{
// Read quotes from the source buffer until we receive
// a negative value.
double quote;
while ((quote = receive(_source)) >= 0.0)
{
// Print the quote.
wcout.setf(ios::fixed);
wcout.precision(2);
wcout << L"Current quote is " << quote << L'.' << endl;
// Pause before reading the next quote.
concurrency::wait(10);
}
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<double>& _source;
};
int wmain()
{
// A message buffer that is shared by the agents.
overwrite_buffer<double> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer);
consumer_agent consumer(buffer);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
}
這個範例 (Example) 產生下列範例 (Sample) 輸出。
Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.
與使用 unbounded_buffer 物件時不同,receive 函式並不會從 overwrite_buffer 物件移除訊息。 如果消費者在生產者覆寫該訊息之前多次讀取訊息緩衝區,則消費者每次都會得到相同的訊息。
編譯程式碼
將範例程式碼複製並貼上它在 Visual Studio 專案中,或將它貼在檔名為產生者 consumer.cpp ,然後執行下列命令,Visual Studio 的命令提示字元] 視窗中。
cl.exe /EHsc producer-consumer.cpp