Delen via


Gegevensstroom (taakparallelbibliotheek)

De TPL (Task Parallel Library) biedt gegevensstroomonderdelen om de robuustheid van toepassingen met gelijktijdigheid te verbeteren. Deze gegevensstroomonderdelen worden gezamenlijk aangeduid als de TPL-gegevensstroombibliotheek. Dit gegevensstroommodel bevordert op actor gebaseerde programmering door in-process berichten door te geven voor grofkorrelige gegevensstroom- en pipeliningtaken. De gegevensstroomonderdelen bouwen voort op de typen en planningsinfrastructuur van de TPL en integreren met de taalondersteuning voor C#, Visual Basic en F# voor asynchrone programmering. Deze gegevensstroomonderdelen zijn handig wanneer u meerdere bewerkingen hebt die asynchroon met elkaar moeten communiceren of wanneer u gegevens wilt verwerken zodra deze beschikbaar zijn. Denk bijvoorbeeld aan een toepassing die afbeeldingsgegevens van een webcamera verwerkt. Met behulp van het gegevensstroommodel kan de toepassing afbeeldingsframes verwerken zodra deze beschikbaar zijn. Als de toepassing bijvoorbeeld afbeeldingsframes verbetert door lichte correctie of rode oogreductie uit te voeren, kunt u een pijplijn met gegevensstroomonderdelen maken. Voor elke fase van de pijplijn kan meer grof gemalen parallellismefunctionaliteit worden gebruikt, zoals de functionaliteit die door de TPL wordt geleverd, om de afbeelding te transformeren.

Dit document bevat een overzicht van de TPL-gegevensstroombibliotheek. Het beschrijft het programmeermodel, de vooraf gedefinieerde typen gegevensstroomblokken en het configureren van gegevensstroomblokken om te voldoen aan de specifieke vereisten van uw toepassingen.

Notitie

De TPL-gegevensstroombibliotheek (de System.Threading.Tasks.Dataflow naamruimte) wordt niet gedistribueerd met .NET. Als u de System.Threading.Tasks.Dataflow naamruimte in Visual Studio wilt installeren, opent u uw project, kiest u NuGet-pakketten beheren in het menu Project en zoekt u online naar het System.Threading.Tasks.Dataflow pakket. U kunt het ook installeren met behulp van de .NET Core CLI.dotnet add package System.Threading.Tasks.Dataflow

Programmeermodel

De TPL-gegevensstroombibliotheek biedt een basis voor het doorgeven en parallelliseren van CPU-intensieve en I/O-intensieve toepassingen met een hoge doorvoer en lage latentie. Het geeft u ook expliciet controle over hoe gegevens worden gebufferd en verplaatst door het systeem. Als u meer inzicht wilt krijgen in het programmeermodel voor gegevensstromen, kunt u een toepassing overwegen die asynchroon installatiekopieën van de schijf laadt en een samengestelde van deze installatiekopieën maakt. Traditionele programmeermodellen vereisen doorgaans dat u callbacks en synchronisatieobjecten, zoals vergrendelingen, gebruikt om taken en toegang tot gedeelde gegevens te coördineren. Met behulp van het programmeermodel voor gegevensstromen kunt u gegevensstroomobjecten maken die afbeeldingen verwerken terwijl ze worden gelezen van schijf. Onder het gegevensstroommodel declareert u hoe gegevens worden verwerkt wanneer deze beschikbaar worden en eventuele afhankelijkheden tussen gegevens. Omdat de runtime afhankelijkheden tussen gegevens beheert, kunt u vaak voorkomen dat de toegang tot gedeelde gegevens moet worden gesynchroniseerd. Bovendien kan de gegevensstroom, omdat de runtimeschema's werken op basis van de asynchrone aankomst van gegevens, de reactiesnelheid en doorvoer verbeteren door de onderliggende threads efficiënt te beheren. Zie Overzicht: Gegevensstroom gebruiken in een Windows Forms-toepassing voor een voorbeeld waarin het programmeermodel voor gegevensstromen wordt gebruikt voor het implementeren van afbeeldingsverwerking in een Windows Forms-toepassing.

Bronnen en doelen

De TPL-gegevensstroombibliotheek bestaat uit gegevensstroomblokken, die gegevensstructuren zijn die gegevens bufferen en verwerken. De TPL definieert drie soorten gegevensstroomblokken: bronblokken, doelblokken en doorgifteblokken. Een bronblok fungeert als gegevensbron en kan worden gelezen uit. Een doelblok fungeert als ontvanger van gegevens en kan worden geschreven naar. Een doorgifteblok fungeert als zowel een bronblok als een doelblok en kan worden gelezen van en naar geschreven. De TPL definieert de System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> interface die bronnen vertegenwoordigt, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> doelen vertegenwoordigt en System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> doorgifteders vertegenwoordigt. IPropagatorBlock<TInput,TOutput> neemt over van zowel ISourceBlock<TOutput>, als ITargetBlock<TInput>.

De TPL-gegevensstroombibliotheek biedt verschillende vooraf gedefinieerde typen gegevensstromen die de ISourceBlock<TOutput>, ITargetBlock<TInput>en IPropagatorBlock<TInput,TOutput> interfaces implementeren. Deze typen gegevensstroomblokken worden beschreven in dit document in de sectie Vooraf gedefinieerde typen gegevensstroomblokken.

blokken Verbinding maken

