Поделиться через


Поток данных (библиотека параллельных задач)

Библиотека параллельных задач (TPL) предоставляет компоненты потока данных для повышения надежности приложений с поддержкой параллелизма. Эти компоненты потока данных совместно называются библиотекой потоков данных TPL. Эта модель потока данных способствует актерному программированию, предоставляя внутр_ипроцессную передачу сообщений для крупнозернистого потока данных и задач конвейера. Компоненты потока данных создаются на основе типов и инфраструктуры планирования TPL и интеграции с языком C#, Visual Basic и F# для асинхронного программирования. Эти компоненты потока данных полезны при наличии нескольких операций, которые должны взаимодействовать друг с другом асинхронно или когда требуется обрабатывать данные по мере его доступности. Например, рассмотрим приложение, обрабатывающее данные изображения с веб-камеры. С помощью модели потока данных приложение может обрабатывать кадры изображений по мере их доступности. Если приложение улучшает кадры изображений, например, за счёт коррекции освещения или устранения эффекта красных глаз, можно создать конвейер компонентов потока данных. Каждый этап конвейера может использовать более грубые функции параллелизма, такие как функциональные возможности, предоставляемые TPL, для преобразования образа.

В этом документе представлен обзор библиотеки потоков данных TPL. В нем описывается модель программирования, предопределенные типы блоков потока данных и настройка блоков потоков данных в соответствии с конкретными требованиями приложений.

Примечание.

Библиотека потоков данных TPL (пространство имен System.Threading.Tasks.Dataflow) не распространяется с помощью .NET. Чтобы установить пространство имен System.Threading.Tasks.Dataflow в Visual Studio, откройте проект, выберите Управление пакетами NuGet в меню Project и найдите пакет System.Threading.Tasks.Dataflow в Интернете. Кроме того, чтобы установить его с помощью cli .NET Core, запустите dotnet add package System.Threading.Tasks.Dataflow.

Модель программирования

Библиотека потоков данных TPL предоставляет основу для передачи сообщений и параллелизации приложений с большим объемом ЦП и операций ввода-вывода с высокой пропускной способностью и низкой задержкой. Он также предоставляет явный контроль над буферизацией и перемещением данных по системе. Чтобы лучше понять модель программирования потока данных, рассмотрим приложение, которое асинхронно загружает изображения с диска и создает состав этих образов. Традиционные модели программирования обычно требуют использования обратных вызовов и объектов синхронизации, таких как блокировки, для координации задач и доступа к общим данным. С помощью модели программирования потока данных можно создавать объекты потока данных, обрабатывающие образы по мере их чтения с диска. В модели потока данных вы объявляете, как обрабатываются данные, когда они становятся доступными, а также любые зависимости между данными. Так как среда выполнения управляет зависимостями между данными, часто можно избежать необходимости синхронизации доступа к общим данным. Кроме того, так как расписания выполнения работают на основе асинхронного поступления данных, поток данных может повысить скорость реагирования и пропускную способность, эффективно управляя базовыми потоками. Для примера, использующего модель программирования потока данных для реализации обработки изображений в приложении Windows Forms, см. пошаговое руководство: Использование потока данных в приложении Windows Forms.

Источники и целевые объекты

Библиотека потоков данных TPL состоит из блоков потока данных , которые являются структурами данных, которые буферируют и обрабатывают данные. TPL определяет три типа блоков потока данных: исходные блоки, целевые блокии блоки распространения . Исходный блок выступает в качестве источника данных и может быть прочитан из. Целевой блок выступает в качестве получателя данных, и в него могут записывать данные. Блок распространителя выступает и как исходный, и как целевой, и из него можно считывать данные, и записывать в него. TPL определяет интерфейс System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> для представления источников, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> для представления целевых объектов и System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> для представления распространителей. IPropagatorBlock<TInput,TOutput> наследует от ISourceBlock<TOutput>и ITargetBlock<TInput>.

Библиотека потоков данных TPL предоставляет несколько предопределенных типов блоков потока данных, реализующих интерфейсы ISourceBlock<TOutput>, ITargetBlock<TInput>и IPropagatorBlock<TInput,TOutput>. Эти типы блоков потока данных описаны в этом документе в разделе предопределенные типы блоков потока данных.

Соединение блоков

Вы можете подключить блоки потока данных для формирования конвейеров, которые являются линейными последовательностями блоков потока данных, или сетей, которые представляют собой графы блоков потока данных. Конвейер — это одна из форм сети. В конвейере или сети источники асинхронно распространяют данные на целевые объекты по мере того, как эти данные становятся доступными. Метод ISourceBlock<TOutput>.LinkTo связывает блок потока данных источника с целевым блоком. Источник может быть связан с нулевым или более целевыми объектами; целевые объекты могут быть связаны с нулевыми или более источниками. Вы можете добавлять или удалять блоки потока данных в конвейер или сеть одновременно. Стандартные типы блоков потока данных обрабатывают все аспекты безопасности потока при связывании и отмене связи.

