Condividi tramite


Procedure consigliate nella libreria di agenti asincroni

Questo documento descrive come usare in modo efficace la libreria degli agenti asincroni. La libreria agenti promuove un modello di programmazione basato su attore e il passaggio di messaggi in-process per attività di pipelining e flusso di dati con granularità grossolana.

Per altre informazioni sulla libreria agenti, vedere Libreria degli agenti asincroni.

Sezioni

Questo documento contiene le seguenti sezioni:

Usare agenti per isolare lo stato

La libreria agenti offre alternative allo stato condiviso consentendo di connettere componenti isolati tramite un meccanismo asincrono di passaggio dei messaggi. Gli agenti asincroni sono più efficaci quando isolano lo stato interno da altri componenti. Isolando lo stato, in genere più componenti non agiscono sui dati condivisi. L'isolamento dello stato può consentire all'applicazione di ridimensionare perché riduce la contesa sulla memoria condivisa. L'isolamento dello stato riduce anche la probabilità di deadlock e race condition perché i componenti non devono sincronizzare l'accesso ai dati condivisi.

In genere si isola lo stato in un agente tenendo i membri dati nelle private sezioni o protected della classe dell'agente e usando i buffer dei messaggi per comunicare le modifiche dello stato. Nell'esempio seguente viene illustrata la basic_agent classe , che deriva da concurrency::agent. La basic_agent classe usa due buffer di messaggi per comunicare con componenti esterni. Un buffer di messaggi contiene i messaggi in arrivo; l'altro buffer di messaggi contiene messaggi in uscita.

// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }
   
   // Retrieves the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer.
         int value = concurrency::receive(_input);

         // TODO: Do something with the value.
         int result = value;
         
         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Per esempi completi su come definire e usare gli agenti, vedere Procedura dettagliata: Creazione di un'applicazione basata su agente e Procedura dettagliata: Creazione di un agente di flussi di dati.

[Torna all'inizio]

Usare un meccanismo di limitazione per limitare il numero di messaggi in una pipeline di dati

Molti tipi di buffer di messaggi, ad esempio concurrency::unbounded_buffer, possono contenere un numero illimitato di messaggi. Quando un producer di messaggi invia messaggi a una pipeline di dati più velocemente di quanto il consumer possa elaborare questi messaggi, l'applicazione può immettere uno stato di memoria insufficiente o insufficiente. È possibile usare un meccanismo di limitazione, ad esempio un semaforo, per limitare il numero di messaggi attivi simultaneamente in una pipeline di dati.

Nell'esempio di base seguente viene illustrato come usare un semaforo per limitare il numero di messaggi in una pipeline di dati. La pipeline di dati usa la funzione concurrency::wait per simulare un'operazione che richiede almeno 100 millisecondi. Poiché il mittente produce messaggi più velocemente rispetto al consumer può elaborare tali messaggi, questo esempio definisce la semaphore classe per consentire all'applicazione di limitare il numero di messaggi attivi.

// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            (Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.  
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its 
// count reaches zero.
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero.
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter.
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter.
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set.
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero.
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer.
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time.
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };
  
   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages.

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage.
      return n;
   });

   // The second stage of the pipeline simulates a 
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage.
      return n;
   });

   // The third stage of the pipeline releases the semaphore
   // and signals to the main appliation that the message has
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

L'oggetto semaphore limita la pipeline a elaborare al massimo due messaggi contemporaneamente.

Il producer in questo esempio invia relativamente pochi messaggi al consumer. Di conseguenza, questo esempio non dimostra una potenziale condizione di memoria insufficiente o insufficiente. Tuttavia, questo meccanismo è utile quando una pipeline di dati contiene un numero relativamente elevato di messaggi.

Per altre informazioni su come creare la classe semaforo usata in questo esempio, vedere Procedura: Usare la classe Context per implementare un semaforo cooperativo.

