다음을 통해 공유


방법: 초과 구독을 사용하여 대기 오프셋

초과 구독은 대기 시간이 많은 작업을 포함하는 일부 응용 프로그램의 전반적인 효율성을 향상시킬 수 있습니다.이 항목에서는 초과 구독을 사용하여 네트워크 연결에서 데이터 읽기를 통해 발생한 대기 시간을 오프셋하는 방법을 보여 줍니다.

예제

이 예제에서는 비동기 에이전트 라이브러리를 사용하여 HTTP 서버의 파일을 다운로드합니다.http_reader 클래스에서 파생 concurrency::agent 사용 하 여 메시지 전달 비동기적으로 다운로드 하는 URL 이름을 읽을 수 있습니다.

http_reader 클래스는 concurrency::task_group 동시에 각 파일을 읽을 수 있는 클래스입니다.각 작업 호출는 concurrency::Context::Oversubscribe 메서드로 _BeginOversubscription 매개 변수를 설정 true 하기 현재 컨텍스트에서 사용할 수 있도록.그런 다음 MFC(Microsoft Foundation Classes) CInternetSessionCHttpFile 클래스를 사용하여 파일을 다운로드합니다.마지막으로 각 작업에서는 _BeginOversubscription 매개 변수를 false로 설정한 상태로 Context::Oversubscribe를 호출하여 초과 구독을 사용하지 않도록 설정합니다.

초과 구독을 사용하도록 설정하면 런타임에서는 작업을 실행하는 추가 스레드 하나를 만듭니다.이러한 각 스레드에서 현재 컨텍스트를 초과 구독하여 추가 스레드를 만들 수도 있습니다.http_reader 클래스는 concurrency::unbounded_buffer 응용 프로그램을 사용 하 여 스레드 수를 제한 하는 개체입니다.에이전트는 고정된 수의 토큰 값을 사용하여 버퍼를 초기화합니다.각 다운로드 작업을 수행할 때 에이전트는 작업이 시작되기 전에 버퍼에서 토큰 값을 읽은 다음, 작업이 끝난 후에 해당 값을 다시 버퍼에 씁니다.버퍼가 비어 있는 경우 에이전트는 다운로드 작업 중 하나에서 값을 다시 버퍼에 쓸 때까지 기다립니다.

다음 예제에서는 동시 작업의 수를 두 번, 즉 사용 가능한 하드웨어 스레드의 수로 제한합니다.초과 구독을 테스트할 때 이 값을 사용하여 시작하는 것이 좋습니다.특정 처리 환경에 적합한 값을 사용하거나, 실제 작업 부하에 응답하도록 이 값을 동적으로 변경할 수 있습니다.

// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);

// Reads files from HTTP servers.
class http_reader : public agent
{
public:
   explicit http_reader(CInternetSession& session,      
      ISource<string>& source,
      unsigned int& total_bytes,
      unsigned int max_concurrent_reads)
      : _session(session)
      , _source(source)
      , _total_bytes(total_bytes)
   {
      // Add one token to the available tasks buffer for each 
      // possible concurrent read operation. The value of each token 
      // is not important, but can be useful for debugging.
      for (unsigned int i = 0; i < max_concurrent_reads; ++i)
         send(_available_tasks, i);
   }

   // Signals to the agent that there are no more items to download.
   static const string input_sentinel;

protected:
   void run()
   {
      // A task group. Each task in the group downloads one file.
      task_group tasks;

      // Holds the total number of bytes downloaded.
      combinable<unsigned int> total_bytes;

      // Read from the source buffer until the application 
      // sends the sentinel value.
      string url;
      while ((url = receive(_source)) != input_sentinel)
      {
         // Wait for a task to release an available slot.
         unsigned int token = receive(_available_tasks);

         // Create a task to download the file.
         tasks.run([&, token, url] {

            // Print a message.
            wstringstream ss;
            ss << L"Downloading " << url.c_str() << L"..." << endl;
            wcout << ss.str();

            // Download the file.
            string content = download(url);

            // Update the total number of bytes downloaded.
            total_bytes.local() += content.size();

            // Release the slot for another task.
            send(_available_tasks, token);
         });
      }

      // Wait for all tasks to finish.
      tasks.wait();

      // Compute the total number of bytes download on all threads.
      _total_bytes = total_bytes.combine(plus<unsigned int>());

      // Set the status of the agent to agent_done.
      done();
   }

