Aracılığıyla paylaş


Nasıl yapılır: Çeşitli Üretici-Tüketici Desenlerini Uygulama

Bu konuda, uygulamanızda üretici-tüketici deseninin nasıl uygulandığı açıklanmaktadır. Bu düzende , üretici bir ileti bloğuna ileti gönderir ve tüketici bu bloktaki iletileri okur.

Bu konuda iki senaryo gösterilmektedir. İlk senaryoda tüketicinin üreticinin gönderdiği her iletiyi alması gerekir. İkinci senaryoda, tüketici verileri düzenli aralıklarla yoklar ve bu nedenle her iletiyi almak zorunda değildir.

Bu konudaki her iki örnek de, üreticiden tüketiciye ileti iletmek için aracılar, ileti blokları ve ileti geçirme işlevlerini kullanır. Üretici aracısı, bir eşzamanlılık::ITarget nesnesine ileti yazmak için eşzamanlılık::send işlevini kullanır. Tüketici aracısı, eşzamanlılık::ISource nesnesinden gelen iletileri okumak için eşzamanlılık::alma işlevini kullanır. her iki aracı da işlem sonunu koordine etmek için bir sentinel değeri tutar.

Zaman uyumsuz aracılar hakkında daha fazla bilgi için bkz . Zaman Uyumsuz Aracılar. İleti blokları ve ileti geçirme işlevleri hakkında daha fazla bilgi için bkz . Zaman Uyumsuz İleti Blokları ve İleti Geçirme İşlevleri.

Örnek: Tüketici aracısına sayı serisi gönderme

Bu örnekte, üretici aracısı tüketici aracısına bir dizi sayı gönderir. Tüketici bu sayıların her birini alır ve ortalamasını hesaplar. Uygulama ortalamayı konsola yazar.

Bu örnekte üreticinin iletileri kuyruğa almasını sağlamak için eşzamanlılık::unbounded_buffer nesnesi kullanılır. unbounded_buffer sınıfı, ITargetISource üreticinin ve tüketicinin paylaşılan bir arabelleğe ileti gönderip alabilmesi için ve uygular. ve receive işlevleri, send verileri üreticiden tüketiciye yayma görevini koordine edin.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Bu örnek aşağıdaki çıkışı oluşturur.

The average is 50.

Örnek: Tüketici temsilcisine bir dizi hisse senedi teklifi gönderme

Bu örnekte, üretici aracısı tüketici temsilcisine bir dizi hisse senedi teklifi gönderir. Tüketici aracısı, geçerli teklifi düzenli aralıklarla okur ve konsola yazdırır.

Bu örnek, üreticinin bir iletiyi tüketiciyle paylaşmasını sağlamak için eşzamanlılık::overwrite_buffer nesnesi kullanması dışında öncekine benzer. Önceki örnekte olduğu gibi sınıfıITarget, overwrite_buffer üreticinin ve ISource tüketicinin paylaşılan bir ileti arabelleği üzerinde hareket edebilmesi için ve uygular.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Bu örnek aşağıdaki örnek çıktıyı oluşturur.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

Bir unbounded_buffer nesnenin receive aksine, işlevi iletiyi nesneden overwrite_buffer kaldırmaz. Tüketici, üretici bu iletinin üzerine yazmadan önce ileti arabelleğinden birden fazla kez okursa, alıcı her seferinde aynı iletiyi alır.

Kod Derleniyor

Örnek kodu kopyalayıp bir Visual Studio projesine yapıştırın veya adlı producer-consumer.cpp bir dosyaya yapıştırın ve ardından bir Visual Studio Komut İstemi penceresinde aşağıdaki komutu çalıştırın.

cl.exe /EHsc producer-consumer.cpp

Ayrıca bkz.

Zaman Uyumsuz Aracılar Kitaplığı
Zaman Uyumsuz Aracılar
Zaman Uyumsuz İleti Blokları
İleti Geçirme İşlevleri