演练:使用联接避免死锁

本主题通过哲学家就餐问题说明如何使用 Concurrency::join 类来避免应用程序中发生死锁。 在软件应用程序中,在两个或更多进程中的每个进程都占用一个资源,并相互等待另一个进程释放一些其他资源时,会发生死锁。

哲学家就餐问题是在多个并发进程之间共享资源集时可能出现的一组常规问题的具体示例。

系统必备

在开始本演练之前,请阅读下列主题:

各节内容

本演练包含以下各节:

  • 哲学家就餐问题

  • 低级实现

  • 使用联接避免死锁

哲学家就餐问题

哲学家就餐问题说明了在应用程序中如何发生死锁。 在这个问题中,五个哲学家坐在圆桌旁。 每个哲学家一边思考一边就餐。 每个哲学家必须与左侧邻居共享一根筷子,与右侧邻居共享另一根筷子。 下图显示了这一布局。

哲学家就餐问题

为了就餐,哲学家必须拿起两根筷子。 如果每个哲学家都只拿起一根筷子而等待另一根筷子,则没有一个哲学家可以吃到食物,所有人都饿着。

[转到页首]

低级实现

下面的示例演示哲学家就餐问题的低级实现。 从 Concurrency::agent 派生的 philosopher 类使每个哲学家可以单独操作。 该示例使用 Concurrency::critical_section 对象的共享数组为每个 philosopher 对象授予对一对筷子的独占访问权。

为了将该实现与图示相关联,philosopher 类表示一个哲学家。 int 变量表示每根筷子。 critical_section 对象作为放置筷子的托架。 run 方法模拟哲学家的生命周期。 think 方法模拟思考操作,eat 方法模拟就餐操作。

philosopher 对象在调用 eat 方法之前,锁定全部两个 critical_section 对象,以模拟从托架上取走筷子的操作。 调用 eat 后,philosopher 对象通过将 critical_section 对象设置回未锁定状态,将筷子放回到托架。

pickup_chopsticks 方法说明了可能发生死锁的位置。 如果每个 philosopher 对象获得了一个锁的访问权,则没有 philosopher 对象可以继续,因为另一个锁由另一个 philosopher 对象控制。

示例

代码

// philosophers-deadlock.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <array>
#include <iostream>
#include <algorithm>
#include <random>

using namespace Concurrency;
using namespace std;

// Defines a single chopstick.
typedef int chopstick;

// The total number of philosophers.
const int philosopher_count = 5;

// The number of times each philosopher should eat.
const int eat_count = 50;

// A shared array of critical sections. Each critical section 
// guards access to a single chopstick.
critical_section locks[philosopher_count];

// Implements the logic for a single dining philosopher.
class philosopher : public agent 
{
public:
   explicit philosopher(chopstick& left, chopstick& right, const wstring& name)
      : _left(left)
      , _right(right)
      , _name(name)
      , _random_generator(42)
   {
      send(_times_eaten, 0);
   }

   // Retrieves the number of times the philosopher has eaten.
   int times_eaten()
   {
      return receive(_times_eaten);
   }

   // Retrieves the name of the philosopher.
   wstring name() const
   {
      return _name;
   }

protected:
   // Performs the main logic of the dining philosopher algorithm.
   void run()
   {
      // Repeat the thinks/eat cycle a set number of times.
      for (int n = 0; n < eat_count; ++n)
      {
         think();

         pickup_chopsticks(); 
         eat();
         send(_times_eaten, n+1);
         putdown_chopsticks();
      }

      done();
   }

   // Gains access to the chopsticks.
   void pickup_chopsticks()
   {
      // Deadlock occurs here if each philosopher gains access to one
      // of the chopsticks and mutually waits for another to release
      // the other chopstick.
      locks[_left].lock();
      locks[_right].lock();
   }

   // Releases the chopsticks for others.
   void putdown_chopsticks()
   {
      locks[_right].unlock();
      locks[_left].unlock();
   }

   // Simulates thinking for a brief period of time.
   void think()
   {
      random_wait(100);
   }

   // Simulates eating for a brief period of time.
   void eat()
   { 
      random_wait(100);
   }

private:
   // Yields the current context for a random period of time.
   void random_wait(unsigned int max)
   {
      Concurrency::wait(_random_generator()%max);
   }

private:
   // Index of the left chopstick in the chopstick array.
   chopstick& _left;
   // Index of the right chopstick in the chopstick array.
   chopstick& _right;

   // The name of the philosopher.
   wstring _name;
   // Stores the number of times the philosopher has eaten.
   overwrite_buffer<int> _times_eaten;

   // A random number generator.
   mt19937 _random_generator;
};

