Udostępnij za pośrednictwem


Porady: implementowanie różnych wzorców producent — konsument

W tym temacie opisano sposób implementowania wzorca producenta-konsumenta w aplikacji. W tym wzorcu producent wysyła komunikaty do bloku komunikatów, a użytkownik odczytuje komunikaty z tego bloku.

W tym temacie przedstawiono dwa scenariusze. W pierwszym scenariuszu konsument musi otrzymać każdą wiadomość, którą wysyła producent. W drugim scenariuszu użytkownik okresowo sonduje dane i w związku z tym nie musi odbierać każdego komunikatu.

Oba przykłady w tym temacie używają agentów, bloków komunikatów i funkcji przekazywania komunikatów w celu przesyłania komunikatów z producenta do odbiorcy. Agent producenta używa funkcji concurrency::send do zapisywania komunikatów w obiekcie concurrency::ITarget . Agent konsumenta używa funkcji concurrency::receive do odczytywania komunikatów z obiektu concurrency::ISource . Obaj agenci przechowują wartość sentinel, aby koordynować koniec przetwarzania.

Aby uzyskać więcej informacji na temat agentów asynchronicznych, zobacz Asynchronous Agents (Agenci asynchroniczne). Aby uzyskać więcej informacji na temat bloków komunikatów i funkcji przekazywania komunikatów, zobacz Asynchroniczne bloki komunikatów i funkcje przekazywania komunikatów.

Przykład: wysyłanie serii liczb do agenta odbiorcy

W tym przykładzie agent producenta wysyła serię liczb do agenta konsumenta. Odbiorca otrzymuje każdą z tych liczb i oblicza średnią. Aplikacja zapisuje średnią w konsoli.

W tym przykładzie użyto obiektu współbieżności::unbounded_buffer , aby umożliwić producentowi kolejkę komunikatów. Klasa unbounded_buffer implementuje ITarget i ISource tak, aby producent i odbiorca mogli wysyłać i odbierać komunikaty do i z udostępnionego buforu. Funkcje send i receive koordynują zadanie propagowania danych od producenta do konsumenta.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

W tym przykładzie są generowane następujące dane wyjściowe.

The average is 50.

Przykład: Wysyłanie serii ofert giełdowych do agenta konsumenta

W tym przykładzie agent producenta wysyła serię ofert giełdowych do agenta konsumenta. Agent konsumenta okresowo odczytuje bieżący cudzysłów i drukuje go w konsoli.

Ten przykład przypomina poprzedni, z tą różnicą , że używa obiektu współbieżności::overwrite_buffer , aby umożliwić producentowi udostępnianie jednego komunikatu użytkownikowi. Podobnie jak w poprzednim przykładzie, overwrite_buffer klasa implementuje ITarget i ISource tak, aby producent i odbiorca mogli działać na udostępnionym buforze komunikatów.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

W tym przykładzie są generowane następujące przykładowe dane wyjściowe.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

unbounded_buffer W przeciwieństwie do obiektu receive funkcja nie usuwa komunikatu overwrite_buffer z obiektu. Jeśli odbiorca odczytuje z buforu komunikatu więcej niż jeden raz przed zastąpieniem tego komunikatu przez producenta, odbiorca uzyskuje ten sam komunikat za każdym razem.

Kompilowanie kodu

Skopiuj przykładowy kod i wklej go w projekcie programu Visual Studio lub wklej go w pliku o nazwie producer-consumer.cpp , a następnie uruchom następujące polecenie w oknie wiersza polecenia programu Visual Studio.

cl.exe /EHsc producer-consumer.cpp

Zobacz też

Biblioteki agentów asynchronicznych
Agenci asynchroniczni
Bloki komunikatów asynchronicznych
Funkcje przekazywania komunikatów