如何:使用內容類別實作合作式信號
本主題顯示如何使用 concurrency::Context 類別實作合作式信號類別。
Context 類別可讓您封鎖或產生目前的執行內容。 當目前的內容因資源無法使用而無法繼續執行時,封鎖或產生目前的內容會非常有用。 有種情況是目前的執行內容必須等候資源變可用,而「信號」(Semaphore) 即屬於這種情況。 就像關鍵區段物件一樣,信號是一個同步處理物件,可讓某個內容中的程式碼對資源進行獨佔存取。 但是,與關鍵區段物件不同的是,信號可讓多個內容同時存取資源。 如果持有信號鎖定的內容數達到上限,則每個額外的內容都必須等候其他內容釋放鎖定。
若要實作信號類別
宣告名為 semaphore 的類別。 將 public 和 private 區段加入至這個類別。
// A semaphore type that uses cooperative blocking semantics. class semaphore { public: private: };
在 semaphore 類別的 private 區段中,宣告 std::atomic 的變數 (這個變數會保留信號計數),以及 concurrency::concurrent_queue 物件 (這個物件會保留必須等候取得信號的內容)。
// The semaphore count. atomic<long long> _semaphore_count; // A concurrency-safe queue of contexts that must wait to // acquire the semaphore. concurrent_queue<Context*> _waiting_contexts;
在 semaphore 類別的 public 區段中,實作建構函式。 這個建構函式要接受 long long 值,該值指定可以持有鎖定的並行內容數上限。
explicit semaphore(long long capacity) : _semaphore_count(capacity) { }
在 semaphore 類別的 public 區段中,實作 acquire 方法。 這個方法會將信號計數遞減,做為不可部分完成的作業。 如果信號計數變成負數,請將目前的內容加入至等候佇列的結尾,並呼叫 concurrency::Context::Block 方法來封鎖目前的內容。
// Acquires access to the semaphore. void acquire() { // The capacity of the semaphore is exceeded when the semaphore count // falls below zero. When this happens, add the current context to the // back of the wait queue and block the current context. if (--_semaphore_count < 0) { _waiting_contexts.push(Context::CurrentContext()); Context::Block(); } }
在 semaphore 類別的 public 區段中,實作 release 方法。 這個方法會將信號計數遞增,做為不可部分完成的作業。 如果在遞增作業執行之前信號計數是負數,則至少有一個內容在等候取得鎖定。 在此情況下,請解除封鎖擋在等候佇列前面的內容。
// Releases access to the semaphore. void release() { // If the semaphore count is negative, unblock the first waiting context. if (++_semaphore_count <= 0) { // A call to acquire might have decremented the counter, but has not // yet finished adding the context to the queue. // Create a spin loop that waits for the context to become available. Context* waiting = NULL; while (!_waiting_contexts.try_pop(waiting)) { Context::Yield(); } // Unblock the context. waiting->Unblock(); } }
範例
這個範例中的 semaphore 類別會展現合作行為,因為 Context::Block 和 Context::Yield 方法會產生讓執行階段可以執行其他工作的執行作業。
acquire 方法會將計數器遞減,但是它可能無法趕在其他內容呼叫 release 方法之前,完成將內容加入至等候佇列。 為了處理這種情況,release 方法會使用空轉迴圈,這個迴圈會呼叫 concurrency::Context::Yield 方法,以等候 acquire 方法完成加入內容的動作。
在 acquire 方法呼叫 Context::Block 方法之前,release 方法可以呼叫 Context::Unblock 方法。 您無須擔心這種呼叫順序,因為執行階段允許以任何順序呼叫這些方法。 如果在相同的內容中,release 方法在 acquire 方法呼叫 Context::Block 之前呼叫了 Context::Unblock,則該內容會維持解除封鎖狀態。 執行階段只需要每個 Context::Block 呼叫都有一個對應的 Context::Unblock 呼叫。
下列範例顯示完整的 semaphore 類別。 wmain 函式會示範這個類別的基本用法。 wmain 函式會使用 concurrency::parallel_for 演算法來建立數個需要存取信號的工作。 因為不論何時都可以有三個執行緒持有鎖定,因此有些工作必須等候其他工作執行完成並釋放鎖定。
// cooperative-semaphore.cpp
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
int wmain()
{
// Create a semaphore that allows at most three threads to
// hold the lock.
semaphore s(3);
parallel_for(0, 10, [&](int i) {
// Acquire the lock.
s.acquire();
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
// Release the lock.
s.release();
});
}
這個範例 (Example) 產生下列範例 (Sample) 輸出。
如需 concurrent_queue 類別的詳細資訊,請參閱 平行容器和物件。 如需 parallel_for 演算法的詳細資訊,請參閱平行演算法。
編譯程式碼
請複製範例程式碼,並將它貼在 Visual Studio 專案中,或貼在名為 cooperative-semaphore.cpp 的檔案中,然後在 Visual Studio 的 [命令提示字元] 視窗中執行下列命令。
cl.exe /EHsc cooperative-semaphore.cpp
穩固程式設計
您可以使用「資源擷取為初始設定」(Resource Acquisition Is Initialization,RAII) 模式,將 semaphore 物件存取限制在指定的範圍。 在 RAII 模式下,資料結構會配置於堆疊上。 該資料結構會在建立時初始化或擷取資源,並在資料結構終結時終結或釋放該資源。 RAII 模式可保證在封閉範圍結束之前呼叫解構函式。 因此,當有例外狀況擲回或是函式包含多個 return 陳述式時,都會正確地管理資源。
下列範例會定義名為 scoped_lock 的類別,這個類別定義於 semaphore 類別的 public 區段中。 scoped_lock 類別類似於 concurrency::critical_section::scoped_lock 和 concurrency::reader_writer_lock::scoped_lock 類別。 semaphore::scoped_lock 類別的建構函式會取得對所指定 semaphore 物件的存取,而解構函式則會釋放對該物件的存取。
// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
// Acquires access to the semaphore.
scoped_lock(semaphore& s)
: _s(s)
{
_s.acquire();
}
// Releases access to the semaphore.
~scoped_lock()
{
_s.release();
}
private:
semaphore& _s;
};
下列範例會修改傳遞給 parallel_for 演算法的工作函式主體,讓它使用 RAII 以確保在函式傳回之前已釋放信號。 這項技術可確保工作函式不怕例外狀況發生。
parallel_for(0, 10, [&](int i) {
// Create an exception-safe scoped_lock object that holds the lock
// for the duration of the current scope.
semaphore::scoped_lock auto_lock(s);
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
});