Пример, который подключает блоки потока данных для формирования базового конвейера, см. в разделе Пошаговое руководство: Создание конвейера потоков данных. Для примера, демонстрирующего, как подключить блоки потока данных для создания более сложной сети, см. Пошаговое руководство: Использование потока данных в приложении Windows Forms. Пример, который отменяет связь целевого объекта из источника после того, как источник предлагает целевое сообщение, см. в разделе Практическое руководство. Отмена связи блоков потока данных.

Фильтрация

При вызове метода ISourceBlock<TOutput>.LinkTo для связывания источника с целевым объектом можно указать делегат, который определяет, принимает ли целевой блок или отклоняет сообщение на основе значения этого сообщения. Этот механизм фильтрации является полезным способом гарантировать, что блок потока данных получает только определенные значения. Для большинства предопределенных типов блоков потока данных, если исходный блок подключен к нескольким целевым блокам, когда целевой блок отклоняет сообщение, источник предлагает это сообщение следующему целевому объекту. Порядок, в котором источник предлагает сообщения целевым объектам, определяется источником и может отличаться в зависимости от типа источника. Большинство типов блоков источника перестают предлагать сообщение после того, как один целевой объект принимает это сообщение. Одним из исключений этого правила является класс BroadcastBlock<T>, который предлагает каждое сообщение всем целевым объектам, даже если некоторые целевые объекты отклоняют сообщение. Для примера, который использует фильтрацию для обработки только определенных сообщений, см. пошаговое руководство: Использование потока данных в приложении Windows Forms.

Это важно

Так как каждый предопределенный тип блока потока данных источника гарантирует распространение сообщений в том порядке, в котором они получены, каждое сообщение должно быть считывается из исходного блока, прежде чем исходный блок может обработать следующее сообщение. Поэтому при использовании фильтрации для подключения нескольких целевых объектов к источнику убедитесь, что по крайней мере один целевой блок получает каждое сообщение. В противном случае ваше приложение может привести к взаимной блокировке.

Передача сообщений

Модель программирования потока данных связана с понятием передачи сообщений, где независимые компоненты программы взаимодействуют друг с другом, отправляя сообщения. Одним из способов распространения сообщений между компонентами приложения является вызов методов Post (синхронных) и SendAsync (асинхронных) для отправки сообщений в целевые блоки потока данных, а также методы Receive, ReceiveAsyncи TryReceive для получения сообщений из исходных блоков. Эти методы можно объединить с конвейерами потока данных или сетями, отправив входные данные в головной узел (целевой блок) и получив выходные данные из узла терминала конвейера или узлов терминала сети (один или несколько исходных блоков). Вы также можете использовать метод Choose для чтения из первого из предоставленных источников с доступными данными и выполнения действий по этим данным.

Исходные блоки предлагают данные для целевых блоков путем вызова метода ITargetBlock<TInput>.OfferMessage. Целевой блок реагирует на предлагаемое сообщение одним из трех способов: он может принять сообщение, отклонить сообщение или отложить сообщение. Когда целевой объект принимает сообщение, метод OfferMessage возвращает Accepted. Когда целевой объект отклоняет сообщение, метод OfferMessage возвращает Declined. Если целевой объект требует, чтобы он больше не получал никаких сообщений из источника, OfferMessage возвращает DecliningPermanently. Предопределенные типы блоков источника не предлагают сообщения связанным целевым объектам после получения такого возвращаемого значения, и они автоматически отменяют связь с такими целевыми объектами.

Когда целевой блок откладывает сообщение для последующего использования, метод OfferMessage возвращает Postponed. Целевой блок, откладывающий сообщение, может позже вызвать метод ISourceBlock<TOutput>.ReserveMessage, чтобы попытаться зарезервировать предлагаемое сообщение. На этом этапе сообщение по-прежнему доступно и может использоваться целевым блоком, или сообщение было принято другим целевым объектом. Если целевой блок позже требует сообщения или больше не нуждается в сообщении, он вызывает метод ISourceBlock<TOutput>.ConsumeMessage или ReleaseReservation соответственно. Резервирование сообщений обычно используется типами блоков потока данных, которые работают в не жадном режиме. Не жадный режим объясняется далее в этом документе. Вместо резервирования отложенного сообщения целевой блок также может использовать метод ISourceBlock<TOutput>.ConsumeMessage, чтобы попытаться напрямую использовать отложенное сообщение.

