Partager via


Meilleures pratiques de la Bibliothèque d'agents asynchrones

Ce document décrit comment tirer le meilleur parti de la bibliothèque d'agents asynchrones. La bibliothèque d'agents fournit un modèle de programmation basé sur acteur et un passage de messages in-process pour les tâches de traitement « pipeline » et de flux de données de granularité grossière.

Pour plus d'informations sur la Bibliothèque d'agents, consultez Bibliothèque d'agents asynchrones.

Sections

Ce document contient les sections suivantes :

  • Utiliser des agents pour isoler l'état

  • Utiliser un mécanisme de limitation pour restreindre le nombre de messages dans un pipeline de données

  • Ne pas exécuter de travail de granularité fine dans un pipeline de données

  • Ne pas passer de charges de messages volumineux par valeur

  • Utiliser shared_ptr dans un réseau de données lorsque la propriété n'est pas définie

Utiliser des agents pour isoler l'état

La bibliothèque d'agents fournit des alternatives à l'état partagé en vous permettant de connecter des composants isolés via un mécanisme asynchrone de passage de message. Les agents asynchrones sont les plus efficaces lorsqu'il s'agit d'isoler leur état interne d'autres composants. Lorsque l'état est isolé, plusieurs composants ne traitent généralement pas les données partagées. L'isolation d'état peut permettre à votre application d'être mise à l'échelle, car elle réduit les conflits sur la mémoire partagée. L'isolation d'état réduit également le risque d'interblocage et les conditions de concurrence, étant donné que les composants ne doivent pas synchroniser l'accès aux données partagées.

En général, vous isolez l'état dans un agent en maintenant les membres de données dans les sections private ou protected de la classe d'agent et en utilisant des mémoires tampons de messages pour communiquer les changements d'état. L'exemple suivant montre la classe basic_agent, qui dérive de concurrency::agent. La classe basic_agent utilise deux mémoires tampons de messages pour communiquer avec les composants externes. Une mémoire tampon de messages contient les messages entrants. L'autre mémoire tampon de messages contient les messages sortants.

// basic-agent.cpp 
// compile with: /c /EHsc
#include <agents.h>

// An agent that uses message buffers to isolate state and communicate 
// with other components. 
class basic_agent : public concurrency::agent
{
public:
   basic_agent(concurrency::unbounded_buffer<int>& input)
      : _input(input)
   {
   }

   // Retrives the message buffer that holds output messages.
   concurrency::unbounded_buffer<int>& output()
   {
      return _output;
   }

protected:
   void run()
   {
      while (true)
      {
         // Read from the input message buffer. 
         int value = concurrency::receive(_input);

         // TODO: Do something with the value. 
         int result = value;

         // Write the result to the output message buffer.
         concurrency::send(_output, result);
      }
      done();
   }

private:
   // Holds incoming messages.
   concurrency::unbounded_buffer<int>& _input;
   // Holds outgoing messages.
   concurrency::unbounded_buffer<int> _output;
};

Pour obtenir des exemples sur la définition et l'utilisation des agents, consultez Procédure pas à pas : création d'une application basée sur un agent et Procédure pas à pas : création des agents de flux de données.

[Premières]

Utiliser un mécanisme de limitation pour restreindre le nombre de messages dans un pipeline de données

De nombreux types de mémoires tampons de messages, tel que concurrency::unbounded_buffer, peuvent contenir un nombre illimité de messages. Lorsqu'un producteur de message envoie des messages à un pipeline de données en moins de temps qu'il n'en faut au consommateur pour les traiter, la mémoire de l'application peut s'avérer insuffisante ou faible. Vous pouvez utiliser un mécanisme de limitation, par exemple, un sémaphore, pour restreindre le nombre de messages actifs simultanément dans un pipeline de données.

L'exemple de base suivant montre comment utiliser un sémaphore pour restreindre le nombre de messages dans un pipeline de données. Le pipeline de données utilise la fonction concurrency::wait pour simuler une opération qui dure au moins 100 millisecondes. Étant donné que l'expéditeur produit des messages en moins de temps qu'il n'en faut au consommateur pour les traiter, cet exemple définit la classe semaphore pour permettre à l'application de restreindre le nombre de messages actifs.

// message-throttling.cpp 
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics. 
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore. 
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count  
      // falls below zero. When this happens, add the current context to the  
      // back of the wait queue and block the current context. 
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore. 
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context. 
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not 
         // yet finished adding the context to the queue.  
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to  
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

