共用方式為


逐步解說:使用聯結以避免死結

本主題使用餐飲哲學家問題來說明如何使用 concurrency::join 類別來防止應用程式中的死結。 在軟體應用程式中,當兩個或多個處理序都保留資源,且互相等候另一個處理序釋放某些其他資源時,就會發生「死結」

餐飲哲學家問題是一般問題集的特定範例,在多個並行流程之間共用一組資源時可能發生。

必要條件

開始本逐步解說之前,請先閱讀下列主題:

區段

本逐步解說包含下列各節:

餐飲哲學家問題

餐飲哲學家問題說明應用程式中發生死結的方式。 在這個問題中,五位哲學家坐在一個圓桌旁。 每個哲學家在思考和飲食之間交替。 每一位哲學家都必須與左邊的鄰居共用一把巧克力,另一個與右鄰的巧克力共用。 下圖顯示此版面配置。

餐飲哲學家問題。

要吃,一個哲學家必須拿著兩把棍子。 如果每個哲學家只拿著一把棍子,並等待另一個,那麼沒有哲學家可以吃,所有的饑餓。

[靠上]

天真實作

下列範例顯示餐飲哲學家問題的天真實作。 類別 philosopher 衍生自 concurrency::agent,可讓每個哲學家獨立運作。 此範例會使用並行::critical_section 對象的共享陣列,讓每個philosopher物件都能夠獨佔存取一組 chopstick。

為了將實作與圖例產生關聯,類別 philosopher 代表一個哲學家。 int變數代表每個 chopstick。 critical_section對象作為巧克力休息的持有者。 方法 run 會模擬哲學家的生活。 方法 think 會模擬思維的行為,而 eat 方法會模擬飲食的行為。

philosopher物件會鎖定這兩個critical_section物件,以模擬在呼叫 eat 方法之前,從持有人移除這兩個物件。 呼叫 eat之後, philosopher 物件會藉由將 critical_section 物件設定回未鎖定狀態,將巧克力棒傳回持有人。

方法 pickup_chopsticks 說明可能發生死結的位置。 如果每個 philosopher 物件都取得其中一個鎖定的存取權,則因為另一個鎖定是由另一個物件控制,因此無法 philosopher 繼續任何 philosopher 物件。

範例

// philosophers-deadlock.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <array>
#include <iostream>
#include <algorithm>
#include <random>

using namespace concurrency;
using namespace std;

// Defines a single chopstick.
typedef int chopstick;

// The total number of philosophers.
const int philosopher_count = 5;

// The number of times each philosopher should eat.
const int eat_count = 50;

// A shared array of critical sections. Each critical section 
// guards access to a single chopstick.
critical_section locks[philosopher_count];

// Implements the logic for a single dining philosopher.
class philosopher : public agent 
{
public:
   explicit philosopher(chopstick& left, chopstick& right, const wstring& name)
      : _left(left)
      , _right(right)
      , _name(name)
      , _random_generator(42)
   {
      send(_times_eaten, 0);
   }

   // Retrieves the number of times the philosopher has eaten.
   int times_eaten()
   {
      return receive(_times_eaten);
   }

   // Retrieves the name of the philosopher.
   wstring name() const
   {
      return _name;
   }

protected:
   // Performs the main logic of the dining philosopher algorithm.
   void run()
   {
      // Repeat the thinks/eat cycle a set number of times.
      for (int n = 0; n < eat_count; ++n)
      {
         think();
         
         pickup_chopsticks(); 
         eat();
         send(_times_eaten, n+1);
         putdown_chopsticks();
      }

      done();
   }

   // Gains access to the chopsticks.
   void pickup_chopsticks()
   {
      // Deadlock occurs here if each philosopher gains access to one
      // of the chopsticks and mutually waits for another to release
      // the other chopstick.
      locks[_left].lock();
      locks[_right].lock();
   }

   // Releases the chopsticks for others.
   void putdown_chopsticks()
   {
      locks[_right].unlock();
      locks[_left].unlock();
   }

   // Simulates thinking for a brief period of time.
   void think()
   {
      random_wait(100);
   }

   // Simulates eating for a brief period of time.
   void eat()
   { 
      random_wait(100);
   }

private:
   // Yields the current context for a random period of time.
   void random_wait(unsigned int max)
   {
      concurrency::wait(_random_generator()%max);
   }

private:
   // Index of the left chopstick in the chopstick array.
   chopstick& _left;
   // Index of the right chopstick in the chopstick array.
   chopstick& _right;

   // The name of the philosopher.
   wstring _name;
   // Stores the number of times the philosopher has eaten.
   overwrite_buffer<int> _times_eaten;

   // A random number generator.
   mt19937 _random_generator;
};

