Sdílet prostřednictvím


Postupy: Kompenzace latence vytvořením nadbytečného počtu vláken

Jež mohou zlepšit celkové účinnosti některých aplikací, které obsahují úkoly, které obsahují vysoké množství čekací doby.Toto téma ukazuje použití jež vyrovnat zpoždění, která je způsobena čtení dat ze síťových připojení.

Příklad

V tomto příkladu Knihovna asynchronních agentů ke stahování souborů ze serverů HTTP.http_reader Třída je odvozena z concurrency::agent a používá zpráva předávání asynchronně načíst názvy URL ke stažení.

http_reader Třídy používá concurrency::task_group třídy pro každý soubor číst souběžně.Každý úkol volá concurrency::Context::Oversubscribe metodu s _BeginOversubscription parametr nastaven na true povolení, jež v aktuálním kontextu.Každý úkol, použije aplikace Microsoft Foundation Classes (MFC) CInternetSession a CHttpFile tříd ke stažení souboru.Nakonec volá každý úkol Context::Oversubscribe se _BeginOversubscription parametr nastaven na hodnotu false Chcete-li zakázat překročení stanovených množství.

Pokud je povoleno, jež modul runtime vytvoří jeden další podproces ke spouštění úloh.Každý z těchto vláken lze také oversubscribe aktuální kontext a a tím vytvořit další vlákna.http_reader Třídy používá concurrency::unbounded_buffer objekt, který chcete omezit počet podprocesů, které aplikace používá.Agent inicializuje vyrovnávací paměť s pevným počtem hodnoty tokenu.Pro každou operaci stažení agent přečte hodnotu token z vyrovnávací paměti před spustí operaci a poté zapíše tuto hodnotu do vyrovnávací paměti po dokončení operace.Pokud je vyrovnávací paměť prázdná, agent čeká jedna operace stažení zapisovat hodnoty zpět do vyrovnávací paměti.

Následující příklad omezuje počet simultánních úloh dvakrát počet podprocesů dostupných hardwarem.Tato hodnota je dobrý výchozí bod pro použití při experimentování s Překryvný odběr.Můžete použít hodnotu, která odpovídá zvláštní zpracování prostředí nebo dynamicky měnit tuto hodnotu reagovat na skutečné pracovní zátěže.

// download-oversubscription.cpp 
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds  
// that it takes to call that function. 
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);

// Reads files from HTTP servers. 
class http_reader : public agent
{
public:
   explicit http_reader(CInternetSession& session,      
      ISource<string>& source,
      unsigned int& total_bytes,
      unsigned int max_concurrent_reads)
      : _session(session)
      , _source(source)
      , _total_bytes(total_bytes)
   {
      // Add one token to the available tasks buffer for each  
      // possible concurrent read operation. The value of each token  
      // is not important, but can be useful for debugging. 
      for (unsigned int i = 0; i < max_concurrent_reads; ++i)
         send(_available_tasks, i);
   }

   // Signals to the agent that there are no more items to download. 
   static const string input_sentinel;

protected:
   void run()
   {
      // A task group. Each task in the group downloads one file.
      task_group tasks;

      // Holds the total number of bytes downloaded.
      combinable<unsigned int> total_bytes;

      // Read from the source buffer until the application  
      // sends the sentinel value.
      string url;
      while ((url = receive(_source)) != input_sentinel)
      {
         // Wait for a task to release an available slot. 
         unsigned int token = receive(_available_tasks);

         // Create a task to download the file.
         tasks.run([&, token, url] {

            // Print a message.
            wstringstream ss;
            ss << L"Downloading " << url.c_str() << L"..." << endl;
            wcout << ss.str();

            // Download the file.
            string content = download(url);

            // Update the total number of bytes downloaded.
            total_bytes.local() += content.size();

            // Release the slot for another task.
            send(_available_tasks, token);
         });
      }

      // Wait for all tasks to finish.
      tasks.wait();

      // Compute the total number of bytes download on all threads.
      _total_bytes = total_bytes.combine(plus<unsigned int>());

      // Set the status of the agent to agent_done.
      done();
   }