U kunt gegevensstroomblokken verbinden met pijplijnen. Dit zijn lineaire reeksen gegevensstroomblokken of netwerken, die grafieken van gegevensstroomblokken zijn. Een pijplijn is één vorm van netwerk. In een pijplijn of netwerk worden gegevens asynchroon doorgegeven aan doelen wanneer die gegevens beschikbaar komen. De ISourceBlock<TOutput>.LinkTo methode koppelt een brongegevensstroomblok aan een doelblok. Een bron kan worden gekoppeld aan nul of meer doelen; doelen kunnen worden gekoppeld vanuit nul of meer bronnen. U kunt gegevensstroomblokken aan of uit een pijplijn of netwerk gelijktijdig toevoegen of verwijderen. De vooraf gedefinieerde gegevensstroombloktypen verwerken alle thread-veiligheidsaspecten van het koppelen en loskoppelen.

Zie Walkthrough: Een gegevensstroompijplijn maken voor een voorbeeld waarmee gegevensstroomblokken worden verbonden om een basispijplijn te vormen. Zie Overzicht: Gegevensstroom gebruiken in een Windows Forms-toepassing voor een voorbeeld dat gegevensstroomblokken verbindt om een complexer netwerk te vormen. Zie Procedure: Gegevensstroomblokken ontkoppelen voor een voorbeeld dat een doel loskoppelt van een bron nadat de bron het doel heeft aangeboden.

Filteren

Wanneer u de ISourceBlock<TOutput>.LinkTo methode aanroept om een bron aan een doel te koppelen, kunt u een gemachtigde opgeven die bepaalt of het doelblok een bericht accepteert of weigert op basis van de waarde van dat bericht. Dit filtermechanisme is een handige manier om ervoor te zorgen dat een gegevensstroomblok alleen bepaalde waarden ontvangt. Voor de meeste vooraf gedefinieerde typen gegevensstroomblokken, als een bronblok is verbonden met meerdere doelblokken, wanneer een doelblok een bericht weigert, biedt de bron dat bericht aan het volgende doel. De volgorde waarin een bron berichten aan doelen biedt, wordt gedefinieerd door de bron en kan variëren afhankelijk van het type bron. De meeste bronbloktypen stoppen met het aanbieden van een bericht nadat één doel dat bericht accepteert. Een uitzondering op deze regel is de BroadcastBlock<T> klasse, die elk bericht aan alle doelen biedt, zelfs als sommige doelen het bericht afwijzen. Zie Walkthrough voor een voorbeeld waarin filteren wordt gebruikt om alleen bepaalde berichten te verwerken: Gegevensstroom gebruiken in een Windows Forms-toepassing.

Belangrijk

Omdat elk vooraf gedefinieerd type brongegevensstroomblok garandeert dat berichten worden doorgegeven in de volgorde waarin ze worden ontvangen, moet elk bericht worden gelezen uit het bronblok voordat het bronblok het volgende bericht kan verwerken. Wanneer u filteren gebruikt om meerdere doelen te verbinden met een bron, moet u er daarom voor zorgen dat ten minste één doelblok elk bericht ontvangt. Anders kan uw toepassing een impasse opleveren.

Bericht doorgeven

Het programmeermodel voor gegevensstromen is gerelateerd aan het concept van berichtdoorgifte, waarbij onafhankelijke onderdelen van een programma met elkaar communiceren door berichten te verzenden. Een manier om berichten tussen toepassingsonderdelen door te geven, is door de Post methoden (synchrone) en SendAsync (asynchrone) methoden aan te roepen om berichten te verzenden naar doelgegevensstroomblokken, en de methoden TryReceive voor het ReceiveReceiveAsyncontvangen van berichten van bronblokken. U kunt deze methoden combineren met gegevensstroompijplijnen of netwerken door invoergegevens te verzenden naar het hoofdknooppunt (een doelblok) en door uitvoergegevens te ontvangen van het terminalknooppunt van de pijplijn of de terminalknooppunten van het netwerk (een of meer bronblokken). U kunt de Choose methode ook gebruiken om te lezen uit de eerste van de opgegeven bronnen met gegevens die beschikbaar zijn en actie op die gegevens uit te voeren.

Bronblokken bieden gegevens aan doelblokken door de ITargetBlock<TInput>.OfferMessage methode aan te roepen. Het doelblok reageert op een aangeboden bericht op drie manieren: het kan het bericht accepteren, het bericht weigeren of het bericht uitstellen. Wanneer het doel het bericht accepteert, wordt de OfferMessage methode geretourneerd Accepted. Wanneer het doel het bericht weigert, wordt de OfferMessage methode geretourneerd Declined. Wanneer het doel vereist dat er geen berichten meer van de bron worden ontvangen, OfferMessage wordt het resultaat geretourneerd DecliningPermanently. De vooraf gedefinieerde bronbloktypen bieden geen berichten aan gekoppelde doelen nadat een dergelijke retourwaarde is ontvangen en ze ontkoppelen automatisch van dergelijke doelen.

Wanneer een doelblok het bericht uitstelt voor later gebruik, wordt de OfferMessage methode geretourneerd Postponed. Een doelblok dat een bericht uitstelt, kan de ISourceBlock<TOutput>.ReserveMessage methode later aanroepen om het aangeboden bericht te reserveren. Op dit moment is het bericht nog steeds beschikbaar en kan worden gebruikt door het doelblok, of het bericht is genomen door een ander doel. Wanneer het doelblok later het bericht vereist of het bericht niet meer nodig heeft, roept het respectievelijk de ISourceBlock<TOutput>.ConsumeMessage of ReleaseReservation methode aan. Berichtreservering wordt doorgaans gebruikt door de typen gegevensstroomblokken die in de niet-greedy modus werken. De niet-hebzuchtige modus wordt verderop in dit document uitgelegd. In plaats van een uitgesteld bericht te reserveren, kan een doelblok ook de ISourceBlock<TOutput>.ConsumeMessage methode gebruiken om het uitgestelde bericht rechtstreeks te gebruiken.

Voltooiing van gegevensstroomblok