Завершение блока потока данных

Блоки потока данных также поддерживают концепцию завершения . Блок потока данных, который находится в состоянии завершения, не выполняет никаких дополнительных операций. Каждый блок потока данных имеет связанный объект System.Threading.Tasks.Task, известный как задача завершения, которая представляет состояние завершения блока. Так как вы можете дожидаться завершения объекта Task, используя задачи завершения, вы можете ждать завершения одного или нескольких конечных узлов сети передачи данных. Интерфейс IDataflowBlock определяет метод Complete, который сообщает блоку потока данных о завершении запроса и свойстве Completion, которое возвращает задачу завершения блока потока данных. Оба ISourceBlock<TOutput> и ITargetBlock<TInput> наследуют интерфейс IDataflowBlock.

Существует два способа определить, завершен ли блок потока данных без ошибок, обнаружен один или несколько ошибок или отменен. Первым способом является вызов метода Task.Wait задачи завершения в блоке try-catch (Try-Catch в Visual Basic). В следующем примере создается объект ActionBlock<TInput>, который создает ArgumentOutOfRangeException, если его входное значение меньше нуля. AggregateException возникает, когда в этом примере вызывается Wait задачи завершения. Доступ к ArgumentOutOfRangeException осуществляется через свойство InnerExceptions объекта AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

В этом примере показан случай, когда исключение остаётся необработанным в делегате блока потока выполнения данных. Рекомендуется обрабатывать исключения в телах таких блоков. Однако если этого не удается сделать, блок ведет себя так, как если бы он был отменен и не обрабатывает входящие сообщения.

Если блок потока данных отменен явным образом, объект AggregateException содержит OperationCanceledException в свойстве InnerExceptions. Для получения дополнительных сведений об отмене потока данных см. раздел Включение отмены.

Второй способ определить состояние завершения блока потока данных — использовать продолжение задачи завершения или использовать асинхронные языковые функции C# и Visual Basic для асинхронного ожидания задачи завершения. Делегат, предоставленный методу Task.ContinueWith, принимает объект Task, представляющий предыдущую задачу. В случае свойства Completion делегат для продолжения принимает на себя саму задачу завершения. Следующий пример похож на предыдущий, за исключением того, что он также использует метод ContinueWith для создания задачи продолжения, которая выводит состояние общей операции потока данных.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Можно также использовать такие свойства, как IsCanceled в тексте задачи продолжения, чтобы определить дополнительные сведения о состоянии завершения блока потока данных. Дополнительные сведения о задачах продолжения и их связи с отменой и обработкой ошибок см. в задачах цепочки с помощьюзадач продолжения, отмены задач и обработки исключений.

Стандартные типы блоков потока данных

Библиотека потоков данных TPL предоставляет несколько предопределенных типов блоков потока данных. Эти типы разделены на три категории: блоки буферизации , блоки выполнения и блоки группировки . В следующих разделах описаны типы блоков, составляющие эти категории.

Блоки буферизации

Блоки буферизации хранят данные для использования потребителями данных. Библиотека потоков данных TPL предоставляет три типа блоков буферизации: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>и System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

БуферБлок<T>

Класс BufferBlock<T> представляет структуру асинхронного обмена сообщениями общего назначения. Этот класс хранит очередь сообщений типа FIFO, в которую могут записываться сообщения из нескольких источников и из которой могут считываться несколькими целевыми объектами. Когда целевой объект получает сообщение от объекта BufferBlock<T>, это сообщение удаляется из очереди сообщений. Поэтому, хотя объект BufferBlock<T> может иметь несколько целевых объектов, только один целевой объект получит каждое сообщение. Класс BufferBlock<T> полезен, если требуется передать несколько сообщений другому компоненту, и этот компонент должен получать каждое сообщение.

Следующий базовый пример записывает несколько значений Int32 в объект BufferBlock<T>, а затем считывает эти значения обратно из этого объекта.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Полный пример, демонстрирующий запись сообщений и чтение сообщений из объекта BufferBlock<T>, см. в разделе Практическое руководство. Запись сообщений в блок потока данных и чтение сообщений из блока потока данных.

BroadcastBlock<T>

Класс BroadcastBlock<T> полезен, если необходимо передать несколько сообщений другому компоненту, но этот компонент нуждается только в последнем значении. Этот класс также полезен, если вы хотите транслировать сообщение нескольким компонентам.

В следующем базовом примере значение Double записывается в объект BroadcastBlock<T>, а затем считывает это значение обратно из этого объекта несколько раз. Так как значения не удаляются из BroadcastBlock<T> объектов после их чтения, то одно и то же значение доступно каждый раз.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Для полного примера использования BroadcastBlock<T> для отправки сообщения нескольким целевым блокам см. раздел Практическое руководство: Указание планировщика задач в блоке потока данных.

