Como: Use o excesso de assinatura para deslocamento latência
Excesso de assinaturas pode melhorar a eficiência geral de alguns aplicativos que contêm tarefas que têm uma grande quantidade de latência. Este tópico ilustra como usar o excesso de assinatura para deslocar a latência causada por leitura de dados de uma conexão de rede.
Exemplo
Este exemplo usa a Biblioteca de agentes assíncronos para fazer o download de arquivos de todos os servidores HTTP. O http_reader classe deriva de Concurrency::agent e passar assincronamente ler quais nomes de URL para fazer o download de mensagens usa.
O http_reader classe usa a Concurrency::task_group classe ler simultaneamente a cada arquivo. Cada tarefa chama o Concurrency::Context::Oversubscribe método com o _BeginOversubscription parâmetro definido como true para habilitar o excesso de assinaturas no contexto atual. Cada tarefa, em seguida, usa o Microsoft Foundation Classes (MFC) CInternetSession e CHttpFile classes para baixar o arquivo. Finalmente, a cada tarefa chama Context::Oversubscribe com o _BeginOversubscription parâmetro definido como false para desativar o excesso de assinatura.
Quando o excesso de assinatura está habilitado, o runtime cria um thread de adicional para execução de tarefas. Cada um desses segmentos pode também receber assinaturas demais o contexto atual e, assim, criar threads adicionais. O http_reader classe usa uma Concurrency::unbounded_buffer o objeto para limitar o número de segmentos que o aplicativo usa. O agente inicializa o buffer com um número fixo de valores de token. Para cada operação de download, o agente lê o valor do token do buffer antes que a operação é iniciado e, em seguida, grava esse valor de volta para o buffer após a conclusão da operação. Quando o buffer está vazio, o agente espera uma das operações de download para gravar um valor no buffer.
O exemplo a seguir limita o número de tarefas simultâneas para duas vezes o número de segmentos de hardware disponíveis. Esse valor é um bom ponto de partida para usar quando você experimentar o excesso de assinatura. Você pode usar um valor que atenda a um ambiente de processamento particular ou altera dinamicamente esse valor para responder a carga de trabalho real.
// download-oversubscription.cpp
// compile with: /EHsc /MD /D "_AFXDLL"
#define _WIN32_WINNT 0x0501
#include <afxinet.h>
#include <concrtrm.h>
#include <agents.h>
#include <ppl.h>
#include <sstream>
#include <iostream>
#include <array>
using namespace Concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// Downloads the file at the given URL.
CString GetHttpFile(CInternetSession& session, const CString& strUrl);
// Reads files from HTTP servers.
class http_reader : public agent
{
public:
explicit http_reader(CInternetSession& session,
ISource<string>& source,
unsigned int& total_bytes,
unsigned int max_concurrent_reads)
: _session(session)
, _source(source)
, _total_bytes(total_bytes)
{
// Add one token to the available tasks buffer for each
// possible concurrent read operation. The value of each token
// is not important, but can be useful for debugging.
for (unsigned int i = 0; i < max_concurrent_reads; ++i)
send(_available_tasks, i);
}
// Signals to the agent that there are no more items to download.
static const string input_sentinel;
protected:
void run()
{
// A task group. Each task in the group downloads one file.
task_group tasks;
// Holds the total number of bytes downloaded.
combinable<unsigned int> total_bytes;
// Read from the source buffer until the application
// sends the sentinel value.
string url;
while ((url = receive(_source)) != input_sentinel)
{
// Wait for a task to release an available slot.
unsigned int token = receive(_available_tasks);
// Create a task to download the file.
tasks.run([&, token, url] {
// Print a message.
wstringstream ss;
ss << L"Downloading " << url.c_str() << L"..." << endl;
wcout << ss.str();
// Download the file.
string content = download(url);
// Update the total number of bytes downloaded.
total_bytes.local() += content.size();
// Release the slot for another task.
send(_available_tasks, token);
});
}
// Wait for all tasks to finish.
tasks.wait();
// Compute the total number of bytes download on all threads.
_total_bytes = total_bytes.combine(plus<unsigned int>());
// Set the status of the agent to agent_done.
done();
}
// Downloads the file at the given URL.
string download(const string& url)
{
// Enable oversubscription.
Context::Oversubscribe(true);
// Download the file.
string content = GetHttpFile(_session, url.c_str());
// Disable oversubscription.
Context::Oversubscribe(false);
return content;
}
private:
// Manages the network connection.
CInternetSession& _session;
// A message buffer that holds the URL names to download.
ISource<string>& _source;
// The total number of bytes downloaded
unsigned int& _total_bytes;
// Limits the agent to a given number of simultaneous tasks.
unbounded_buffer<unsigned int> _available_tasks;
};
const string http_reader::input_sentinel("");
int wmain()
{
// Create an array of URL names to download.
// A real-world application might read the names from user input.
array<string, 21> urls = {
"http://www.adatum.com/",
"https://www.adventure-works.com/",
"http://www.alpineskihouse.com/",
"http://www.cpandl.com/",
"http://www.cohovineyard.com/",
"http://www.cohowinery.com/",
"http://www.cohovineyardandwinery.com/",
"https://www.contoso.com/",
"http://www.consolidatedmessenger.com/",
"http://www.fabrikam.com/",
"https://www.fourthcoffee.com/",
"http://www.graphicdesigninstitute.com/",
"http://www.humongousinsurance.com/",
"http://www.litwareinc.com/",
"http://www.lucernepublishing.com/",
"http://www.margiestravel.com/",
"http://www.northwindtraders.com/",
"https://www.proseware.com/",
"http://www.fineartschool.net",
"http://www.tailspintoys.com/",
http_reader::input_sentinel,
};
// Manages the network connection.
CInternetSession session("Microsoft Internet Browser");
// A message buffer that enables the application to send URL names to the
// agent.
unbounded_buffer<string> source_urls;
// The total number of bytes that the agent has downloaded.
unsigned int total_bytes = 0u;
// Create an http_reader object that can oversubscribe each processor by one.
http_reader reader(session, source_urls, total_bytes, 2*GetProcessorCount());
// Compute the amount of time that it takes for the agent to download all files.
__int64 elapsed = time_call([&] {
// Start the agent.
reader.start();
// Use the message buffer to send each URL name to the agent.
for_each(urls.begin(), urls.end(), [&](const string& url) {
send(source_urls, url);
});
// Wait for the agent to finish downloading.
agent::wait(&reader);
});
// Print the results.
wcout << L"Downloaded " << total_bytes
<< L" bytes in " << elapsed << " ms." << endl;
}
// Downloads the file at the given URL and returns the size of that file.
CString GetHttpFile(CInternetSession& session, const CString& strUrl)
{
CString strResult;
// Reads data from an HTTP server.
CHttpFile* pHttpFile = NULL;
try
{
// Open URL.
pHttpFile = (CHttpFile*)session.OpenURL(strUrl, 1,
INTERNET_FLAG_TRANSFER_ASCII |
INTERNET_FLAG_RELOAD | INTERNET_FLAG_DONT_CACHE);
// Read the file.
if(pHttpFile != NULL)
{
UINT uiBytesRead;
do
{
char chBuffer[10000];
uiBytesRead = pHttpFile->Read(chBuffer, sizeof(chBuffer));
strResult += chBuffer;
}
while (uiBytesRead > 0);
}
}
catch (CInternetException)
{
// TODO: Handle exception
}
// Clean up and return.
delete pHttpFile;
return strResult;
}
Este exemplo produz a saída a seguir em um computador que possui quatro processadores:
Downloading http://www.adatum.com/...
Downloading https://www.adventure-works.com/...
Downloading http://www.alpineskihouse.com/...
Downloading http://www.cpandl.com/...
Downloading http://www.cohovineyard.com/...
Downloading http://www.cohowinery.com/...
Downloading http://www.cohovineyardandwinery.com/...
Downloading https://www.contoso.com/...
Downloading http://www.consolidatedmessenger.com/...
Downloading http://www.fabrikam.com/...
Downloading https://www.fourthcoffee.com/...
Downloading http://www.graphicdesigninstitute.com/...
Downloading http://www.humongousinsurance.com/...
Downloading http://www.litwareinc.com/...
Downloading http://www.lucernepublishing.com/...
Downloading http://www.margiestravel.com/...
Downloading http://www.northwindtraders.com/...
Downloading https://www.proseware.com/...
Downloading http://www.fineartschool.net...
Downloading http://www.tailspintoys.com/...
Downloaded 1801040 bytes in 3276 ms.
O exemplo pode executar mais rapidamente quando o excesso de assinatura está habilitado como tarefas adicionais executadas enquanto espera de outras tarefas para uma operação latente de término.
Compilando o código
Copie o código de exemplo e colá-lo em um Visual Studio do projeto, ou colá-lo em um arquivo que é chamado download oversubscription.cpp e, em seguida, execute um dos seguintes comandos em um Visual Studio 2010 janela do Prompt de comando.
cl.exe /EHsc /MD /D "_AFXDLL" download-oversubscription.cpp
cl.exe /EHsc /MT download-oversubscription.cpp
Programação robusta
Sempre desative o excesso de assinatura depois que você não precisa. Considere uma função que não lidam com uma exceção que é lançada por outra função. Se você não desabilitar o excesso de assinatura antes que a função retorna, qualquer trabalho paralelo adicional será também receber assinaturas demais o contexto atual.
Você pode usar o É inicialização de aquisição de recursos padrão (RAII) para limitar o excesso de assinatura para um determinado escopo. Em padrão de RAII, uma estrutura de dados é alocada na pilha. Essa estrutura de dados inicializa ou adquire um recurso quando ele é criado e destrói ou libera esse recurso quando a estrutura de dados é destruída. O padrão RAII garante que o destruidor é chamado antes que sai do escopo de fechamento. Portanto, o recurso é gerenciado corretamente quando uma exceção é lançada ou uma função contém várias return instruções.
O exemplo a seguir define uma estrutura que é denominada scoped_blocking_signal. O construtor da scoped_blocking_signal estrutura permite que o excesso de assinatura e o destruidor desativa o excesso de assinatura.
struct scoped_blocking_signal
{
scoped_blocking_signal()
{
Concurrency::Context::Oversubscribe(true);
}
~scoped_blocking_signal()
{
Concurrency::Context::Oversubscribe(false);
}
};
O exemplo seguinte modifica o corpo da download método para usar RAII para garantir que excesso de assinatura está desativado antes da função retorna. Esta técnica garante que o download método é a exceção-safe.
// Downloads the file at the given URL.
string download(const string& url)
{
scoped_blocking_signal signal;
// Download the file.
return string(GetHttpFile(_session, url.c_str()));
}
Consulte também
Referência
Método de Context::Oversubscribe