Aracılığıyla paylaş


Nasıl yapılır: çeşitli Producer tüketici desenleri uygulamak

Bu konuda tüketici producer desen uygulamanızda nasıl açıklar. Bu desende producer ileti bloğu iletileri gönderir ve tüketici engelleyen iletileri okur.

Konuyu iki senaryonun gösterilmektedir. Bu senaryoda, tüketici producer gönderen her ileti alması gerekir. İkinci senaryoda tüketici verilerini düzenli olarak yoklar ve bu nedenle her ileti almak zorunda değildir.

Bu konuda iki örnek tüketiciye producer gelen iletileri iletmek için aracıları, ileti blokları ve ileti iletme işlevlerini kullanın. Producer Aracısı kullanan concurrency::send işlevi ileti yazmak için bir concurrency::ITarget nesnesi. Tüketici Aracısı kullanan concurrency::receive işlevi, gelen iletileri okumak için bir concurrency::ISource nesnesi. Her iki aracıları işleme sonuna koordine etmek için sentinel değer tutun.

Zaman uyumsuz aracıları hakkında daha fazla bilgi için bkz: Zaman uyumsuz aracıları. İleti blokları ve ileti geçirme işlevleri hakkında daha fazla bilgi için bkz: Zaman uyumsuz ileti blokları ve İleti gönderme fonksiyonları.

Örnek

Bu örnekte, producer Aracısı bir numara serisi tüketici aracısına gönderir. Tüketici, bu numaraları alır ve bunların ortalamasını hesaplar. Uygulama ortalama konsola yazacak.

Bu örnek bir concurrency::unbounded_buffer producer sıra iletileri etkinleştirmek için nesne. unbounded_buffer Class Implements ITarget ve ISource producer ve tüketici göndermek ve paylaşılan bir arabellek iletileri almak olduğunu. send Ve receive işlevleri tüketiciye producer verilerini yayma görevini koordine etmek.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }

      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Bu örnek aşağıdaki çıktıyı üretir.

The average is 50.

Bu örnekte, producer Aracısı bir dizi hisse tüketici aracısına gönderir. Tüketici aracı düzenli olarak geçerli teklif okur ve konsol yazdırır.

Kullandığı dışında bu örnek önceki bir benzer bir concurrency::overwrite_buffer bir ileti tüketici ile paylaşmak producer etkinleştirmek için nesne. Önceki örnekte olduğu gibi overwrite_buffer Implements sınıfı ITarget ve ISource böylece producer ve tüketici paylaşılan ileti arabelleği davranabilir.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);

      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Bu örnek, aşağıdaki örnek çıktı oluşturur.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

Aksine olan bir unbounded_buffer nesnesi, receive işlevi iletiden kaldırmak değil overwrite_buffer nesnesi. Tüketici producer ileti üzerine yazılmadan önce birden fazla kez ileti arabelleğinden yazıyorsa, alıcının her zaman aynı iletiyi alır.

Kod Derleniyor

Örnek kodu kopyalayın ve Visual Studio Project'te yapıştırın veya adlı bir dosyaya yapıştırın producer consumer.cpp ve Visual Studio komut istemi penceresinde aşağıdaki komutu çalıştırın.

cl.exe /EHsc producer-consumer.cpp

Ayrıca bkz.

Kavramlar

Zaman uyumsuz aracıları kitaplığı

Zaman uyumsuz aracıları

Zaman uyumsuz ileti blokları

İleti gönderme fonksiyonları