int wmain()
{
   // Create an array of index values for the chopsticks.
   array<chopstick, philosopher_count> chopsticks = {0, 1, 2, 3, 4};

   // Create an array of philosophers. Each pair of neighboring 
   // philosophers shares one of the chopsticks.
   array<philosopher, philosopher_count> philosophers = {
      philosopher(chopsticks[0], chopsticks[1], L"aristotle"),
      philosopher(chopsticks[1], chopsticks[2], L"descartes"),
      philosopher(chopsticks[2], chopsticks[3], L"hobbes"),
      philosopher(chopsticks[3], chopsticks[4], L"socrates"),
      philosopher(chopsticks[4], chopsticks[0], L"plato"),
   };
   
   // Begin the simulation.
   for_each (begin(philosophers), end(philosophers), [](philosopher& p) {
      p.start();
   });

   // Wait for each philosopher to finish and print his name and the number
   // of times he has eaten.
   for_each (begin(philosophers), end(philosophers), [](philosopher& p) {
      agent::wait(&p);
      wcout << p.name() << L" ate " << p.times_eaten() << L" times." << endl;
   });
}

編譯程式碼

複製範例程式代碼,並將其貼到 Visual Studio 專案中,或貼到名為 philosophers-deadlock.cpp 的檔案中,然後在 Visual Studio 命令提示字元視窗中執行下列命令。

cl.exe /EHsc philosophers-deadlock.cpp

[靠上]

使用聯結以預防死結

本節說明如何使用訊息緩衝區和訊息傳遞函式來消除死結的機會。

為了將此範例與先前的範例相關聯,類別philosopher會使用並行::unbounded_buffer 對象和 join 物件來取代每個critical_section物件。 對像是 join 一種仲裁器,可為哲學家提供巧克力。

這個範例會 unbounded_buffer 使用 類別,因為當目標從 unbounded_buffer 物件收到訊息時,訊息會從消息佇列中移除。 這可讓 unbounded_buffer 物件保存訊息,以指出可使用的 chopstick。 unbounded_buffer不保存任何訊息的物件表示正在使用的 chopstick。

此範例會使用非窮盡 join 對象,因為非貪婪聯結只有在兩個物件都包含訊息時,才會讓每個 philosopher 物件存取這兩 unbounded_buffer 個 chopstick。 貪婪聯結不會防止死結,因為貪婪聯結會在訊息可供使用時立即接受訊息。 如果所有貪婪 join 的物件都收到其中一則訊息,但請永遠等待另一個訊息可供使用,就會發生死結。

如需貪婪和非貪婪聯結的詳細資訊,以及各種訊息緩衝區類型之間的差異,請參閱 異步消息塊

若要防止此範例中的死結

  1. 從範例中移除下列程序代碼。
// A shared array of critical sections. Each critical section 
// guards access to a single chopstick.
critical_section locks[philosopher_count];
  1. 將類別的 _left 與資料成員philosopher型態變更為 unbounded_buffer_right
// Message buffer for the left chopstick.
unbounded_buffer<chopstick>& _left;
// Message buffer for the right chopstick.
unbounded_buffer<chopstick>& _right;
  1. 修改建構函式以 philosopher 接受 unbounded_buffer 對象作為其參數。
explicit philosopher(unbounded_buffer<chopstick>& left, 
   unbounded_buffer<chopstick>& right, const wstring& name)
   : _left(left)
   , _right(right)
   , _name(name)
   , _random_generator(42)
{
   send(_times_eaten, 0);
}
  1. pickup_chopsticks修改 方法,以使用非貪婪join物件從這兩個 chopstick 訊息緩衝區接收訊息。
// Gains access to the chopsticks.
vector<int> pickup_chopsticks()
{
   // Create a non-greedy join object and link it to the left and right 
   // chopstick.
   join<chopstick, non_greedy> j(2);
   _left.link_target(&j);
   _right.link_target(&j);

   // Receive from the join object. This resolves the deadlock situation
   // because a non-greedy join removes the messages only when a message
   // is available from each of its sources.
   return receive(&j);
}
  1. 修改 方法, putdown_chopsticks 藉由將訊息傳送至這兩個 chopstick 的訊息緩衝區,以釋放對 chopss 的存取權。
// Releases the chopsticks for others.
void putdown_chopsticks(int left, int right)
{
   // Add the values of the messages back to the message queue.
   asend(&_left, left);
   asend(&_right, right);
}
  1. run修改 方法以保存 方法的結果pickup_chopsticks,並將這些結果傳遞至 putdown_chopsticks 方法。
// Performs the main logic of the dining philosopher algorithm.
void run()
{
   // Repeat the thinks/eat cycle a set number of times.
   for (int n = 0; n < eat_count; ++n)
   {
      think();
      
      vector<int> v = pickup_chopsticks(); 
      
      eat();
               
      send(_times_eaten, n+1);

      putdown_chopsticks(v[0], v[1]);
   }

   done();
}
  1. 將函式中wmain變數的chopsticks宣告修改為每個保存一則訊息的物件unbounded_buffer數位。
// Create an array of message buffers to hold the chopsticks.
array<unbounded_buffer<chopstick>, philosopher_count> chopsticks;

