Partager via


Comment : utiliser le surabonnement pour compenser la latence

La sursubscription peut améliorer l’efficacité globale de certaines applications qui contiennent des tâches qui ont une grande latence. Cette rubrique explique comment utiliser une sursubscription pour compenser la latence provoquée par la lecture de données à partir d’une connexion réseau.

Exemple

Cet exemple utilise la bibliothèque d’agents asynchrones pour télécharger des fichiers à partir de serveurs HTTP. La http_reader classe dérive de concurrency ::agent et utilise le passage de message pour lire de manière asynchrone les noms d’URL à télécharger.

La http_reader classe utilise la classe concurrency ::task_group pour lire simultanément chaque fichier. Chaque tâche appelle la méthode concurrency ::Context ::Oversubscribe avec le _BeginOversubscription paramètre défini pour true activer la sursubscription dans le contexte actuel. Chaque tâche utilise ensuite les classes CInternetSession et CHttpFile microsoft Foundation Classes (MFC) pour télécharger le fichier. Enfin, chaque tâche appelle Context::Oversubscribe avec le _BeginOversubscription paramètre défini pour false désactiver la sursubscription.

Lorsque le sursubscription est activé, le runtime crée un thread supplémentaire dans lequel exécuter des tâches. Chacun de ces threads peut également sursubcrire le contexte actuel et ainsi créer des threads supplémentaires. La http_reader classe utilise un objet concurrency ::unbounded_buffer pour limiter le nombre de threads que l’application utilise. L’agent initialise la mémoire tampon avec un nombre fixe de valeurs de jeton. Pour chaque opération de téléchargement, l’agent lit une valeur de jeton à partir de la mémoire tampon avant le démarrage de l’opération, puis écrit cette valeur dans la mémoire tampon une fois l’opération terminée. Lorsque la mémoire tampon est vide, l’agent attend l’une des opérations de téléchargement pour réécrire une valeur dans la mémoire tampon.

L’exemple suivant limite le nombre de tâches simultanées à deux fois le nombre de threads matériels disponibles. Cette valeur est un bon point de départ à utiliser lorsque vous expérimentez avec oversubscription. Vous pouvez utiliser une valeur qui correspond à un environnement de traitement particulier ou modifier dynamiquement cette valeur pour répondre à la charge de travail réelle.

// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);

// Reads files from HTTP servers.
class http_reader : public agent
{
public:
   explicit http_reader(CInternetSession& session,      
      ISource<string>& source,
      unsigned int& total_bytes,
      unsigned int max_concurrent_reads)
      : _session(session)
      , _source(source)
      , _total_bytes(total_bytes)
   {
      // Add one token to the available tasks buffer for each 
      // possible concurrent read operation. The value of each token 
      // is not important, but can be useful for debugging.
      for (unsigned int i = 0; i < max_concurrent_reads; ++i)
         send(_available_tasks, i);
   }

   // Signals to the agent that there are no more items to download.
   static const string input_sentinel;
 
protected:
   void run()
   {
      // A task group. Each task in the group downloads one file.
      task_group tasks;

      // Holds the total number of bytes downloaded.
      combinable<unsigned int> total_bytes;

      // Read from the source buffer until the application 
      // sends the sentinel value.
      string url;
      while ((url = receive(_source)) != input_sentinel)
      {
         // Wait for a task to release an available slot.
         unsigned int token = receive(_available_tasks);

         // Create a task to download the file.
         tasks.run([&, token, url] {

            // Print a message.
            wstringstream ss;
            ss << L"Downloading " << url.c_str() << L"..." << endl;
            wcout << ss.str();

            // Download the file.
            string content = download(url);

            // Update the total number of bytes downloaded.
            total_bytes.local() += content.size();

            // Release the slot for another task.
            send(_available_tasks, token);
         });
      }

      // Wait for all tasks to finish.
      tasks.wait();
      
      // Compute the total number of bytes download on all threads.
      _total_bytes = total_bytes.combine(plus<unsigned int>());

      // Set the status of the agent to agent_done.
      done();
   }