WriteOnceBlock<T>

Класс WriteOnceBlock<T> напоминает класс BroadcastBlock<T>, за исключением того, что объект WriteOnceBlock<T> можно записать только один раз. Вы можете рассматривать WriteOnceBlock<T> как аналог ключевого слова C# (ReadOnly в Visual Basic), за исключением того, что объект WriteOnceBlock<T> делается неизменяемым после получения значения вместо конструирования. Как и класс BroadcastBlock<T>, когда целевой объект получает сообщение от объекта WriteOnceBlock<T>, это сообщение не удаляется из этого объекта. Поэтому несколько целевых объектов получают копию сообщения. Класс WriteOnceBlock<T> полезен, если вы хотите распространить только первое из нескольких сообщений.

Следующий базовый пример записывает несколько значений String в объект WriteOnceBlock<T>, а затем считывает значение обратно из этого объекта. Так как объект WriteOnceBlock<T> может быть записан только один раз, после того как объект WriteOnceBlock<T> получает сообщение, он удаляет последующие сообщения.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Полный пример, демонстрирующий, как использовать WriteOnceBlock<T> для получения значения первой операции, которая завершается, смотрите в разделе Практическое руководство: Отмена связи блоков потока данных.

Блоки выполнения

Блоки выполнения вызывают делегата, предоставленного пользователем, для каждого фрагмента полученных данных. Библиотека потоков данных TPL предоставляет три типа блоков выполнения: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>и System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

Класс ActionBlock<TInput> — это целевой блок, вызывающий делегат при получении данных. Подумайте о объекте ActionBlock<TInput> как делегате, который выполняется асинхронно, когда данные становятся доступными. Делегат, предоставленный объекту ActionBlock<TInput>, может быть типом Action<T> или типом System.Func<TInput, Task>. При использовании объекта ActionBlock<TInput> с Action<T>обработка каждого входного элемента считается завершенной при возврате делегата. При использовании объекта ActionBlock<TInput> с System.Func<TInput, Task>обработка каждого входного элемента считается завершенной только после завершения возвращаемого объекта Task. С помощью этих двух механизмов можно использовать ActionBlock<TInput> для синхронной и асинхронной обработки каждого входного элемента.

Следующий базовый пример отправляет несколько значений Int32 в объект ActionBlock<TInput>. Объект ActionBlock<TInput> выводит эти значения в консоль. В этом примере блок переводится в состояние завершения и ожидает завершения всех задач обработки данных.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Полные примеры, демонстрирующие использование делегатов с классом ActionBlock<TInput>, см. в разделе Как выполнять действие, когда блок потока данных получает данные.

TransformBlock<TInput, TOutput>

Класс TransformBlock<TInput,TOutput> похож на класс ActionBlock<TInput>, за исключением того, что он выступает как в качестве источника, так и в качестве целевого объекта. Делегат, переданный объекту TransformBlock<TInput,TOutput>, возвращает значение типа TOutput. Делегат, предоставленный объекту TransformBlock<TInput,TOutput>, может быть типом System.Func<TInput, TOutput> или типом System.Func<TInput, Task<TOutput>>. При использовании объекта TransformBlock<TInput,TOutput> с System.Func<TInput, TOutput>обработка каждого входного элемента считается завершенной при возврате делегата. При использовании объекта TransformBlock<TInput,TOutput>, используемого с System.Func<TInput, Task<TOutput>>, обработка каждого входного элемента считается завершенной только после завершения возвращаемого объекта Task<TResult>. Как и в случае с ActionBlock<TInput>, с помощью этих двух механизмов можно использовать TransformBlock<TInput,TOutput> для синхронной и асинхронной обработки каждого входного элемента.

В следующем базовом примере создается объект TransformBlock<TInput,TOutput>, вычисляющий квадратный корень входных данных. Объект TransformBlock<TInput,TOutput> принимает значения Int32 в качестве входных данных и создает Double значения в качестве выходных данных.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Полные примеры, использующие TransformBlock<TInput,TOutput> в сети блоков передачи данных для обработки изображений в приложении Windows Forms, см. в пошаговом руководстве: Использование потока данных в приложении Windows Forms.

TransformManyBlock<TInput, TOutput>

