Condividi tramite


Procedura: Usare l'oversubscription per compensare la latenza

L'oversubscription può migliorare l'efficienza complessiva di alcune applicazioni che contengono attività con una notevole latenza. In questo argomento viene illustrato come usare l'oversubscription per compensare la latenza causata dalla lettura dei dati da una connessione di rete.

Esempio

In questo esempio viene usata la libreria degli agenti asincroni per scaricare i file dai server HTTP. La http_reader classe deriva da concurrency::agent e usa il passaggio del messaggio per leggere in modo asincrono i nomi url da scaricare.

La http_reader classe usa la classe concurrency::task_group per leggere simultaneamente ogni file. Ogni attività chiama il metodo concurrency::Context::Oversubscribe con il _BeginOversubscription parametro impostato su per true abilitare l'oversubscription nel contesto corrente. Ogni attività usa quindi le classi CInternetSession (MFC) Microsoft Foundation Classs e CHttpFile per scaricare il file. Infine, ogni attività chiama Context::Oversubscribe con il parametro impostato su false per disabilitare l'oversubscription_BeginOversubscription.

Quando è abilitata l'oversubscription, il runtime crea un thread aggiuntivo in cui eseguire le attività. Ognuno di questi thread può anche sovrascrivere il contesto corrente e quindi creare thread aggiuntivi. La http_reader classe usa un oggetto concurrency::unbounded_buffer per limitare il numero di thread usati dall'applicazione. L'agente inizializza il buffer con un numero fisso di valori di token. Per ogni operazione di download, l'agente legge un valore del token dal buffer prima dell'avvio dell'operazione e quindi lo scrive nuovamente nel buffer al termine dell'operazione. Quando il buffer è vuoto, l'agente attende che una delle operazioni di download scriva un valore nel buffer.

L'esempio seguente limita il numero di attività simultanee a due volte il numero di thread hardware disponibili. Questo valore è un buon punto di partenza da usare quando si sperimenta l'oversubscription. È possibile usare un valore adatto a un ambiente di elaborazione specifico o modificare in modo dinamico questo valore per rispondere al carico di lavoro effettivo.

// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);

// Reads files from HTTP servers.
class http_reader : public agent
{
public:
   explicit http_reader(CInternetSession& session,      
      ISource<string>& source,
      unsigned int& total_bytes,
      unsigned int max_concurrent_reads)
      : _session(session)
      , _source(source)
      , _total_bytes(total_bytes)
   {
      // Add one token to the available tasks buffer for each 
      // possible concurrent read operation. The value of each token 
      // is not important, but can be useful for debugging.
      for (unsigned int i = 0; i < max_concurrent_reads; ++i)
         send(_available_tasks, i);
   }

   // Signals to the agent that there are no more items to download.
   static const string input_sentinel;
 
protected:
   void run()
   {
      // A task group. Each task in the group downloads one file.
      task_group tasks;

      // Holds the total number of bytes downloaded.
      combinable<unsigned int> total_bytes;

      // Read from the source buffer until the application 
      // sends the sentinel value.
      string url;
      while ((url = receive(_source)) != input_sentinel)
      {
         // Wait for a task to release an available slot.
         unsigned int token = receive(_available_tasks);

         // Create a task to download the file.
         tasks.run([&, token, url] {

            // Print a message.
            wstringstream ss;
            ss << L"Downloading " << url.c_str() << L"..." << endl;
            wcout << ss.str();

            // Download the file.
            string content = download(url);

            // Update the total number of bytes downloaded.
            total_bytes.local() += content.size();

            // Release the slot for another task.
            send(_available_tasks, token);
         });
      }

      // Wait for all tasks to finish.
      tasks.wait();
      
      // Compute the total number of bytes download on all threads.
      _total_bytes = total_bytes.combine(plus<unsigned int>());

      // Set the status of the agent to agent_done.
      done();
   }

