Compartir vía


Cómo: Implementar varios modelos productor-consumidor

En este tema se describe cómo implementar el modelo productor-consumidor en la aplicación. En este modelo, el productor envía mensajes a un bloque de mensajes y el consumidor lee los mensajes de este bloque.

En el tema se muestran dos escenarios. En el primero, el consumidor debe recibir cada mensaje que el productor envía. En el segundo, el consumidor busca datos periódicamente y, por tanto, no tiene que recibir cada mensaje.

Ambos ejemplos de este tema usan agentes, bloques de mensajes y funciones de paso de mensajes para transmitir mensajes del productor al consumidor. El agente del productor usa la función concurrency::send para escribir mensajes en un objeto concurrency::ITarget. El agente del consumidor usa la función concurrency::receive para leer mensajes de un objeto concurrency::ISource. Ambos agentes almacenan un valor centinela para coordinar el final del procesamiento.

Para más información sobre los agentes asincrónicos, consulte Agentes asincrónicos. Para más información sobre los bloques de mensajes y las funciones de paso de mensajes, consulte Bloques de mensajes asincrónicos y Funciones que pasan mensajes.

Ejemplo: Envío de series de números al agente de consumidor

En este ejemplo, el agente del productor envía una serie de números al agente del consumidor. El consumidor recibe cada uno de estos números y calcula su promedio. La aplicación escribe el promedio en la consola.

En este ejemplo se usa un objeto concurrency::unbounded_buffer para permitir al productor poner en cola los mensajes. La clase unbounded_buffer implementa ITarget e ISource para que el productor y el consumidor puedan enviar y recibir mensajes en un búfer compartido. Las funciones send y receive coordinan la tarea de propagación de los datos del productor al consumidor.

// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
      : _target(target)
      , _count(count)
      , _sentinel(sentinel)
   {
   }
protected:
   void run()
   {
      // Send the value of each loop iteration to the target buffer.
      while (_count > 0)
      {
         send(_target, static_cast<int>(_count));
         --_count;
      }
      // Send the sentinel value.
      send(_target, _sentinel);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<int>& _target;
   // The number of values to send.
   unsigned int _count;
   // The sentinel value, which informs the consumer agent to stop processing.
   int _sentinel;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<int>& source, int sentinel)
      : _source(source)
      , _sentinel(sentinel)
   {
   }

   // Retrieves the average of all received values.
   int average()
   {
      return receive(_average);
   }
protected:
   void run()
   {
      // The sum of all values.
      int sum = 0;
      // The count of values received.
      int count = 0;

      // Read from the source block until we receive the 
      // sentinel value.
      int n;
      while ((n = receive(_source)) != _sentinel)
      {
         sum += n;
         ++count;
      }
      
      // Write the average to the message buffer.
      send(_average, sum / count);

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<int>& _source;
   // The sentinel value, which informs the agent to stop processing.
   int _sentinel;
   // Holds the average of all received values.
   single_assignment<int> _average;
};

int wmain()
{
   // Informs the consumer agent to stop processing.
   const int sentinel = 0;
   // The number of values for the producer agent to send.
   const unsigned int count = 100;

   // A message buffer that is shared by the agents.
   unbounded_buffer<int> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer, count, sentinel);
   consumer_agent consumer(buffer, sentinel);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);

   // Print the average.
   wcout << L"The average is " << consumer.average() << L'.' << endl;
}

Este ejemplo produce el siguiente resultado:

The average is 50.

Ejemplo: Envío de una serie de cotizaciones bursátiles al agente del consumidor

En este ejemplo, el agente del productor envía una serie de cotizaciones al agente del consumidor. El agente del consumidor lee periódicamente la cotización actual y la imprime en la consola.

Este ejemplo es similar al anterior, excepto por el hecho de que usa un objeto concurrency::overwrite_buffer para permitir al productor compartir un mensaje con el consumidor. Como en el ejemplo anterior, la clase overwrite_buffer implementa ITarget e ISource para que el productor y el consumidor puedan actuar en un búfer de mensajes compartido.

// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>

using namespace concurrency;
using namespace std;

// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
   explicit producer_agent(ITarget<double>& target)
      : _target(target)
   {
   }
protected:
   void run()
   {
      // For illustration, create a predefined array of stock quotes. 
      // A real-world application would read these from an external source, 
      // such as a network connection or a database.
      array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };

      // Send each quote to the target buffer.
      for_each (begin(quotes), end(quotes), [&] (double quote) { 

         send(_target, quote);

         // Pause before sending the next quote.
         concurrency::wait(20);
      });
      // Send a negative value to indicate the end of processing.
      send(_target, -1.0);
                 
      // Set the agent to the finished state.
      done();
   }
private:
   // The target buffer to write to.
   ITarget<double>& _target;
};

// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
   explicit consumer_agent(ISource<double>& source)
      : _source(source)      
   {
   }

protected:
   void run()
   {
      // Read quotes from the source buffer until we receive
      // a negative value.
      double quote;
      while ((quote = receive(_source)) >= 0.0)
      {
         // Print the quote.
         wcout.setf(ios::fixed);
         wcout.precision(2);
         wcout << L"Current quote is " << quote << L'.' << endl;

         // Pause before reading the next quote.
         concurrency::wait(10);
      }

      // Set the agent to the finished state.
      done();
   }
private:
   // The source buffer to read from.
   ISource<double>& _source;
};

int wmain()
{
   // A message buffer that is shared by the agents.
   overwrite_buffer<double> buffer;

   // Create and start the producer and consumer agents.
   producer_agent producer(buffer);
   consumer_agent consumer(buffer);
   producer.start();
   consumer.start();

   // Wait for the agents to finish.
   agent::wait(&producer);
   agent::wait(&consumer);
}

Este ejemplo genera la siguiente salida de ejemplo.

Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.

A diferencia de lo que sucede con un objeto unbounded_buffer, la función receive no quita el mensaje del objeto overwrite_buffer. Si el consumidor lee del búfer de mensajes más de una vez antes de que el productor sobrescriba dicho mensaje, el receptor obtiene el mismo mensaje cada vez.

Compilar el código

Copie el código de ejemplo y péguelo en un proyecto de Visual Studio o en un archivo denominado producer-consumer.cpp y, después, ejecute el siguiente comando en una ventana del símbolo del sistema de Visual Studio.

cl.exe /EHsc producer-consumer.cpp

Consulte también

Biblioteca de agentes asincrónicos
Agentes asincrónicos
Bloques de mensajes asincrónicos
Funciones que pasan mensajes