// A synchronization primitive that is signaled when its  
// count reaches zero. 
class countdown_event
{
public:
   countdown_event(long long count)
       : _current(count) 
    {
       // Set the event if the initial count is zero. 
       if (_current == 0LL)
          _event.set();
    }

    // Decrements the event counter. 
    void signal() {
       if(--_current == 0LL) {
          _event.set();
       }
    }

    // Increments the event counter. 
    void add_count() {
       if(++_current == 1LL) {
          _event.reset();
       }
    }

    // Blocks the current context until the event is set. 
    void wait() {
       _event.wait();
    }

private:
   // The current count.
   atomic<long long> _current;
   // The event that is set when the counter reaches zero. 
   event _event;

   // Disable copy constructor.
   countdown_event(const countdown_event&);
   // Disable assignment.
   countdown_event const & operator=(countdown_event const&);
};

int wmain()
{
   // The number of messages to send to the consumer. 
   const long long MessageCount = 5;

   // The number of messages that can be active at the same time. 
   const long long ActiveMessages = 2;

   // Used to compute the elapsed time.
   DWORD start_time;

   // Computes the elapsed time, rounded-down to the nearest 
   // 100 milliseconds.
   auto elapsed = [&start_time] {
      return (GetTickCount() - start_time)/100*100;
   };

   // Limits the number of active messages.
   semaphore s(ActiveMessages);

   // Enables the consumer message buffer to coordinate completion 
   // with the main application.
   countdown_event e(MessageCount);

   // Create a data pipeline that has three stages. 

   // The first stage of the pipeline prints a message.
   transformer<int, int> print_message([&elapsed](int n) -> int {
      wstringstream ss;
      ss << elapsed() << L": received " << n << endl;
      wcout << ss.str();

      // Send the input to the next pipeline stage. 
      return n;
   });

   // The second stage of the pipeline simulates a  
   // time-consuming operation.
   transformer<int, int> long_operation([](int n) -> int {
      wait(100);

      // Send the input to the next pipeline stage. 
      return n;
   });

   // The third stage of the pipeline releases the semaphore 
   // and signals to the main appliation that the message has 
   // been processed.
   call<int> release_and_signal([&](int unused) {
      // Enable the sender to send the next message.
      s.release();

      // Signal that the message has been processed.
      e.signal();
   });

   // Connect the pipeline.
   print_message.link_target(&long_operation);
   long_operation.link_target(&release_and_signal);

   // Send several messages to the pipeline.
   start_time = GetTickCount();
   for(auto i = 0; i < MessageCount; ++i)
   {
      // Acquire access to the semaphore.
      s.acquire();

      // Print the message to the console.
      wstringstream ss;
      ss << elapsed() << L": sending " << i << L"..." << endl;
      wcout << ss.str();

      // Send the message.
      send(print_message, i);
   }

   // Wait for the consumer to process all messages.
   e.wait();
}
/* Sample output:
    0: sending 0...
    0: received 0
    0: sending 1...
    0: received 1
    100: sending 2...
    100: received 2
    200: sending 3...
    200: received 3
    300: sending 4...
    300: received 4
*/

L'objet semaphore restreint le pipeline à traiter à deux messages simultanés maximum.

Dans cet exemple, le producteur envoie relativement peu de messages au consommateur. Par conséquent, cet exemple ne présente pas une mémoire faible ou insuffisante. Toutefois, ce mécanisme est utile lorsqu'un pipeline de données contient un nombre de messages relativement élevé.

Pour plus d'informations sur la création de la classe de sémaphore utilisée dans cet exemple, consultez Comment : utiliser la classe Context pour implémenter un sémaphore coopératif.

[Premières]

Ne pas exécuter de travail de granularité fine dans un pipeline de données

La bibliothèque d'agents est très utile lorsque la granularité du travail exécuté par un pipeline de données est plutôt grossière. Par exemple, un composant d'application peut lire des données à partir d'un fichier ou d'une connexion réseau et envoyer de temps en temps cet élément vers un autre composant. Le protocole utilisé par la bibliothèque d'agents pour propager les messages provoque une surcharge du mécanisme de passage de message plus importante que celle des éléments de tâche parallèles fournis par la bibliothèque de modèles parallèles. Par conséquent, vous devez vous assurer que le travail effectué par un pipeline de données dure assez longtemps pour compenser cette surcharge.

