共用方式為


逐步解說:使用聯結以避免死結

本主題會使用哲學家用餐問題,說明如何使用 Concurrency::join 類別來避免應用程式中的死結。 在軟體應用程式中,如果有兩個以上的處理序各擁有一項資源,而互相等候另一個處理序釋放另一項資源,即會發生「死結」(Deadlock)。

我們常用哲學家用餐問題做為特定範例,說明多個並行處理序之間共用一組資源時可能會發生的一般問題集。

必要條件

在您開始閱讀此逐步解說前,請先參閱下列主題:

章節

此逐步解說包含下列章節:

  • 哲學家用餐問題

  • 直覺實作

  • 使用聯結以避免死結

哲學家用餐問題

哲學家用餐問題說明了應用程式中為何會發生死結。 在此問題中,有五位哲學家圍坐在一個圓桌。 每位哲學家各交替著思考與用餐的動作。 每位哲學家都必須與左邊的哲學家共用一根筷子,與右邊的哲學家也必須共用一根筷子。 下圖顯示其座位配置。

哲學家用餐問題

哲學家必須同時擁有兩根筷子,才能用餐。 如果每個哲學家各有一根筷子,而都在等候另一根,則沒有哲學家可以用餐,大家都得挨餓。

回到頁首

直覺實作

下列範例說明哲學家用餐問題的直覺實作。 衍生自 Concurrency::agentphilosopher 類別,可讓每一位哲學家獨立執行動作。 此範例使用 Concurrency::critical_section 物件的共用陣列,給予每個 philosopher 物件獨佔存取一雙筷子的權限。

我們以 philosopher 類別代表一個哲學家,說明實作與圖例的關聯性。 int 變數代表每根筷子。 critical_section 物件做為筷子的持有者。 run 方法模擬哲學家的生命。 think 方法模擬思考的動作,eat 方法則模擬用餐的動作。

philosopher 在呼叫 eat 方法前,會先同時鎖定前述兩個 critical_section 物件,以模擬從持有者手中拿走筷子的動作。 在呼叫 eat 之後,philosopher 物件會將 critical_section 物件重新設為未鎖定的狀態,將筷子歸還給持有者。

pickup_chopsticks 方法說明可能會發生死結的情況。 如果每個 philosopher 物件皆取得其中一個鎖定的存取權,則沒有任何 philosopher 物件可繼續動作,因為另一個鎖定皆由其他 philosopher 物件所控制。

範例

程式碼

// philosophers-deadlock.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <array>
#include <iostream>
#include <algorithm>
#include <random>

using namespace Concurrency;
using namespace std;

// Defines a single chopstick.
typedef int chopstick;

// The total number of philosophers.
const int philosopher_count = 5;

// The number of times each philosopher should eat.
const int eat_count = 50;

// A shared array of critical sections. Each critical section 
// guards access to a single chopstick.
critical_section locks[philosopher_count];

// Implements the logic for a single dining philosopher.
class philosopher : public agent 
{
public:
   explicit philosopher(chopstick& left, chopstick& right, const wstring& name)
      : _left(left)
      , _right(right)
      , _name(name)
      , _random_generator(42)
   {
      send(_times_eaten, 0);
   }

   // Retrieves the number of times the philosopher has eaten.
   int times_eaten()
   {
      return receive(_times_eaten);
   }

   // Retrieves the name of the philosopher.
   wstring name() const
   {
      return _name;
   }

protected:
   // Performs the main logic of the dining philosopher algorithm.
   void run()
   {
      // Repeat the thinks/eat cycle a set number of times.
      for (int n = 0; n < eat_count; ++n)
      {
         think();

         pickup_chopsticks(); 
         eat();
         send(_times_eaten, n+1);
         putdown_chopsticks();
      }

      done();
   }