int wmain()
{
   // Create an array of index values for the chopsticks.
   array<chopstick, philosopher_count> chopsticks = {0, 1, 2, 3, 4};

   // Create an array of philosophers. Each pair of neighboring 
   // philosophers shares one of the chopsticks.
   array<philosopher, philosopher_count> philosophers = {
      philosopher(chopsticks[0], chopsticks[1], L"aristotle"),
      philosopher(chopsticks[1], chopsticks[2], L"descartes"),
      philosopher(chopsticks[2], chopsticks[3], L"hobbes"),
      philosopher(chopsticks[3], chopsticks[4], L"socrates"),
      philosopher(chopsticks[4], chopsticks[0], L"plato"),
   };

   // Begin the simulation.
   for_each (philosophers.begin(), philosophers.end(), [](philosopher& p) {
      p.start();
   });

   // Wait for each philosopher to finish and print his name and the number
   // of times he has eaten.
   for_each (philosophers.begin(), philosophers.end(), [](philosopher& p) {
      agent::wait(&p);
      wcout << p.name() << L" ate " << p.times_eaten() << L" times." << endl;
   });
}

编译代码

复制代码示例,再将此代码粘贴到 Visual Studio 项目中或一个名为 philosophers-deadlock.cpp 的文件中,然后在 Visual Studio 2010 命令提示符窗口中运行以下命令。

cl.exe /EHsc philosophers-deadlock.cpp

[转到页首]

使用联接避免死锁

本节说明如何使用消息缓冲区和消息传递函数来消除出现死锁的机会。

为了将该示例与上一示例相关联,philosopher 类通过使用 Concurrency::unbounded_buffer 对象和 join 对象来替换每个 critical_section 对象。 join 对象作为向哲学家提供筷子的仲裁者。

该示例使用 unbounded_buffer 类,这是因为在目标收到来自 unbounded_buffer 对象的消息后,将从消息队列中移除此消息。 这样,包含消息的 unbounded_buffer 对象表示筷子可用。 不包含消息的 unbounded_buffer 对象则表示筷子正在使用中。

该示例使用非贪婪 join 对象,这是因为只有当两个 unbounded_buffer 对象都包含消息时,非贪婪联接才为每个 philosopher 对象授予全部两根筷子的访问权。 因为只要消息可用,贪婪联接就会接受消息,所以贪婪联接无法避免死锁。 如果所有贪婪 join 对象收到其中一条消息,但是永远等待另一条变为可用,则会发生死锁。

有关贪婪和非贪婪联接的更多信息,以及各种消息缓冲区类型之间的差异,请参见异步消息块

在此示例中避免死锁

  1. 从示例中移除以下代码。

    // A shared array of critical sections. Each critical section 
    // guards access to a single chopstick.
    critical_section locks[philosopher_count];
    
  2. philosopher 类的 _left_right 数据成员的类型更改为 unbounded_buffer

    // Message buffer for the left chopstick.
    unbounded_buffer<chopstick>& _left;
    // Message buffer for the right chopstick.
    unbounded_buffer<chopstick>& _right;
    
  3. 修改 philosopher 构造函数,以采用 unbounded_buffer 对象作为其参数。

    explicit philosopher(unbounded_buffer<chopstick>& left, 
       unbounded_buffer<chopstick>& right, const wstring& name)
       : _left(left)
       , _right(right)
       , _name(name)
       , _random_generator(42)
    {
       send(_times_eaten, 0);
    }
    
  4. 修改 pickup_chopsticks 方法,以使用非贪婪 join 对象从全部两根筷子的消息缓冲区接收消息。

    // Gains access to the chopsticks.
    vector<int> pickup_chopsticks()
    {
       // Create a non-greedy join object and link it to the left and right 
       // chopstick.
       join<chopstick, non_greedy> j(2);
       _left.link_target(&j);
       _right.link_target(&j);
    
       // Receive from the join object. This resolves the deadlock situation
       // because a non-greedy join removes the messages only when a message
       // is available from each of its sources.
       return receive(&j);
    }
    
  5. 修改 putdown_chopsticks 方法,以通过将消息发送到全部两根筷子的消息缓冲区来释放对筷子的访问权。

    // Releases the chopsticks for others.
    void putdown_chopsticks(int left, int right)
    {
       // Add the values of the messages back to the message queue.
       asend(&_left, left);
       asend(&_right, right);
    }
    
  6. 修改 run 方法,以保存 pickup_chopsticks 方法的结果,并将这些结果传递给 putdown_chopsticks 方法。

    // Performs the main logic of the dining philosopher algorithm.
    void run()
    {
       // Repeat the thinks/eat cycle a set number of times.
       for (int n = 0; n < eat_count; ++n)
       {
          think();
    
          vector<int> v = pickup_chopsticks(); 
    
          eat();
    
          send(_times_eaten, n+1);
    
          putdown_chopsticks(v[0], v[1]);
       }
    
       done();
    }
    
  7. wmain 函数中 chopsticks 变量的声明修改为 unbounded_buffer 对象的数组,其中每个对象保存一条消息。

    // Create an array of message buffers to hold the chopsticks.
    array<unbounded_buffer<chopstick>, philosopher_count> chopsticks;
    
    // Send a value to each message buffer in the array.
    // The value of the message is not important. A buffer that contains
    // any message indicates that the chopstick is available.
    for_each (chopsticks.begin(), chopsticks.end(), 
       [](unbounded_buffer<chopstick>& c) {
          send(c, 1);
    });
    

