Bagikan melalui


Cara: Menerapkan Berbagai Pola Produsen-Konsumen

Topik ini menjelaskan cara menerapkan pola produsen-konsumen dalam aplikasi Anda. Dalam pola ini, produsen mengirim pesan ke blok pesan, dan konsumen membaca pesan dari blok itu.

Topik ini menunjukkan dua skenario. Dalam skenario pertama, konsumen harus menerima setiap pesan yang dikirim produsen. Dalam skenario kedua, konsumen secara berkala melakukan polling untuk data, dan karenanya tidak harus menerima setiap pesan.

Kedua contoh dalam topik ini menggunakan agen, blok pesan, dan fungsi pengiriman pesan untuk mengirimkan pesan dari produsen ke konsumen. Agen produsen menggunakan fungsi konkurensi::kirim untuk menulis pesan ke objek konkurensi::ITarget . Agen konsumen menggunakan fungsi konkurensi::receive untuk membaca pesan dari objek konkurensi::ISource . Kedua agen memegang nilai sentinel untuk mengoordinasikan akhir pemrosesan.

Untuk informasi selengkapnya tentang agen asinkron, lihat Agen Asinkron. Untuk informasi selengkapnya tentang blok pesan dan fungsi pengiriman pesan, lihat Blok Pesan Asinkron dan Fungsi Meneruskan Pesan.

Contoh: Mengirim serangkaian angka ke agen konsumen

Dalam contoh ini, agen produsen mengirimkan serangkaian angka ke agen konsumen. Konsumen menerima masing-masing angka ini dan menghitung rata-ratanya. Aplikasi menulis rata-rata ke konsol.

Contoh ini menggunakan objek konkurensi::unbounded_buffer untuk memungkinkan produsen mengantre pesan. Kelas mengimplementasikan unbounded_buffer ITarget dan ISource sehingga produsen dan konsumen dapat mengirim dan menerima pesan ke dan dari buffer bersama. Fungsi send dan receive mengoordinasikan tugas penyebaran data dari produsen ke konsumen.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Contoh ini menghasilkan output berikut.

The average is 50.

Contoh: Mengirim rangkaian kutipan saham ke agen konsumen

Dalam contoh ini, agen produsen mengirimkan serangkaian kutipan saham ke agen konsumen. Agen konsumen secara berkala membaca kuotasi saat ini dan mencetaknya ke konsol.

Contoh ini menyerupai yang sebelumnya, kecuali menggunakan objek konkurensi::overwrite_buffer untuk memungkinkan produsen berbagi satu pesan dengan konsumen. Seperti dalam contoh sebelumnya, overwrite_buffer kelas mengimplementasikan ITarget dan ISource sehingga produsen dan konsumen dapat bertindak pada buffer pesan bersama.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Contoh ini menghasilkan output sampel berikut.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

unbounded_buffer Tidak seperti objek, receive fungsi tidak menghapus pesan dari overwrite_buffer objek. Jika konsumen membaca dari buffer pesan lebih dari satu kali sebelum produsen menimpa pesan tersebut, penerima mendapatkan pesan yang sama setiap saat.

Mengompilasi Kode

Salin kode contoh dan tempelkan dalam proyek Visual Studio, atau tempelkan dalam file yang diberi nama producer-consumer.cpp lalu jalankan perintah berikut di jendela Prompt Perintah Visual Studio.

producer-consumer.cpp cl.exe /EHsc

Lihat juga

Pustaka Agen Asinkron
Agen Asinkron
Blok Pesan Asinkron
Fungsi Passing Pesan