   // Gains access to the chopsticks.
   void pickup_chopsticks()
   {
      // Deadlock occurs here if each philosopher gains access to one
      // of the chopsticks and mutually waits for another to release
      // the other chopstick.
      locks[_left].lock();
      locks[_right].lock();
   }

   // Releases the chopsticks for others.
   void putdown_chopsticks()
   {
      locks[_right].unlock();
      locks[_left].unlock();
   }

   // Simulates thinking for a brief period of time.
   void think()
   {
      random_wait(100);
   }

   // Simulates eating for a brief period of time.
   void eat()
   { 
      random_wait(100);
   }

private:
   // Yields the current context for a random period of time.
   void random_wait(unsigned int max)
   {
      Concurrency::wait(_random_generator()%max);
   }

private:
   // Index of the left chopstick in the chopstick array.
   chopstick& _left;
   // Index of the right chopstick in the chopstick array.
   chopstick& _right;

   // The name of the philosopher.
   wstring _name;
   // Stores the number of times the philosopher has eaten.
   overwrite_buffer<int> _times_eaten;

   // A random number generator.
   mt19937 _random_generator;
};

int wmain()
{
   // Create an array of index values for the chopsticks.
   array<chopstick, philosopher_count> chopsticks = {0, 1, 2, 3, 4};

   // Create an array of philosophers. Each pair of neighboring 
   // philosophers shares one of the chopsticks.
   array<philosopher, philosopher_count> philosophers = {
      philosopher(chopsticks[0], chopsticks[1], L"aristotle"),
      philosopher(chopsticks[1], chopsticks[2], L"descartes"),
      philosopher(chopsticks[2], chopsticks[3], L"hobbes"),
      philosopher(chopsticks[3], chopsticks[4], L"socrates"),
      philosopher(chopsticks[4], chopsticks[0], L"plato"),
   };

   // Begin the simulation.
   for_each (philosophers.begin(), philosophers.end(), [](philosopher& p) {
      p.start();
   });

   // Wait for each philosopher to finish and print his name and the number
   // of times he has eaten.
   for_each (philosophers.begin(), philosophers.end(), [](philosopher& p) {
      agent::wait(&p);
      wcout << p.name() << L" ate " << p.times_eaten() << L" times." << endl;
   });
}

編譯程式碼

請複製範例程式碼,並將它貼在 Visual Studio 專案中,或貼在名為 philosophers-deadlock.cpp 的檔案中,然後在 Visual Studio 2010 的 [命令提示字元] 視窗中執行下列命令。

cl.exe /EHsc philosophers-deadlock.cpp

回到頁首

使用聯結以避免死結

本節說明如何使用訊息緩衝區和訊息傳遞函式消除死結的可能性。

為使此範例與前一個範例產生關聯,philosopher 類別會使用 Concurrency::unbounded_buffer 物件與 join 物件取代每一個 critical_section 物件。 join 物件做為將筷子提供給哲學家的仲裁者。

此範例會使用 unbounded_buffer 類別,因為當目標接收到來自 unbounded_buffer 物件的訊息時,該訊息會從訊息佇列中移除。 這可讓保有訊息的 unbounded_buffer 物件指出筷子處於可用狀態。 未獲得訊息的 unbounded_buffer 物件則會指出有人正在使用筷子。

此範例會使用非窮盡 (non-greedy) join 物件,因為非窮盡聯結只有在兩個 unbounded_buffer 物件都包含訊息時,才會給予每個 philosopher 物件兩根筷子的存取權。 窮盡 (greedy) 聯結不會避免死結,因為只要一有訊息,窮盡聯結就會立即接受。 如果所有窮盡 join 物件都接收其中一個訊息,但無止盡地等待另一個訊息變成可用訊息,就會發生死結。

如需窮盡和非窮盡聯結的詳細資訊,以及各種訊息緩衝區類型之間的差異資訊,請參閱非同步訊息區