示例

说明

下面显示了使用非贪婪 join 对象消除死锁风险的已完成示例。

代码

// philosophers-join.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <array>
#include <iostream>
#include <algorithm>
#include <random>

using namespace Concurrency;
using namespace std;

// Defines a single chopstick.
typedef int chopstick;

// The total number of philosophers.
const int philosopher_count = 5;

// The number of times each philosopher should eat.
const int eat_count = 50;

// Implements the logic for a single dining philosopher.
class philosopher : public agent 
{
public:
   explicit philosopher(unbounded_buffer<chopstick>& left, 
      unbounded_buffer<chopstick>& right, const wstring& name)
      : _left(left)
      , _right(right)
      , _name(name)
      , _random_generator(42)
   {
      send(_times_eaten, 0);
   }

   // Retrieves the number of times the philosopher has eaten.
   int times_eaten()
   {
      return receive(_times_eaten);
   }

   // Retrieves the name of the philosopher.
   wstring name() const
   {
      return _name;
   }

protected:
   // Performs the main logic of the dining philosopher algorithm.
   void run()
   {
      // Repeat the thinks/eat cycle a set number of times.
      for (int n = 0; n < eat_count; ++n)
      {
         think();

         vector<int> v = pickup_chopsticks(); 

         eat();

         send(_times_eaten, n+1);

         putdown_chopsticks(v[0], v[1]);
      }

      done();
   }

   // Gains access to the chopsticks.
   vector<int> pickup_chopsticks()
   {
      // Create a non-greedy join object and link it to the left and right 
      // chopstick.
      join<chopstick, non_greedy> j(2);
      _left.link_target(&j);
      _right.link_target(&j);

      // Receive from the join object. This resolves the deadlock situation
      // because a non-greedy join removes the messages only when a message
      // is available from each of its sources.
      return receive(&j);
   }

   // Releases the chopsticks for others.
   void putdown_chopsticks(int left, int right)
   {
      // Add the values of the messages back to the message queue.
      asend(&_left, left);
      asend(&_right, right);
   }

   // Simulates thinking for a brief period of time.
   void think()
   {
      random_wait(100);
   }

   // Simulates eating for a brief period of time.
   void eat()
   {      
      random_wait(100);      
   }

private:
   // Yields the current context for a random period of time.
   void random_wait(unsigned int max)
   {
      Concurrency::wait(_random_generator()%max);
   }

private:
   // Message buffer for the left chopstick.
   unbounded_buffer<chopstick>& _left;
   // Message buffer for the right chopstick.
   unbounded_buffer<chopstick>& _right;

   // The name of the philosopher.
   wstring _name;
   // Stores the number of times the philosopher has eaten.
   overwrite_buffer<int> _times_eaten;

   // A random number generator.
   mt19937 _random_generator;
};

int wmain()
{
   // Create an array of message buffers to hold the chopsticks.
   array<unbounded_buffer<chopstick>, philosopher_count> chopsticks;

   // Send a value to each message buffer in the array.
   // The value of the message is not important. A buffer that contains
   // any message indicates that the chopstick is available.
   for_each (chopsticks.begin(), chopsticks.end(), 
      [](unbounded_buffer<chopstick>& c) {
         send(c, 1);
   });

   // Create an array of philosophers. Each pair of neighboring 
   // philosophers shares one of the chopsticks.
   array<philosopher, philosopher_count> philosophers = {
      philosopher(chopsticks[0], chopsticks[1], L"aristotle"),
      philosopher(chopsticks[1], chopsticks[2], L"descartes"),
      philosopher(chopsticks[2], chopsticks[3], L"hobbes"),
      philosopher(chopsticks[3], chopsticks[4], L"socrates"),
      philosopher(chopsticks[4], chopsticks[0], L"plato"),
   };

   // Begin the simulation.
   for_each (philosophers.begin(), philosophers.end(), [](philosopher& p) {
      p.start();
   });

   // Wait for each philosopher to finish and print his name and the number
   // of times he has eaten.
   for_each (philosophers.begin(), philosophers.end(), [](philosopher& p) {
      agent::wait(&p);
      wcout << p.name() << L" ate " << p.times_eaten() << L" times." << endl;
   });
}

注释

该示例产生下面的输出。

aristotle ate 50 times.
descartes ate 50 times.
hobbes ate 50 times.
socrates ate 50 times.
plato ate 50 times.

编译代码

复制代码示例,再将此代码粘贴到 Visual Studio 项目中或一个名为 philosophers-join.cpp 的文件中,然后在 Visual Studio 2010 命令提示符窗口中运行以下命令。

cl.exe /EHsc philosophers-join.cpp

[转到页首]

请参见

概念

异步代理库

异步代理

异步消息块

消息传递函数

同步数据结构

其他资源

异步代理帮助主题和演练主题