如何:使用上下文类实现协作信号量

本主题演示如何使用 concurrency::Context 类实现协作信号灯类。

备注

Context 类允许你阻止或暂停当前执行上下文。 在当前上下文因资源不可用而无法继续时,阻止或暂停当前上下文很有用。 信号灯是当前执行上下文必须等待资源变为可用的一种情况示例。 信号灯(例如关键节对象)是一个同步对象,支持一个上下文中的代码具有对资源的独占访问权限。 但是,与关键节对象不同,信号灯支持多个上下文同时访问资源。 如果具有信号灯锁的上下文数量达到上限,则每个附加的上下文必须等待另一个上下文释放信号灯锁。

实现信号灯类

  1. 声明一个名为 semaphore。 将 publicprivate 节添加到此类。
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
private:
};
  1. semaphore 类的 private 节中,声明一个 std::atomic 变量(用于包含信号灯计数)和一个 concurrency::concurrent_queue 对象(用于包含获取信号灯前必须等待的上下文)。
// The semaphore count.
atomic<long long> _semaphore_count;

// A concurrency-safe queue of contexts that must wait to 
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
  1. semaphore 类的 public 节中,实现构造函数。 构造函数采用一个 long long 值,该值指定可同时具有锁的最大上下文数。
explicit semaphore(long long capacity)
   : _semaphore_count(capacity)
{
}
  1. semaphore 类的 public 节中,实现 acquire 方法。 此方法以原子操作的形式递减信号灯计数。 如果信号灯计数变为负数,请将当前上下文添加到等待队列的末尾,并调用 concurrency::Context::Block 方法来阻止当前上下文。
// Acquires access to the semaphore.
void acquire()
{
   // The capacity of the semaphore is exceeded when the semaphore count 
   // falls below zero. When this happens, add the current context to the 
   // back of the wait queue and block the current context.
   if (--_semaphore_count < 0)
   {
      _waiting_contexts.push(Context::CurrentContext());
      Context::Block();
   }
}
  1. semaphore 类的 public 节中,实现 release 方法。 此方法以原子操作的形式递增信号灯计数。 如果在递增操作之前信号灯计数为负数,则表示至少有一个上下文正在等待锁。 在这种情况下,请取消阻止等待队列前面的上下文。
// Releases access to the semaphore.
void release()
{
   // If the semaphore count is negative, unblock the first waiting context.
   if (++_semaphore_count <= 0)
   {
      // A call to acquire might have decremented the counter, but has not
      // yet finished adding the context to the queue. 
      // Create a spin loop that waits for the context to become available.
      Context* waiting = NULL;
      while (!_waiting_contexts.try_pop(waiting))
      {
         Context::Yield();
      }

      // Unblock the context.
      waiting->Unblock();
   }
}

示例

此示例中的 semaphore 类的行为是协作的,因为 Context::BlockContext::Yield 方法会暂停执行,使运行时可执行其他任务。

acquire 方法会递减计数器,但在另一个上下文调用 release 方法之前,它可能不会完成将上下文添加到等待队列的操作。 为了解释这一点,release 方法使用一个旋转循环来调用 concurrency::Context::Yield 方法,等待 acquire 方法完成添加上下文的操作。

acquire 方法调用 Context::Block 方法之前,release 方法可以调用 Context::Unblock 方法。 无需防止此争用条件,因为运行时允许按任意顺序调用这些方法。 如果在 acquire 方法为同一上下文调用 Context::Block 之前 release 方法调用了 Context::Unblock,那么该上下文保持在不受阻止的状态。 运行时仅要求对 Context::Block 的每次调用与对 Context::Unblock 的相应调用匹配。

以下示例显示了完整的 semaphore 类。 wmain 函数显示此类的基本用法。 wmain 函数使用 concurrency::parallel_for 算法创建多个需要访问信号灯的任务。 由于 3 个线程可随时保留锁,因此一些任务必须等待另一个任务完成并释放锁后才能执行。

// cooperative-semaphore.cpp
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>

using namespace concurrency;
using namespace std;

// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
   explicit semaphore(long long capacity)
      : _semaphore_count(capacity)
   {
   }

   // Acquires access to the semaphore.
   void acquire()
   {
      // The capacity of the semaphore is exceeded when the semaphore count 
      // falls below zero. When this happens, add the current context to the 
      // back of the wait queue and block the current context.
      if (--_semaphore_count < 0)
      {
         _waiting_contexts.push(Context::CurrentContext());
         Context::Block();
      }
   }

   // Releases access to the semaphore.
   void release()
   {
      // If the semaphore count is negative, unblock the first waiting context.
      if (++_semaphore_count <= 0)
      {
         // A call to acquire might have decremented the counter, but has not
         // yet finished adding the context to the queue. 
         // Create a spin loop that waits for the context to become available.
         Context* waiting = NULL;
         while (!_waiting_contexts.try_pop(waiting))
         {
            Context::Yield();
         }

         // Unblock the context.
         waiting->Unblock();
      }
   }

private:
   // The semaphore count.
   atomic<long long> _semaphore_count;

   // A concurrency-safe queue of contexts that must wait to 
   // acquire the semaphore.
   concurrent_queue<Context*> _waiting_contexts;
};

int wmain()
{
   // Create a semaphore that allows at most three threads to 
   // hold the lock.
   semaphore s(3);

   parallel_for(0, 10, [&](int i) {
      // Acquire the lock.
      s.acquire();

      // Print a message to the console.
      wstringstream ss;
      ss << L"In loop iteration " << i << L"..." << endl;
      wcout << ss.str();

      // Simulate work by waiting for two seconds.
      wait(2000);

      // Release the lock.
      s.release();
   });
}

此示例生成以下示例输出。

In loop iteration 5...
In loop iteration 0...
In loop iteration 6...
In loop iteration 1...
In loop iteration 2...
In loop iteration 7...
In loop iteration 3...
In loop iteration 8...
In loop iteration 9...
In loop iteration 4...

有关 concurrent_queue 类的详细信息,请参阅并行容器和对象。 有关 parallel_for 算法的详细信息,请参阅并行算法

编译代码

复制示例代码,并将它粘贴到 Visual Studio 项目中,或粘贴到名为 cooperative-semaphore.cpp 的文件中,再在 Visual Studio 命令提示符窗口中运行以下命令。

cl.exe /EHsc cooperative-semaphore.cpp

可靠编程

可使用“资源获取即初始化”(RAII) 模式将对 semaphore 对象的访问限制到给定范围。 在 RAII 模式下,数据结构在堆栈上分配。 该数据结构在创建资源时初始化或获取资源,并在销毁数据结构时销毁或释放该资源。 RAII 模式保证在封闭范围退出之前调用析构函数。 因此,当引发异常或函数包含多个 return 语句时,可以正确管理资源。

以下示例定义一个名为 scoped_lock 的类,该类在 semaphore 类的 public 节中定义。 scoped_lock 类与 concurrency::critical_section::scoped_lockconcurrency::reader_writer_lock::scoped_lock 类相似。 semaphore::scoped_lock 类的构造函数获取对给定的 semaphore 对象的访问权限,析构函数释放对该对象的访问权限。

// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
   // Acquires access to the semaphore.
   scoped_lock(semaphore& s)
      : _s(s)
   {
      _s.acquire();
   }
   // Releases access to the semaphore.
   ~scoped_lock()
   {
      _s.release();
   }

private:
   semaphore& _s;
};

以下示例修改了传递至 parallel_for 算法的工作函数的主体,以便它使用 RAII 来确保在函数返回之前释放信号灯。 此技术可确保工作函数是异常安全的。

parallel_for(0, 10, [&](int i) {
   // Create an exception-safe scoped_lock object that holds the lock 
   // for the duration of the current scope.
   semaphore::scoped_lock auto_lock(s);

   // Print a message to the console.
   wstringstream ss;
   ss << L"In loop iteration " << i << L"..." << endl;
   wcout << ss.str();

   // Simulate work by waiting for two seconds.
   wait(2000);
});

另请参阅

上下文
并行容器和对象