   // Downloads the file at the given URL.
   string download(const string& url)
   {
      // Enable oversubscription.
      Context::Oversubscribe(true);

      // Download the file.
      string content = GetHttpFile(_session, url.c_str());
      
      // Disable oversubscription.
      Context::Oversubscribe(false);

      return content;
   }

private:
   // Manages the network connection.
   CInternetSession& _session;
   // A message buffer that holds the URL names to download.
   ISource<string>& _source;
   // The total number of bytes downloaded
   unsigned int& _total_bytes;
   // Limits the agent to a given number of simultaneous tasks.
   unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");

int wmain()
{
   // Create an array of URL names to download.
   // A real-world application might read the names from user input.
   array<string, 21> urls = {
      "http://www.adatum.com/",
      "http://www.adventure-works.com/", 
      "http://www.alpineskihouse.com/",
      "http://www.cpandl.com/", 
      "http://www.cohovineyard.com/",
      "http://www.cohowinery.com/",
      "http://www.cohovineyardandwinery.com/", 
      "http://www.contoso.com/",
      "http://www.consolidatedmessenger.com/",
      "http://www.fabrikam.com/", 
      "http://www.fourthcoffee.com/",
      "http://www.graphicdesigninstitute.com/",
      "http://www.humongousinsurance.com/",
      "http://www.litwareinc.com/",
      "http://www.lucernepublishing.com/",
      "http://www.margiestravel.com/",
      "http://www.northwindtraders.com/",
      "http://www.proseware.com/", 
      "http://www.fineartschool.net",
      "http://www.tailspintoys.com/",
      http_reader::input_sentinel,
   };
      
   // Manages the network connection.
   CInternetSession session("Microsoft Internet Browser");

   // A message buffer that enables the application to send URL names to the 
   // agent.
   unbounded_buffer<string> source_urls;

   // The total number of bytes that the agent has downloaded.
   unsigned int total_bytes = 0u;

   // Create an http_reader object that can oversubscribe each processor by one.
   http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());

   // Compute the amount of time that it takes for the agent to download all files.
   __int64 elapsed = time_call([&] {
      
      // Start the agent.
      reader.start();
      
      // Use the message buffer to send each URL name to the agent.
      for_each(begin(urls), end(urls), [&](const string& url) {
         send(source_urls, url);
      });

      // Wait for the agent to finish downloading.
      agent::wait(&reader);      
   });

   // Print the results.
   wcout << L"Downloaded " << total_bytes
         << L" bytes in " << elapsed << " ms." << endl;
}

// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
   CString strResult;

   // Reads data from an HTTP server.
   CHttpFile* pHttpFile = NULL;

   try
   {
      // Open URL.
      pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1, 
         INTERNET_FLAG_TRANSFER_ASCII | 
         INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);

      // Read the file.
      if(pHttpFile != NULL)
      {           
         UINT uiBytesRead;
         do
         {
            char chBuffer[10000];
            uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
            strResult += chBuffer;
         }
         while (uiBytesRead > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return.
   delete pHttpFile;

   return strResult;
}

In questo esempio viene generato l'output seguente in un computer con quattro processori:

Downloading http://www.adatum.com/...
Downloading http://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading http://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading http://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading http://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.

L'esempio può essere eseguito più velocemente quando è abilitata l'oversubscription perché vengono eseguite attività aggiuntive mentre altre attività attendono il completamento di un'operazione latente.

Compilazione del codice

Copiare il codice di esempio e incollarlo in un progetto di Visual Studio oppure incollarlo in un file denominato download-oversubscription.cpp e quindi eseguire uno dei comandi seguenti in una finestra del prompt dei comandi di Visual Studio.

cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp

Programmazione efficiente

Disabilitare sempre l'oversubscription dopo che non è più necessario. Si consideri una funzione che non gestisce un'eccezione generata da un'altra funzione. Se non si disabilita l'oversubscription prima della restituzione della funzione, verrà sovrascritto anche il contesto corrente.

È possibile usare il modello di inizializzazione delle risorse (RAII) per limitare l'oversubscription a un determinato ambito.You can use the Resource Acquisition Is Initialization (RAII) pattern to limit oversubscription to a given scope. Nel modello RAII viene allocata una struttura di dati nello stack. Tale struttura di dati inizializza o acquisisce una risorsa quando viene creata e distrugge o rilascia tale risorsa quando la struttura dei dati viene eliminata definitivamente. Il modello RAII garantisce che il distruttore venga chiamato prima dell'uscita dall'ambito di inclusione. Di conseguenza, la risorsa viene gestita correttamente quando viene generata un'eccezione o quando una funzione contiene più return istruzioni.

Nell'esempio seguente viene definita una struttura denominata scoped_blocking_signal. Il costruttore della scoped_blocking_signal struttura abilita l'oversubscription e il distruttore disabilita l'oversubscription.

struct scoped_blocking_signal
{
    scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(true);
    }
    ~scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(false);
    }
};

Nell'esempio seguente viene modificato il corpo del download metodo in modo da usare RAII per assicurarsi che l'oversubscription sia disabilitato prima che la funzione restituisca. Questa tecnica garantisce che il download metodo sia indipendente dalle eccezioni.

// Downloads the file at the given URL.
string download(const string& url)
{
   scoped_blocking_signal signal;

   // Download the file.
   return string(GetHttpFile(_session, url.c_str()));
}

Vedi anche

Contesti
Metodo Context::Oversubscribe