Класс TransformManyBlock<TInput,TOutput> напоминает класс TransformBlock<TInput,TOutput>, за исключением того, что TransformManyBlock<TInput,TOutput> создает ноль или больше выходных значений для каждого входного значения вместо одного выходного значения для каждого входного значения. Делегат, предоставленный объекту TransformManyBlock<TInput,TOutput>, может быть типом System.Func<TInput, IEnumerable<TOutput>> или типом System.Func<TInput, Task<IEnumerable<TOutput>>>. При использовании объекта TransformManyBlock<TInput,TOutput> с System.Func<TInput, IEnumerable<TOutput>>обработка каждого входного элемента считается завершенной при возврате делегата. При использовании объекта TransformManyBlock<TInput,TOutput> с System.Func<TInput, Task<IEnumerable<TOutput>>>обработка каждого входного элемента считается завершенной только после завершения возвращаемого объекта System.Threading.Tasks.Task<IEnumerable<TOutput>>.

В следующем базовом примере создается объект TransformManyBlock<TInput,TOutput>, который разбивает строки на отдельные последовательности символов. Объект TransformManyBlock<TInput,TOutput> принимает значения String в качестве входных данных и создает Char значения в качестве выходных данных.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Полные примеры, использующие TransformManyBlock<TInput,TOutput> для создания нескольких независимых выходных данных для каждого ввода в конвейере потока данных, см. в "Пошаговое руководство: создание конвейера потоков данных".

Степень параллелизма

Каждый объект ActionBlock<TInput>, TransformBlock<TInput,TOutput>и TransformManyBlock<TInput,TOutput> буферизует входные сообщения, пока блок не готов к обработке. По умолчанию эти классы обрабатывают сообщения в порядке их получения, по одному сообщению за раз. Можно также указать степень параллелизма, чтобы включить ActionBlock<TInput>, TransformBlock<TInput,TOutput> и TransformManyBlock<TInput,TOutput> объекты для одновременной обработки нескольких сообщений. Дополнительные сведения о параллельном выполнении см. в разделе "Указание степени параллелизма" далее в этом документе. Пример, который задает степень параллелизма, позволяя блоку выполнения потока данных обрабатывать более одного сообщения одновременно, см. в руководстве Как указать степень параллелизма в блоке потока данных.

Сводка типов делегатов

В следующей таблице перечислены типы делегатов, которые можно предоставить для ActionBlock<TInput>, TransformBlock<TInput,TOutput>и объектов TransformManyBlock<TInput,TOutput>. Эта таблица также указывает, работает ли тип делегата синхронно или асинхронно.

Тип Синхронный тип делегата Асинхронный тип делегата
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

При работе с типами блоков выполнения можно также использовать лямбда-выражения. Пример того, как использовать лямбда-выражение с блоком выполнения, см. в разделе Как: Выполнить действие, когда блок потока данных получает данные.

Группировка блоков

Группирующие блоки объединяют данные из одного или нескольких источников с различными ограничениями. Библиотека потоков данных TPL предоставляет три типа блоков соединения: BatchBlock<T>, JoinBlock<T1,T2>и BatchedJoinBlock<T1,T2>.

Блок пакетной<T>

Класс BatchBlock<T> объединяет наборы входных данных, которые называются пакетами, в массивы выходных данных. При создании объекта BatchBlock<T> укажите размер каждого пакета. Когда объект BatchBlock<T> получает указанное количество входных элементов, он асинхронно распространяет массив, содержащий эти элементы. Если объект BatchBlock<T> имеет состояние завершенного, но не содержит достаточно элементов для формирования пакета, он распространяет окончательный массив, содержащий оставшиеся входные элементы.

Класс BatchBlock<T> работает в жадном или нежадном режиме. В жадном режиме, который является значением по умолчанию, объект BatchBlock<T> принимает каждое сообщение, которое оно предлагает и распространяет массив после получения указанного количества элементов. В нежадном режиме объект BatchBlock<T> откладывает все входящие сообщения до тех пор, пока достаточно источников не предоставят сообщения блоку для формирования пакета. Жадный режим обычно работает лучше, чем не жадный режим, так как он требует меньше затрат на обработку. Однако вы можете использовать не жадный режим, когда необходимо координировать потребление из нескольких источников атомарным образом. Укажите режим без жадности, установив Greedy на False в параметре dataflowBlockOptions конструктора BatchBlock<T>.

В следующем базовом примере несколько значений Int32 записывается в объект BatchBlock<T>, содержащий десять элементов в пакете. Чтобы гарантировать распространение всех значений из BatchBlock<T>, в этом примере вызывается метод Complete. Метод Complete задает объект BatchBlock<T> завершенным состоянием, поэтому объект BatchBlock<T> распространяет все остальные элементы в виде окончательного пакета.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Полный пример, демонстрирующий использование BatchBlock<T> для повышения эффективности операций вставки в базу данных, можно найти в разделе "Пошаговое руководство: использование BatchBlock и BatchedJoinBlock для повышения эффективности".

JoinBlock<T1, T2, ...>