Gegevensstroomblokken ondersteunen ook het concept van voltooiing. Een gegevensstroomblok met de voltooide status voert geen verdere werkzaamheden uit. Elk gegevensstroomblok heeft een gekoppeld System.Threading.Tasks.Task object, ook wel een voltooiingstaak genoemd, die de voltooiingsstatus van het blok vertegenwoordigt. Omdat u kunt wachten tot een object is voltooid door voltooiingstaken te gebruiken, kunt u wachten tot een Task of meer terminalknooppunten van een gegevensstroomnetwerk zijn voltooid. De IDataflowBlock interface definieert de Complete methode, waarmee het gegevensstroomblok wordt geïnformeerd over een aanvraag om deze te voltooien en de Completion eigenschap, die de voltooiingstaak voor het gegevensstroomblok retourneert. Zowel ISourceBlock<TOutput> als ITargetBlock<TInput> overnemen van de IDataflowBlock interface.

Er zijn twee manieren om te bepalen of een gegevensstroomblok zonder fouten is voltooid, een of meer fouten heeft aangetroffen of is geannuleerd. De eerste manier is om de Task.Wait methode aan te roepen voor de voltooiingstaak in een-trycatch blok (Try-Catchin Visual Basic). In het volgende voorbeeld wordt een ActionBlock<TInput> object gemaakt dat wordt gegenereerd ArgumentOutOfRangeException als de invoerwaarde kleiner is dan nul. AggregateException wordt gegenereerd wanneer dit voorbeeld de voltooiingstaak aanroept Wait . Het ArgumentOutOfRangeException object wordt geopend via de InnerExceptions eigenschap van het AggregateException object.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

In dit voorbeeld ziet u het geval waarin een uitzondering niet wordt verwerkt in de gemachtigde van een uitvoeringsgegevensstroomblok. We raden u aan om uitzonderingen in de lichamen van dergelijke blokken af te handelen. Als u dit echter niet kunt doen, gedraagt het blok zich alsof het is geannuleerd en niet binnenkomende berichten verwerkt.

Wanneer een gegevensstroomblok expliciet wordt geannuleerd, bevat OperationCanceledException het AggregateException object de InnerExceptions eigenschap. Zie de sectie Annulering inschakelen voor meer informatie over annulering van gegevensstromen.

De tweede manier om de voltooiingsstatus van een gegevensstroomblok te bepalen, is door een vervolg van de voltooiingstaak te gebruiken of de asynchrone taalfuncties van C# en Visual Basic te gebruiken om asynchroon te wachten op de voltooiingstaak. De gemachtigde die u aan de Task.ContinueWith methode opgeeft, neemt een Task object dat de antecedent-taak vertegenwoordigt. In het geval van de Completion eigenschap neemt de gedelegeerde voor de voortzetting de voltooiingstaak zelf. Het volgende voorbeeld lijkt op de vorige, behalve dat de methode ook wordt gebruikt ContinueWith om een vervolgtaak te maken waarmee de status van de algehele gegevensstroombewerking wordt afgedrukt.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

U kunt ook eigenschappen IsCanceled zoals in de hoofdtekst van de vervolgtaak gebruiken om aanvullende informatie te bepalen over de voltooiingsstatus van een gegevensstroomblok. Zie Koppelingstaken met behulp van vervolgtaken, taakannulering en afhandeling van uitzonderingen voor meer informatie over vervolgtaken en hoe deze betrekking hebben op annulering en foutafhandeling.

Vooraf gedefinieerde gegevensstroombloktypen

De TPL-gegevensstroombibliotheek biedt verschillende vooraf gedefinieerde typen gegevensstromen. Deze typen zijn onderverdeeld in drie categorieën: bufferblokken, uitvoeringsblokken en groeperingsblokken. In de volgende secties worden de bloktypen beschreven waaruit deze categorieën zijn opgebouwd.

Bufferblokken

Buffering blokkeert gegevens voor gebruik door gegevensgebruikers. De TPL-gegevensstroombibliotheek biedt drie bufferbloktypen: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>en System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

De BufferBlock<T> klasse vertegenwoordigt een asynchrone berichtstructuur voor algemeen gebruik. In deze klasse wordt een FIFO-wachtrij (First Out) opgeslagen van berichten waarnaar meerdere bronnen kunnen worden geschreven of gelezen door meerdere doelen. Wanneer een doel een bericht van een BufferBlock<T> object ontvangt, wordt dat bericht verwijderd uit de berichtenwachtrij. Hoewel een BufferBlock<T> object meerdere doelen kan hebben, ontvangt slechts één doel elk bericht. De BufferBlock<T> klasse is handig als u meerdere berichten wilt doorgeven aan een ander onderdeel en dat onderdeel moet elk bericht ontvangen.

In het volgende basisvoorbeeld worden verschillende Int32 waarden op een BufferBlock<T> object geplaatst en worden die waarden vervolgens van dat object gelezen.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Voor een volledig voorbeeld dat laat zien hoe u berichten schrijft naar en leest van een BufferBlock<T> object, raadpleegt u Hoe kunt u berichten schrijven naar en berichten lezen vanuit een gegevensstroomblok.

BroadcastBlock<T>

De BroadcastBlock<T> klasse is handig wanneer u meerdere berichten moet doorgeven aan een ander onderdeel, maar dat onderdeel heeft alleen de meest recente waarde nodig. Deze klasse is ook handig als u een bericht naar meerdere onderdelen wilt uitzenden.

In het volgende basisvoorbeeld wordt een Double waarde in een BroadcastBlock<T> object geplaatst en wordt die waarde vervolgens meerdere keren van dat object gelezen. Omdat waarden niet uit BroadcastBlock<T> objecten worden verwijderd nadat ze zijn gelezen, is dezelfde waarde elke keer beschikbaar.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Zie Een taakplanner opgeven in een gegevensstroomblok voor een volledig voorbeeld dat laat zien hoe BroadcastBlock<T> u een bericht naar meerdere doelblokken uitzendt.