若要在此範例中避免死結

  1. 從範例中移除下列程式碼。

    // A shared array of critical sections. Each critical section 
    // guards access to a single chopstick.
    critical_section locks[philosopher_count];
    
  2. philosopher 類別之 _left_right 資料成員的類型變更為 unbounded_buffer

    // Message buffer for the left chopstick.
    unbounded_buffer<chopstick>& _left;
    // Message buffer for the right chopstick.
    unbounded_buffer<chopstick>& _right;
    
  3. 修改 philosopher 建構函式,以 unbounded_buffer 物件做為其參數。

    explicit philosopher(unbounded_buffer<chopstick>& left, 
       unbounded_buffer<chopstick>& right, const wstring& name)
       : _left(left)
       , _right(right)
       , _name(name)
       , _random_generator(42)
    {
       send(_times_eaten, 0);
    }
    
  4. 修改 pickup_chopsticks 方法,以使用非窮盡 join 物件接收來自一雙筷子之訊息緩衝區的訊息。

    // Gains access to the chopsticks.
    vector<int> pickup_chopsticks()
    {
       // Create a non-greedy join object and link it to the left and right 
       // chopstick.
       join<chopstick, non_greedy> j(2);
       _left.link_target(&j);
       _right.link_target(&j);
    
       // Receive from the join object. This resolves the deadlock situation
       // because a non-greedy join removes the messages only when a message
       // is available from each of its sources.
       return receive(&j);
    }
    
  5. 修改 putdown_chopsticks 方法,將訊息傳送至一雙筷子的訊息緩衝區,以釋放筷子的存取權。

    // Releases the chopsticks for others.
    void putdown_chopsticks(int left, int right)
    {
       // Add the values of the messages back to the message queue.
       asend(&_left, left);
       asend(&_right, right);
    }
    
  6. 修改 run 方法以保有 pickup_chopsticks 方法的結果,並將這些結果傳遞至 putdown_chopsticks 方法。

    // Performs the main logic of the dining philosopher algorithm.
    void run()
    {
       // Repeat the thinks/eat cycle a set number of times.
       for (int n = 0; n < eat_count; ++n)
       {
          think();
    
          vector<int> v = pickup_chopsticks(); 
    
          eat();
    
          send(_times_eaten, n+1);
    
          putdown_chopsticks(v[0], v[1]);
       }
    
       done();
    }
    
  7. wmain 函式中的 chopsticks 變數宣告修改成各包含一則訊息之 unbounded_buffer 物件的陣列。

    // Create an array of message buffers to hold the chopsticks.
    array<unbounded_buffer<chopstick>, philosopher_count> chopsticks;
    
    // Send a value to each message buffer in the array.
    // The value of the message is not important. A buffer that contains
    // any message indicates that the chopstick is available.
    for_each (chopsticks.begin(), chopsticks.end(), 
       [](unbounded_buffer<chopstick>& c) {
          send(c, 1);
    });
    

範例

說明

以下顯示使用非窮盡 join 物件消除死結風險的完整範例。

程式碼

// philosophers-join.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <array>
#include <iostream>
#include <algorithm>
#include <random>

using namespace Concurrency;
using namespace std;

// Defines a single chopstick.
typedef int chopstick;

// The total number of philosophers.
const int philosopher_count = 5;

// The number of times each philosopher should eat.
const int eat_count = 50;

// Implements the logic for a single dining philosopher.
class philosopher : public agent 
{
public:
   explicit philosopher(unbounded_buffer<chopstick>& left, 
      unbounded_buffer<chopstick>& right, const wstring& name)
      : _left(left)
      , _right(right)
      , _name(name)
      , _random_generator(42)
   {
      send(_times_eaten, 0);
   }

   // Retrieves the number of times the philosopher has eaten.
   int times_eaten()
   {
      return receive(_times_eaten);
   }