   // Downloads the file at the given URL.
   string download(const string& url)
   {
      // Enable oversubscription.
      Context::Oversubscribe(true);

      // Download the file.
      string content = GetHttpFile(_session, url.c_str());

      // Disable oversubscription.
      Context::Oversubscribe(false);

      return content;
   }

private:
   // Manages the network connection.
   CInternetSession& _session;
   // A message buffer that holds the URL names to download.
   ISource<string>& _source;
   // The total number of bytes downloaded 
   unsigned int& _total_bytes;
   // Limits the agent to a given number of simultaneous tasks.
   unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");

int wmain()
{
   // Create an array of URL names to download. 
   // A real-world application might read the names from user input. 
   array<string, 21> urls = {
      "http://www.adatum.com/",
      "https://www.adventure-works.com/", 
      "http://www.alpineskihouse.com/",
      "http://www.cpandl.com/", 
      "http://www.cohovineyard.com/",
      "http://www.cohowinery.com/",
      "http://www.cohovineyardandwinery.com/", 
      "https://www.contoso.com/",
      "http://www.consolidatedmessenger.com/",
      "http://www.fabrikam.com/", 
      "https://www.fourthcoffee.com/",
      "http://www.graphicdesigninstitute.com/",
      "http://www.humongousinsurance.com/",
      "http://www.litwareinc.com/",
      "http://www.lucernepublishing.com/",
      "http://www.margiestravel.com/",
      "http://www.northwindtraders.com/",
      "https://www.proseware.com/", 
      "http://www.fineartschool.net",
      "http://www.tailspintoys.com/",
      http_reader::input_sentinel,
   };

   // Manages the network connection.
   CInternetSession session("Microsoft Internet Browser");

   // A message buffer that enables the application to send URL names to the  
   // agent.
   unbounded_buffer<string> source_urls;

   // The total number of bytes that the agent has downloaded. 
   unsigned int total_bytes = 0u;

   // Create an http_reader object that can oversubscribe each processor by one.
   http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());

   // Compute the amount of time that it takes for the agent to download all files.
   __int64 elapsed = time_call([&] {

      // Start the agent.
      reader.start();

      // Use the message buffer to send each URL name to the agent.
      for_each(begin(urls), end(urls), [&](const string& url) {
         send(source_urls, url);
      });

      // Wait for the agent to finish downloading.
      agent::wait(&reader);      
   });

   // Print the results.
   wcout << L"Downloaded " << total_bytes
         << L" bytes in " << elapsed << " ms." << endl;
}

// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
   CString strResult;

   // Reads data from an HTTP server.
   CHttpFile* pHttpFile = NULL;

   try
   {
      // Open URL.
      pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1, 
         INTERNET_FLAG_TRANSFER_ASCII | 
         INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);

      // Read the file. 
      if(pHttpFile != NULL)
      {           
         UINT uiBytesRead;
         do
         {
            char chBuffer[10000];
            uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
            strResult += chBuffer;
         }
         while (uiBytesRead > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return. 
   delete pHttpFile;

   return strResult;
}

Tento příklad vytvoří následující výstup na počítači, který má čtyři procesory:

  

V příkladu může pracovat rychleji, pokud Překryvný odběr je povolena, protože spustit další úkoly čekají na latentní operaci dokončit další úkoly.

Probíhá kompilace kódu

Zkopírovat ukázkový kód a vložit jej do projektu sady Visual Studio nebo vložit do souboru s názvem ke stažení oversubscription.cpp a potom spustit jednu z následujících příkazů v okně Příkazový řádek Visual Studio.

cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp

cl.exe /EHsc /MT download-oversubscription.cpp

Robustní programování

Překryvný odběr zrušit vždy již nevyžadují.Zvažte funkce, která zpracovává výjimku, která je vyvolána jinou funkcí.Pokud nezakážete Překryvný odběr dříve, než se vrátí, bude jakékoli další paralelní práce také oversubscribe k aktuálnímu kontextu.

Lze použít Inicializace je pořízení prostředků (RAII) vzorek omezení, jež do daného oboru.Podle vzoru RAII struktura dat je přidělený do zásobníku.Že struktura dat inicializuje nebo získá prostředek při vytvoření a ničí nebo uvolní zdroje v případě, že datová struktura je zničen.Vzorek RAII zaručuje, že je zavolán destruktor před ukončí ohraničujícím obor.Proto zdroj správně spravovat je vyvolána výjimka nebo funkce obsahuje více return příkazy.

Následující příklad definuje strukturu, nazvanou scoped_blocking_signal.Konstruktor scoped_blocking_signal konstrukce umožňuje překročení stanovených množství a jež zakáže destruktoru.

struct scoped_blocking_signal
{
    scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(true);  
    }
    ~scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(false);
    }
};

Následující příklad upravuje text download metodu použít k zajištění toho, že Překryvný odběr RAII je zakázáno před vrátí.Tento postup zajišťuje, že download metoda je bezpečný pro výjimku.

// Downloads the file at the given URL.
string download(const string& url)
{
   scoped_blocking_signal signal;

   // Download the file. 
   return string(GetHttpFile(_session, url.c_str()));
}

Viz také

Referenční dokumentace

Context::Oversubscribe – metoda

Koncepty

Kontexty