Como usar a classe de contexto para implementar um semáforo cooperativo
Este tópico mostra como usar a classe de concurrency::Context para implementar uma classe cooperativa de semáforo.
A classe de Context permite bloqueio ou gerar o contexto de execução atual. Bloquear ou gerar o contexto atual é útil quando o contexto atual não pode continuar porque um recurso não está disponível. Um sinal é um exemplo de uma situação em que o contexto de execução atual deve esperar para que um recurso se torne disponível. Um sinal, como um objeto da seção crítica, é um objeto de sincronização que permite que o código em um contexto para ter acesso exclusivo a um recurso. No entanto, ao contrário de um objeto da seção crítica, um semáforo permite que mais de um contexto para acessar o recurso simultaneamente. Se o número máximo de contextos mantém um bloqueio de sinal, cada um contexto adicional deve esperar por outro contexto para liberar o bloqueio.
Para implementar a classe semaphore
Declare uma classe chamada semaphore. Adicionar public e seções de private a essa classe.
// A semaphore type that uses cooperative blocking semantics. class semaphore { public: private: };
Na seção de private da classe de semaphore , declare uma variável de std::atomic que contém a contagem de semáforo e um objeto de concurrency::concurrent_queue que contém os contextos que devem aguardar para adquirir o semáforo.
// The semaphore count. atomic<long long> _semaphore_count; // A concurrency-safe queue of contexts that must wait to // acquire the semaphore. concurrent_queue<Context*> _waiting_contexts;
Na seção de public da classe de semaphore , implemente o construtor. O construtor usa um valor de long long que especifica o número máximo de contextos que podem simultaneamente mantendo o bloqueio.
explicit semaphore(long long capacity) : _semaphore_count(capacity) { }
Na seção de public da classe de semaphore , implemente o método de acquire . Esse método diminui a contagem de semáforo como uma operação atômica. Se a contagem de semáforo se torna negativa, adicione o contexto atual ao final da fila de espera e chamar o método de concurrency::Context::Block para bloquear o contexto atual.
// Acquires access to the semaphore. void acquire() { // The capacity of the semaphore is exceeded when the semaphore count // falls below zero. When this happens, add the current context to the // back of the wait queue and block the current context. if (--_semaphore_count < 0) { _waiting_contexts.push(Context::CurrentContext()); Context::Block(); } }
Na seção de public da classe de semaphore , implemente o método de release . Esse método incrementa a contagem de semáforo como uma operação atômica. Se a contagem do sinal é negativa antes que a operação de incremento, ela seja pelo menos um contexto que está aguardando o bloqueio. Nesse caso, desbloqueie o contexto que estiver à frente da fila de espera.
// Releases access to the semaphore. void release() { // If the semaphore count is negative, unblock the first waiting context. if (++_semaphore_count <= 0) { // A call to acquire might have decremented the counter, but has not // yet finished adding the context to the queue. // Create a spin loop that waits for the context to become available. Context* waiting = NULL; while (!_waiting_contexts.try_pop(waiting)) { Context::Yield(); } // Unblock the context. waiting->Unblock(); } }
Exemplo
A classe de semaphore neste exemplo cooperativa se comporta como os métodos de Context::Block e de Context::Yield gerenciem a execução de modo que o tempo de execução possa executar outras tarefas.
O método de acquire diminui o contador, mas não pode terminar adicionar o contexto para a fila de espera antes que o outro contexto chame o método de release . Para considerar isso, o método de release usar um loop de rotação que chama o método de concurrency::Context::Yield para aguardar o método de acquire para concluir a adição de contexto.
O método de release pode chamar o método de Context::Unblock antes do método de acquire chame o método de Context::Block . Não é necessário proteger contra essa condição de corrida porque o tempo de execução permite esses métodos ser chamado em qualquer ordem. Se o método de release chama Context::Unblock antes de chamar o método de acquireContext::Block para o mesmo contexto, ela permanece desse contexto desbloqueadas. O tempo de execução só requer que cada chamada para Context::Block tem correspondência com uma chamada correspondente a Context::Unblock.
O exemplo a seguir mostra a classe completo de semaphore . A função de wmain mostra o uso básico dessa classe. A função de wmain usa o algoritmo de concurrency::parallel_for para criar várias tarefas que exigem acesso ao semáforo. Como três threads podem manter o bloqueio a qualquer momento, algumas tarefas devem esperar uma outra tarefa complete e liberar o bloqueio.
// cooperative-semaphore.cpp
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
int wmain()
{
// Create a semaphore that allows at most three threads to
// hold the lock.
semaphore s(3);
parallel_for(0, 10, [&](int i) {
// Acquire the lock.
s.acquire();
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
// Release the lock.
s.release();
});
}
Esse exemplo gera a seguinte saída de amostra.
Para obter mais informações sobre a classe concurrent_queue, consulte Contêineres e objetos em paralelo. Para obter mais informações sobre o algoritmo de parallel_for , consulte Algoritmos paralelos.
Compilando o código
Copie o código de exemplo e cole-o em um projeto do Visual Studio, ou cole-o em um arquivo chamado cooperative-semaphore.cpp e execute o comando a seguir em uma janela de prompt de comando do Visual Studio.
cl.exe /EHsc cooperative-semaphore.cpp
Programação robusta
Você pode usar o padrão Aquisição de recurso é inicialização (RAII) para limitar o acesso a um objeto semaphore a um escopo fornecido. No padrão de RAII, uma estrutura de dados é atribuída na pilha. Se a estrutura de dados inicializa ou adquire um recurso quando é criada e destrói o ou versões esse recurso quando a estrutura de dados é destruída. O padrão de RAII garante que o destruidor é chamado antes que o escopo inclusive encerrado. Em virtude disso, o recurso está gerenciadas corretamente quando uma exceção é lançada quando ou uma função contém várias instruções de return .
O exemplo a seguir define uma classe chamada scoped_lock, que é definida na seção de public da classe de semaphore . A classe de scoped_lock se assemelha às classes de concurrency::critical_section::scoped_lock e de concurrency::reader_writer_lock::scoped_lock . O construtor de classe de semaphore::scoped_lock adquire o acesso ao objeto determinado de semaphore e às versões de destruidor acesso a esse objeto.
// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
// Acquires access to the semaphore.
scoped_lock(semaphore& s)
: _s(s)
{
_s.acquire();
}
// Releases access to the semaphore.
~scoped_lock()
{
_s.release();
}
private:
semaphore& _s;
};
O exemplo a seguir altera o corpo da função de trabalho que é passada para o algoritmo de parallel_for de modo que usa RAII para garantir que o sinal seja liberado antes que a função retorna. Essa técnica garante que a função exceções gerais de trabalho é seguro.
parallel_for(0, 10, [&](int i) {
// Create an exception-safe scoped_lock object that holds the lock
// for the duration of the current scope.
semaphore::scoped_lock auto_lock(s);
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
});