次の方法で共有


方法: オーバーサブスクリプションを使用して待機時間を短縮する

オーバーサブスクリプションを使用すると、待機時間の長いタスクが含まれた一部のアプリケーションの全体的な効率を向上できます。 このトピックでは、オーバーサブスクリプションを使用して、ネットワーク接続からのデータの読み込みが原因で発生する待機時間を短縮する方法について説明します。

この例では、非同期エージェント ライブラリを使用して、HTTP サーバーからファイルをダウンロードします。 http_reader クラスは、concurrency::agent から派生し、メッセージ パッシングを使用してダウンロード対象の URL 名を非同期的に読み取ります。

http_reader クラスは、concurrency::task_group クラスを使用して各ファイルを同時に読み取ります。 各タスクは、_BeginOversubscription パラメーターを true に設定して concurrency::Context::Oversubscribe メソッドを呼び出し、現在のコンテキストでオーバーサブスクリプションを有効にします。 次に、各タスクが MFC (Microsoft Foundation Classes) の CInternetSession クラスと CHttpFile クラスを使用してファイルをダウンロードします。 最後に、各タスクは、_BeginOversubscription パラメーターを false に設定して Context::Oversubscribe を呼び出し、オーバーサブスクリプションを無効にします。

オーバーサブスクリプションを有効にすると、タスクを実行する 1 つの追加スレッドがランタイムによって作成されます。 これらのスレッドのそれぞれで現在のコンテキストをオーバーサブスクライブして、追加のスレッドを作成することもできます。 http_reader クラスは、concurrency::unbounded_buffer オブジェクトを使用して、アプリケーションで使用するスレッドの数を制限します。 agent は、トークン値の固定数でバッファーを初期化します。 それぞれのダウンロード操作に対して、agent はバッファーからトークン値を読み込んでから操作を開始し、操作が完了すると値をバッファーに書き戻します。 バッファーが空の場合、agent はいずれかのダウンロード操作によって値がバッファーに書き戻されるまで待機します。

次の例は、同時に実行するタスクの数を、使用できるハードウェア スレッドの数の 2 倍に制限します。 オーバーサブスクリプションの効果を調べる場合は、この値から始めることをお勧めします。 特定の処理環境に合った値を使用したり、この値を動的に変更して実際の作業負荷に対応したりできます。

// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);

// Reads files from HTTP servers.
class http_reader : public agent
{
public:
   explicit http_reader(CInternetSession& session,      
      ISource<string>& source,
      unsigned int& total_bytes,
      unsigned int max_concurrent_reads)
      : _session(session)
      , _source(source)
      , _total_bytes(total_bytes)
   {
      // Add one token to the available tasks buffer for each 
      // possible concurrent read operation. The value of each token 
      // is not important, but can be useful for debugging.
      for (unsigned int i = 0; i < max_concurrent_reads; ++i)
         send(_available_tasks, i);
   }

   // Signals to the agent that there are no more items to download.
   static const string input_sentinel;
 
protected:
   void run()
   {
      // A task group. Each task in the group downloads one file.
      task_group tasks;

      // Holds the total number of bytes downloaded.
      combinable<unsigned int> total_bytes;

      // Read from the source buffer until the application 
      // sends the sentinel value.
      string url;
      while ((url = receive(_source)) != input_sentinel)
      {
         // Wait for a task to release an available slot.
         unsigned int token = receive(_available_tasks);

         // Create a task to download the file.
         tasks.run([&, token, url] {

            // Print a message.
            wstringstream ss;
            ss << L"Downloading " << url.c_str() << L"..." << endl;
            wcout << ss.str();

            // Download the file.
            string content = download(url);

            // Update the total number of bytes downloaded.
            total_bytes.local() += content.size();

            // Release the slot for another task.
            send(_available_tasks, token);
         });
      }

      // Wait for all tasks to finish.
      tasks.wait();
      
      // Compute the total number of bytes download on all threads.
      _total_bytes = total_bytes.combine(plus<unsigned int>());

      // Set the status of the agent to agent_done.
      done();
   }