Классы JoinBlock<T1,T2> и JoinBlock<T1,T2,T3> собирают входные элементы и распространяют System.Tuple<T1,T2> или System.Tuple<T1,T2,T3> объекты, содержащие эти элементы. Классы JoinBlock<T1,T2> и JoinBlock<T1,T2,T3> не наследуются от ITargetBlock<TInput>. Вместо этого они предоставляют свойства, Target1, Target2и Target3, реализующие ITargetBlock<TInput>.

Как и BatchBlock<T>, JoinBlock<T1,T2> и JoinBlock<T1,T2,T3> работают в жадном или нежадном режиме. В жадном режиме, который является режимом по умолчанию, объект JoinBlock<T1,T2> или JoinBlock<T1,T2,T3> принимает каждое сообщение, которое ему предложено, и распространяет кортеж после того, как каждая из его целей получит по крайней мере одно сообщение. В не жадном режиме объект JoinBlock<T1,T2> или JoinBlock<T1,T2,T3> откладывает все входящие сообщения до тех пор, пока все целевые объекты не будут предложены данные, необходимые для создания кортежа. На этом этапе блок участвует в двухфазном протоколе фиксации для атомарного извлечения всех необходимых элементов из источников. Это отложение позволяет другой сущности использовать данные в то же время, чтобы обеспечить общий прогресс системы.

В следующем базовом примере показано, в каком объекте JoinBlock<T1,T2,T3> требуется несколько данных для вычисления значения. В этом примере создается объект JoinBlock<T1,T2,T3>, который требует двух Int32 значений и значения Char для выполнения арифметической операции.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
         break;
      case '-':
         Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
         break;
      default:
         Console.WriteLine($"Unknown operator '{data.Item3}'.");
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Полный пример, использующий объекты JoinBlock<T1,T2> в не жадном режиме для совместного использования ресурса, см. в разделе Практическое руководство. Использование JoinBlock для чтения данных из нескольких источников.

BatchedJoinBlock<T1, T2, ...>

Классы BatchedJoinBlock<T1,T2> и BatchedJoinBlock<T1,T2,T3> собирают пакеты входных элементов и распространяют System.Tuple(IList(T1), IList(T2)) или System.Tuple(IList(T1), IList(T2), IList(T3)) объекты, содержащие эти элементы. Думайте о BatchedJoinBlock<T1,T2> как сочетании BatchBlock<T> и JoinBlock<T1,T2>. Укажите размер каждого пакета при создании объекта BatchedJoinBlock<T1,T2>. BatchedJoinBlock<T1,T2> также предоставляет свойства, Target1 и Target2, реализующие ITargetBlock<TInput>. Если указанное количество входных элементов получено от всех целевых объектов, объект BatchedJoinBlock<T1,T2> асинхронно распространяет System.Tuple(IList(T1), IList(T2)) объект, содержащий эти элементы.

В следующем базовом примере создается объект BatchedJoinBlock<T1,T2>, содержащий результаты, значения Int32 и ошибки, которые являются объектами Exception. В этом примере выполняется несколько операций, и результаты записываются в свойство Target1, а ошибки — в свойство Target2 объекта BatchedJoinBlock<T1,T2>. Так как количество успешных и неудачных операций неизвестно заранее, объекты IList<T> позволяют каждому целевому объекту получать ноль или больше значений.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Полный пример, в котором используется BatchedJoinBlock<T1,T2> для фиксации как результатов, так и любых исключений, возникающих при считывании данных программой из базы данных, см. в пошаговом руководстве «Использование BatchBlock и BatchedJoinBlock для повышения эффективности».

Настройка поведения блока потока данных

Дополнительные параметры можно включить, предоставив в конструктор типов блоков потока данных объект System.Threading.Tasks.Dataflow.DataflowBlockOptions. Эти параметры управляют поведением планировщика, который управляет базовой задачей и степенью параллелизма. DataflowBlockOptions также имеет производные типы, определяющие поведение, которое относится к определенным типам блоков потока данных. В следующей таблице приведены сведения о том, какой тип параметров связан с каждым типом блока потока данных.

Тип блока потока данных тип DataflowBlockOptions
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

В следующих разделах содержатся дополнительные сведения о важных типах параметров блока потока данных, доступных через классы System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionsи System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.

Настройка планировщика задач

Каждый предопределенный блок потока данных использует механизм планирования задач TPL для выполнения таких действий, как распространение данных в целевой объект, получение данных из источника и выполнение определяемых пользователем делегатов, когда данные становятся доступными. TaskScheduler — это абстрактный класс, представляющий планировщик задач, который помещает задачи в потоки. Планировщик задач по умолчанию Defaultиспользует класс ThreadPool для очереди и выполнения работы. Планировщик задач по умолчанию можно переопределить, задав свойство TaskScheduler при создании объекта блока потока данных.