   // Downloads the file at the given URL.
   string download(const string& url)
   {
      // Enable oversubscription.
      Context::Oversubscribe(true);

      // Download the file.
      string content = GetHttpFile(_session, url.c_str());

      // Disable oversubscription.
      Context::Oversubscribe(false);

      return content;
   }

private:
   // Manages the network connection.
   CInternetSession& _session;
   // A message buffer that holds the URL names to download.
   ISource<string>& _source;
   // The total number of bytes downloaded
   unsigned int& _total_bytes;
   // Limits the agent to a given number of simultaneous tasks.
   unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");

int wmain()
{
   // Create an array of URL names to download.
   // A real-world application might read the names from user input.
   array<string, 21> urls = {
      "http://www.adatum.com/",
      "https://www.adventure-works.com/", 
      "http://www.alpineskihouse.com/",
      "http://www.cpandl.com/", 
      "http://www.cohovineyard.com/",
      "http://www.cohowinery.com/",
      "http://www.cohovineyardandwinery.com/", 
      "https://www.contoso.com/",
      "http://www.consolidatedmessenger.com/",
      "http://www.fabrikam.com/", 
      "https://www.fourthcoffee.com/",
      "http://www.graphicdesigninstitute.com/",
      "http://www.humongousinsurance.com/",
      "http://www.litwareinc.com/",
      "http://www.lucernepublishing.com/",
      "http://www.margiestravel.com/",
      "http://www.northwindtraders.com/",
      "https://www.proseware.com/", 
      "http://www.fineartschool.net",
      "http://www.tailspintoys.com/",
      http_reader::input_sentinel,
   };

   // Manages the network connection.
   CInternetSession session("Microsoft Internet Browser");

   // A message buffer that enables the application to send URL names to the 
   // agent.
   unbounded_buffer<string> source_urls;

   // The total number of bytes that the agent has downloaded.
   unsigned int total_bytes = 0u;

   // Create an http_reader object that can oversubscribe each processor by one.
   http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());

   // Compute the amount of time that it takes for the agent to download all files.
   __int64 elapsed = time_call([&] {

      // Start the agent.
      reader.start();

      // Use the message buffer to send each URL name to the agent.
      for_each(begin(urls), end(urls), [&](const string& url) {
         send(source_urls, url);
      });

      // Wait for the agent to finish downloading.
      agent::wait(&reader);      
   });

   // Print the results.
   wcout << L"Downloaded " << total_bytes
         << L" bytes in " << elapsed << " ms." << endl;
}

// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
   CString strResult;

   // Reads data from an HTTP server.
   CHttpFile* pHttpFile = NULL;

   try
   {
      // Open URL.
      pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1, 
         INTERNET_FLAG_TRANSFER_ASCII | 
         INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);

      // Read the file.
      if(pHttpFile != NULL)
      {           
         UINT uiBytesRead;
         do
         {
            char chBuffer[10000];
            uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
            strResult += chBuffer;
         }
         while (uiBytesRead > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return.
   delete pHttpFile;

   return strResult;
}

프로세서가 4개인 컴퓨터에 대해 이 예제를 실행하면 다음과 같은 결과가 출력됩니다.

Downloading http://www.adatum.com/...
Downloading https://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading https://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading https://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading https://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.

이 예제의 경우 초과 구독을 사용하도록 설정하면 다른 작업에서 숨은 작업이 끝날 때까지 기다리는 동안 추가 작업이 실행되므로 실행 속도가 더 빨라질 수 있습니다.

코드 컴파일

예제 코드를 복사 하 고 Visual Studio 프로젝트에 붙여 또는 라는 파일에 붙여 다운로드-oversubscription.cpp 및 다음 Visual Studio 명령 프롬프트 창에서 다음 중 하나를 실행된을 명령 합니다.

cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp

cl.exe /EHsc /MT download-oversubscription.cpp

강력한 프로그래밍

초과 구독이 더 이상 필요하지 않게 되면 항상 사용하지 않도록 설정하십시오.다른 함수에서 throw되는 예외를 처리하지 않는 함수를 가정해 봅니다.함수가 반환되기 전에 초과 구독의 사용을 해제하지 않으면 추가 병렬 작업에서도 현재 컨텍스트를 초과 구독하게 됩니다.

RAII(Resource Acquisition Is Initialization) 패턴을 사용하면 초과 구독을 특정 범위로 제한할 수 있습니다.RAII 패턴에서는 데이터 구조가 스택에 할당됩니다.이러한 데이터 구조는 만들어질 때 리소스를 초기화하거나 획득하고, 소멸될 때 리소스를 소멸시키거나 해제합니다.RAII 패턴을 사용하면 바깥쪽 범위를 벗어나기 전에 소멸자가 호출됩니다.따라서 예외가 throw되거나 함수에 return 문이 여러 개 있는 경우 리소스가 올바르게 관리됩니다.

다음 예제에서는 scoped_blocking_signal이라는 구조체를 정의합니다.scoped_blocking_signal 구조체의 생성자는 초과 구독 사용을 설정하고 소멸자는 초과 구독 사용을 해제합니다.

struct scoped_blocking_signal
{
    scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(true);  
    }
    ~scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(false);
    }
};

다음 예제에서는 RAII를 사용하여 함수가 반환되기 전에 초과 구독의 사용이 해제되도록 download 메서드 본문을 수정합니다.이 방법을 사용하면 download 메서드에서 예외가 발생하지 않습니다.

// Downloads the file at the given URL.
string download(const string& url)
{
   scoped_blocking_signal signal;

   // Download the file.
   return string(GetHttpFile(_session, url.c_str()));
}

참고 항목

참조

Context::Oversubscribe 메서드

개념

컨텍스트