WriteOnceBlock<T>

De WriteOnceBlock<T> klasse lijkt op de BroadcastBlock<T> klasse, behalve dat een WriteOnceBlock<T> object slechts één keer kan worden geschreven. U kunt denken WriteOnceBlock<T> dat het lijkt op het C# -sleutelwoord readonly (ReadOnly in Visual Basic), behalve dat een WriteOnceBlock<T> object onveranderbaar wordt nadat het een waarde heeft ontvangen in plaats van bij de constructie. Net als bij de BroadcastBlock<T> klasse, wanneer een doel een bericht van een WriteOnceBlock<T> object ontvangt, wordt dat bericht niet uit dat object verwijderd. Daarom ontvangen meerdere doelen een kopie van het bericht. De WriteOnceBlock<T> klasse is handig als u alleen de eerste van meerdere berichten wilt doorgeven.

In het volgende basisvoorbeeld worden meerdere String waarden naar een WriteOnceBlock<T> object geplaatst en wordt de waarde vervolgens van dat object gelezen. Omdat een WriteOnceBlock<T> object slechts één keer naar een object kan worden geschreven, worden volgende berichten verwijderd nadat een WriteOnceBlock<T> object een bericht heeft ontvangen.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Voor een volledig voorbeeld dat laat zien hoe WriteOnceBlock<T> u de waarde van de eerste bewerking kunt ontvangen die is voltooid, raadpleegt u Procedure: Gegevensstroomblokken ontkoppelen.

Uitvoeringsblokken

Uitvoeringsblokken roepen een door de gebruiker opgegeven gemachtigde aan voor elk ontvangen stukje ontvangen gegevens. De TPL-gegevensstroombibliotheek biedt drie typen uitvoeringsblokken: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>en System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

De ActionBlock<TInput> klasse is een doelblok dat een gemachtigde aanroept wanneer deze gegevens ontvangt. U kunt een ActionBlock<TInput> object beschouwen als gemachtigde die asynchroon wordt uitgevoerd wanneer gegevens beschikbaar komen. De gemachtigde die u aan een ActionBlock<TInput> object opgeeft, kan van het type Action<T> of type System.Func<TInput, Task>zijn. Wanneer u een ActionBlock<TInput> object gebruikt met Action<T>, wordt de verwerking van elk invoerelement beschouwd als voltooid wanneer de gedelegeerde terugkeert. Wanneer u een object met System.Func<TInput, Task>een ActionBlock<TInput> object gebruikt, wordt de verwerking van elk invoerelement alleen als voltooid beschouwd wanneer het geretourneerde Task object is voltooid. Met behulp van deze twee mechanismen kunt u zowel voor synchrone als asynchrone verwerking van elk invoerelement gebruiken ActionBlock<TInput> .

In het volgende basisvoorbeeld worden meerdere Int32 waarden op een ActionBlock<TInput> object geplaatst. Het ActionBlock<TInput> object drukt deze waarden af op de console. In dit voorbeeld wordt het blok ingesteld op de voltooide status en wordt gewacht totdat alle gegevensstroomtaken zijn voltooid.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Voor volledige voorbeelden die laten zien hoe u gemachtigden gebruikt met de ActionBlock<TInput> klasse, raadpleegt u Procedure: Actie uitvoeren wanneer een gegevensstroomblok gegevens ontvangt.

TransformBlock<TInput, TOutput>

De TransformBlock<TInput,TOutput> klasse lijkt op de ActionBlock<TInput> klasse, behalve dat deze fungeert als een bron en als doel. De gemachtigde die u aan een TransformBlock<TInput,TOutput> object doorgeeft, retourneert een waarde van het type TOutput. De gemachtigde die u aan een TransformBlock<TInput,TOutput> object opgeeft, kan van het type System.Func<TInput, TOutput> of type System.Func<TInput, Task<TOutput>>zijn. Wanneer u een object met System.Func<TInput, TOutput>gebruiktTransformBlock<TInput,TOutput>, wordt de verwerking van elk invoerelement beschouwd als voltooid wanneer de gedelegeerde terugkeert. Wanneer u een TransformBlock<TInput,TOutput> object gebruikt waarmee System.Func<TInput, Task<TOutput>>wordt gebruikt, wordt de verwerking van elk invoerelement alleen als voltooid beschouwd wanneer het geretourneerde Task<TResult> object is voltooid. Net als bij ActionBlock<TInput>het gebruik van deze twee mechanismen kunt u zowel voor synchrone als asynchrone verwerking van elk invoerelement gebruiken TransformBlock<TInput,TOutput> .

In het volgende basisvoorbeeld wordt een TransformBlock<TInput,TOutput> object gemaakt waarmee de vierkantswortel van de invoer wordt berekend. Het TransformBlock<TInput,TOutput> object neemt Int32 waarden als invoer en produceert Double waarden als uitvoer.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Zie Overzicht: Gegevensstroom gebruiken in een Windows Forms-toepassing voor volledige voorbeelden die worden gebruikt TransformBlock<TInput,TOutput> in een netwerk van gegevensstroomblokken die afbeeldingsverwerking uitvoeren in een Windows Forms-toepassing.

TransformManyBlock<TInput, TOutput>

