如何:使用上下文类实现协作信号量
本主题演示如何使用 concurrency::Context 类实现协作信号量类。
利用 Context 类,可以阻止或退出当前执行上下文。 在当前上下文由于资源不可用而无法继续时,便会用到阻止或退出功能。 信号量是当前执行上下文必须等待资源变得可用的一种情况的示例。 信号量与临界区对象相似,它是允许某个上下文中的代码以独占方式访问资源的同步对象。 然而,与临界区对象不同的是,信号量允许多个上下文同时访问资源。 如果最大数目的上下文持有信号量锁,则每个附加的上下文必须等待另一个上下文释放该锁。
实现信号量类
声明一个名为 semaphore 的类。 向此类添加 public 和 private 部分。
// A semaphore type that uses cooperative blocking semantics. class semaphore { public: private: };
在 semaphore 类的 private 部分,声明 std::atomic类型的变量(用于保存信号量计数)以及 concurrency::concurrent_queue 对象(用于保存必须等待获得该信号量的上下文)。
// The semaphore count. atomic<long long> _semaphore_count; // A concurrency-safe queue of contexts that must wait to // acquire the semaphore. concurrent_queue<Context*> _waiting_contexts;
在 semaphore 类的 public 节中,实现构造函数。 构造函数采用 long long 值指定可以同时持有锁的上下文的最大数目。
explicit semaphore(long long capacity) : _semaphore_count(capacity) { }
在 semaphore 类的 public 节中,实现 acquire 方法。 此方法以原子操作形式递减信号量计数。 如果信号量计数变为负数,请向等待队列末尾添加当前上下文并调用 concurrency::Context::Block 方法,以阻止当前上下文。
// Acquires access to the semaphore. void acquire() { // The capacity of the semaphore is exceeded when the semaphore count // falls below zero. When this happens, add the current context to the // back of the wait queue and block the current context. if (--_semaphore_count < 0) { _waiting_contexts.push(Context::CurrentContext()); Context::Block(); } }
在 semaphore 类的 public 部分,实现 release 方法。 此方法以原子操作形式递增信号量计数。 如果在递增操作之前信号量计数为负数,则至少有一个上下文在等待锁。 在这种情况下,取消阻止在等待队列前面的上下文。
// Releases access to the semaphore. void release() { // If the semaphore count is negative, unblock the first waiting context. if (++_semaphore_count <= 0) { // A call to acquire might have decremented the counter, but has not // yet finished adding the context to the queue. // Create a spin loop that waits for the context to become available. Context* waiting = NULL; while (!_waiting_contexts.try_pop(waiting)) { Context::Yield(); } // Unblock the context. waiting->Unblock(); } }
示例
此示例中的 semaphore 类以协作方式工作,因为 Context::Block 和 Context::Yield 方法将退出执行操作,以便运行时可执行其他任务。
acquire 方法将递减计数器,但它可能未添加完在另一上下文调用 release 方法之前要等待队列的上下文。 为了对此做出解释,release 方法使用旋转循环调用 concurrency::Context::Yield 方法,以等待 acquire 方法添加完上下文。
在 acquire 方法调用 Context::Block 方法之前,release 方法可以调用 Context::Unblock 方法。 您不必防范此争用条件,因为运行时允许以任何顺序调用这些方法。 对于相同上下文,如果在 acquire 方法调用 Context::Block 之前,release 方法调用 Context::Unblock,则该上下文将保持取消阻止状态。 运行时只要求每个对 Context::Block 的调用都与对应的对 Context::Unblock 的调用匹配。
下面的示例演示了完整的 semaphore 类。 wmain 函数显示了此类的基本用法。 wmain 函数使用 concurrency::parallel_for 算法创建一些需要访问该信号量的任务。 由于三个线程可以在任何时间持有锁,所以某些任务必须等待另一个任务完成并释放该锁。
// cooperative-semaphore.cpp
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
int wmain()
{
// Create a semaphore that allows at most three threads to
// hold the lock.
semaphore s(3);
parallel_for(0, 10, [&](int i) {
// Acquire the lock.
s.acquire();
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
// Release the lock.
s.release();
});
}
此示例产生下面的示例输出。
有关 concurrent_queue 类的更多信息,请参见并行容器和对象。 有关 parallel_for 算法的更多信息,请参见并行算法。
编译代码
复制代码示例,并将此代码粘贴到Visual Studio项目中或一个名为cooperative-semaphore.cpp的文件中,然后在Visual Studio命令提示符窗口中运行以下命令。
cl.exe /EHsc cooperative-semaphore.cpp
可靠编程
可以使用“资源获取为初始化”(RAII) 模式,将对 semaphore 对象的访问局限于某个给定的范围。 在 RAII 模式下,将在堆栈上分配一个数据结构。 该数据结构在创建时将会初始化或获取一个资源,而且该数据结构在销毁时将会销毁或释放该资源。 RAII 模式可确保在封闭范围退出之前调用析构函数。 因此,在引发异常时或在函数包含多个 return 语句时,将可以正确地管理资源。
下面的示例定义了一个名为 scoped_lock 的类,这在 semaphore 类的 public 节中已定义。 scoped_lock 类类似于 concurrency::critical_section::scoped_lock 和 concurrency::reader_writer_lock::scoped_lock 类。 semaphore::scoped_lock 类的构造函数将获取对给定的 semaphore 对象的访问权;析构函数将释放对该对象的访问权。
// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
// Acquires access to the semaphore.
scoped_lock(semaphore& s)
: _s(s)
{
_s.acquire();
}
// Releases access to the semaphore.
~scoped_lock()
{
_s.release();
}
private:
semaphore& _s;
};
下面的示例修改了传递给 parallel_for 算法的工作函数的主体,以便使用 RAII 来确保在该函数返回之前释放信号量。 这项技术可确保工作函数是异常安全的。
parallel_for(0, 10, [&](int i) {
// Create an exception-safe scoped_lock object that holds the lock
// for the duration of the current scope.
semaphore::scoped_lock auto_lock(s);
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
});