Bien qu'un pipeline de données soit plus efficace lorsque la granularité de ses tâches est grossière, chaque étape du pipeline de données peut utiliser des éléments de la bibliothèque de modèles parallèles, tels que les groupes de tâches et les algorithmes parallèles, pour effectuer un travail de granularité plus fine. Pour obtenir un exemple de réseau de données de granularité grossière qui utilise un parallélisme de granularité fine à chaque étape de traitement, consultez Procédure pas à pas : création d'un réseau de traitement d'image.

[Premières]

Ne pas passer de charges de messages volumineux par valeur

Dans certains cas, le runtime crée une copie de chaque message qu'il passe d'une mémoire tampon de messages à une autre mémoire tampon de messages. Par exemple, la classe concurrency::overwrite_buffer offre une copie de chaque message qu'elle reçoit à chacune de ses cibles. Le runtime crée également une copie des données du message lorsque vous utilisez des fonctions de passage de messages, telles que concurrency::send et concurrency::receive pour écrire des messages dans un tampon de message et pour lire des messages provenant d'une mémoire tampon de messages. Bien que ce mécanisme permette d'éliminer le risque d'écrire simultanément sur des données partagées, il peut détériorer les performances de la mémoire lorsque la charge de message est relativement importante.

Vous pouvez utiliser des pointeurs ou des références pour améliorer les performances de la mémoire lorsque vous passez des messages dont la charge est importante. L'exemple suivant compare le passage par valeur de messages importants au passage de pointeurs vers le même type de message. L'exemple définit deux types d'agent, producer et consumer, qui agissent sur les objets message_data. L'exemple compare le temps nécessaire pour que le producteur envoie plusieurs objets message_data au consommateur au temps nécessaire pour que l'agent producteur envoie plusieurs pointeurs vers des objets message_data au consommateur.

// message-payloads.cpp 
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds  
// that it takes to call that function. 
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// A message structure that contains large payload data. 
struct message_data
{
   int id;
   string source;
   unsigned char binary_data[32768];
};

// A basic agent that produces values. 
template <typename T>
class producer : public agent
{
public:
   explicit producer(ITarget<T>& target, unsigned int message_count)
      : _target(target)
      , _message_count(message_count)
   {
   }
protected:
   void run();

private:
   // The target buffer to write to.
   ITarget<T>& _target;
   // The number of messages to send. 
   unsigned int _message_count;
};