// Send a value to each message buffer in the array.
// The value of the message is not important. A buffer that contains
// any message indicates that the chopstick is available.
for_each (begin(chopsticks), end(chopsticks), 
   [](unbounded_buffer<chopstick>& c) {
      send(c, 1);
});

描述

以下顯示使用非貪婪 join 物件來消除死結風險的已完成範例。

範例

// philosophers-join.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <array>
#include <iostream>
#include <algorithm>
#include <random>

using namespace concurrency;
using namespace std;

// Defines a single chopstick.
typedef int chopstick;

// The total number of philosophers.
const int philosopher_count = 5;

// The number of times each philosopher should eat.
const int eat_count = 50;

// Implements the logic for a single dining philosopher.
class philosopher : public agent 
{
public:
   explicit philosopher(unbounded_buffer<chopstick>& left, 
      unbounded_buffer<chopstick>& right, const wstring& name)
      : _left(left)
      , _right(right)
      , _name(name)
      , _random_generator(42)
   {
      send(_times_eaten, 0);
   }

   // Retrieves the number of times the philosopher has eaten.
   int times_eaten()
   {
      return receive(_times_eaten);
   }

   // Retrieves the name of the philosopher.
   wstring name() const
   {
      return _name;
   }

protected:
   // Performs the main logic of the dining philosopher algorithm.
   void run()
   {
      // Repeat the thinks/eat cycle a set number of times.
      for (int n = 0; n < eat_count; ++n)
      {
         think();
         
         vector<int> v = pickup_chopsticks(); 
         
         eat();
                  
         send(_times_eaten, n+1);

         putdown_chopsticks(v[0], v[1]);
      }

      done();
   }

   // Gains access to the chopsticks.
   vector<int> pickup_chopsticks()
   {
      // Create a non-greedy join object and link it to the left and right 
      // chopstick.
      join<chopstick, non_greedy> j(2);
      _left.link_target(&j);
      _right.link_target(&j);

      // Receive from the join object. This resolves the deadlock situation
      // because a non-greedy join removes the messages only when a message
      // is available from each of its sources.
      return receive(&j);
   }

   // Releases the chopsticks for others.
   void putdown_chopsticks(int left, int right)
   {
      // Add the values of the messages back to the message queue.
      asend(&_left, left);
      asend(&_right, right);
   }

   // Simulates thinking for a brief period of time.
   void think()
   {
      random_wait(100);
   }

   // Simulates eating for a brief period of time.
   void eat()
   {      
      random_wait(100);      
   }

private:
   // Yields the current context for a random period of time.
   void random_wait(unsigned int max)
   {
      concurrency::wait(_random_generator()%max);
   }

private:
   // Message buffer for the left chopstick.
   unbounded_buffer<chopstick>& _left;
   // Message buffer for the right chopstick.
   unbounded_buffer<chopstick>& _right;

   // The name of the philosopher.
   wstring _name;
   // Stores the number of times the philosopher has eaten.
   overwrite_buffer<int> _times_eaten;

   // A random number generator.
   mt19937 _random_generator;
};

int wmain()
{
   // Create an array of message buffers to hold the chopsticks.
   array<unbounded_buffer<chopstick>, philosopher_count> chopsticks;
   
   // Send a value to each message buffer in the array.
   // The value of the message is not important. A buffer that contains
   // any message indicates that the chopstick is available.
   for_each (begin(chopsticks), end(chopsticks), 
      [](unbounded_buffer<chopstick>& c) {
         send(c, 1);
   });
   
   // Create an array of philosophers. Each pair of neighboring 
   // philosophers shares one of the chopsticks.
   array<philosopher, philosopher_count> philosophers = {
      philosopher(chopsticks[0], chopsticks[1], L"aristotle"),
      philosopher(chopsticks[1], chopsticks[2], L"descartes"),
      philosopher(chopsticks[2], chopsticks[3], L"hobbes"),
      philosopher(chopsticks[3], chopsticks[4], L"socrates"),
      philosopher(chopsticks[4], chopsticks[0], L"plato"),
   };
   
   // Begin the simulation.
   for_each (begin(philosophers), end(philosophers), [](philosopher& p) {
      p.start();
   });

   // Wait for each philosopher to finish and print his name and the number
   // of times he has eaten.
   for_each (begin(philosophers), end(philosophers), [](philosopher& p) {
      agent::wait(&p);
      wcout << p.name() << L" ate " << p.times_eaten() << L" times." << endl;
   });
}

此範例會產生下列輸出。

aristotle ate 50 times.
descartes ate 50 times.
hobbes ate 50 times.
socrates ate 50 times.
plato ate 50 times.

編譯程式碼

複製範例程式代碼,並將其貼到 Visual Studio 專案中,或貼到名為 philosophers-join.cpp 的檔案中,然後在 Visual Studio 命令提示字元視窗中執行下列命令。

cl.exe /EHsc philosophers-join.cpp

[靠上]

另請參閱

並行執行階段逐步解說
非同步代理程式程式庫
非同步代理程式
非同步訊息區
訊息傳遞函式
同步處理資料結構