   // Retrieves the name of the philosopher.
   wstring name() const
   {
      return _name;
   }

protected:
   // Performs the main logic of the dining philosopher algorithm.
   void run()
   {
      // Repeat the thinks/eat cycle a set number of times.
      for (int n = 0; n < eat_count; ++n)
      {
         think();

         vector<int> v = pickup_chopsticks(); 

         eat();

         send(_times_eaten, n+1);

         putdown_chopsticks(v[0], v[1]);
      }

      done();
   }

   // Gains access to the chopsticks.
   vector<int> pickup_chopsticks()
   {
      // Create a non-greedy join object and link it to the left and right 
      // chopstick.
      join<chopstick, non_greedy> j(2);
      _left.link_target(&j);
      _right.link_target(&j);

      // Receive from the join object. This resolves the deadlock situation
      // because a non-greedy join removes the messages only when a message
      // is available from each of its sources.
      return receive(&j);
   }

   // Releases the chopsticks for others.
   void putdown_chopsticks(int left, int right)
   {
      // Add the values of the messages back to the message queue.
      asend(&_left, left);
      asend(&_right, right);
   }

   // Simulates thinking for a brief period of time.
   void think()
   {
      random_wait(100);
   }

   // Simulates eating for a brief period of time.
   void eat()
   {      
      random_wait(100);      
   }

private:
   // Yields the current context for a random period of time.
   void random_wait(unsigned int max)
   {
      Concurrency::wait(_random_generator()%max);
   }

private:
   // Message buffer for the left chopstick.
   unbounded_buffer<chopstick>& _left;
   // Message buffer for the right chopstick.
   unbounded_buffer<chopstick>& _right;

   // The name of the philosopher.
   wstring _name;
   // Stores the number of times the philosopher has eaten.
   overwrite_buffer<int> _times_eaten;

   // A random number generator.
   mt19937 _random_generator;
};

int wmain()
{
   // Create an array of message buffers to hold the chopsticks.
   array<unbounded_buffer<chopstick>, philosopher_count> chopsticks;

   // Send a value to each message buffer in the array.
   // The value of the message is not important. A buffer that contains
   // any message indicates that the chopstick is available.
   for_each (chopsticks.begin(), chopsticks.end(), 
      [](unbounded_buffer<chopstick>& c) {
         send(c, 1);
   });

   // Create an array of philosophers. Each pair of neighboring 
   // philosophers shares one of the chopsticks.
   array<philosopher, philosopher_count> philosophers = {
      philosopher(chopsticks[0], chopsticks[1], L"aristotle"),
      philosopher(chopsticks[1], chopsticks[2], L"descartes"),
      philosopher(chopsticks[2], chopsticks[3], L"hobbes"),
      philosopher(chopsticks[3], chopsticks[4], L"socrates"),
      philosopher(chopsticks[4], chopsticks[0], L"plato"),
   };

   // Begin the simulation.
   for_each (philosophers.begin(), philosophers.end(), [](philosopher& p) {
      p.start();
   });

   // Wait for each philosopher to finish and print his name and the number
   // of times he has eaten.
   for_each (philosophers.begin(), philosophers.end(), [](philosopher& p) {
      agent::wait(&p);
      wcout << p.name() << L" ate " << p.times_eaten() << L" times." << endl;
   });
}

註解

這個範例會產生下列輸出。

aristotle ate 50 times.
descartes ate 50 times.
hobbes ate 50 times.
socrates ate 50 times.
plato ate 50 times.

編譯程式碼

請複製範例程式碼,並將它貼在 Visual Studio 專案中,或貼在名為 philosophers-join.cpp 的檔案中,然後在 Visual Studio 2010 的 [命令提示字元] 視窗中執行下列命令。

cl.exe /EHsc philosophers-join.cpp

回到頁首

請參閱

概念

非同步代理程式程式庫

非同步代理程式

非同步訊息區

訊息傳遞函式

同步處理資料結構

其他資源

非同步代理程式 HOW TO 和逐步解說主題