Поделиться через


Барьер

System.Threading.Barrier — это определяемый пользователем примитив синхронизации, позволяющий нескольким потокам (которые называются участниками) параллельно осуществлять поэтапную работу с алгоритмом. Каждый участник выполняется до достижения точки барьера в коде. Барьер означает окончание одного этапа работы. Когда участник достигает барьера, он блокируется до тех пор, пока все участники не достигнут этого барьера. Когда все участники достигли барьера, можно при необходимости можно вызвать действие следующего этапа. Это действие следующего этапа может использоваться для выполнения действий одним потоком, пока все остальные потоки все еще остаются блокированными. После выполнения действия все участники разблокируются.

Во фрагменте кода ниже показан базовый шаблон барьера.


// Create the Barrier object, and supply a post-phase delegate
// to be invoked at the end of each phase.
Barrier barrier = new Barrier(2, (bar) =>
    {
        // Examine results from all threads, determine
        // whether to continue, create inputs for next phase, etc.
        if (someCondition)
            success = true;
    });

// Define the work that each thread will perform. (Threads do not
// have to all execute the same method.)
void CrunchNumbers(int partitionNum)
{
    // Up to System.Int64.MaxValue phases are supported. We assume
    // in this code that the problem will be solved before that.
    while (success == false)
    {
        // Begin phase:
        // Process data here on each thread, and optionally
        // store results, for example:
        results[partitionNum] = ProcessData(data[partitionNum]);

        // End phase:
        // After all threads arrive,post-phase delegate
        // is invoked, then threads are unblocked. Overloads
        // accept a timeout value and/or CancellationToken.
        barrier.SignalAndWait();
    }
}

// Perform n tasks to run in parallel. For simplicity
// all threads execute the same method in this example.
static void Main()
{
    var app = new BarrierDemo();
    Thread t1 = new Thread(() => app.CrunchNumbers(0));
    Thread t2 = new Thread(() => app.CrunchNumbers(1));
    t1.Start();
    t2.Start();
}

' Create the Barrier object, and supply a post-phase delegate 
' to be invoked at the end of each phase.
Dim barrier = New Barrier(2, Sub(bar)
                                 ' Examine results from all threads, determine 
                                 ' whether to continue, create inputs for next phase, etc. 
                                 If (someCondition) Then
                                     success = True
                                 End If
                             End Sub)



' Define the work that each thread will perform. (Threads do not
' have to all execute the same method.)
Sub CrunchNumbers(ByVal partitionNum As Integer)

    ' Up to System.Int64.MaxValue phases are supported. We assume
    ' in this code that the problem will be solved before that.
    While (success = False)

        ' Begin phase:
        ' Process data here on each thread, and optionally
        ' store results, for example:
        results(partitionNum) = ProcessData(myData(partitionNum))

        ' End phase:
        ' After all threads arrive,post-phase delegate
        ' is invoked, then threads are unblocked. Overloads
        ' accept a timeout value and/or CancellationToken.
        barrier.SignalAndWait()
    End While
End Sub

' Perform n tasks to run in parallel. For simplicity
' all threads execute the same method in this example.
Shared Sub Main()

    Dim app = New BarrierDemo()
    Dim t1 = New Thread(Sub() app.CrunchNumbers(0))
    Dim t2 = New Thread(Sub() app.CrunchNumbers(1))
    t1.Start()
    t2.Start()
End Sub

Полный пример можно найти в руководстве по синхронизации параллельных операций с барьером.

Добавление и удаление участников

При создании экземпляра Barrier укажите количество участников. Вы можете также динамически добавлять или удалять участников в любое время. Например, если один участник решил свою часть задачи, можно сохранить результат, остановить выполнение этого потока и вызвать Barrier.RemoveParticipant, чтобы уменьшить число участников барьера. При добавлении участника путем вызова Barrier.AddParticipant возвращаемое значение определяет номер текущего этапа, что может быть полезно для инициализации действий нового участника.

Неисправные барьеры

Если один из участников не достигает барьера, это может привести к возникновению взаимоблокировок. Во избежание взаимных блокировок используйте перегрузки метода Barrier.SignalAndWait, чтобы задать время ожидания и токен отмены. Эти перегрузки возвращают логическое значение, которое может проверить каждый участник перед переходом к следующему этапу.

Исключения следующих этапов

Если делегат следующего этапа создает исключение, оно инкапсулируется в объект BarrierPostPhaseException, который затем передается всем участникам.

Сравнение барьера и ContinueWhenAll

Барьеры особенно полезны, когда потоки выполняют несколько этапов циклически. Если код требует выполнения только в один–два этапа, рассмотрите возможность использования объектов System.Threading.Tasks.Task с любым видом неявного объединения, в том числе:

Подробнее см. в разделе Создание цепочки задач с помощью задач продолжения.

См. также