如何:使用上下文类实现协作信号量

本主题演示如何使用 concurrency::Context 类实现协作信号量类。

利用 Context 类,可以阻止或退出当前执行上下文。 在当前上下文由于资源不可用而无法继续时,便会用到阻止或退出功能。 信号量是当前执行上下文必须等待资源变得可用的一种情况的示例。 信号量与临界区对象相似,它是允许某个上下文中的代码以独占方式访问资源的同步对象。 然而,与临界区对象不同的是,信号量允许多个上下文同时访问资源。 如果最大数目的上下文持有信号量锁,则每个附加的上下文必须等待另一个上下文释放该锁。

实现信号量类

  1. 声明一个名为 semaphore 的类。 向此类添加 public 和 private 部分。

    // A semaphore type that uses cooperative blocking semantics. 
    class semaphore
    {
    public:
    private:
    };
    
  2. semaphore 类的 private 部分,声明 std::atomic类型的变量(用于保存信号量计数)以及 concurrency::concurrent_queue 对象(用于保存必须等待获得该信号量的上下文)。

    // The semaphore count.
    atomic<long long> _semaphore_count;
    
    // A concurrency-safe queue of contexts that must wait to  
    // acquire the semaphore.
    concurrent_queue<Context*> _waiting_contexts;
    
  3. semaphore 类的 public 节中,实现构造函数。 构造函数采用 long long 值指定可以同时持有锁的上下文的最大数目。

    explicit semaphore(long long capacity)
       : _semaphore_count(capacity)
    {
    }
    
  4. semaphore 类的 public 节中,实现 acquire 方法。 此方法以原子操作形式递减信号量计数。 如果信号量计数变为负数,请向等待队列末尾添加当前上下文并调用 concurrency::Context::Block 方法,以阻止当前上下文。

    // Acquires access to the semaphore. 
    void acquire()
    {
       // The capacity of the semaphore is exceeded when the semaphore count  
       // falls below zero. When this happens, add the current context to the  
       // back of the wait queue and block the current context. 
       if (--_semaphore_count < 0)
       {
          _waiting_contexts.push(Context::CurrentContext());
          Context::Block();
       }
    }
    
  5. semaphore 类的 public 部分,实现 release 方法。 此方法以原子操作形式递增信号量计数。 如果在递增操作之前信号量计数为负数,则至少有一个上下文在等待锁。 在这种情况下,取消阻止在等待队列前面的上下文。

    // Releases access to the semaphore. 
    void release()
    {
       // If the semaphore count is negative, unblock the first waiting context. 
       if (++_semaphore_count <= 0)
       {
          // A call to acquire might have decremented the counter, but has not 
          // yet finished adding the context to the queue.  
          // Create a spin loop that waits for the context to become available.
          Context* waiting = NULL;
          while (!_waiting_contexts.try_pop(waiting))
          {
             Context::Yield();
          }
    
          // Unblock the context.
          waiting->Unblock();
       }
    }
    

示例

此示例中的 semaphore 类以协作方式工作,因为 Context::BlockContext::Yield 方法将退出执行操作,以便运行时可执行其他任务。

acquire 方法将递减计数器,但它可能未添加完在另一上下文调用 release 方法之前要等待队列的上下文。 为了对此做出解释,release 方法使用旋转循环调用 concurrency::Context::Yield 方法,以等待 acquire 方法添加完上下文。

acquire 方法调用 Context::Block 方法之前,release 方法可以调用 Context::Unblock 方法。 您不必防范此争用条件,因为运行时允许以任何顺序调用这些方法。 对于相同上下文,如果在 acquire 方法调用 Context::Block 之前,release 方法调用 Context::Unblock,则该上下文将保持取消阻止状态。 运行时只要求每个对 Context::Block 的调用都与对应的对 Context::Unblock 的调用匹配。

下面的示例演示了完整的 semaphore 类。 wmain 函数显示了此类的基本用法。 wmain 函数使用 concurrency::parallel_for 算法创建一些需要访问该信号量的任务。 由于三个线程可以在任何时间持有锁,所以某些任务必须等待另一个任务完成并释放该锁。

// cooperative-semaphore.cpp 
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics. 
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore. 
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count  
      // falls below zero. When this happens, add the current context to the  
      // back of the wait queue and block the current context. 
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore. 
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context. 
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not 
         // yet finished adding the context to the queue.  
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to  
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

int wmain()
{
   // Create a semaphore that allows at most three threads to  
   // hold the lock.
   semaphore s(3);

   parallel_for(0, 10, [&](int i) {
      // Acquire the lock.
      s.acquire();

      // Print a message to the console.
      wstringstream ss;
      ss << L"In loop iteration " << i << L"..." << endl;
      wcout << ss.str();

      // Simulate work by waiting for two seconds.
      wait(2000);

      // Release the lock.
      s.release();
   });
}

此示例产生下面的示例输出。

  

有关 concurrent_queue 类的更多信息,请参见并行容器和对象。 有关 parallel_for 算法的更多信息,请参见并行算法

编译代码

复制代码示例,并将此代码粘贴到Visual Studio项目中或一个名为cooperative-semaphore.cpp的文件中,然后在Visual Studio命令提示符窗口中运行以下命令。

cl.exe /EHsc cooperative-semaphore.cpp

可靠编程

可以使用“资源获取为初始化”(RAII) 模式,将对 semaphore 对象的访问局限于某个给定的范围。 在 RAII 模式下,将在堆栈上分配一个数据结构。 该数据结构在创建时将会初始化或获取一个资源,而且该数据结构在销毁时将会销毁或释放该资源。 RAII 模式可确保在封闭范围退出之前调用析构函数。 因此,在引发异常时或在函数包含多个 return 语句时,将可以正确地管理资源。

下面的示例定义了一个名为 scoped_lock 的类,这在 semaphore 类的 public 节中已定义。 scoped_lock 类类似于 concurrency::critical_section::scoped_lockconcurrency::reader_writer_lock::scoped_lock 类。 semaphore::scoped_lock 类的构造函数将获取对给定的 semaphore 对象的访问权;析构函数将释放对该对象的访问权。

// An exception-safe RAII wrapper for the semaphore class. 
class scoped_lock
{
public:
   // Acquires access to the semaphore.
   scoped_lock(semaphore& s)
      : _s(s)
   {
      _s.acquire();
   }
   // Releases access to the semaphore.
   ~scoped_lock()
   {
      _s.release();
   }

private:
   semaphore& _s;
};

下面的示例修改了传递给 parallel_for 算法的工作函数的主体,以便使用 RAII 来确保在该函数返回之前释放信号量。 这项技术可确保工作函数是异常安全的。

parallel_for(0, 10, [&](int i) {
   // Create an exception-safe scoped_lock object that holds the lock  
   // for the duration of the current scope.
   semaphore::scoped_lock auto_lock(s);

   // Print a message to the console.
   wstringstream ss;
   ss << L"In loop iteration " << i << L"..." << endl;
   wcout << ss.str();

   // Simulate work by waiting for two seconds.
   wait(2000);
});

请参见

参考

Context 类

概念

上下文

并行容器和对象