// Template specialization for message_data. 
template <>
void producer<message_data>::run()
{
   // Send a number of messages to the target buffer. 
   while (_message_count > 0)
   {
      message_data message;
      message.id = _message_count;
      message.source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// Template specialization for message_data*. 
template <>
void producer<message_data*>::run()
{
   // Send a number of messages to the target buffer. 
   while (_message_count > 0)
   {
      message_data* message = new message_data;
      message->id = _message_count;
      message->source = "Application";

      send(_target, message);
      --_message_count;
   }

   // Set the agent to the finished state.
   done();
}

// A basic agent that consumes values. 
template <typename T>
class consumer : public agent
{
public:
   explicit consumer(ISource<T>& source, unsigned int message_count)
      : _source(source)
      , _message_count(message_count)
   {
   }

protected:
   void run();

private:
   // The source buffer to read from.
   ISource<T>& _source;
   // The number of messages to receive. 
   unsigned int _message_count;
};

// Template specialization for message_data. 
template <>
void consumer<message_data>::run()
{
   // Receive a number of messages from the source buffer. 
   while (_message_count > 0)
   {
      message_data message = receive(_source);
      --_message_count;

      // TODO: Do something with the message.  
      // ...
   }

   // Set the agent to the finished state.
   done();
}

template <>
void consumer<message_data*>::run()
{
   // Receive a number of messages from the source buffer. 
   while (_message_count > 0)
   {
      message_data* message = receive(_source);
      --_message_count;

      // TODO: Do something with the message. 
      // ... 

      // Release the memory for the message. 
      delete message;     
   }

   // Set the agent to the finished state.
   done();
}

int wmain()
{
   // The number of values for the producer agent to send. 
   const unsigned int count = 10000;

   __int64 elapsed;

   // Run the producer and consumer agents. 
   // This version uses message_data as the message payload type.

   wcout << L"Using message_data..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data> prod(buffer, count);
      consumer<message_data> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;

   // Run the producer and consumer agents a second time. 
   // This version uses message_data* as the message payload type.

   wcout << L"Using message_data*..." << endl;
   elapsed = time_call([count] {
      // A message buffer that is shared by the agents.
      unbounded_buffer<message_data*> buffer;

      // Create and start the producer and consumer agents.
      producer<message_data*> prod(buffer, count);
      consumer<message_data*> cons(buffer, count);
      prod.start();
      cons.start();

      // Wait for the agents to finish.
      agent::wait(&prod);
      agent::wait(&cons);
   });
   wcout << L"took " << elapsed << L"ms." << endl;
}

Cet exemple génère l'exemple de sortie suivant :

  

La version qui utilise des pointeurs est plus performante, car elle n'exige pas que le runtime crée une copie complète de chaque objet message_data passé du producteur au consommateur.

[Premières]

Utiliser shared_ptr dans un réseau de données lorsque la propriété n'est pas définie

Lorsque vous envoyez un message par le pointeur via un pipeline ou un réseau de passage de message, vous allouez en général de la mémoire pour chaque message à l'avant du réseau et vous libérez de la mémoire à l'arrière du réseau. Bien que ce mécanisme fonctionne habituellement correctement, il est difficile ou impossible de l'utiliser dans certains cas. Envisagez le cas où le réseau de données contient plusieurs nœuds de fin. Dans ce cas, il n'existe aucun emplacement évident pour libérer de la mémoire pour les messages.

Pour résoudre ce problème, vous pouvez utiliser un mécanisme, par exemple, std::shared_ptr, qui permet à un pointeur d'être partagé par plusieurs composants. Lorsque l'objet final shared_ptr qui possède une ressource est détruit, la ressource est également libérée.

L'exemple suivant illustre l'utilisation de shared_ptr pour partager des valeurs de pointeur entre plusieurs mémoires tampons de messages. L'exemple connecte un objet concurrency::overwrite_buffer à trois objets concurrency::call. La classe overwrite_buffer offre des messages à chacune de ses cibles. Étant donné qu'il existe plusieurs propriétaires pour les données à l'arrière du réseau de données, cet exemple utilise shared_ptr pour permettre à chaque objet call de partager la propriété des messages.

// message-sharing.cpp 
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A type that holds a resource. 
class resource
{
public:
   resource(int id) : _id(id)
   { 
      wcout << L"Creating resource " << _id << L"..." << endl;
   }
   ~resource()
   { 
      wcout << L"Destroying resource " << _id << L"..." << endl;
   }

   // Retrieves the identifier for the resource. 
   int id() const { return _id; }

   // TODO: Add additional members here. 
private:
   // An identifier for the resource. 
   int _id;

   // TODO: Add additional members here.
};

int wmain()
{   
   // A message buffer that sends messages to each of its targets.
   overwrite_buffer<shared_ptr<resource>> input;

   // Create three call objects that each receive resource objects 
   // from the input message buffer.

   call<shared_ptr<resource>> receiver1(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver1: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   call<shared_ptr<resource>> receiver2(
      [](shared_ptr<resource> res) {
         wstringstream ss;
         ss << L"receiver2: received resource " << res->id() << endl;
         wcout << ss.str();
      },
      [](shared_ptr<resource> res) { 
         return res != nullptr; 
      }
   );

   event e;
   call<shared_ptr<resource>> receiver3(
      [&e](shared_ptr<resource> res) {
         e.set();
      },
      [](shared_ptr<resource> res) { 
         return res == nullptr; 
      }
   );

   // Connect the call objects to the input message buffer.
   input.link_target(&receiver1);
   input.link_target(&receiver2);
   input.link_target(&receiver3);

   // Send a few messages through the network.
   send(input, make_shared<resource>(42));
   send(input, make_shared<resource>(64));
   send(input, shared_ptr<resource>(nullptr));

   // Wait for the receiver that accepts the nullptr value to  
   // receive its message.
   e.wait();
}

Cet exemple génère l'exemple de sortie suivant :

  

Voir aussi

Tâches

Procédure pas à pas : création d'une application basée sur un agent

Procédure pas à pas : création des agents de flux de données

Procédure pas à pas : création d'un réseau de traitement d'image

Concepts

Bibliothèque d'agents asynchrones

Meilleures pratiques de la Bibliothèque de modèles parallèles

Meilleures pratiques en général du runtime d'accès concurrentiel

Autres ressources

Meilleures pratiques sur le runtime d'accès concurrentiel