Когда один планировщик задач управляет несколькими блоками потока данных, он может применять политики между ними. Например, если несколько блоков потока данных настроены на использование эксклюзивного планировщика одного и того же объекта ConcurrentExclusiveSchedulerPair, вся работа, выполняемая в этих блоках, сериализуется. Аналогично, если эти блоки настроены для планировщика параллельных задач того же объекта ConcurrentExclusiveSchedulerPair, и планировщик настроен на максимальный уровень параллелизма, вся работа из этих блоков ограничена тем количеством параллельных операций. Пример, использующий класс ConcurrentExclusiveSchedulerPair для параллельного выполнения операций чтения, но чтобы операции записи происходили с исключением всех других операций, см. раздел Практическое руководство: указание планировщика задач в блоке потока данных. Чтобы получить дополнительные сведения о планировщиках задач в библиотеке параллельных задач (TPL), обратитесь к теме класса TaskScheduler.

Указание степени параллелизма

По умолчанию три типа блоков выполнения, предоставляемые библиотекой потоков данных TPL, ActionBlock<TInput>, TransformBlock<TInput,TOutput>и TransformManyBlock<TInput,TOutput>, обрабатывают одно сообщение одновременно. Эти типы блоков потока данных также обрабатывают сообщения в порядке их получения. Чтобы включить эти блоки потока данных для одновременной обработки сообщений, задайте свойство ExecutionDataflowBlockOptions.MaxDegreeOfParallelism при создании объекта блока потока данных.

Значение по умолчанию MaxDegreeOfParallelism равно 1, что гарантирует, что блок потока данных обрабатывает одно сообщение одновременно. Задание этого свойства значением, превышающее 1, позволяет блоку потока данных обрабатывать несколько сообщений одновременно. Задание этого свойства DataflowBlockOptions.Unbounded позволяет базовому планировщику задач управлять максимальной степенью параллелизма.

Это важно

При указании максимальной степени параллелизма, превышающей 1, одновременно обрабатываются несколько сообщений, поэтому сообщения могут не обрабатываться в порядке их получения. Порядок вывода сообщений из блока, однако тот же, в котором они получены.

Так как свойство MaxDegreeOfParallelism представляет максимальную степень параллелизма, блок потока данных может выполняться с меньшей степенью параллелизма, чем указано. Блок потока данных может использовать меньшую степень параллелизма для удовлетворения своих функциональных требований или из-за отсутствия доступных системных ресурсов. Блок потока данных никогда не выбирает больше параллелизма, чем указано.

Значение свойства MaxDegreeOfParallelism является эксклюзивным для каждого объекта блока потока данных. Например, если четыре объекта блока потока данных указывают 1 для максимальной степени параллелизма, все четыре объекта блока потока данных могут выполняться параллельно.

Пример, который задает максимальную степень параллелизма для параллельного выполнения длительных операций, см. в разделе Практическое руководство. Указание степени параллелизма в блоке потока данных.

Указание количества сообщений для каждой задачи

Стандартные типы блоков потока данных используют задачи для обработки нескольких входных элементов. Это помогает свести к минимуму количество объектов задач, необходимых для обработки данных, что позволяет приложениям работать более эффективно. Однако, когда задачи из одного набора блоков потока данных обрабатывают данные, задачам из других блоков потока данных может потребоваться ожидание их обработки в очереди сообщений. Чтобы повысить справедливость между задачами потока данных, задайте свойство MaxMessagesPerTask. Если для MaxMessagesPerTask задано значение DataflowBlockOptions.Unbounded, то по умолчанию задача, используемая блоком потока данных, обрабатывает столько сообщений, сколько доступно. Если MaxMessagesPerTask задано значение, отличное от Unbounded, блок потока данных обрабатывает не более этого количества сообщений на объект Task. Хотя установка свойства MaxMessagesPerTask может повысить справедливость между задачами, система может создавать больше задач, чем это необходимо, что может снизить производительность.

Включение отмены

TPL предоставляет механизм, который позволяет задачам координированно отменять операции в сотрудничестве. Чтобы включить блоки потока данных для участия в этом механизме отмены, задайте свойство CancellationToken. Когда для этого объекта CancellationToken устанавливается состояние отмены, все блоки потока данных, которые отслеживают этот маркер, завершают выполнение текущего элемента, но не начинают обработку последующих элементов. Эти блоки потока данных также очищают все буферные сообщения, освобождают подключения к любым исходным и целевым блокам и переходят в отмененное состояние. При переходе в отмененное состояние, свойству Completion устанавливается значение Canceledдля свойства Status, если не произошло исключение во время обработки. В этом случае для Status задано значение Faulted.