[Torna all'inizio]

Non eseguire operazioni con granularità fine in una pipeline di dati

La libreria agenti è più utile quando il lavoro eseguito da una pipeline di dati è piuttosto grossolano. Ad esempio, un componente dell'applicazione potrebbe leggere i dati da un file o da una connessione di rete e inviare occasionalmente tali dati a un altro componente. Il protocollo usato dalla libreria agenti per propagare i messaggi fa sì che il meccanismo di passaggio dei messaggi abbia un sovraccarico maggiore rispetto ai costrutti paralleli dell'attività forniti dalla libreria PPL (Parallel Patterns Library ). Assicurarsi pertanto che il lavoro eseguito da una pipeline di dati sia sufficientemente lungo da compensare questo sovraccarico.

Anche se una pipeline di dati è più efficace quando le attività sono con granularità grossolana, ogni fase della pipeline di dati può usare costrutti PPL, ad esempio gruppi di attività e algoritmi paralleli per eseguire operazioni più granulari. Per un esempio di una rete di dati con granularità grossolana che usa parallelismo con granularità fine in ogni fase di elaborazione, vedere Procedura dettagliata: Creazione di una rete di elaborazione immagini.

[Torna all'inizio]

Non passare payload di messaggi di grandi dimensioni per valore

In alcuni casi, il runtime crea una copia di ogni messaggio che passa da un buffer di messaggi a un altro buffer di messaggi. Ad esempio, la classe concurrency::overwrite_buffer offre una copia di ogni messaggio ricevuto in ognuna delle destinazioni. Il runtime crea anche una copia dei dati del messaggio quando si usano funzioni di passaggio dei messaggi, ad esempio concurrency::send e concurrency::receive per scrivere e leggere messaggi da un buffer di messaggi. Anche se questo meccanismo consente di eliminare il rischio di scrivere simultaneamente in dati condivisi, potrebbe causare prestazioni di memoria scarse quando il payload del messaggio è relativamente grande.

È possibile usare puntatori o riferimenti per migliorare le prestazioni di memoria quando si passano messaggi con un payload di grandi dimensioni. Nell'esempio seguente viene confrontato il passaggio di messaggi di grandi dimensioni per valore al passaggio di puntatori allo stesso tipo di messaggio. Nell'esempio vengono definiti due tipi di agente e producerconsumer, che agiscono sugli message_data oggetti . Nell'esempio viene confrontato il tempo necessario per consentire al producer di inviare diversi message_data oggetti al consumer al tempo necessario affinché l'agente producer invii diversi puntatori agli message_data oggetti al consumer.

// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data.
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer.
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }
   
   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive.
   unsigned int _message_count;
};

// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ...
   }
       
   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer.
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.
      // ...

      // Release the memory for the message.
      delete message;     
   }
       
   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send.
   const unsigned int count = 10000;
      
   __int64 elapsed;

   // Run the producer and consumer agents.
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time.
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Questo esempio produce l'output di esempio seguente:

Using message_data...
took 437ms.
Using message_data*...
took 47ms.

La versione che usa puntatori offre prestazioni migliori perché elimina il requisito per il runtime di creare una copia completa di ogni message_data oggetto passato dal producer al consumer.

[Torna all'inizio]

Usare shared_ptr in una rete dati quando la proprietà non è definita

Quando si inviano messaggi tramite puntatore tramite una pipeline o una rete che passa messaggi, in genere si alloca la memoria per ogni messaggio nella parte anteriore della rete e libera tale memoria alla fine della rete. Anche se questo meccanismo funziona spesso bene, ci sono casi in cui è difficile o non è possibile usarlo. Si consideri ad esempio il caso in cui la rete dati contiene più nodi finali. In questo caso, non esiste una posizione chiara per liberare la memoria per i messaggi.

Per risolvere questo problema, è possibile usare un meccanismo, ad esempio std::shared_ptr, che consente a un puntatore di essere di proprietà di più componenti. Quando l'oggetto finale shared_ptr proprietario di una risorsa viene eliminato definitivamente, viene liberata anche la risorsa.

Nell'esempio seguente viene illustrato come usare shared_ptr per condividere i valori del puntatore tra più buffer di messaggi. L'esempio connette un oggetto concurrency::overwrite_buffer a tre oggetti concurrency::call . La overwrite_buffer classe offre messaggi a ognuna delle destinazioni. Poiché alla fine della rete dati sono presenti più proprietari dei dati, questo esempio usa shared_ptr per consentire a ogni call oggetto di condividere la proprietà dei messaggi.

// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource.
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource.
   int id() const { return _id; }

   // TODO: Add additional members here.
private:
   // An identifier for the resource.
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;
      
   // Create three call objects that each receive resource objects
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to 
   // receive its message.
   e.wait();
}

Questo esempio produce l'output di esempio seguente:

Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...

Vedi anche

Procedure consigliate del runtime di concorrenza
Libreria di agenti asincroni
Procedura dettagliata: creazione di un'applicazione basata sugli agenti
Procedura dettagliata: creazione di un agente del flusso di dati
Procedura dettagliata: creazione di una rete per l'elaborazione di immagini
Procedure consigliate nella libreria PPL (Parallel Patterns Library)
Procedure consigliate generali nel runtime di concorrenza