Compartir vía


Procedimientos recomendados en la biblioteca de agentes asincrónicos

En este documento se describe cómo hacer un uso eficaz de la biblioteca de agentes asincrónicos. La biblioteca de agentes promueve un modelo de programación basado en actores y en el paso de mensajes en proceso para el flujo de datos general y las tareas de canalización.

Para obtener más información sobre la Biblioteca de agentes asincrónicos, consulte la Biblioteca de agentes asincrónicos.

Secciones

Este documento contiene las siguientes secciones:

Uso de agentes para aislar el estado

La biblioteca de agentes proporciona alternativas al estado compartido al permitir conectar componentes aislados a través de un mecanismo de paso de mensajes asincrónico. Los agentes asincrónicos son más eficaces cuando aíslan su estado interno de otros componentes. Al aislar el estado, varios componentes no actúan normalmente en los datos compartidos. El aislamiento del estado puede permitir escalar la aplicación porque reduce la contención en la memoria compartida. El aislamiento del estado también reduce la posibilidad de condiciones de carrera e interbloqueo porque los componentes no tienen que sincronizar el acceso a los datos compartidos.

Normalmente, para aislar el estado en un agente, se conservan los miembros de datos en las secciones private o protected de la clase de agente y se usan los búferes de mensajes para comunicar los cambios de estado. En el ejemplo siguiente se muestra la clase basic_agent, que se deriva de concurrency::agent. La clase basic_agent usa dos búferes de mensajes para comunicarse con los componentes externos. Un búfer de mensajes contiene los mensajes entrantes; el otro búfer de mensajes contiene los mensajes de salida.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }
   
   // Retrieves the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;
         
         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Para obtener ejemplos completos sobre cómo definir y usar agentes, consulte Tutorial: crear una aplicación basada en agente y Tutorial: creación de un agente de flujo de datos.

[Arriba]

Uso de un mecanismo de limitación para limitar el número de mensajes en una canalización de datos

Muchos tipos de búferes de mensajes, como concurrency::unbounded_buffer pueden contener un número ilimitado de mensajes. Cuando un productor de mensajes envía mensajes a una canalización de datos más rápidamente de lo que el consumidor puede procesarlos, la aplicación puede entrar en un estado de memoria insuficiente. Puede usar un mecanismo de limitación, por ejemplo, un semáforo, para limitar el número de mensajes que están activos en paralelo en una canalización de datos.

En el siguiente ejemplo básico se muestra cómo usar un semáforo para limitar el número de mensajes en una canalización de datos. La canalización de datos usa la función concurrency::wait para simular una operación que tarda al menos 100 milisegundos. Dado que el remitente muestra los mensajes más rápidamente de lo que el consumidor puede procesarlos, en este ejemplo se define la clase semaphore para que la aplicación pueda limitar el número de mensajes activos.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            (Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };
  
   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

El objeto semaphore limita la canalización para procesar a lo sumo dos mensajes al mismo tiempo.

El productor de este ejemplo envía relativamente pocos mensajes al consumidor. Por consiguiente, en este ejemplo no se muestra una condición potencial de memoria insuficiente. Sin embargo, este mecanismo es útil cuando una canalización de datos contiene un número de mensajes relativamente elevado.

Para más información sobre cómo crear la clase de semáforo que se usa en este ejemplo, consulte Cómo usar la clase Context para implementar un semáforo cooperativo.

[Arriba]

No realizar un trabajo específico en una canalización de datos

La biblioteca de agentes es muy útil cuando el trabajo que realiza una canalización de datos es bastante general. Por ejemplo, un componente de la aplicación podría leer los datos de un archivo o una conexión de red y enviar ocasionalmente esos datos a otro componente. El protocolo que la biblioteca de agentes usa para propagar los mensajes ocasiona que el mecanismo de paso de mensajes tenga más sobrecarga que las construcciones paralelas de tareas proporcionadas por la Biblioteca de modelos de procesamiento paralelo (PPL). Por consiguiente, asegúrese de que el trabajo que realiza una canalización de datos es suficientemente larga para desplazar esta sobrecarga.

