Megosztás a következőn keresztül:


Útmutató: Különböző Producer-Consumer minták implementálása

Ez a témakör azt ismerteti, hogyan implementálhatja a gyártó-fogyasztó mintát az alkalmazásban. Ebben a mintában a gyártó üzeneteket küld egy üzenetblokkba, és a fogyasztó ebből a blokkból olvas üzeneteket.

A témakör két forgatókönyvet mutat be. Az első forgatókönyvben a fogyasztónak minden egyes üzenetet meg kell kapnia, amelyet a gyártó küld. A második forgatókönyvben a fogyasztó rendszeresen lekérdezi az adatokat, ezért nem kell minden üzenetet fogadnia.

A jelen témakör mindkét példája ügynököket, üzenetblokkokat és üzenetátadási függvényeket használ az üzenetek gyártótól a fogyasztónak való továbbításához. A gyártóügynök az egyidejűség::send függvény használatával ír üzeneteket egy egyidejűség::ITarget objektumba. A fogyasztói ügynök az egyidejűség::receive függvény használatával olvas üzeneteket egy egyidejűség::ISource objektumból. Mindkét ügynök rendelkezik egy sentinel értékkel a feldolgozás befejezésének koordinálásához.

Az aszinkron ügynökökről további információt az Aszinkron ügynökök című témakörben talál. További információ az üzenetblokkokról és az üzenetátadási függvényekről: Aszinkron üzenetblokkok és üzenetátadási függvények.

Példa: Számok sorozatának küldése a fogyasztói ügynöknek

Ebben a példában a gyártóügynök számsorozatot küld a fogyasztói ügynöknek. A fogyasztó megkapja ezeket a számokat, és kiszámítja az átlagukat. Az alkalmazás az átlagot a konzolra írja.

Ez a példa egy egyidejűség::unbounded_buffer objektumot használ, amely lehetővé teszi az előállító számára az üzenetek sorba állítását. Az unbounded_buffer osztály megvalósítja ITarget és ISource úgy, hogy biztosítja a gyártó és a fogyasztó számára, hogy üzeneteket küldhessenek a megosztott pufferbe és fogadhassanak onnan. Az send és receive a funkciók koordinálják az adatok gyártótól a fogyasztó felé történő propagálásának feladatát.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Ez a példa a következő kimenetet hozza létre.

The average is 50.

Példa: Tőzsdei árfolyamok sorozatának küldése a fogyasztói ügynöknek

Ebben a példában a termelőügynök részvényárfolyamokat küld a fogyasztói ügynöknek. A fogyasztói ügynök rendszeresen felolvassa az aktuális árajánlatot, és kinyomtatja azt a konzolon.

Ez a példa az előzőhöz hasonló, azzal a kivétellel, hogy egy concurrency::overwrite_buffer objektumot használ, amely lehetővé teszi a gyártó számára, hogy egy üzenetet megosszon a fogyasztóval. Az előző példához hasonlóan a overwrite_buffer osztály implementálja a ITarget és ISource interfészeket, így a gyártó és a fogyasztó egy megosztott üzenetpufferen tud működni.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Ez a példa a következő mintakimenetet hozza létre.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

Az objektumtól unbounded_buffer eltérően a receive függvény nem távolítja el az üzenetet az overwrite_buffer objektumból. Ha a fogyasztó többször olvas az üzenetpufferből, mielőtt a gyártó felülírja az üzenetet, a fogadó minden alkalommal ugyanazt az üzenetet kapja.

A kód összeállítása

Másolja ki a példakódot, és illessze be egy Visual Studio-projektbe, vagy illessze be egy elnevezett producer-consumer.cpp fájlba, majd futtassa a következő parancsot egy Visual Studio parancssori ablakban.

cl.exe /EHsc producer-consumer.cpp

Lásd még

Aszinkron ügynökök könyvtára
Aszinkron ügynökök
Aszinkron üzenetblokkok
Üzenetátadási függvények