De TransformManyBlock<TInput,TOutput> klasse lijkt op de TransformBlock<TInput,TOutput> klasse, behalve dat TransformManyBlock<TInput,TOutput> er nul of meer uitvoerwaarden voor elke invoerwaarde worden geproduceerd, in plaats van slechts één uitvoerwaarde voor elke invoerwaarde. De gemachtigde die u aan een TransformManyBlock<TInput,TOutput> object opgeeft, kan van het type System.Func<TInput, IEnumerable<TOutput>> of type System.Func<TInput, Task<IEnumerable<TOutput>>>zijn. Wanneer u een object met System.Func<TInput, IEnumerable<TOutput>>gebruiktTransformManyBlock<TInput,TOutput>, wordt de verwerking van elk invoerelement beschouwd als voltooid wanneer de gedelegeerde terugkeert. Wanneer u een TransformManyBlock<TInput,TOutput> object gebruikt met System.Func<TInput, Task<IEnumerable<TOutput>>>, wordt de verwerking van elk invoerelement alleen beschouwd als voltooid wanneer het geretourneerde System.Threading.Tasks.Task<IEnumerable<TOutput>> object is voltooid.

In het volgende basisvoorbeeld wordt een TransformManyBlock<TInput,TOutput> object gemaakt waarmee tekenreeksen worden gesplitst in de afzonderlijke tekenreeksen. Het TransformManyBlock<TInput,TOutput> object neemt String waarden als invoer en produceert Char waarden als uitvoer.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Zie Overzicht: Een gegevensstroompijplijn maken voor volledige voorbeelden die gebruikmaken TransformManyBlock<TInput,TOutput> van meerdere onafhankelijke uitvoer voor elke invoer in een gegevensstroompijplijn.

Mate van parallelle uitvoering

Elke ActionBlock<TInput>, TransformBlock<TInput,TOutput>en TransformManyBlock<TInput,TOutput> object buffert invoerberichten totdat het blok gereed is om ze te verwerken. Deze klassen verwerken standaard berichten in de volgorde waarin ze worden ontvangen, één bericht tegelijk. U kunt ook de mate van parallelle uitvoering opgeven, ActionBlock<TInput>TransformBlock<TInput,TOutput> zodat objecten TransformManyBlock<TInput,TOutput> meerdere berichten gelijktijdig kunnen verwerken. Zie de sectie Specificeren van de mate van parallelle uitvoering verderop in dit document voor meer informatie over gelijktijdige uitvoering. Voor een voorbeeld waarin de mate van parallelle uitvoering zo wordt ingesteld dat een gegevensstroomblok voor uitvoering meer dan één bericht tegelijk kan verwerken, raadpleegt u Procedure: De mate van parallelle uitvoering opgeven in een gegevensstroomblok.

Samenvatting van gemachtigdentypen

De volgende tabel bevat een overzicht van de gedelegeerdentypen die u kunt opgeven ActionBlock<TInput>en TransformManyBlock<TInput,TOutput>TransformBlock<TInput,TOutput>objecten. In deze tabel wordt ook aangegeven of het type gemachtigde synchroon of asynchroon werkt.

Type Synchroon gemachtigdentype Asynchroon gemachtigdentype
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

U kunt ook lambda-expressies gebruiken wanneer u met uitvoeringsbloktypen werkt. Voor een voorbeeld waarin wordt getoond hoe u een lambda-expressie gebruikt met een uitvoeringsblok, raadpleegt u Hoe kunt u actie uitvoeren wanneer een gegevensstroomblok gegevens ontvangt.

Groeperingsblokken

Groeperen blokkeert het combineren van gegevens uit een of meer bronnen en onder verschillende beperkingen. De TPL-gegevensstroombibliotheek biedt drie joinbloktypen: BatchBlock<T>, JoinBlock<T1,T2>en BatchedJoinBlock<T1,T2>.

BatchBlock<T>

De BatchBlock<T> klasse combineert sets invoergegevens, ook wel batches genoemd, in matrices met uitvoergegevens. U geeft de grootte van elke batch op wanneer u een BatchBlock<T> object maakt. Wanneer het BatchBlock<T> object het opgegeven aantal invoerelementen ontvangt, wordt asynchroon een matrix met deze elementen doorgegeven. Als een BatchBlock<T> object is ingesteld op de voltooide status, maar niet voldoende elementen bevat om een batch te vormen, wordt er een uiteindelijke matrix doorgegeven die de resterende invoerelementen bevat.

De BatchBlock<T> klasse werkt in de hebzuchtige of niet-hebzuchtige modus. In de greedy-modus, wat de standaardinstelling is, accepteert een BatchBlock<T> object elk bericht dat het wordt aangeboden en geeft een matrix door nadat het het opgegeven aantal elementen heeft ontvangen. In de niet-hebzuchtige modus BatchBlock<T> worden alle binnenkomende berichten uitgesteld totdat voldoende bronnen berichten aan het blok hebben aangeboden om een batch te vormen. De greedy-modus presteert doorgaans beter dan de niet-greedy-modus, omdat hiervoor minder verwerkingsoverhead nodig is. U kunt echter de niet-greedy-modus gebruiken wanneer u het verbruik van meerdere bronnen op atomische wijze moet coördineren. Geef de niet-greedy modus op door deze in te stellen GreedyFalse in de dataflowBlockOptions parameter in de BatchBlock<T> constructor.

In het volgende basisvoorbeeld worden verschillende Int32 waarden op een BatchBlock<T> object geplaatst dat tien elementen in een batch bevat. Om te garanderen dat alle waarden worden doorgegeven uit het BatchBlock<T>voorbeeld, roept dit voorbeeld de Complete methode aan. Met Complete de methode wordt het BatchBlock<T> object ingesteld op de voltooide status en daarom BatchBlock<T> worden alle resterende elementen doorgegeven als een uiteindelijke batch.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Zie Walkthrough: BatchBlock en BatchedJoinBlock gebruiken om de efficiëntie van invoegbewerkingen voor databases te verbeteren voor een volledig voorbeeld dat wordt gebruikt BatchBlock<T> om de efficiëntie te verbeteren.

JoinBlock<T1, T2, ...>