Aunque una canalización de datos es más eficaz cuando sus tareas son generales, cada fase de la canalización de datos puede usar las construcciones de PPL como grupos de tareas y algoritmos paralelos para realizar un trabajo más específico. Para obtener un ejemplo de una red de datos de granularidad gruesa que usa el paralelismo de granularidad fina en cada fase de procesamiento, consulte Tutorial: Crear una red de procesamiento de imagen.

[Arriba]

No pasar cargas de mensajes grandes por valor

En algunos casos, el runtime crea una copia de cada mensaje que pasa de un búfer de mensajes a otro búfer de mensajes. Por ejemplo, la clase concurrency::overwrite_buffer proporciona una copia de cada mensaje que recibe a cada uno de sus destinos. El runtime también crea una copia de los datos del mensaje cuando se usan funciones de paso de mensajes como concurrency::send y concurrency::receive para escribir y leer los mensajes de un búfer de mensajes. Aunque este mecanismo ayuda a eliminar el riesgo de escribir simultáneamente en los datos compartidos, podría dar lugar a un rendimiento deficiente de memoria cuando la carga del mensaje es relativamente grande.

Puede utilizar los punteros o referencias para mejorar el rendimiento de la memoria al pasar los mensajes que tienen una carga grande. En el siguiente ejemplo se compara el paso de mensajes grandes por valor con el paso de punteros al mismo tipo de mensaje. En el ejemplo se definen dos tipos de agentes, producer y consumer, que actúan sobre los objetos message_data. En el ejemplo se compara el tiempo necesario para que el productor envíe varios objetos message_data al consumidor con el tiempo necesario para que el agente del productor envíe varios punteros a objetos message_data al consumidor.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }
       
   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }
       
   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;
      
   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Este ejemplo genera la siguiente salida de ejemplo:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

La versión que usa punteros funciona mejor porque elimina la necesidad de que el runtime cree una copia completa de cada objeto message_data que pasa del productor al consumidor.

[Arriba]

Uso de shared_ptr en una red de datos cuando la propiedad no está definida

Cuando se envían mensajes mediante un puntero a través de una canalización o una red de paso de mensajes, normalmente se asigna la memoria para cada mensaje al principio de la red y se libera esa memoria en el extremo de la red. Aunque este mecanismo funciona bien con frecuencia, hay casos en los que resulta difícil o imposible utilizarlo. Por ejemplo, considere el caso en el que la red de datos contiene varios nodos finales. En este caso, no hay ninguna ubicación clara para liberar la memoria para los mensajes.

Para solucionar este problema, puede usar un mecanismo, por ejemplo, std::shared_ptr, que habilita un puntero que será propiedad de varios componentes. Cuando se destruye el objeto final shared_ptr que posee un recurso, se libera el recurso también.

En el ejemplo siguiente se muestra cómo usar shared_ptr para compartir valores de puntero entre búferes de mensajes. En el ejemplo se conecta un objeto concurrency::overwrite_buffer con tres objetos concurrency::call. La clase overwrite_buffer proporciona mensajes a cada uno de los destinos. Dado que hay varios propietarios de los datos en el extremo de la red de datos, en este ejemplo se usa shared_ptr para permitir que cada objeto call comparta la propiedad de los mensajes.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;
      
   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Este ejemplo genera la siguiente salida de ejemplo:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Consulte también

Procedimientos recomendados del Runtime de simultaneidad
Biblioteca de agentes asincrónicos
Tutorial: Crear una aplicación basada en agente
Tutorial: Crear un agente de flujo de datos
Tutorial: Crear una red de procesamiento de imagen
Procedimientos recomendados en la biblioteca de modelos paralelos
Procedimientos recomendados generales en el Runtime de simultaneidad