Пример, демонстрирующий использование отмены в приложении Windows Forms, см. в разделе Практическое руководство: отмена блока потока данных. Для получения дополнительной информации об отмене задач в TPL см. и.

Указание жадности и не жадного поведения

Несколько типов блоков потока данных группировки могут работать в режиме жадности или не жадным режиме. По умолчанию предопределенные типы блоков потока данных работают в жадном режиме.

Для типов блоков соединения, таких как JoinBlock<T1,T2>, жадный режим означает, что блок немедленно принимает данные, даже если соответствующие данные, с которыми необходимо присоединиться, пока недоступны. Не жадный режим означает, что блок откладывает все входящие сообщения до тех пор, пока он не будет доступен для каждого из его целевых объектов для завершения соединения. Если любой из отложенных сообщений больше недоступен, блок соединения освобождает все отложенные сообщения и перезапускает процесс. Для класса BatchBlock<T> жадное и нежадное поведение схожи, за исключением того, что в нежадном режиме объект BatchBlock<T> откладывает все входящие сообщения до тех пор, пока они не будут достаточно получены из различных источников, чтобы можно было обработать партию.

Чтобы указать не жадный режим для блока потока данных, задайте для Greedy значение False. Пример, демонстрирующий использование не жадного режима для более эффективного использования нескольких блоков соединения для совместного использования источника данных, см. в статье Практическое руководство. Использование JoinBlock для чтения данных из нескольких источников.

Настраиваемые блоки потока данных

Хотя библиотека потоков данных TPL предоставляет множество предопределенных типов блоков, можно создать дополнительные типы блоков, которые выполняют настраиваемое поведение. Реализуйте интерфейсы ISourceBlock<TOutput> или ITargetBlock<TInput> напрямую или используйте метод Encapsulate для создания сложного блока, который инкапсулирует поведение существующих типов блоков. Для примеров, демонстрирующих, как реализовать пользовательские функции блока потока данных, см. Пошаговое руководство: Создание пользовательского типа блока потока данных.

Заголовок Описание
Как: Отправлять сообщения в блок передачи данных и читать сообщения из блока передачи данных Показывается, как записывать сообщения в объект и читать сообщения из объекта BufferBlock<T>.
Практическое руководство: Реализация шаблона Producer-Consumer потока данных Описывает, как использовать модель потока данных для реализации шаблона производителя-потребителя, где производитель отправляет сообщения в блок потока данных, а потребитель считывает сообщения из этого блока.
Как выполнить действие, когда блок потока данных получает данные Описывает, как предоставить делегатам типы блоков потока данных выполнения, ActionBlock<TInput>, TransformBlock<TInput,TOutput>и TransformManyBlock<TInput,TOutput>.
пошаговое руководство. Создание конвейера потоков данных Описывает создание конвейера потока данных, который загружает текст из Интернета и выполняет операции с этим текстом.
Как отменить связь блоков потока данных Демонстрирует, как использовать метод LinkTo для разрыва связи целевого блока с его источником после того, как источник предлагает сообщение целевому блоку.
Пошаговое руководство: Использование Dataflow в приложении Windows Forms Демонстрируется создание сети блоков потока данных, выполняющих обработку изображений в приложении Windows Forms.
Как отменить блок потока данных Демонстрирует, как использовать отмену в приложении на Windows Forms.
Практическое руководство. Использование JoinBlock для чтения данных из нескольких источников Объясняет, как использовать класс JoinBlock<T1,T2> для выполнения операции, когда данные доступны из нескольких источников, а также как использовать не жадный режим, чтобы обеспечить более эффективный общий доступ к источнику данных несколькими блоками соединения.
Практическое руководство. Указание степени параллелизма в блоке потока данных Описывает, как задать свойство MaxDegreeOfParallelism, чтобы включить блок потока данных выполнения для обработки нескольких сообщений одновременно.
Как задать планировщик задач в блоке потока данных Демонстрирует, как связать конкретный планировщик задач при использовании потока данных в приложении.
Обзор: Использование BatchBlock и BatchedJoinBlock для повышения эффективности Описывает использование класса BatchBlock<T> для повышения эффективности операций вставки базы данных и использования класса BatchedJoinBlock<T1,T2> для записи результатов и любых исключений, возникающих при чтении программы из базы данных.
Пошаговое руководство: создание пользовательского типа блока потока данных Демонстрирует два способа создания типа блока потока данных, реализующего пользовательское поведение.
Task Parallel Library (TPL) Представляет библиотеку TPL, упрощающую параллельное и конкурентное программирование в приложениях .NET Framework.