Compartir vía


Cómo: Usar la suscripción excesiva para compensar la latencia

La suscripción excesiva puede mejorar la eficacia general de algunas aplicaciones que contienen tareas con una latencia elevada. En este tema se muestra cómo utilizar la suscripción excesiva para compensar la latencia que se produce al leer datos de una conexión de red.

Ejemplo

En este ejemplo se utiliza la Biblioteca de Agentes Asincrónicos para descargar los archivos de los servidores HTTP. La clase http_reader se deriva de concurrency::agent y usa el paso de mensajes para leer de forma asincrónica qué nombres de direcciones URL se deben descargar.

La clase http_reader usa la clase concurrency::task_group para leer cada archivo simultáneamente. Cada tarea llama al método concurrency::Context::Oversubscribe con el parámetro _BeginOversubscription establecido en true para habilitar la sobresuscripción en el contexto actual. A continuación, cada tarea utiliza las clases CInternetSession y CHttpFile de Microsoft Foundation Classes (MFC) para descargar el archivo. Finalmente, cada tarea llama a Context::Oversubscribe con el parámetro _BeginOversubscription establecido en false para deshabilitar la suscripción excesiva.

Cuando la suscripción excesiva está habilitada, el runtime crea un subproceso adicional en el que ejecutar las tareas. Cada uno de estos subprocesos también puede suscribir en exceso el contexto actual y crear de este modo subprocesos adicionales. La clase http_reader usa un objeto concurrency::unbounded_buffer para limitar el número de subprocesos que usa la aplicación. El agente inicializa el búfer con un número fijo de valores de token. Para cada operación de descarga, el agente lee un valor de token del búfer antes de que se inicie la operación y, a continuación, escribe el valora de vuelta en el búfer cuando la operación finaliza. Cuando el búfer está vacío, el agente espera a que una de las operaciones de la descarga vuelva a escribir un valor en el búfer.

En el siguiente ejemplo se limita el número de tareas simultáneas a dos veces el número de subprocesos de hardware disponibles. Este valor es un buen punto de partida cuando se experimenta con la suscripción excesiva. Puede utilizar un valor que ajusta un entorno del procesamiento determinado o cambiar dinámicamente este valor para responder a la carga de trabajo.

// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);

// Reads files from HTTP servers.
class http_reader : public agent
{
public:
   explicit http_reader(CInternetSession& session,      
      ISource<string>& source,
      unsigned int& total_bytes,
      unsigned int max_concurrent_reads)
      : _session(session)
      , _source(source)
      , _total_bytes(total_bytes)
   {
      // Add one token to the available tasks buffer for each 
      // possible concurrent read operation. The value of each token 
      // is not important, but can be useful for debugging.
      for (unsigned int i = 0; i < max_concurrent_reads; ++i)
         send(_available_tasks, i);
   }

   // Signals to the agent that there are no more items to download.
   static const string input_sentinel;
 
protected:
   void run()
   {
      // A task group. Each task in the group downloads one file.
      task_group tasks;

      // Holds the total number of bytes downloaded.
      combinable<unsigned int> total_bytes;

      // Read from the source buffer until the application 
      // sends the sentinel value.
      string url;
      while ((url = receive(_source)) != input_sentinel)
      {
         // Wait for a task to release an available slot.
         unsigned int token = receive(_available_tasks);

         // Create a task to download the file.
         tasks.run([&, token, url] {

            // Print a message.
            wstringstream ss;
            ss << L"Downloading " << url.c_str() << L"..." << endl;
            wcout << ss.str();

            // Download the file.
            string content = download(url);

            // Update the total number of bytes downloaded.
            total_bytes.local() += content.size();

            // Release the slot for another task.
            send(_available_tasks, token);
         });
      }

      // Wait for all tasks to finish.
      tasks.wait();
      
      // Compute the total number of bytes download on all threads.
      _total_bytes = total_bytes.combine(plus<unsigned int>());

      // Set the status of the agent to agent_done.
      done();
   }

