HOW TO:使用內容類別實作合作式信號
本主題說明如何使用 concurrency::Context 類別實作合作式信號類別。
Context 類別可讓您封鎖或產生目前的執行內容。當目前的內容因資源無法使用而無法繼續執行時,封鎖或產生目前的內容會非常有用。有種情況是目前的執行內容必須等候資源變可用,而「信號」(Semaphore) 即屬於這種情況。就像關鍵區段物件一樣,信號是一個同步處理物件,可讓某個內容中的程式碼對資源進行獨佔存取。但是,與關鍵區段物件不同的是,信號可讓多個內容同時存取資源。如果持有信號鎖定的內容數達到上限,則每個額外的內容都必須等候其他內容釋放鎖定。
若要實作信號類別
宣告名為 semaphore 的類別。將 public 和 private 區段加入至這個類別。
// A semaphore type that uses cooperative blocking semantics. class semaphore { public: private: };
在 semaphore 類別的 private 區段中,宣告會保留信號計數和一 concurrency::concurrent_queue 物件保存內容必須等候取得信號的 std::atomic 變數。
// The semaphore count. atomic<long long> _semaphore_count; // A concurrency-safe queue of contexts that must wait to // acquire the semaphore. concurrent_queue<Context*> _waiting_contexts;
在 semaphore 類別的 public 區段中,實作建構函式。這個建構函式要接受 long long 值,該值指定可以持有鎖定的並行內容數上限。
explicit semaphore(long long capacity) : _semaphore_count(capacity) { }
在 semaphore 類別的 public 區段中,實作 acquire 方法。這個方法會將信號計數遞減,做為不可部分完成的作業。如果信號計數變成負數,請將目前的內容加入至等候佇列的結尾 concurrency::Context::Block 並呼叫方法來封鎖目前的內容。
// Acquires access to the semaphore. void acquire() { // The capacity of the semaphore is exceeded when the semaphore count // falls below zero. When this happens, add the current context to the // back of the wait queue and block the current context. if (--_semaphore_count < 0) { _waiting_contexts.push(Context::CurrentContext()); Context::Block(); } }
在 semaphore 類別的 public 區段中,實作 release 方法。這個方法會將信號計數遞增,做為不可部分完成的作業。如果在遞增作業執行之前信號計數是負數,則至少有一個內容在等候取得鎖定。在此情況下,請解除封鎖擋在等候佇列前面的內容。
// Releases access to the semaphore. void release() { // If the semaphore count is negative, unblock the first waiting context. if (++_semaphore_count <= 0) { // A call to acquire might have decremented the counter, but has not // yet finished adding the context to the queue. // Create a spin loop that waits for the context to become available. Context* waiting = NULL; while (!_waiting_contexts.try_pop(waiting)) { Context::Yield(); } // Unblock the context. waiting->Unblock(); } }
範例
這個範例中的 semaphore 類別會展現合作行為,因為 Context::Block 和 Context::Yield 方法會產生讓執行階段可以執行其他工作的執行作業。
acquire 方法會將計數器遞減,但是它可能無法趕在其他內容呼叫 release 方法之前,完成將內容加入至等候佇列。為了處理這種情況, release 方法使用呼叫 concurrency::Context::Yield 方法 acquire 等候方法完成加入內容的動作將迴圈。
在 acquire 方法呼叫 Context::Block 方法之前,release 方法可以呼叫 Context::Unblock 方法。您無須擔心這種呼叫順序,因為執行階段允許以任何順序呼叫這些方法。如果在相同的內容中,release 方法在 acquire 方法呼叫 Context::Block 之前呼叫了 Context::Unblock,則該內容會維持解除封鎖狀態。執行階段只需要每個 Context::Block 呼叫都有一個對應的 Context::Unblock 呼叫。
下列範例顯示完整的 semaphore 類別。wmain 函式會示範這個類別的基本用法。wmain 函式使用 concurrency::parallel_for 演算法建立需要號誌的存取數個工作。因為不論何時都可以有三個執行緒持有鎖定,因此有些工作必須等候其他工作執行完成並釋放鎖定。
// cooperative-semaphore.cpp
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
int wmain()
{
// Create a semaphore that allows at most three threads to
// hold the lock.
semaphore s(3);
parallel_for(0, 10, [&](int i) {
// Acquire the lock.
s.acquire();
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
// Release the lock.
s.release();
});
}
這個範例 (Example) 產生下列範例 (Sample) 輸出。
In loop iteration 5...
In loop iteration 0...
In loop iteration 6...
In loop iteration 1...
In loop iteration 2...
In loop iteration 7...
In loop iteration 3...
In loop iteration 8...
In loop iteration 9...
In loop iteration 4...
如需 concurrent_queue 類別的詳細資訊,請參閱平行容器和物件。如需 parallel_for 演算法的詳細資訊,請參閱平行演算法。
編譯程式碼
請複製範例程式碼並貼到 Visual Studio 專案或貼在名為 semaphore.cpp 共同作業 然後執行 Visual Studio 命令提示字元] 視窗中執行下列命令的檔案。
cl.exe /EHsc cooperative-semaphore.cpp
穩固程式設計
您可以使用「資源擷取為初始設定」(Resource Acquisition Is Initialization,RAII) 模式,將 semaphore 物件存取限制在指定的範圍。在 RAII 模式下,資料結構會配置於堆疊上。該資料結構會在建立時初始化或擷取資源,並在資料結構終結時終結或釋放該資源。RAII 模式可保證在封閉範圍結束之前呼叫解構函式。因此,當有例外狀況擲回或是函式包含多個 return 陳述式時,都會正確地管理資源。
下列範例會定義名為 scoped_lock 的類別,這個類別定義於 semaphore 類別的 public 區段中。scoped_lock 類別與 concurrency::critical_section::scoped_lock 和 concurrency::reader_writer_lock::scoped_lock 類別。semaphore::scoped_lock 類別的建構函式會取得對所指定 semaphore 物件的存取,而解構函式則會釋放對該物件的存取。
// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
// Acquires access to the semaphore.
scoped_lock(semaphore& s)
: _s(s)
{
_s.acquire();
}
// Releases access to the semaphore.
~scoped_lock()
{
_s.release();
}
private:
semaphore& _s;
};
下列範例會修改傳遞給 parallel_for 演算法的工作函式主體,讓它使用 RAII 以確保在函式傳回之前已釋放信號。這項技術可確保工作函式不怕例外狀況發生。
parallel_for(0, 10, [&](int i) {
// Create an exception-safe scoped_lock object that holds the lock
// for the duration of the current scope.
semaphore::scoped_lock auto_lock(s);
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
});