如何:使用上下文类实现协作信号量

本主题演示如何使用 concurrency::Context 选件类实现协作信号量选件类。

利用 Context 类,可以阻止或退出当前执行上下文。在当前上下文由于资源不可用而无法继续时,便会用到阻止或退出功能。信号量是当前执行上下文必须等待资源变得可用的一种情况的示例。信号量与临界区对象相似,它是允许某个上下文中的代码以独占方式访问资源的同步对象。然而,与临界区对象不同的是,信号量允许多个上下文同时访问资源。如果最大数目的上下文持有信号量锁,则每个附加的上下文必须等待另一个上下文释放该锁。

实现信号量类

  1. 声明一个名为 semaphore 的类。向此类添加 public 和 private 部分。

    // A semaphore type that uses cooperative blocking semantics.
    class semaphore
    {
    public:
    private:
    };
    
  2. semaphore 选件类的 private 部分,声明持有信号量计数和一 concurrency::concurrent_queue 对象保持上下文必须等待获取信号量的 std::atomic 变量。

    // The semaphore count.
    atomic<long long> _semaphore_count;
    
    // A concurrency-safe queue of contexts that must wait to 
    // acquire the semaphore.
    concurrent_queue<Context*> _waiting_contexts;
    
  3. semaphore 类的 public 节中,实现构造函数。构造函数采用 long long 值指定可以同时持有锁的上下文的最大数目。

    explicit semaphore(long long capacity)
       : _semaphore_count(capacity)
    {
    }
    
  4. semaphore 类的 public 节中,实现 acquire 方法。此方法以原子操作形式递减信号量计数。如果信号量计数变为负数,添加当前上下文设置为等待队列末尾并调用 concurrency::Context::Block 方法阻止当前上下文。

    // Acquires access to the semaphore.
    void acquire()
    {
       // The capacity of the semaphore is exceeded when the semaphore count 
       // falls below zero. When this happens, add the current context to the 
       // back of the wait queue and block the current context.
       if (--_semaphore_count < 0)
       {
          _waiting_contexts.push(Context::CurrentContext());
          Context::Block();
       }
    }
    
  5. semaphore 类的 public 部分,实现 release 方法。此方法以原子操作形式递增信号量计数。如果在递增操作之前信号量计数为负数,则至少有一个上下文在等待锁。在这种情况下,取消阻止在等待队列前面的上下文。

    // Releases access to the semaphore.
    void release()
    {
       // If the semaphore count is negative, unblock the first waiting context.
       if (++_semaphore_count <= 0)
       {
          // A call to acquire might have decremented the counter, but has not
          // yet finished adding the context to the queue. 
          // Create a spin loop that waits for the context to become available.
          Context* waiting = NULL;
          while (!_waiting_contexts.try_pop(waiting))
          {
             Context::Yield();
          }
    
          // Unblock the context.
          waiting->Unblock();
       }
    }
    

示例

此示例中的 semaphore 类以协作方式工作,因为 Context::BlockContext::Yield 方法将退出执行操作,以便运行时可执行其他任务。

acquire 方法将递减计数器,但它可能未添加完在另一上下文调用 release 方法之前要等待队列的上下文。为了考虑这一点,release 方法调用使用 concurrency::Context::Yield 方法等待 acquire 方法添加完上下文的旋转循环。

acquire 方法调用 Context::Block 方法之前,release 方法可以调用 Context::Unblock 方法。您不必防范此争用条件,因为运行时允许以任何顺序调用这些方法。对于相同上下文,如果在 acquire 方法调用 Context::Block 之前,release 方法调用 Context::Unblock,则该上下文将保持取消阻止状态。运行时只要求每个对 Context::Block 的调用都与对应的对 Context::Unblock 的调用匹配。

下面的示例演示了完整的 semaphore 类。wmain 函数显示了此类的基本用法。wmain 函数使用 concurrency::parallel_for 算法创建需要访问该信号量的任务。由于三个线程可以在任何时间持有锁,所以某些任务必须等待另一个任务完成并释放该锁。

// cooperative-semaphore.cpp
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

int wmain()
{
   // Create a semaphore that allows at most three threads to 
   // hold the lock.
   semaphore s(3);

   parallel_for(0, 10, [&](int i) {
      // Acquire the lock.
      s.acquire();

      // Print a message to the console.
      wstringstream ss;
      ss << L"In loop iteration " << i << L"..." << endl;
      wcout << ss.str();

      // Simulate work by waiting for two seconds.
      wait(2000);

      // Release the lock.
      s.release();
   });
}

此示例产生下面的示例输出。

In loop iteration 5...
In loop iteration 0...
In loop iteration 6...
In loop iteration 1...
In loop iteration 2...
In loop iteration 7...
In loop iteration 3...
In loop iteration 8...
In loop iteration 9...
In loop iteration 4...

有关 concurrent_queue 类的更多信息,请参见并行容器和对象。有关 parallel_for 算法的更多信息,请参见并行算法

编译代码

复制示例代码并将其粘贴到Visual Studio项目或一个名为 团队semaphore.cpp 然后运行在Visual Studio命令提示符窗口中运行以下命令的文件。

cl.exe /EHsc cooperative-semaphore.cpp

可靠编程

可以使用“资源获取为初始化”(RAII) 模式,将对 semaphore 对象的访问局限于某个给定的范围。在 RAII 模式下,将在堆栈上分配一个数据结构。该数据结构在创建时将会初始化或获取一个资源,而且该数据结构在销毁时将会销毁或释放该资源。RAII 模式可确保在封闭范围退出之前调用析构函数。因此,在引发异常时或在函数包含多个 return 语句时,将可以正确地管理资源。

下面的示例定义了一个名为 scoped_lock 的类,这在 semaphore 类的 public 节中已定义。scoped_lock 选件类类似于 concurrency::critical_section::scoped_lockconcurrency::reader_writer_lock::scoped_lock 选件类。semaphore::scoped_lock 类的构造函数将获取对给定的 semaphore 对象的访问权;析构函数将释放对该对象的访问权。

// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
   // Acquires access to the semaphore.
   scoped_lock(semaphore& s)
      : _s(s)
   {
      _s.acquire();
   }
   // Releases access to the semaphore.
   ~scoped_lock()
   {
      _s.release();
   }

private:
   semaphore& _s;
};

下面的示例修改了传递给 parallel_for 算法的工作函数的主体,以便使用 RAII 来确保在该函数返回之前释放信号量。这项技术可确保工作函数是异常安全的。

parallel_for(0, 10, [&](int i) {
   // Create an exception-safe scoped_lock object that holds the lock 
   // for the duration of the current scope.
   semaphore::scoped_lock auto_lock(s);

   // Print a message to the console.
   wstringstream ss;
   ss << L"In loop iteration " << i << L"..." << endl;
   wcout << ss.str();

   // Simulate work by waiting for two seconds.
   wait(2000);
});

请参见

参考

Context 类

概念

上下文

并行容器和对象