   // Downloads the file at the given URL.
   string download(const string& url)
   {
      // Enable oversubscription.
      Context::Oversubscribe(true);

      // Download the file.
      string content = GetHttpFile(_session, url.c_str());
      
      // Disable oversubscription.
      Context::Oversubscribe(false);

      return content;
   }

private:
   // Manages the network connection.
   CInternetSession& _session;
   // A message buffer that holds the URL names to download.
   ISource<string>& _source;
   // The total number of bytes downloaded
   unsigned int& _total_bytes;
   // Limits the agent to a given number of simultaneous tasks.
   unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");

int wmain()
{
   // Create an array of URL names to download.
   // A real-world application might read the names from user input.
   array<string, 21> urls = {
      "http://www.adatum.com/",
      "http://www.adventure-works.com/", 
      "http://www.alpineskihouse.com/",
      "http://www.cpandl.com/", 
      "http://www.cohovineyard.com/",
      "http://www.cohowinery.com/",
      "http://www.cohovineyardandwinery.com/", 
      "http://www.contoso.com/",
      "http://www.consolidatedmessenger.com/",
      "http://www.fabrikam.com/", 
      "http://www.fourthcoffee.com/",
      "http://www.graphicdesigninstitute.com/",
      "http://www.humongousinsurance.com/",
      "http://www.litwareinc.com/",
      "http://www.lucernepublishing.com/",
      "http://www.margiestravel.com/",
      "http://www.northwindtraders.com/",
      "http://www.proseware.com/", 
      "http://www.fineartschool.net",
      "http://www.tailspintoys.com/",
      http_reader::input_sentinel,
   };
      
   // Manages the network connection.
   CInternetSession session("Microsoft Internet Browser");

   // A message buffer that enables the application to send URL names to the 
   // agent.
   unbounded_buffer<string> source_urls;

   // The total number of bytes that the agent has downloaded.
   unsigned int total_bytes = 0u;

   // Create an http_reader object that can oversubscribe each processor by one.
   http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());

   // Compute the amount of time that it takes for the agent to download all files.
   __int64 elapsed = time_call([&] {
      
      // Start the agent.
      reader.start();
      
      // Use the message buffer to send each URL name to the agent.
      for_each(begin(urls), end(urls), [&](const string& url) {
         send(source_urls, url);
      });

      // Wait for the agent to finish downloading.
      agent::wait(&reader);      
   });

   // Print the results.
   wcout << L"Downloaded " << total_bytes
         << L" bytes in " << elapsed << " ms." << endl;
}

// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
   CString strResult;

   // Reads data from an HTTP server.
   CHttpFile* pHttpFile = NULL;

   try
   {
      // Open URL.
      pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1, 
         INTERNET_FLAG_TRANSFER_ASCII | 
         INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);

      // Read the file.
      if(pHttpFile != NULL)
      {           
         UINT uiBytesRead;
         do
         {
            char chBuffer[10000];
            uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
            strResult += chBuffer;
         }
         while (uiBytesRead > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return.
   delete pHttpFile;

   return strResult;
}

Cet exemple produit la sortie suivante sur un ordinateur qui a quatre processeurs :

Downloading http://www.adatum.com/...
Downloading http://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading http://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading http://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading http://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.

L’exemple peut s’exécuter plus rapidement lorsque la sursubscription est activée, car des tâches supplémentaires s’exécutent pendant que d’autres tâches attendent la fin d’une opération latente.

Compilation du code

Copiez l’exemple de code et collez-le dans un projet Visual Studio, ou collez-le dans un fichier nommé download-oversubscription.cpp , puis exécutez l’une des commandes suivantes dans une fenêtre d’invite de commandes Visual Studio.

cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp

Programmation fiable

Désactivez toujours le sursubscription une fois que vous n’en avez plus besoin. Considérez une fonction qui ne gère pas une exception levée par une autre fonction. Si vous ne désactivez pas la sursubscription avant que la fonction ne retourne, tout travail parallèle supplémentaire sursubcrira également le contexte actuel.

Vous pouvez utiliser le modèle d’initialisation d’acquisition de ressources (RAII) pour limiter la sursubscription à une étendue donnée. Sous le modèle RAII, une structure de données est allouée sur la pile. Cette structure de données initialise ou acquiert une ressource lorsqu’elle est créée et détruit ou libère cette ressource lorsque la structure de données est détruite. Le modèle RAII garantit que le destructeur est appelé avant la sortie de l’étendue englobante. Par conséquent, la ressource est correctement gérée lorsqu’une exception est levée ou lorsqu’une fonction contient plusieurs return instructions.

L’exemple suivant définit une structure nommée scoped_blocking_signal. Le constructeur de la scoped_blocking_signal structure active le sursubscription et le destructeur désactive la sursubscription.

struct scoped_blocking_signal
{
    scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(true);
    }
    ~scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(false);
    }
};

L’exemple suivant modifie le corps de la download méthode pour utiliser RAII pour vous assurer que la surscription est désactivée avant que la fonction ne retourne. Cette technique garantit que la download méthode est sans risque d’exception.

// Downloads the file at the given URL.
string download(const string& url)
{
   scoped_blocking_signal signal;

   // Download the file.
   return string(GetHttpFile(_session, url.c_str()));
}

Voir aussi

Contextes
Context ::Oversubscribe, méthode