De JoinBlock<T1,T2> en JoinBlock<T1,T2,T3> klassen verzamelen invoerelementen en geven System.Tuple<T1,T2> ze door of System.Tuple<T1,T2,T3> objecten die deze elementen bevatten. De JoinBlock<T1,T2> en JoinBlock<T1,T2,T3> klassen nemen niet over van ITargetBlock<TInput>. In plaats daarvan bieden ze eigenschappen, Target1, Target2en , Target3die implementeren ITargetBlock<TInput>.

JoinBlock<T1,T2>JoinBlock<T1,T2,T3> En BatchBlock<T>werken in de hebzuchtige of niet-hebzuchtige modus. In de greedy-modus, wat de standaardinstelling is, accepteert een JoinBlock<T1,T2> of JoinBlock<T1,T2,T3> object elk bericht dat wordt aangeboden en geeft een tuple door nadat elk van de doelen ten minste één bericht heeft ontvangen. In de niet-hebzuchtige modus JoinBlock<T1,T2>JoinBlock<T1,T2,T3> worden alle binnenkomende berichten uitgesteld totdat alle doelen de gegevens hebben aangeboden die nodig zijn om een tuple te maken. Op dit moment neemt het blok deel aan een doorvoerprotocol in twee fasen om alle vereiste items uit de bronnen atomisch op te halen. Door dit uitstel kan een andere entiteit de gegevens ondertussen gebruiken, zodat het algehele systeem vooruitgang kan boeken.

In het volgende basisvoorbeeld ziet u een geval waarin voor een JoinBlock<T1,T2,T3> object meerdere gegevens nodig zijn om een waarde te berekenen. In dit voorbeeld wordt een JoinBlock<T1,T2,T3> object gemaakt waarvoor twee Int32 waarden en een Char waarde nodig zijn om een rekenkundige bewerking uit te voeren.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Zie Voor een volledig voorbeeld waarin objecten in de niet-greedy modus worden gebruikt JoinBlock<T1,T2> om gezamenlijk een resource te delen: JoinBlock gebruiken om gegevens uit meerdere bronnen te lezen.

BatchedJoinBlock<T1, T2, ...>

De BatchedJoinBlock<T1,T2> en BatchedJoinBlock<T1,T2,T3> klassen verzamelen batches invoerelementen en geven System.Tuple(IList(T1), IList(T2)) objecten door die System.Tuple(IList(T1), IList(T2), IList(T3)) elementen door. BatchedJoinBlock<T1,T2> Denk aan als een combinatie van BatchBlock<T> en JoinBlock<T1,T2>. Geef de grootte van elke batch op wanneer u een BatchedJoinBlock<T1,T2> object maakt. BatchedJoinBlock<T1,T2> biedt ook eigenschappen en Target1Target2, die implementeren ITargetBlock<TInput>. Wanneer het opgegeven aantal invoerelementen van alle doelen wordt ontvangen, geeft het BatchedJoinBlock<T1,T2> object asynchroon een System.Tuple(IList(T1), IList(T2)) object door dat deze elementen bevat.

In het volgende basisvoorbeeld wordt een BatchedJoinBlock<T1,T2> object gemaakt dat resultaten, Int32 waarden en fouten bevat die objecten zijn Exception . In dit voorbeeld worden meerdere bewerkingen uitgevoerd en worden resultaten naar de Target1 eigenschap en fouten naar de Target2 eigenschap van het BatchedJoinBlock<T1,T2> object geschreven. Omdat het aantal geslaagde en mislukte bewerkingen vooraf onbekend is, kunnen de IList<T> objecten elk doel nul of meer waarden ontvangen.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Zie Walkthrough: BatchBlock en BatchedJoinBlock gebruiken om de efficiëntie te verbeteren voor een volledig voorbeeld dat wordt gebruikt BatchedJoinBlock<T1,T2> om zowel de resultaten als eventuele uitzonderingen vast te leggen die optreden tijdens het lezen van het programma uit een database.

Gedrag van gegevensstroomblok configureren

U kunt extra opties inschakelen door een System.Threading.Tasks.Dataflow.DataflowBlockOptions object aan te bieden aan de constructor van gegevensstroombloktypen. Deze opties bepalen gedrag zoals de scheduler die de onderliggende taak en de mate van parallelle uitvoering beheert. De DataflowBlockOptions bevat ook afgeleide typen die gedrag opgeven dat specifiek is voor bepaalde typen gegevensstroomblokken. De volgende tabel bevat een overzicht van het type opties dat is gekoppeld aan elk type gegevensstroomblok.

Bloktype gegevensstroom DataflowBlockOptions type
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

De volgende secties bevatten aanvullende informatie over de belangrijke soorten opties voor gegevensstroomblokken die beschikbaar zijn via de System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionsen System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions klassen.

De taakplanner opgeven

Elk vooraf gedefinieerd gegevensstroomblok maakt gebruik van het TPL-taakplanningsmechanisme om activiteiten uit te voeren, zoals het doorgeven van gegevens aan een doel, het ontvangen van gegevens van een bron en het uitvoeren van door de gebruiker gedefinieerde gemachtigden wanneer gegevens beschikbaar komen. TaskScheduler is een abstracte klasse die een taakplanner vertegenwoordigt waarmee taken in threads worden geplaatst. De standaardtaakplanner, Defaultgebruikt de klasse om werk in de ThreadPool wachtrij te plaatsen en uit te voeren. U kunt de standaardtaakplanner overschrijven door de TaskScheduler eigenschap in te stellen wanneer u een blokobject voor een gegevensstroom maakt.

