Практическое руководство. Использование лимита подписки для устранения задержек

Oversubscription может повысить общую эффективность некоторых приложений, содержащих задачи с высокой задержкой. В этом разделе показано, как использовать oversubscription для смещения задержки, вызванной чтением данных из сетевого подключения.

Пример

В этом примере библиотека асинхронных агентов используется для скачивания файлов с HTTP-серверов. Класс http_reader является производным от параллелизма::agent и использует передачу сообщений для асинхронного чтения имен URL-адресов для скачивания.

Класс http_reader использует класс параллелизма::task_group для параллельного чтения каждого файла. Каждая задача вызывает метод параллелизма::Context::Oversubscribe с набором _BeginOversubscription параметров, чтобы true включить перезапись в текущем контексте. Затем каждая задача использует классы CInternetSession и CHttpFile (Microsoft Foundation Classes( MFC) для скачивания файла. Наконец, все вызовы Context::Oversubscribe задач с параметром _BeginOversubscription , чтобы false отключить oversubscription.

При включении oversubscription среда выполнения создает один дополнительный поток, в котором выполняются задачи. Каждый из этих потоков также может переопределить текущий контекст и таким образом создавать дополнительные потоки. Класс http_reader использует объект параллелизма::unbounded_buffer , чтобы ограничить количество потоков, которые использует приложение. Агент инициализирует буфер с фиксированным числом значений токена. Для каждой операции скачивания агент считывает значение маркера из буфера до запуска операции, а затем записывает это значение обратно в буфер после завершения операции. Когда буфер пуст, агент ожидает, пока одна из операций загрузки записывает значение обратно в буфер.

В следующем примере количество одновременных задач ограничено двумя числами доступных аппаратных потоков. Это значение является хорошей отправной точкой, используемой при эксперименте с oversubscription. Можно использовать значение, которое соответствует определенной среде обработки или динамически изменять это значение, чтобы реагировать на фактическую рабочую нагрузку.

// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);

// Reads files from HTTP servers.
class http_reader : public agent
{
public:
   explicit http_reader(CInternetSession& session,      
      ISource<string>& source,
      unsigned int& total_bytes,
      unsigned int max_concurrent_reads)
      : _session(session)
      , _source(source)
      , _total_bytes(total_bytes)
   {
      // Add one token to the available tasks buffer for each 
      // possible concurrent read operation. The value of each token 
      // is not important, but can be useful for debugging.
      for (unsigned int i = 0; i < max_concurrent_reads; ++i)
         send(_available_tasks, i);
   }

   // Signals to the agent that there are no more items to download.
   static const string input_sentinel;
 
protected:
   void run()
   {
      // A task group. Each task in the group downloads one file.
      task_group tasks;

      // Holds the total number of bytes downloaded.
      combinable<unsigned int> total_bytes;

      // Read from the source buffer until the application 
      // sends the sentinel value.
      string url;
      while ((url = receive(_source)) != input_sentinel)
      {
         // Wait for a task to release an available slot.
         unsigned int token = receive(_available_tasks);

         // Create a task to download the file.
         tasks.run([&, token, url] {

            // Print a message.
            wstringstream ss;
            ss << L"Downloading " << url.c_str() << L"..." << endl;
            wcout << ss.str();

            // Download the file.
            string content = download(url);

            // Update the total number of bytes downloaded.
            total_bytes.local() += content.size();

            // Release the slot for another task.
            send(_available_tasks, token);
         });
      }

      // Wait for all tasks to finish.
      tasks.wait();
      
      // Compute the total number of bytes download on all threads.
      _total_bytes = total_bytes.combine(plus<unsigned int>());

      // Set the status of the agent to agent_done.
      done();
   }

   // Downloads the file at the given URL.
   string download(const string& url)
   {
      // Enable oversubscription.
      Context::Oversubscribe(true);

      // Download the file.
      string content = GetHttpFile(_session, url.c_str());
      
      // Disable oversubscription.
      Context::Oversubscribe(false);

      return content;
   }

private:
   // Manages the network connection.
   CInternetSession& _session;
   // A message buffer that holds the URL names to download.
   ISource<string>& _source;
   // The total number of bytes downloaded
   unsigned int& _total_bytes;
   // Limits the agent to a given number of simultaneous tasks.
   unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");

int wmain()
{
   // Create an array of URL names to download.
   // A real-world application might read the names from user input.
   array<string, 21> urls = {
      "http://www.adatum.com/",
      "http://www.adventure-works.com/", 
      "http://www.alpineskihouse.com/",
      "http://www.cpandl.com/", 
      "http://www.cohovineyard.com/",
      "http://www.cohowinery.com/",
      "http://www.cohovineyardandwinery.com/", 
      "http://www.contoso.com/",
      "http://www.consolidatedmessenger.com/",
      "http://www.fabrikam.com/", 
      "http://www.fourthcoffee.com/",
      "http://www.graphicdesigninstitute.com/",
      "http://www.humongousinsurance.com/",
      "http://www.litwareinc.com/",
      "http://www.lucernepublishing.com/",
      "http://www.margiestravel.com/",
      "http://www.northwindtraders.com/",
      "http://www.proseware.com/", 
      "http://www.fineartschool.net",
      "http://www.tailspintoys.com/",
      http_reader::input_sentinel,
   };
      
   // Manages the network connection.
   CInternetSession session("Microsoft Internet Browser");

   // A message buffer that enables the application to send URL names to the 
   // agent.
   unbounded_buffer<string> source_urls;

   // The total number of bytes that the agent has downloaded.
   unsigned int total_bytes = 0u;

   // Create an http_reader object that can oversubscribe each processor by one.
   http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());

   // Compute the amount of time that it takes for the agent to download all files.
   __int64 elapsed = time_call([&] {
      
      // Start the agent.
      reader.start();
      
      // Use the message buffer to send each URL name to the agent.
      for_each(begin(urls), end(urls), [&](const string& url) {
         send(source_urls, url);
      });

      // Wait for the agent to finish downloading.
      agent::wait(&reader);      
   });

   // Print the results.
   wcout << L"Downloaded " << total_bytes
         << L" bytes in " << elapsed << " ms." << endl;
}

// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
   CString strResult;

   // Reads data from an HTTP server.
   CHttpFile* pHttpFile = NULL;

   try
   {
      // Open URL.
      pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1, 
         INTERNET_FLAG_TRANSFER_ASCII | 
         INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);

      // Read the file.
      if(pHttpFile != NULL)
      {           
         UINT uiBytesRead;
         do
         {
            char chBuffer[10000];
            uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
            strResult += chBuffer;
         }
         while (uiBytesRead > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return.
   delete pHttpFile;

   return strResult;
}

В этом примере создаются следующие выходные данные на компьютере с четырьмя процессорами:

Downloading http://www.adatum.com/...
Downloading http://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading http://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading http://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading http://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.

Этот пример может выполняться быстрее, если включена переопределение, так как дополнительные задачи выполняются, а другие задачи ожидают завершения скрытой операции.

Компиляция кода

Скопируйте пример кода и вставьте его в проект Visual Studio или вставьте его в файл с именем download-oversubscription.cpp , а затем выполните одну из следующих команд в окне командной строки Visual Studio.

cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp

Отказоустойчивость

Всегда отключать oversubscription после того, как больше не требуется. Рассмотрим функцию, которая не обрабатывает исключение, вызываемое другой функцией. Если вы не отключаете oversubscription перед возвратом функции, любая дополнительная параллельная работа также перезаписывает текущий контекст.

Вы можете использовать шаблон инициализации ресурсов (RAII), чтобы ограничить превышение количества запросов на заданный область. В шаблоне RAII структура данных выделяется в стеке. Эта структура данных инициализирует или получает ресурс при создании и уничтожении или выпуске ресурса при уничтожении структуры данных. Шаблон RAII гарантирует, что деструктор вызывается перед выходом заключающей область. Поэтому ресурс правильно управляется при возникновении исключения или при наличии функции нескольких return операторов.

В следующем примере определяется структура, которая называется scoped_blocking_signal. Конструктор структуры включает oversubscription, а деструктор scoped_blocking_signal отключает oversubscription.

struct scoped_blocking_signal
{
    scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(true);
    }
    ~scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(false);
    }
};

В следующем примере текст метода изменяется download для использования RAII, чтобы убедиться, что перезапись отключена до возврата функции. Этот метод гарантирует, что download метод является исключением.

// Downloads the file at the given URL.
string download(const string& url)
{
   scoped_blocking_signal signal;

   // Download the file.
   return string(GetHttpFile(_session, url.c_str()));
}

См. также

Контексты
Метод Context::Oversubscribe