   // Downloads the file at the given URL.
   string download(const string& url)
   {
      // Enable oversubscription.
      Context::Oversubscribe(true);

      // Download the file.
      string content = GetHttpFile(_session, url.c_str());
      
      // Disable oversubscription.
      Context::Oversubscribe(false);

      return content;
   }

private:
   // Manages the network connection.
   CInternetSession& _session;
   // A message buffer that holds the URL names to download.
   ISource<string>& _source;
   // The total number of bytes downloaded
   unsigned int& _total_bytes;
   // Limits the agent to a given number of simultaneous tasks.
   unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");

int wmain()
{
   // Create an array of URL names to download.
   // A real-world application might read the names from user input.
   array<string, 21> urls = {
      "http://www.adatum.com/",
      "http://www.adventure-works.com/", 
      "http://www.alpineskihouse.com/",
      "http://www.cpandl.com/", 
      "http://www.cohovineyard.com/",
      "http://www.cohowinery.com/",
      "http://www.cohovineyardandwinery.com/", 
      "http://www.contoso.com/",
      "http://www.consolidatedmessenger.com/",
      "http://www.fabrikam.com/", 
      "http://www.fourthcoffee.com/",
      "http://www.graphicdesigninstitute.com/",
      "http://www.humongousinsurance.com/",
      "http://www.litwareinc.com/",
      "http://www.lucernepublishing.com/",
      "http://www.margiestravel.com/",
      "http://www.northwindtraders.com/",
      "http://www.proseware.com/", 
      "http://www.fineartschool.net",
      "http://www.tailspintoys.com/",
      http_reader::input_sentinel,
   };
      
   // Manages the network connection.
   CInternetSession session("Microsoft Internet Browser");

   // A message buffer that enables the application to send URL names to the 
   // agent.
   unbounded_buffer<string> source_urls;

   // The total number of bytes that the agent has downloaded.
   unsigned int total_bytes = 0u;

   // Create an http_reader object that can oversubscribe each processor by one.
   http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());

   // Compute the amount of time that it takes for the agent to download all files.
   __int64 elapsed = time_call([&] {
      
      // Start the agent.
      reader.start();
      
      // Use the message buffer to send each URL name to the agent.
      for_each(begin(urls), end(urls), [&](const string& url) {
         send(source_urls, url);
      });

      // Wait for the agent to finish downloading.
      agent::wait(&reader);      
   });

   // Print the results.
   wcout << L"Downloaded " << total_bytes
         << L" bytes in " << elapsed << " ms." << endl;
}

// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
   CString strResult;

   // Reads data from an HTTP server.
   CHttpFile* pHttpFile = NULL;

   try
   {
      // Open URL.
      pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1, 
         INTERNET_FLAG_TRANSFER_ASCII | 
         INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);

      // Read the file.
      if(pHttpFile != NULL)
      {           
         UINT uiBytesRead;
         do
         {
            char chBuffer[10000];
            uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
            strResult += chBuffer;
         }
         while (uiBytesRead > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return.
   delete pHttpFile;

   return strResult;
}

この例では、4 つのプロセッサを備えたコンピューターで次の出力を生成します。

Downloading http://www.adatum.com/...
Downloading http://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading http://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading http://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading http://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.

この例でオーバーサブスクリプションを有効にすると、他のタスクが潜在的な操作の完了を待っている間に別のタスクが実行されるので、実行時間を短縮できます。

コードのコンパイル

コード例をコピーし、Visual Studio プロジェクトに貼り付けるか、download-oversubscription.cpp という名前のファイルに貼り付けてから、Visual Studio のコマンド プロンプト ウィンドウで次のコマンドのいずれかを実行します。

cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp

信頼性の高いプログラミング

オーバーサブスクリプションは、不要になったら必ず無効にしてください。 たとえば、ある関数が、別の関数によってスローされた例外を処理しないとします。 関数から制御が返されたときにオーバーサブスクリプションが無効になっていない場合、他のすべての並列処理でも現在のコンテキストがオーバーサブスクライブされます。

RAII (Resource Acquisition Is Initialization) パターンを使用すると、オーバーサブスクリプションを特定のスコープ内に制限できます。 RAII パターンでは、データ構造はスタック上に割り当てられます。 データ構造は、作成されたときにリソースを初期化または取得し、破棄されたときにそのリソースを破棄または解放します。 RAII パターンでは、外側のスコープが終了する前に、常にデストラクターが呼び出されます。 したがって、例外がスローされた場合や、関数に複数の return ステートメントが含まれている場合でも、リソースは適切に管理されます。

次の例では、scoped_blocking_signal という名前の構造を定義します。 scoped_blocking_signal 構造のコンストラクターによってオーバーサブスクリプションが有効になり、デストラクターによってオーバーサブスクリプションが無効になります。

struct scoped_blocking_signal
{
    scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(true);
    }
    ~scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(false);
    }
};

次の例では、関数から制御が返される前にオーバーサブスクリプションを無効にするために、RAII を使用するように download メソッドの本体に変更を加えています。 この方法により、download メソッドは例外セーフになります。

// Downloads the file at the given URL.
string download(const string& url)
{
   scoped_blocking_signal signal;

   // Download the file.
   return string(GetHttpFile(_session, url.c_str()));
}

関連項目

コンテキスト
Context::Oversubscribe メソッド