Wanneer dezelfde taakplanner meerdere gegevensstroomblokken beheert, kan er beleid worden afgedwongen. Als er bijvoorbeeld meerdere gegevensstroomblokken zijn geconfigureerd om de exclusieve scheduler van hetzelfde ConcurrentExclusiveSchedulerPair object te bereiken, wordt al het werk dat in deze blokken wordt uitgevoerd, geserialiseerd. Op dezelfde manier geldt dat als deze blokken zijn geconfigureerd om de gelijktijdige scheduler van hetzelfde ConcurrentExclusiveSchedulerPair object te bereiken en die scheduler zodanig is geconfigureerd dat ze een maximum gelijktijdigheidsniveau hebben, al het werk uit deze blokken beperkt is tot dat aantal gelijktijdige bewerkingen. Voor een voorbeeld waarin de ConcurrentExclusiveSchedulerPair klasse wordt gebruikt om leesbewerkingen parallel uit te voeren, maar schrijfbewerkingen die uitsluitend van alle andere bewerkingen moeten plaatsvinden, raadpleegt u Instructies: Een taakplanner opgeven in een gegevensstroomblok. Zie het klasonderwerp voor meer informatie over taakplanners in de TaskScheduler TPL.

De mate van parallelle uitvoering opgeven

Standaard verwerken de drie typen uitvoeringsblokken die de TPL-gegevensstroombibliotheek biedt, ActionBlock<TInput>TransformBlock<TInput,TOutput>en TransformManyBlock<TInput,TOutput>één bericht tegelijk verwerken. Deze gegevensstroombloktypen verwerken ook berichten in de volgorde waarin ze worden ontvangen. Als u deze gegevensstroomblokken wilt inschakelen om berichten gelijktijdig te verwerken, stelt u de ExecutionDataflowBlockOptions.MaxDegreeOfParallelism eigenschap in wanneer u het blokobject voor gegevensstromen samenstelt.

De standaardwaarde is MaxDegreeOfParallelism 1, wat garandeert dat het gegevensstroomblok één bericht tegelijk verwerkt. Als u deze eigenschap instelt op een waarde die groter is dan 1, kan het gegevensstroomblok meerdere berichten gelijktijdig verwerken. Door deze eigenschap in te stellen, DataflowBlockOptions.Unbounded kan de onderliggende taakplanner de maximale mate van gelijktijdigheid beheren.

Belangrijk

Wanneer u een maximale mate van parallelle uitvoering opgeeft die groter is dan 1, worden meerdere berichten tegelijkertijd verwerkt en worden berichten daarom mogelijk niet verwerkt in de volgorde waarin ze worden ontvangen. De volgorde waarin de berichten worden uitgevoerd vanuit het blok, is echter hetzelfde als waarin ze worden ontvangen.

Omdat de MaxDegreeOfParallelism eigenschap de maximale mate van parallelle uitvoering aangeeft, kan het gegevensstroomblok worden uitgevoerd met een mindere mate van parallelle uitvoering dan u opgeeft. Het gegevensstroomblok kan minder parallellisme gebruiken om te voldoen aan de functionele vereisten of omdat er onvoldoende systeembronnen beschikbaar zijn. Een gegevensstroomblok kiest nooit meer parallellisme dan u opgeeft.

De waarde van de MaxDegreeOfParallelism eigenschap is exclusief voor elk blokobject voor gegevensstromen. Als bijvoorbeeld vier blokobjecten voor gegevensstromen elk 1 opgeven voor de maximale mate van parallelle uitvoering, kunnen alle vier blokobjecten voor gegevensstromen parallel worden uitgevoerd.

Voor een voorbeeld waarin de maximale mate van parallelle uitvoering wordt ingesteld om langdurige bewerkingen parallel uit te voeren, raadpleegt u Procedure: De mate van parallelle uitvoering opgeven in een gegevensstroomblok.

Het aantal berichten per taak opgeven

De vooraf gedefinieerde typen gegevensstromen maken gebruik van taken om meerdere invoerelementen te verwerken. Dit helpt bij het minimaliseren van het aantal taakobjecten dat nodig is voor het verwerken van gegevens, waardoor toepassingen efficiënter kunnen worden uitgevoerd. Wanneer de taken uit een set gegevensstroomblokken echter gegevens verwerken, moeten de taken van andere gegevensstroomblokken mogelijk wachten op verwerkingstijd door berichten in de wachtrij te plaatsen. Stel de MaxMessagesPerTask eigenschap in om een betere billijkheid tussen gegevensstroomtaken mogelijk te maken. Wanneer MaxMessagesPerTask dit is ingesteld DataflowBlockOptions.Unboundedop , wat de standaardwaarde is, verwerkt de taak die door een gegevensstroomblok wordt gebruikt zoveel berichten als beschikbaar is. Wanneer MaxMessagesPerTask dit is ingesteld op een andere waarde dan Unbounded, verwerkt het gegevensstroomblok maximaal dit aantal berichten per Task object. Hoewel het instellen van de MaxMessagesPerTask eigenschap de billijkheid tussen taken kan verhogen, kan dit ertoe leiden dat het systeem meer taken maakt dan nodig is, waardoor de prestaties kunnen worden verlaagd.

Annulering inschakelen

De TPL biedt een mechanisme waarmee taken de annulering op een coöperatieve manier kunnen coördineren. Als u gegevensstroomblokken wilt inschakelen om deel te nemen aan dit annuleringsmechanisme, stelt u de CancellationToken eigenschap in. Wanneer dit CancellationToken object is ingesteld op de geannuleerde status, worden alle gegevensstroomblokken die het uitvoeren van dit token van het huidige item controleren, maar niet beginnen met het verwerken van volgende items. Deze gegevensstroom blokkeert ook alle gebufferde berichten, releaseverbindingen met bron- en doelblokken en overgang naar de geannuleerde status. Door over te stappen op de geannuleerde status, heeft de Completion eigenschap de Status eigenschap ingesteld op Canceled, tenzij er een uitzondering is opgetreden tijdens de verwerking. In dat geval Status is ingesteld op Faulted.