   // Downloads the file at the given URL.
   string download(const string& url)
   {
      // Enable oversubscription.
      Context::Oversubscribe(true);

      // Download the file.
      string content = GetHttpFile(_session, url.c_str());
      
      // Disable oversubscription.
      Context::Oversubscribe(false);

      return content;
   }

private:
   // Manages the network connection.
   CInternetSession& _session;
   // A message buffer that holds the URL names to download.
   ISource<string>& _source;
   // The total number of bytes downloaded
   unsigned int& _total_bytes;
   // Limits the agent to a given number of simultaneous tasks.
   unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");

int wmain()
{
   // Create an array of URL names to download.
   // A real-world application might read the names from user input.
   array<string, 21> urls = {
      "http://www.adatum.com/",
      "http://www.adventure-works.com/", 
      "http://www.alpineskihouse.com/",
      "http://www.cpandl.com/", 
      "http://www.cohovineyard.com/",
      "http://www.cohowinery.com/",
      "http://www.cohovineyardandwinery.com/", 
      "http://www.contoso.com/",
      "http://www.consolidatedmessenger.com/",
      "http://www.fabrikam.com/", 
      "http://www.fourthcoffee.com/",
      "http://www.graphicdesigninstitute.com/",
      "http://www.humongousinsurance.com/",
      "http://www.litwareinc.com/",
      "http://www.lucernepublishing.com/",
      "http://www.margiestravel.com/",
      "http://www.northwindtraders.com/",
      "http://www.proseware.com/", 
      "http://www.fineartschool.net",
      "http://www.tailspintoys.com/",
      http_reader::input_sentinel,
   };
      
   // Manages the network connection.
   CInternetSession session("Microsoft Internet Browser");

   // A message buffer that enables the application to send URL names to the 
   // agent.
   unbounded_buffer<string> source_urls;

   // The total number of bytes that the agent has downloaded.
   unsigned int total_bytes = 0u;

   // Create an http_reader object that can oversubscribe each processor by one.
   http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());

   // Compute the amount of time that it takes for the agent to download all files.
   __int64 elapsed = time_call([&] {
      
      // Start the agent.
      reader.start();
      
      // Use the message buffer to send each URL name to the agent.
      for_each(begin(urls), end(urls), [&](const string& url) {
         send(source_urls, url);
      });

      // Wait for the agent to finish downloading.
      agent::wait(&reader);      
   });

   // Print the results.
   wcout << L"Downloaded " << total_bytes
         << L" bytes in " << elapsed << " ms." << endl;
}

// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
   CString strResult;

   // Reads data from an HTTP server.
   CHttpFile* pHttpFile = NULL;

   try
   {
      // Open URL.
      pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1, 
         INTERNET_FLAG_TRANSFER_ASCII | 
         INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);

      // Read the file.
      if(pHttpFile != NULL)
      {           
         UINT uiBytesRead;
         do
         {
            char chBuffer[10000];
            uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
            strResult += chBuffer;
         }
         while (uiBytesRead > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return.
   delete pHttpFile;

   return strResult;
}

En este ejemplo se genera el siguiente resultado en un equipo que tiene cuatro procesadores:

Downloading http://www.adatum.com/...
Downloading http://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading http://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading http://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading http://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.

El ejemplo se puede ejecutar más rápidamente cuando la suscripción excesiva está habilitada porque las tareas adicionales se ejecutan mientras otras esperan hasta que una operación latente finalice.

Compilar el código

Copie el código de ejemplo y péguelo en un proyecto de Visual Studio o en un archivo denominado download-oversubscription.cpp y, después, ejecute el siguiente comando en una ventana del símbolo del sistema de Visual Studio.

cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp

Programación sólida

Deshabilite siempre la suscripción excesiva una vez que no la necesite. Considere una función que no controla una excepción que produce otra función. Si no deshabilita la suscripción excesiva antes de que la función vuelva, cualquier trabajo paralelo adicional también suscribirá en exceso el contexto actual.

Puede utilizar el modelo Resource Acquisition Is Initialization (RAII) para limitar la suscripción excesiva a un ámbito determinado. Bajo el modelo RAII, se asigna una estructura de datos en la pila. Esa estructura de datos se inicializa o adquiere un recurso cuando se crea, y destruye o libera ese recurso cuando se destruye la estructura de datos. El modelo RAII garantiza que se llama al destructor antes de que el ámbito de inclusión salga. Por consiguiente, se administra el recurso correctamente cuando se produce una excepción o cuando una función contiene varias instrucciones return.

En el siguiente ejemplo se define una estructura que se denomina scoped_blocking_signal. El constructor de la estructura scoped_blocking_signal habilita la suscripción excesiva y el destructor la deshabilita.

struct scoped_blocking_signal
{
    scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(true);
    }
    ~scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(false);
    }
};

En el siguiente ejemplo se modifica el cuerpo del método download para utilizar RAII y asegurar que la suscripción excesiva se deshabilite antes de que la función vuelva. Esta técnica asegura que el método download es seguro ante excepciones.

// Downloads the file at the given URL.
string download(const string& url)
{
   scoped_blocking_signal signal;

   // Download the file.
   return string(GetHttpFile(_session, url.c_str()));
}

Consulte también

Contextos
Context::Oversubscribe (método)