Bagikan melalui


Cara: Menggunakan Oversubscription untuk Mengimbangi Latensi

Oversubscription dapat meningkatkan efisiensi keseluruhan beberapa aplikasi yang berisi tugas yang memiliki jumlah latensi tinggi. Topik ini menggambarkan cara menggunakan oversubscription untuk mengimbangi latensi yang disebabkan oleh membaca data dari koneksi jaringan.

Contoh

Contoh ini menggunakan Pustaka Agen Asinkron untuk mengunduh file dari server HTTP. Kelas http_reader berasal dari konkurensi::agent dan menggunakan pesan yang diteruskan untuk membaca nama URL mana yang akan diunduh secara asinkron.

Kelas http_reader menggunakan kelas konkurensi::task_group untuk membaca setiap file secara bersamaan. Setiap tugas memanggil metode konkurensi::Context::Oversubscribe dengan parameter yang _BeginOversubscription diatur ke true untuk mengaktifkan oversubscription dalam konteks saat ini. Setiap tugas kemudian menggunakan kelas CInternetSession dan CHttpFile Microsoft Foundation Classes (MFC) untuk mengunduh file. Terakhir, setiap tugas memanggil Context::Oversubscribe dengan parameter yang _BeginOversubscription diatur ke false untuk menonaktifkan oversubscription.

Saat oversubscription diaktifkan, runtime membuat satu utas tambahan untuk menjalankan tugas. Masing-masing utas ini juga dapat melakukan oversubscribe konteks saat ini dan dengan demikian membuat utas tambahan. Kelas http_reader menggunakan objek konkurensi::unbounded_buffer untuk membatasi jumlah utas yang digunakan aplikasi. Agen menginisialisasi buffer dengan jumlah nilai token tetap. Untuk setiap operasi pengunduhan, agen membaca nilai token dari buffer sebelum operasi dimulai dan kemudian menulis nilai tersebut kembali ke buffer setelah operasi selesai. Ketika buffer kosong, agen menunggu salah satu operasi pengunduhan untuk menulis nilai kembali ke buffer.

Contoh berikut membatasi jumlah tugas simultan hingga dua kali jumlah utas perangkat keras yang tersedia. Nilai ini adalah titik awal yang baik untuk digunakan saat Anda bereksperimen dengan kelebihan langganan. Anda dapat menggunakan nilai yang sesuai dengan lingkungan pemrosesan tertentu atau mengubah nilai ini secara dinamis untuk merespons beban kerja aktual.

// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>

using namespace concurrency;
using namespace std;

// Calls the provided work function and returns the number of milliseconds 
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
   __int64 begin = GetTickCount();
   f();
   return GetTickCount() - begin;
}

// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);

// Reads files from HTTP servers.
class http_reader : public agent
{
public:
   explicit http_reader(CInternetSession& session,      
      ISource<string>& source,
      unsigned int& total_bytes,
      unsigned int max_concurrent_reads)
      : _session(session)
      , _source(source)
      , _total_bytes(total_bytes)
   {
      // Add one token to the available tasks buffer for each 
      // possible concurrent read operation. The value of each token 
      // is not important, but can be useful for debugging.
      for (unsigned int i = 0; i < max_concurrent_reads; ++i)
         send(_available_tasks, i);
   }

   // Signals to the agent that there are no more items to download.
   static const string input_sentinel;
 
protected:
   void run()
   {
      // A task group. Each task in the group downloads one file.
      task_group tasks;

      // Holds the total number of bytes downloaded.
      combinable<unsigned int> total_bytes;

      // Read from the source buffer until the application 
      // sends the sentinel value.
      string url;
      while ((url = receive(_source)) != input_sentinel)
      {
         // Wait for a task to release an available slot.
         unsigned int token = receive(_available_tasks);

         // Create a task to download the file.
         tasks.run([&, token, url] {

            // Print a message.
            wstringstream ss;
            ss << L"Downloading " << url.c_str() << L"..." << endl;
            wcout << ss.str();

            // Download the file.
            string content = download(url);

            // Update the total number of bytes downloaded.
            total_bytes.local() += content.size();

            // Release the slot for another task.
            send(_available_tasks, token);
         });
      }

      // Wait for all tasks to finish.
      tasks.wait();
      
      // Compute the total number of bytes download on all threads.
      _total_bytes = total_bytes.combine(plus<unsigned int>());

      // Set the status of the agent to agent_done.
      done();
   }

