Практическое руководство. Использование класса Context для реализации семафора, поддерживающего параллельный доступ
В этом разделе показано, как использовать класс параллелизма::Context для реализации совместного класса семафора.
Замечания
Класс Context
позволяет блокировать или давать текущий контекст выполнения. Блокировка или получение текущего контекста полезно, если текущий контекст не может продолжаться, так как ресурс недоступен. Семафор является примером одной ситуации, когда текущий контекст выполнения должен ожидать, пока ресурс станет доступным. Семафор, например критически важный объект раздела, — это объект синхронизации, который позволяет коду в одном контексте иметь монопольный доступ к ресурсу. Однако, в отличие от объекта критического раздела, семафор позволяет нескольким контекстам одновременно обращаться к ресурсу. Если максимальное количество контекстов содержит блокировку семафора, каждый дополнительный контекст должен ожидать, пока другой контекст будет освобожден.
Реализация класса семафора
- Объявите класс, который называется
semaphore
. Добавьтеpublic
иprivate
разделы в этот класс.
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
private:
};
private
В разделеsemaphore
класса объявите std::атомарную переменную, содержащую число семафоров и объект параллелизма::concurrent_queue, содержащий контексты, которые должны ждать получения семафора.
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
public
В разделеsemaphore
класса реализуйте конструктор. Конструктор принимаетlong long
значение, указывающее максимальное количество контекстов, которые могут одновременно хранить блокировку.
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
public
В разделеsemaphore
класса реализуйтеacquire
метод. Этот метод уменьшает число семафоров как атомарную операцию. Если число семафоров становится отрицательным, добавьте текущий контекст в конец очереди ожидания и вызовите метод параллелизма::Context::Block , чтобы заблокировать текущий контекст.
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
public
В разделеsemaphore
класса реализуйтеrelease
метод. Этот метод увеличивает число семафоров как атомарную операцию. Если число семафоров отрицательно перед операцией добавочного действия, существует по крайней мере один контекст, ожидающий блокировки. В этом случае разблокируйте контекст, который находится в передней части очереди ожидания.
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
Пример
Класс semaphore
в этом примере ведет себя совместно, так как Context::Block
выполнение и Context::Yield
методы дают выполнение, чтобы среда выполнения могли выполнять другие задачи.
Метод acquire
уменьшает счетчик, но может не завершить добавление контекста в очередь ожидания, прежде чем другой контекст вызывает release
метод. Для этого метод использует цикл спина, release
который вызывает метод параллелизма::Context::Yield , чтобы ждать acquire
завершения добавления контекста.
Метод release
может вызывать Context::Unblock
метод до acquire
вызова Context::Block
метода. Вам не нужно защищаться от этого условия гонки, так как среда выполнения позволяет вызывать эти методы в любом порядке. release
Если метод вызывается Context::Unblock
перед вызовом Context::Block
метода для того же контекстаacquire
, этот контекст остается разблокирован. Среда выполнения требует, чтобы каждый вызов Context::Block
соответствовал соответствующему вызову Context::Unblock
.
В следующем примере показан полный semaphore
класс. Функция показывает базовое wmain
использование этого класса. Функция wmain
использует алгоритм concurrency::p arallel_for для создания нескольких задач, требующих доступа к семафору. Так как три потока могут хранить блокировку в любое время, некоторые задачи должны ждать завершения другой задачи и освобождения блокировки.
// cooperative-semaphore.cpp
// compile with: /EHsc
#include <atomic>
#include <concrt.h>
#include <ppl.h>
#include <concurrent_queue.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
Context::Yield();
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
int wmain()
{
// Create a semaphore that allows at most three threads to
// hold the lock.
semaphore s(3);
parallel_for(0, 10, [&](int i) {
// Acquire the lock.
s.acquire();
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
// Release the lock.
s.release();
});
}
В этом примере создается следующий пример выходных данных.
In loop iteration 5...
In loop iteration 0...
In loop iteration 6...
In loop iteration 1...
In loop iteration 2...
In loop iteration 7...
In loop iteration 3...
In loop iteration 8...
In loop iteration 9...
In loop iteration 4...
Дополнительные сведения о классе см. в разделе "Параллельные concurrent_queue
контейнеры и объекты". Дополнительные сведения об алгоритме parallel_for
см. в разделе "Параллельные алгоритмы".
Компиляция кода
Скопируйте пример кода и вставьте его в проект Visual Studio или вставьте его в файл с именем cooperative-semaphore.cpp
, а затем выполните следующую команду в окне командной строки Visual Studio.
cl.exe /EHsc cooperative-semaphore.cpp
Отказоустойчивость
Вы можете использовать шаблон инициализации ресурсов (RAII), чтобы ограничить доступ к semaphore
объекту заданной области. В шаблоне RAII структура данных выделяется в стеке. Эта структура данных инициализирует или получает ресурс при создании и уничтожении или выпуске ресурса при уничтожении структуры данных. Шаблон RAII гарантирует, что деструктор вызывается перед выходом заключающей области. Поэтому ресурс правильно управляется при возникновении исключения или при наличии функции нескольких return
операторов.
В следующем примере определяется класс, который scoped_lock
называется, который определен в public
разделе semaphore
класса. Класс scoped_lock
напоминает класс concurrency::critical_section: :scoped_lock и параллелизм:::reader_writer_lock:::scoped_lock . Конструктор класса получает доступ к заданному semaphore
объекту, а деструктор semaphore::scoped_lock
освобождает доступ к данному объекту.
// An exception-safe RAII wrapper for the semaphore class.
class scoped_lock
{
public:
// Acquires access to the semaphore.
scoped_lock(semaphore& s)
: _s(s)
{
_s.acquire();
}
// Releases access to the semaphore.
~scoped_lock()
{
_s.release();
}
private:
semaphore& _s;
};
В следующем примере изменяется текст рабочей функции, передаваемой parallel_for
алгоритму, чтобы он использовал RAII, чтобы убедиться, что семафор освобождается перед возвратом функции. Этот метод гарантирует, что рабочая функция является безопасной для работы.
parallel_for(0, 10, [&](int i) {
// Create an exception-safe scoped_lock object that holds the lock
// for the duration of the current scope.
semaphore::scoped_lock auto_lock(s);
// Print a message to the console.
wstringstream ss;
ss << L"In loop iteration " << i << L"..." << endl;
wcout << ss.str();
// Simulate work by waiting for two seconds.
wait(2000);
});