Zie Een gegevensstroomblok annuleren voor een voorbeeld dat laat zien hoe u annulering gebruikt in een Windows Forms-toepassing. Zie Taakannulering voor meer informatie over annulering in de TPL.

Greedy versus niet-greedy gedrag opgeven

Verschillende bloktypen voor groeperingsgegevensstromen kunnen worden uitgevoerd in de greedy - of niet-greedy-modus . Standaard worden de vooraf gedefinieerde typen gegevensstroomblokken uitgevoerd in de greedy-modus.

Voor joinbloktypen zoals JoinBlock<T1,T2>de hebzuchtige modus betekent dit dat het blok onmiddellijk gegevens accepteert, zelfs als de bijbehorende gegevens waarmee u wilt deelnemen nog niet beschikbaar zijn. Niet-hebzuchtige modus betekent dat het blok alle binnenkomende berichten uitstelt totdat er één beschikbaar is op elk van de doelen om de join te voltooien. Als een van de uitgestelde berichten niet meer beschikbaar is, worden alle uitgestelde berichten door het joinblok vrijgegeven en wordt het proces opnieuw gestart. Voor de BatchBlock<T> klasse is hebzuchtig en niet-greedy gedrag vergelijkbaar, behalve dat een object onder niet-greedy modus BatchBlock<T> alle binnenkomende berichten uitstelt totdat er voldoende beschikbaar zijn uit verschillende bronnen om een batch te voltooien.

Als u de niet-greedy-modus voor een gegevensstroomblok wilt opgeven, stelt u deze in Greedy op False. Voor een voorbeeld waarin wordt gedemonstreerd hoe u de niet-greedy-modus kunt gebruiken om meerdere joinblokken efficiënter te laten delen met een gegevensbron, raadpleegt u Hoe kunt u: JoinBlock gebruiken om gegevens uit meerdere bronnen te lezen.

Aangepaste gegevensstroomblokken

Hoewel de TPL-gegevensstroombibliotheek veel vooraf gedefinieerde bloktypen biedt, kunt u extra bloktypen maken die aangepast gedrag uitvoeren. Implementeer de ISourceBlock<TOutput> of ITargetBlock<TInput> interfaces rechtstreeks of gebruik de Encapsulate methode om een complex blok te bouwen dat het gedrag van bestaande bloktypen inkapselt. Zie Walkthrough: Een aangepast type gegevensstroomblok maken voor voorbeelden die laten zien hoe u aangepaste functionaliteit voor gegevensstromen kunt implementeren.

Title Beschrijving
Procedure: Berichten schrijven naar en berichten lezen van een gegevensstroomblok Demonstreert hoe u berichten schrijft naar en leest van een BufferBlock<T> object.
Procedure: Een gegevensstroompatroon voor producer-consumer implementeren Hierin wordt beschreven hoe u het gegevensstroommodel gebruikt om een producer-consumerpatroon te implementeren, waarbij de producent berichten naar een gegevensstroomblok verzendt en de consument berichten van dat blok leest.
Procedure: Actie uitvoeren wanneer een gegevensstroomblok gegevens ontvangt Hierin wordt beschreven hoe u gemachtigden kunt bieden voor de bloktypen voor uitvoeringsgegevensstromen, ActionBlock<TInput>en TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput>.
Overzicht: Een gegevensstroompijplijn maken Hierin wordt beschreven hoe u een gegevensstroompijplijn maakt waarmee tekst van het web wordt gedownload en bewerkingen op die tekst worden uitgevoerd.
Procedure: Gegevensstroomblokken ontkoppelen Demonstreert hoe u de LinkTo methode gebruikt om een doelblok los te koppelen van de bron nadat de bron een bericht aan het doel heeft aangeboden.
Overzicht: Gegevensstroom gebruiken in een Windows Forms-toepassing Demonstreert hoe u een netwerk van gegevensstroomblokken maakt die afbeeldingsverwerking uitvoeren in een Windows Forms-toepassing.
Procedure: Een gegevensstroomblok annuleren Demonstreert hoe u annulering gebruikt in een Windows Forms-toepassing.
Procedure: JoinBlock gebruiken om gegevens uit meerdere bronnen te lezen Hierin wordt uitgelegd hoe u de JoinBlock<T1,T2> klasse gebruikt om een bewerking uit te voeren wanneer gegevens uit meerdere bronnen beschikbaar zijn en hoe u de niet-greedy-modus gebruikt om meerdere joinblokken in te schakelen om een gegevensbron efficiënter te delen.
Procedure: De mate van parallelle uitvoering opgeven in een gegevensstroomblok Beschrijft hoe u de MaxDegreeOfParallelism eigenschap instelt om een uitvoeringsgegevensstroomblok in te schakelen voor het verwerken van meer dan één bericht tegelijk.
Procedure: Een taakplanner opgeven in een gegevensstroomblok Demonstreert hoe u een specifieke taakplanner koppelt wanneer u een gegevensstroom in uw toepassing gebruikt.
Walkthrough: BatchBlock en BatchedJoinBlock gebruiken om de efficiëntie te verbeteren Beschrijft hoe u de BatchBlock<T> klasse gebruikt om de efficiëntie van invoegbewerkingen voor databases te verbeteren en hoe u de BatchedJoinBlock<T1,T2> klasse gebruikt om zowel de resultaten als eventuele uitzonderingen vast te leggen die optreden terwijl het programma uit een database leest.
Walkthrough: Een aangepast gegevensstroombloktype maken Demonstreert twee manieren om een gegevensstroombloktype te maken waarmee aangepast gedrag wordt geïmplementeerd.
Taakparallelbibliotheek (TPL) Introduceert de TPL, een bibliotheek die parallelle en gelijktijdige programmering in .NET Framework-toepassingen vereenvoudigt.