   // Downloads the file at the given URL.
   string download(const string& url)
   {
      // Enable oversubscription.
      Context::Oversubscribe(true);

      // Download the file.
      string content = GetHttpFile(_session, url.c_str());
      
      // Disable oversubscription.
      Context::Oversubscribe(false);

      return content;
   }

private:
   // Manages the network connection.
   CInternetSession& _session;
   // A message buffer that holds the URL names to download.
   ISource<string>& _source;
   // The total number of bytes downloaded
   unsigned int& _total_bytes;
   // Limits the agent to a given number of simultaneous tasks.
   unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");

int wmain()
{
   // Create an array of URL names to download.
   // A real-world application might read the names from user input.
   array<string, 21> urls = {
      "http://www.adatum.com/",
      "http://www.adventure-works.com/", 
      "http://www.alpineskihouse.com/",
      "http://www.cpandl.com/", 
      "http://www.cohovineyard.com/",
      "http://www.cohowinery.com/",
      "http://www.cohovineyardandwinery.com/", 
      "http://www.contoso.com/",
      "http://www.consolidatedmessenger.com/",
      "http://www.fabrikam.com/", 
      "http://www.fourthcoffee.com/",
      "http://www.graphicdesigninstitute.com/",
      "http://www.humongousinsurance.com/",
      "http://www.litwareinc.com/",
      "http://www.lucernepublishing.com/",
      "http://www.margiestravel.com/",
      "http://www.northwindtraders.com/",
      "http://www.proseware.com/", 
      "http://www.fineartschool.net",
      "http://www.tailspintoys.com/",
      http_reader::input_sentinel,
   };
      
   // Manages the network connection.
   CInternetSession session("Microsoft Internet Browser");

   // A message buffer that enables the application to send URL names to the 
   // agent.
   unbounded_buffer<string> source_urls;

   // The total number of bytes that the agent has downloaded.
   unsigned int total_bytes = 0u;

   // Create an http_reader object that can oversubscribe each processor by one.
   http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());

   // Compute the amount of time that it takes for the agent to download all files.
   __int64 elapsed = time_call([&] {
      
      // Start the agent.
      reader.start();
      
      // Use the message buffer to send each URL name to the agent.
      for_each(begin(urls), end(urls), [&](const string& url) {
         send(source_urls, url);
      });

      // Wait for the agent to finish downloading.
      agent::wait(&reader);      
   });

   // Print the results.
   wcout << L"Downloaded " << total_bytes
         << L" bytes in " << elapsed << " ms." << endl;
}

// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
   CString strResult;

   // Reads data from an HTTP server.
   CHttpFile* pHttpFile = NULL;

   try
   {
      // Open URL.
      pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1, 
         INTERNET_FLAG_TRANSFER_ASCII | 
         INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);

      // Read the file.
      if(pHttpFile != NULL)
      {           
         UINT uiBytesRead;
         do
         {
            char chBuffer[10000];
            uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
            strResult += chBuffer;
         }
         while (uiBytesRead > 0);
      }
    }
   catch (CInternetException)
   {
      // TODO: Handle exception
   }

   // Clean up and return.
   delete pHttpFile;

   return strResult;
}

Contoh ini menghasilkan output berikut pada komputer yang memiliki empat prosesor:

Downloading http://www.adatum.com/...
Downloading http://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading http://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading http://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading http://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.

Contoh dapat berjalan lebih cepat ketika oversubscription diaktifkan karena tugas tambahan berjalan sementara tugas lain menunggu operasi laten selesai.

Mengompilasi Kode

Salin kode contoh dan tempelkan dalam proyek Visual Studio, atau tempelkan dalam file yang diberi nama download-oversubscription.cpp lalu jalankan salah satu perintah berikut di jendela Prompt Perintah Visual Studio.

cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp

Pemrograman yang Kuat

Selalu nonaktifkan oversubscription setelah Anda tidak lagi memerlukannya. Pertimbangkan fungsi yang tidak menangani pengecualian yang dilemparkan oleh fungsi lain. Jika Anda tidak menonaktifkan oversubscription sebelum fungsi kembali, pekerjaan paralel tambahan juga akan menggantikan konteks saat ini.

Anda dapat menggunakan pola Resource Acquisition Is Initialization (RAII) untuk membatasi oversubscription ke cakupan tertentu. Di bawah pola RAII, struktur data dialokasikan pada tumpukan. Struktur data tersebut menginisialisasi atau memperoleh sumber daya saat dibuat dan menghancurkan atau merilis sumber daya tersebut saat struktur data dihancurkan. Pola RAII menjamin bahwa destruktor dipanggil sebelum cakupan penutup keluar. Oleh karena itu, sumber daya dikelola dengan benar ketika pengecualian dilemparkan atau ketika fungsi berisi beberapa return pernyataan.

Contoh berikut mendefinisikan struktur yang diberi nama scoped_blocking_signal. Konstruktor scoped_blocking_signal struktur memungkinkan oversubscription dan destruktor menonaktifkan oversubscription.

struct scoped_blocking_signal
{
    scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(true);
    }
    ~scoped_blocking_signal()
    {
        concurrency::Context::Oversubscribe(false);
    }
};

Contoh berikut memodifikasi isi download metode untuk menggunakan RAII untuk memastikan bahwa oversubscription dinonaktifkan sebelum fungsi kembali. Teknik ini memastikan bahwa download metode ini aman pengecualian.

// Downloads the file at the given URL.
string download(const string& url)
{
   scoped_blocking_signal signal;

   // Download the file.
   return string(GetHttpFile(_session, url.c_str()));
}

Lihat juga

Konteks
Konteks::Metode Oversubscribe