Tok dat (Task Parallel Library)
Knihovna TPL (Task Parallel Library) poskytuje komponenty toku dat, které pomáhají zvýšit odolnost aplikací s podporou souběžnosti. Tyto komponenty toku dat se souhrnně označují jako knihovna toku dat TPL. Tento model toku dat podporuje programování založené na objektech actor tím, že poskytuje předávání zpráv v procesu pro hrubé odstupňované úlohy toku dat a potrubí. Komponenty toku dat vycházejí z typů a plánování infrastruktury TPL a integrují se s podporou jazyka C#, Visual Basic a F# pro asynchronní programování. Tyto komponenty toku dat jsou užitečné, pokud máte více operací, které musí vzájemně komunikovat asynchronně nebo když chcete zpracovávat data, jakmile budou k dispozici. Představte si například aplikaci, která zpracovává data obrázků z webové kamery. Pomocí modelu toku dat může aplikace zpracovávat snímky obrázků, jakmile budou k dispozici. Pokud aplikace vylepšuje snímky obrázků, například provedením opravy světla nebo snížení počtu červených očí, můžete vytvořit kanál komponent toku dat. Každá fáze kanálu může k transformaci image použít hrubší paralelismus, například funkce poskytované TPL.
Tento dokument obsahuje přehled knihovny toků dat TPL. Popisuje programovací model, předdefinované typy bloků toku dat a způsob konfigurace bloků toku dat tak, aby splňovaly konkrétní požadavky vašich aplikací.
Poznámka:
Knihovna toku dat TPL ( System.Threading.Tasks.Dataflow obor názvů) není distribuována s .NET. Pokud chcete nainstalovat System.Threading.Tasks.Dataflow obor názvů v sadě Visual Studio, otevřete projekt, v nabídce Projekt zvolte Spravovat balíčky NuGet a vyhledejte System.Threading.Tasks.Dataflow
balíček online. Pokud ho chcete nainstalovat pomocí rozhraní příkazového řádku .NET Core, spusťte dotnet add package System.Threading.Tasks.Dataflow
.
Programovací model
Knihovna toků dat TPL poskytuje základ pro předávání zpráv a paralelizaci aplikací náročných na procesor a vstupně-výstupní operace, které mají vysokou propustnost a nízkou latenci. Poskytuje také explicitní kontrolu nad tím, jak se data uloží do vyrovnávací paměti a přesunují se kolem systému. Pokud chcete lépe porozumět programovacímu modelu toku dat, zvažte aplikaci, která asynchronně načítá obrázky z disku a vytvoří složenou z těchto imagí. Tradiční programovací modely obvykle vyžadují použití zpětných volání a synchronizačních objektů, jako jsou zámky, ke koordinaci úkolů a přístupu ke sdíleným datům. Pomocí programovacího modelu toku dat můžete vytvořit objekty toku dat, které zpracovávají obrázky při čtení z disku. V rámci modelu toku dat deklarujete, jak se data zpracovávají, když jsou k dispozici, a také všechny závislosti mezi daty. Vzhledem k tomu, že modul runtime spravuje závislosti mezi daty, můžete se často vyhnout požadavku na synchronizaci přístupu ke sdíleným datům. Vzhledem k tomu, že plány modulu runtime fungují na základě asynchronního doručení dat, může tok dat zlepšit rychlost odezvy a propustnost efektivní správou podkladových vláken. Příklad, který k implementaci zpracování obrázků v aplikaci model Windows Forms používá programovací model toku dat, najdete v tématu Návod: Použití toku dat v aplikaci model Windows Forms.
Zdroje a cíle
Knihovna toků dat TPL se skládá z bloků toku dat, což jsou datové struktury, které ukládají do vyrovnávací paměti a zpracovávají data. TPL definuje tři druhy bloků toku dat: zdrojové bloky, cílové bloky a bloky šíření. Zdrojový blok funguje jako zdroj dat a dá se číst z. Cílový blok funguje jako příjemce dat a může být zapsán do. Blok šíření funguje jako zdrojový i cílový blok a lze ho číst a zapisovat do. TPL definuje System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> rozhraní, které představuje zdroje, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> představují cíle a System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> představují šíření. IPropagatorBlock<TInput,TOutput> dědí z obou ISourceBlock<TOutput>, a ITargetBlock<TInput>.
Knihovna toků dat TPL poskytuje několik předdefinovaných typů bloků toku dat, které implementují ISourceBlock<TOutput>rozhraní a ITargetBlock<TInput>IPropagatorBlock<TInput,TOutput> . Tyto typy bloků toku dat jsou popsány v tomto dokumentu v části Předdefinované typy bloků toku dat.
Připojení bloky
Bloky toku dat můžete připojit k vytvoření kanálů, což jsou lineární sekvence bloků toku dat nebo sítě, což jsou grafy bloků toku dat. Kanál je jednou z forem sítě. V kanálu nebo síti zdroje asynchronně šíří data do cílů, jakmile jsou tato data dostupná. Metoda ISourceBlock<TOutput>.LinkTo propojuje blok toku zdrojových dat s cílovým blokem. Zdroj lze propojit s nulovými nebo více cíli; cíle mohou být propojeny z nuly nebo více zdrojů. Bloky toku dat můžete přidávat nebo odebírat do kanálu nebo sítě současně. Předdefinované typy bloků toku dat zpracovávají všechny aspekty zabezpečení vláken propojení a zrušení propojení.
Příklad, který spojuje bloky toku dat k vytvoření základního kanálu, najdete v tématu Návod: Vytvoření kanálu toku dat. Příklad, který spojuje bloky toku dat, aby vytvořil složitější síť, najdete v tématu Návod: Použití toku dat v aplikaci model Windows Forms. Příklad, který zruší propojení cíle ze zdroje poté, co zdroj nabídne cílovou zprávu, najdete v tématu Postupy: Zrušení propojení bloků toku dat.
Filtrování
Když zavoláte metodu ISourceBlock<TOutput>.LinkTo pro propojení zdroje s cílem, můžete zadat delegáta, který určuje, jestli cílový blok přijme nebo odmítne zprávu na základě hodnoty této zprávy. Tento mechanismus filtrování je užitečný způsob, jak zaručit, že blok toku dat přijímá pouze určité hodnoty. U většiny předdefinovaných typů bloků toku dat platí, že pokud je zdrojový blok připojený k více cílovým blokům, když cílový blok odmítne zprávu, zdroj tuto zprávu nabídne dalšímu cíli. Pořadí, ve kterém zdroj nabízí zprávy cílům, je definován zdrojem a může se lišit podle typu zdroje. Většina typů bloků zdroje přestane nabízet zprávu po přijetí jedné cílové zprávy. Jednou z výjimek tohoto pravidla je BroadcastBlock<T> třída, která nabízí každou zprávu všem cílům, i když některé cíle zprávu zamítnou. Příklad, který používá filtrování ke zpracování pouze určitých zpráv, naleznete v části Návod: Použití toku dat v aplikaci model Windows Forms.
Důležité
Vzhledem k tomu, že každý předdefinovaný typ bloku toku dat zaručuje, že se zprávy rozšíří v pořadí, ve kterém jsou přijaty, musí být každá zpráva načtena ze zdrojového bloku, aby zdrojový blok mohl zpracovat další zprávu. Proto když použijete filtrování pro připojení více cílů ke zdroji, ujistěte se, že každá zpráva obdrží alespoň jeden cílový blok. Jinak může vaše aplikace dojít k zablokování.
Předávání zpráv
Programovací model toku dat souvisí s konceptem předávání zpráv, kdy nezávislé komponenty programu vzájemně komunikují odesláním zpráv. Jedním ze způsobů, jak rozšířit zprávy mezi komponenty aplikace, je volat Post (synchronní) a SendAsync (asynchronní) metody odesílání zpráv do cílových bloků toku dat, a ReceiveReceiveAsync, a TryReceive metody přijímat zprávy ze zdrojových bloků. Tyto metody můžete kombinovat s kanály toku dat nebo sítěmi odesláním vstupních dat do hlavního uzlu (cílový blok) a příjmem výstupních dat z koncového uzlu kanálu nebo terminálových uzlů sítě (jeden nebo více zdrojových bloků). Můžete také použít metodu Choose ke čtení z prvního ze zadaných zdrojů, které mají k dispozici data, a provádět akce s daty.
Zdrojové bloky nabízejí data pro cílové bloky voláním ITargetBlock<TInput>.OfferMessage metody. Cílový blok reaguje na nabízenou zprávu jedním ze tří způsobů: může zprávu přijmout, odmítnout nebo odložit zprávu. Když cíl přijme zprávu, OfferMessage metoda vrátí Accepted. Když cíl zprávu odmítne, OfferMessage metoda vrátí Declined. Pokud cíl vyžaduje, aby již neobdržel žádné zprávy ze zdroje, OfferMessage vrátí DecliningPermanently. Předdefinované typy zdrojového bloku nenabízejí zprávy propojeným cílům po přijetí takové návratové hodnoty a automaticky zruší propojení s těmito cíli.
Když cílový blok odloží zprávu pro pozdější použití, OfferMessage metoda vrátí Postponed. Cílový blok, který odloží zprávu, může později volat metodu ISourceBlock<TOutput>.ReserveMessage , která se pokusí rezervovat nabízenou zprávu. V tomto okamžiku je zpráva stále dostupná a může ji použít cílový blok nebo zpráva byla přijata jiným cílem. Pokud cílový blok později zprávu vyžaduje nebo už zprávu nepotřebuje, zavolá metodu nebo ReleaseReservation metoduISourceBlock<TOutput>.ConsumeMessage. Rezervace zpráv se obvykle používá u typů bloků toku dat, které pracují v režimu bez greedy. Režim bez greedy je vysvětlen dále v tomto dokumentu. Místo rezervace odložené zprávy může cílový blok použít také metodu ISourceBlock<TOutput>.ConsumeMessage k přímému využívání odložené zprávy.
Dokončení bloku toku dat
Bloky toku dat také podporují koncept dokončení. Blok toku dat, který je v dokončeném stavu, neprovádí žádnou další práci. Každý blok toku dat má přidružený System.Threading.Tasks.Task objekt, který se označuje jako úkol dokončení, který představuje stav dokončení bloku. Vzhledem k tomu, že můžete počkat na dokončení objektu Task pomocí úkolů dokončení, můžete počkat na dokončení jednoho nebo více terminálových uzlů sítě toku dat. Rozhraní IDataflowBlock definuje metodu Complete , která informuje blok toku dat o dokončení požadavku a Completion vlastnost, která vrací úlohu dokončení bloku toku dat. Rozhraní ISourceBlock<TOutput> a ITargetBlock<TInput> dědí ho IDataflowBlock .
Existují dva způsoby, jak zjistit, jestli se blok toku dat dokončil bez chyby, zjistil jednu nebo více chyb nebo byl zrušen. Prvním způsobem je volat metodu Task.Wait úkolu dokončení v-try
catch
bloku (Try
-Catch
v jazyce Visual Basic). Následující příklad vytvoří ActionBlock<TInput> objekt, který vyvolá ArgumentOutOfRangeException , pokud je jeho vstupní hodnota menší než nula. AggregateException je vyvolán, když tento příklad volá Wait úkol dokončení. K ArgumentOutOfRangeException objektu InnerExceptions se přistupuje prostřednictvím vlastnosti objektu AggregateException .
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine("n = {0}", n);
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine("Encountered {0}: {1}",
e.GetType().Name, e.Message);
return true;
});
}
/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
Tento příklad ukazuje případ, kdy se výjimka neošetřená v delegátu bloku toku dat spuštění. Doporučujeme zpracovávat výjimky v těle těchto bloků. Pokud to ale nemůžete udělat, blok se chová, jako by byl zrušený a nezpracovává příchozí zprávy.
Pokud je blok toku dat explicitně zrušen, AggregateException objekt obsahuje OperationCanceledException ve InnerExceptions vlastnosti. Další informace o zrušení toku dat najdete v části Povolení zrušení .
Druhým způsobem, jak určit stav dokončení bloku toku dat, je použití pokračování úkolu dokončení nebo použití asynchronních jazykových funkcí jazyka C# a Visual Basic k asynchronnímu čekání na dokončení úkolu. Delegát, který zadáte metodě Task.ContinueWith , přebírá Task objekt, který představuje úkol stecedent. V případě Completion vlastnosti přebírá delegát pro pokračování úkol dokončení sám. Následující příklad se podobá předchozímu, s tím rozdílem, že používá také metodu ContinueWith k vytvoření úlohy pokračování, která vytiskne stav celkové operace toku dat.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine("n = {0}", n);
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
Console.WriteLine("The status of the completion task is '{0}'.",
task.Status);
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine("Encountered {0}: {1}",
e.GetType().Name, e.Message);
return true;
});
}
/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
Console.WriteLine("n = {0}", n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
End Sub)
' Create a continuation task that prints the overall
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))
' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()
' Wait for completion in a try/catch block.
Try
throwIfNegative.Completion.Wait()
Catch ae As AggregateException
' If an unhandled exception occurs during dataflow processing, all
' exceptions are propagated through an AggregateException object.
ae.Handle(Function(e)
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
Return True
End Function)
End Try
' Output:
' n = 0
' n = -1
' The status of the completion task is 'Faulted'.
' Encountered ArgumentOutOfRangeException: Specified argument was out of the range
' of valid values.
'
K určení dalších informací o stavu dokončení bloku toku dat můžete použít také vlastnosti, například IsCanceled v textu úkolu pokračování. Další informace o úlohách pokračování a o tom, jak souvisí se zpracováním zrušení a chyb, najdete v tématu Řetězení úloh pomocí úloh pokračování, zrušení úkolů a zpracování výjimek.
Předdefinované typy bloků toku dat
Knihovna toků dat TPL poskytuje několik předdefinovaných typů bloků toku dat. Tyto typy jsou rozdělené do tří kategorií: bloky ukládání do vyrovnávací paměti, spouštěcí bloky a seskupovací bloky. Následující části popisují typy bloků, které tvoří tyto kategorie.
Bloky ukládání do vyrovnávací paměti
Ukládání bloků do vyrovnávací paměti uchovává data pro použití spotřebiteli dat. Knihovna toků dat TPL poskytuje tři typy bloků vyrovnávací paměti: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>a System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.
BufferBlock<T>
Třída BufferBlock<T> představuje strukturu asynchronního zasílání zpráv pro obecné účely. Tato třída ukládá první do fronty fiFO (first out) zpráv, které lze zapisovat do více zdrojů nebo číst z více cílů. Když cíl přijme zprávu z objektu BufferBlock<T> , tato zpráva se odebere z fronty zpráv. I když BufferBlock<T> objekt může mít více cílů, obdrží každá zpráva pouze jeden cíl. Třída BufferBlock<T> je užitečná, když chcete předat více zpráv jiné komponentě a tato komponenta musí přijímat každou zprávu.
Následující základní příklad publikuje několik Int32 hodnot do objektu BufferBlock<T> a pak tyto hodnoty načte zpět z tohoto objektu.
// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
/* Output:
0
1
2
*/
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
'
Úplný příklad, který ukazuje, jak zapisovat zprávy do objektu a číst je z objektu BufferBlock<T> , viz Postupy: Zápis zpráv do bloku toku dat a čtení zpráv z bloku toku dat.
BroadcastBlock<T>
Třída BroadcastBlock<T> je užitečná, když musíte předat více zpráv jiné komponentě, ale tato komponenta potřebuje pouze nejnovější hodnotu. Tato třída je užitečná také v případě, že chcete zprávu vysílat více komponentám.
Následující základní příklad publikuje Double hodnotu do objektu BroadcastBlock<T> a pak ji několikrát přečte zpět z tohoto objektu. Vzhledem k tomu, že hodnoty nejsou odebrány z BroadcastBlock<T> objektů po jejich přečtení, je stejná hodnota k dispozici pokaždé.
// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);
// Post a message to the block.
broadcastBlock.Post(Math.PI);
// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}
/* Output:
3.14159265358979
3.14159265358979
3.14159265358979
*/
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)
' Post a message to the block.
broadcastBlock.Post(Math.PI)
' Receive the messages back from the block several times.
For i As Integer = 0 To 2
Console.WriteLine(broadcastBlock.Receive())
Next i
' Output:
' 3.14159265358979
' 3.14159265358979
' 3.14159265358979
'
Úplný příklad, který ukazuje, jak použít BroadcastBlock<T> k vysílání zprávy do více cílových bloků, viz Postupy: Určení plánovače úloh v bloku toku dat.
WriteOnceBlock<T>
Třída WriteOnceBlock<T> se podobá BroadcastBlock<T> třídě s tím rozdílem, že WriteOnceBlock<T> objekt lze zapsat pouze jednou. Můžete si představitWriteOnceBlock<T>, že se podobá klíčovému slovu readonly (ReadOnly v jazyce Visual Basic) jazyka C#, s tím rozdílem, že WriteOnceBlock<T> objekt se stane neměnným, jakmile obdrží hodnotu místo při konstrukci. Podobně jako třída BroadcastBlock<T> , když cíl obdrží zprávu z objektu, tato zpráva se z objektu WriteOnceBlock<T> neodebere. Proto více cílů obdrží kopii zprávy. Třída WriteOnceBlock<T> je užitečná, když chcete rozšířit pouze první z více zpráv.
Následující základní příklad publikuje více String hodnot do objektu WriteOnceBlock<T> a pak přečte hodnotu zpět z tohoto objektu. WriteOnceBlock<T> Vzhledem k tomu, že objekt lze zapsat pouze jednou, jakmile WriteOnceBlock<T> objekt obdrží zprávu, zahodí další zprávy.
// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);
// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));
// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());
/* Sample output:
Message 2
*/
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)
' Post several messages to the block in parallel. The first
' message to be received is written to the block.
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))
' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())
' Sample output:
' Message 2
'
Úplný příklad, který ukazuje, jak získat WriteOnceBlock<T> hodnotu první operace, která se dokončí, naleznete v tématu Postupy: Zrušení propojení bloků toku dat.
Spouštěcí bloky
Bloky provádění volají delegáta poskytnutého uživatelem pro každou část přijatých dat. Knihovna toků dat TPL poskytuje tři typy bloků spuštění: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>a System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.
ActionBlock<T>
Třída ActionBlock<TInput> je cílový blok, který volá delegáta, když přijímá data. ActionBlock<TInput> Objekt si můžete představit jako delegáta, který se spouští asynchronně, když budou data k dispozici. Delegát, který zadáte objektu ActionBlock<TInput> , může být typu Action<T> nebo typu System.Func<TInput, Task>
. Při použití objektu ActionBlock<TInput> s Action<T>, zpracování každého vstupního prvku je považováno za dokončené při vrácení delegáta. Při použití objektu ActionBlock<TInput> s System.Func<TInput, Task>
, zpracování každého vstupního prvku je považováno za dokončeno pouze v případě, že vrácený Task objekt je dokončen. Pomocí těchto dvou mechanismů můžete použít ActionBlock<TInput> synchronní i asynchronní zpracování každého vstupního prvku.
Následující základní příklad publikuje více Int32 hodnot do objektu ActionBlock<TInput> . Objekt ActionBlock<TInput> tyto hodnoty vytiskne do konzoly. Tento příklad pak nastaví blok na dokončený stav a čeká na dokončení všech úloh toku dat.
// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
actionBlock.Post(i * 10);
}
// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();
/* Output:
0
10
20
*/
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))
' Post several messages to the block.
For i As Integer = 0 To 2
actionBlock.Post(i * 10)
Next i
' Set the block to the completed state and wait for all
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()
' Output:
' 0
' 10
' 20
'
Kompletní příklady, které ukazují, jak používat delegáty se ActionBlock<TInput> třídou, naleznete v tématu Postupy: Provedení akce Při bloku toku dat přijímá data.
TransformBlock<TInput, TOutput>
Třída TransformBlock<TInput,TOutput> se podobá ActionBlock<TInput> třídě, s tím rozdílem, že funguje jako zdroj i jako cíl. Delegát, který předáte objektu TransformBlock<TInput,TOutput> , vrátí hodnotu typu TOutput
. Delegát, který zadáte objektu TransformBlock<TInput,TOutput> , může být typu System.Func<TInput, TOutput>
nebo typu System.Func<TInput, Task<TOutput>>
. Při použití objektu TransformBlock<TInput,TOutput> s System.Func<TInput, TOutput>
, zpracování každého vstupního elementu je považováno za dokončené při vrácení delegáta. Při použití objektu použitého TransformBlock<TInput,TOutput> s System.Func<TInput, Task<TOutput>>
, zpracování každého vstupního prvku je považováno za dokončeno pouze v případě, že vrácený Task<TResult> objekt je dokončen. Stejně jako u ActionBlock<TInput>, pomocí těchto dvou mechanismů můžete použít TransformBlock<TInput,TOutput> pro synchronní i asynchronní zpracování každého vstupního prvku.
Následující základní příklad vytvoří TransformBlock<TInput,TOutput> objekt, který vypočítá druhou odmocninu jeho vstupu. Objekt TransformBlock<TInput,TOutput> přebírá Int32 hodnoty jako vstup a vytváří Double hodnoty jako výstup.
// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));
// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);
// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}
/* Output:
3.16227766016838
4.47213595499958
5.47722557505166
*/
' Create a TransformBlock<int, double> object that
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))
' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)
' Read the output messages from the block.
For i As Integer = 0 To 2
Console.WriteLine(transformBlock.Receive())
Next i
' Output:
' 3.16227766016838
' 4.47213595499958
' 5.47722557505166
'
Kompletní příklady, které se používají TransformBlock<TInput,TOutput> v síti bloků toku dat, které provádějí zpracování obrázků v aplikaci model Windows Forms, najdete v tématu Návod: Použití toku dat v aplikaci model Windows Forms.
TransformManyBlock<TInput, TOutput>
TransformManyBlock<TInput,TOutput> Třída se podobá TransformBlock<TInput,TOutput> třídě s tím rozdílem, že TransformManyBlock<TInput,TOutput> pro každou vstupní hodnotu vytváří nulové nebo více výstupních hodnot, nikoli pouze jednu výstupní hodnotu pro každou vstupní hodnotu. Delegát, který zadáte objektu TransformManyBlock<TInput,TOutput> , může být typu System.Func<TInput, IEnumerable<TOutput>>
nebo typu System.Func<TInput, Task<IEnumerable<TOutput>>>
. Při použití objektu TransformManyBlock<TInput,TOutput> s System.Func<TInput, IEnumerable<TOutput>>
, zpracování každého vstupního elementu je považováno za dokončené při vrácení delegáta. Při použití objektu TransformManyBlock<TInput,TOutput> s System.Func<TInput, Task<IEnumerable<TOutput>>>
, zpracování každého vstupního prvku je považováno za dokončené pouze v případě, že vrácený System.Threading.Tasks.Task<IEnumerable<TOutput>>
objekt je dokončen.
Následující základní příklad vytvoří TransformManyBlock<TInput,TOutput> objekt, který rozdělí řetězce na jejich jednotlivé sekvence znaků. Objekt TransformManyBlock<TInput,TOutput> přebírá String hodnoty jako vstup a vytváří Char hodnoty jako výstup.
// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
s => s.ToCharArray());
// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");
// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}
/* Output:
H
e
l
l
o
W
o
r
l
d
*/
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())
' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")
' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
Console.WriteLine(transformManyBlock.Receive())
Next i
' Output:
' H
' e
' l
' l
' o
' W
' o
' r
' l
' d
'
Kompletní příklady, které slouží TransformManyBlock<TInput,TOutput> k vytvoření více nezávislých výstupů pro každý vstup v kanálu toku dat, najdete v tématu Návod: Vytvoření kanálu toku dat.
Stupeň paralelismu
Každý ActionBlock<TInput>objekt TransformBlock<TInput,TOutput>a TransformManyBlock<TInput,TOutput> objekt uloží vstupní zprávy do vyrovnávací paměti, dokud blok není připraven je zpracovat. Ve výchozím nastavení tyto třídy zpracovávají zprávy v pořadí, v jakém jsou přijímány, a to po jedné zprávě. Můžete také určit stupeň paralelismu, který povolí ActionBlock<TInput>TransformBlock<TInput,TOutput> a TransformManyBlock<TInput,TOutput> objekty ke souběžnému zpracování více zpráv. Další informace o souběžné provádění naleznete v části Určení stupně paralelismu dále v tomto dokumentu. Příklad, který nastaví stupeň paralelismu tak, aby blok toku dat provádění zpracovával více než jednu zprávu najednou, najdete v tématu Postupy: Určení stupně paralelismu v bloku toku dat.
Souhrn typů delegátů
Následující tabulka shrnuje typy delegátů, které můžete zadat pro ActionBlock<TInput>, TransformBlock<TInput,TOutput>a TransformManyBlock<TInput,TOutput> objekty. Tato tabulka také určuje, jestli typ delegáta funguje synchronně nebo asynchronně.
Typ | Synchronní typ delegáta | Asynchronní typ delegáta |
---|---|---|
ActionBlock<TInput> | System.Action |
System.Func<TInput, Task> |
TransformBlock<TInput,TOutput> | System.Func<TInput, TOutput> |
System.Func<TInput, Task<TOutput>> |
TransformManyBlock<TInput,TOutput> | System.Func<TInput, IEnumerable<TOutput>> |
System.Func<TInput, Task<IEnumerable<TOutput>>> |
Výrazy lambda můžete použít také při práci s typy bloků provádění. Příklad, který ukazuje, jak použít výraz lambda s blokem provádění, viz Postupy: Provedení akce, když blok toku dat přijímá data.
Seskupování bloků
Seskupení blokuje kombinování dat z jednoho nebo více zdrojů a v různých omezeních. Knihovna toků dat TPL poskytuje tři typy bloků spojení: BatchBlock<T>, JoinBlock<T1,T2>a BatchedJoinBlock<T1,T2>.
BatchBlock<T>
Třída BatchBlock<T> kombinuje sady vstupních dat, které se označují jako dávky, do polí výstupních dat. Při vytváření objektu BatchBlock<T> určíte velikost každé dávky. BatchBlock<T> Když objekt obdrží zadaný počet vstupních prvků, asynchronně rozšíří pole, které obsahuje tyto prvky. BatchBlock<T> Pokud je objekt nastaven na dokončený stav, ale neobsahuje dostatek prvků k vytvoření dávky, rozšíří se konečné pole, které obsahuje zbývající vstupní prvky.
Třída BatchBlock<T> funguje v režimu greedy nebo bez greedy . V režimu greedy, což je výchozí, objekt přijímá každou zprávu, BatchBlock<T> že je nabízena a šíří pole po přijetí zadaného počtu prvků. V režimu, který není greedy, odloží objekt všechny příchozí zprávy, BatchBlock<T> dokud dostatek zdrojů nenabízejí zprávy bloku k vytvoření dávky. Režim Greedy obvykle funguje lépe než režim bez greedy, protože vyžaduje menší režii na zpracování. Režim bez greedy ale můžete použít, když musíte koordinovat spotřebu z více zdrojů atomickým způsobem. Určete režim, který není greedy, nastavením Greedy parametru False
dataflowBlockOptions
v konstruktoru BatchBlock<T> .
Následující základní příklad publikuje několik Int32 hodnot do objektu BatchBlock<T> , který obsahuje deset prvků v dávce. Chcete-li zaručit, že se všechny hodnoty rozšíří z této BatchBlock<T>metody, tento příklad volá metodu Complete . Metoda Complete nastaví BatchBlock<T> objekt do dokončeného stavu, a proto BatchBlock<T> objekt rozšíří všechny zbývající prvky jako konečnou dávku.
// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);
// Post several values to the block.
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();
// Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.",
batchBlock.Receive().Sum());
Console.WriteLine("The sum of the elements in batch 2 is {0}.",
batchBlock.Receive().Sum());
/* Output:
The sum of the elements in batch 1 is 45.
The sum of the elements in batch 2 is 33.
*/
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)
' Post several values to the block.
For i As Integer = 0 To 12
batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()
' Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())
Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())
' Output:
' The sum of the elements in batch 1 is 45.
' The sum of the elements in batch 2 is 33.
'
Úplný příklad, který používá BatchBlock<T> ke zlepšení efektivity operací vkládání databáze, najdete v tématu Návod: Použití BatchBlock a BatchedJoinBlock ke zlepšení efektivity.
JoinBlock<T1, T2, ...>
JoinBlock<T1,T2,T3> Třídy JoinBlock<T1,T2> shromažďují vstupní prvky a šíří se ven System.Tuple<T1,T2> nebo System.Tuple<T1,T2,T3> objekty, které obsahují tyto prvky. Třídy JoinBlock<T1,T2>JoinBlock<T1,T2,T3> a třídy nedědí z ITargetBlock<TInput>. Místo toho poskytují vlastnosti, Target1, Target2a Target3, které implementují ITargetBlock<TInput>.
JoinBlock<T1,T2> Podobně jako BatchBlock<T>a JoinBlock<T1,T2,T3> pracovat v režimu greedy nebo bez greedy. V režimu greedy, což je výchozí, JoinBlock<T1,T2> objekt nebo JoinBlock<T1,T2,T3> přijímá všechny zprávy, které je nabízena a šíří řazenou kolekci členů po každém z cílů obdrží alespoň jednu zprávu. V režimu bez greedy odloží JoinBlock<T1,T2> nebo JoinBlock<T1,T2,T3> objekt všechny příchozí zprávy, dokud nebudou nabízena všechna cíle, která jsou potřebná k vytvoření řazené kolekce členů. V tomto okamžiku se blok zapojí do dvoufázového protokolu potvrzení, který atomicky načte všechny požadované položky ze zdrojů. Díky tomuto odložení může jiná entita mezitím využívat data, aby celkový systém mohl pokračovat.
Následující základní příklad ukazuje případ, JoinBlock<T1,T2,T3> kdy objekt k výpočtu hodnoty vyžaduje více dat. Tento příklad vytvoří JoinBlock<T1,T2,T3> objekt, který vyžaduje dvě Int32 hodnoty a Char hodnotu k provedení aritmetické operace.
// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();
// Post two values to each target of the join.
joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);
joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);
joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');
// Receive each group of values and apply the operator part
// to the number parts.
for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine("{0} + {1} = {2}",
data.Item1, data.Item2, data.Item1 + data.Item2);
break;
case '-':
Console.WriteLine("{0} - {1} = {2}",
data.Item1, data.Item2, data.Item1 - data.Item2);
break;
default:
Console.WriteLine("Unknown operator '{0}'.", data.Item3);
break;
}
}
/* Output:
3 + 5 = 8
6 - 4 = 2
*/
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()
' Post two values to each target of the join.
joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)
joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)
joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)
' Receive each group of values and apply the operator part
' to the number parts.
For i As Integer = 0 To 1
Dim data = joinBlock.Receive()
Select Case data.Item3
Case "+"c
Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
Case "-"c
Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
Case Else
Console.WriteLine("Unknown operator '{0}'.", data.Item3)
End Select
Next i
' Output:
' 3 + 5 = 8
' 6 - 4 = 2
'
Úplný příklad, který používá JoinBlock<T1,T2> objekty v režimu bez greedy ke spolupráci na sdílení prostředku, najdete v tématu Postupy: Použití JoinBlock ke čtení dat z více zdrojů.
BatchedJoinBlock<T1, T2, ...>
BatchedJoinBlock<T1,T2,T3> Třídy BatchedJoinBlock<T1,T2> shromažďují dávky vstupních prvků a šíří se ven System.Tuple(IList(T1), IList(T2))
nebo System.Tuple(IList(T1), IList(T2), IList(T3))
objekty, které obsahují tyto prvky. BatchedJoinBlock<T1,T2> Představte si kombinaci BatchBlock<T> a JoinBlock<T1,T2>. Při vytváření objektu BatchedJoinBlock<T1,T2> zadejte velikost každé dávky. BatchedJoinBlock<T1,T2> poskytuje také vlastnosti a Target1Target2, které implementují ITargetBlock<TInput>. Pokud se zadaný počet vstupních prvků přijímá ze všech cílů, BatchedJoinBlock<T1,T2> objekt asynchronně rozšíří System.Tuple(IList(T1), IList(T2))
objekt, který obsahuje tyto prvky.
Následující základní příklad vytvoří BatchedJoinBlock<T1,T2> objekt, který obsahuje výsledky, Int32 hodnoty a chyby, které jsou Exception objekty. Tento příklad provádí více operací a zapisuje výsledky do Target1 vlastnosti a chyby do Target2 vlastnosti objektu BatchedJoinBlock<T1,T2> . Vzhledem k tomu, že počet úspěšných a neúspěšných operací je předem neznámý, IList<T> objekty umožňují každému cíli přijímat nulové nebo více hodnot.
// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};
// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);
// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
try
{
// Post the result of the worker to the
// first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i));
}
catch (ArgumentOutOfRangeException e)
{
// If an error occurred, post the Exception to the
// second target of the block.
batchedJoinBlock.Target2.Post(e);
}
}
// Read the results from the block.
var results = batchedJoinBlock.Receive();
// Print the results to the console.
// Print the results.
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}
/* Output:
5
6
13
55
0
Specified argument was out of the range of valid values.
Specified argument was out of the range of valid values.
*/
' For demonstration, create a Func<int, int> that
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
If n < 0 Then
Throw New ArgumentOutOfRangeException()
End If
Return n
End Function
' Create a BatchedJoinBlock<int, Exception> object that holds
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)
' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
Try
' Post the result of the worker to the
' first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i))
Catch e As ArgumentOutOfRangeException
' If an error occurred, post the Exception to the
' second target of the block.
batchedJoinBlock.Target2.Post(e)
End Try
Next i
' Read the results from the block.
Dim results = batchedJoinBlock.Receive()
' Print the results to the console.
' Print the results.
For Each n As Integer In results.Item1
Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
Console.WriteLine(e.Message)
Next e
' Output:
' 5
' 6
' 13
' 55
' 0
' Specified argument was out of the range of valid values.
' Specified argument was out of the range of valid values.
'
Úplný příklad, který používá BatchedJoinBlock<T1,T2> k zachycení výsledků i všech výjimek, ke kterým dochází při čtení programu z databáze, najdete v tématu Návod: Použití batchBlock a BatchedJoinBlock ke zlepšení efektivity.
Konfigurace chování bloku toku dat
Další možnosti můžete povolit poskytnutím System.Threading.Tasks.Dataflow.DataflowBlockOptions objektu konstruktoru typů bloků toku dat. Tyto možnosti řídí chování plánovače, který spravuje základní úlohu a stupeň paralelismu. Obsahuje DataflowBlockOptions také odvozené typy, které určují chování specifické pro určité typy bloků toku dat. Následující tabulka shrnuje, které typy možností jsou přidružené ke každému typu bloku toku dat.
Následující části obsahují další informace o důležitých typech možností bloku toku dat, které jsou k dispozici prostřednictvím System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionsa System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions tříd.
Určení plánovače úloh
Každý předdefinovaný blok toku dat používá mechanismus plánování úkolů TPL k provádění aktivit, jako je šíření dat do cíle, příjem dat ze zdroje a spouštění uživatelem definovaných delegátů, jakmile budou data k dispozici. TaskScheduler je abstraktní třída, která představuje plánovač úloh, který zařadí úkoly do vláken. Výchozí plánovač Defaultúloh používá ThreadPool třídu k frontě a spuštění práce. Výchozí plánovač úloh můžete přepsat nastavením TaskScheduler vlastnosti při vytváření objektu bloku toku dat.
Pokud stejný plánovač úloh spravuje více bloků toku dat, může vynutit zásady napříč nimi. Pokud je například každý z bloků toku dat nakonfigurovaný tak, aby cílil na výhradní plánovač stejného ConcurrentExclusiveSchedulerPair objektu, je serializována všechna práce, která se spouští napříč těmito bloky. Podobně platí, že pokud jsou tyto bloky nakonfigurované tak, aby cílily na souběžný plánovač stejného ConcurrentExclusiveSchedulerPair objektu a že plánovač má nakonfigurovanou maximální úroveň souběžnosti, je všechna práce z těchto bloků omezena na tento počet souběžných operací. Příklad, který používá ConcurrentExclusiveSchedulerPair třídu k povolení paralelního zpracování operací čtení, ale operace zápisu, které se mají provádět výhradně ze všech ostatních operací, naleznete v tématu Postupy: Určení plánovače úloh v bloku toku dat. Další informace o plánovačích úkolů v TPL naleznete v TaskScheduler tématu třídy.
Určení stupně paralelismu
Ve výchozím nastavení tři typy bloku provádění, které knihovna toku dat TPL poskytuje, ActionBlock<TInput>TransformBlock<TInput,TOutput>a TransformManyBlock<TInput,TOutput>zpracování jedné zprávy najednou. Tyto typy bloků toku dat také zpracovávají zprávy v pořadí, v jakém jsou přijaty. Chcete-li povolit, aby tyto bloky toku dat zpracovávaly zprávy souběžně, nastavte ExecutionDataflowBlockOptions.MaxDegreeOfParallelism vlastnost při vytváření objektu bloku toku dat.
Výchozí hodnota MaxDegreeOfParallelism je 1, což zaručuje, že blok toku dat zpracovává jednu zprávu najednou. Nastavení této vlastnosti na hodnotu, která je větší než 1, umožňuje bloku toku dat zpracovávat více zpráv souběžně. Nastavením této vlastnosti DataflowBlockOptions.Unbounded umožníte podkladovému plánovači úloh spravovat maximální stupeň souběžnosti.
Důležité
Pokud zadáte maximální stupeň paralelismu, který je větší než 1, zpracuje se současně více zpráv, a proto zprávy nemusí být zpracovány v pořadí, ve kterém jsou přijaty. Pořadí, ve kterém jsou zprávy výstupem bloku, je však stejné, ve kterém jsou přijaty.
Vzhledem k tomu, že MaxDegreeOfParallelism vlastnost představuje maximální stupeň paralelismu, může se blok toku dat spustit s menším stupněm paralelismu, než zadáte. Blok toku dat může k splnění svých funkčních požadavků použít menší stupeň paralelismu nebo kvůli nedostatku dostupných systémových prostředků. Blok toku dat nikdy nevybere více paralelismu, než zadáte.
Hodnota MaxDegreeOfParallelism vlastnosti je výhradní pro každý objekt bloku toku dat. Pokud například čtyři objekty bloku toku dat zadávají 1 pro maximální stupeň paralelismu, mohou se všechny čtyři objekty bloku toku dat spouštět paralelně.
Příklad, který nastaví maximální stupeň paralelismu tak, aby umožňoval provádět zdlouhavé operace paralelně, najdete v tématu Postupy: Určení stupně paralelismu v bloku toku dat.
Určení počtu zpráv na úkol
Předdefinované typy bloků toku dat používají úlohy ke zpracování více vstupních prvků. To pomáhá minimalizovat počet objektů úloh, které jsou potřeba ke zpracování dat, což umožňuje aplikacím efektivněji spouštět. Pokud však úkoly z jedné sady bloků toku dat zpracovávají data, mohou úkoly z jiných bloků toku dat čekat na dobu zpracování frontou zpráv. Pokud chcete umožnit lepší nestrannost mezi úkoly toku dat, nastavte MaxMessagesPerTask vlastnost. Pokud MaxMessagesPerTask je nastavena na DataflowBlockOptions.Unboundedhodnotu , což je výchozí, úloha používaná blokem toku dat zpracovává tolik zpráv, kolik je k dispozici. Pokud MaxMessagesPerTask je nastaven na jinou hodnotu než Unbounded, blok toku dat zpracovává maximálně tento počet zpráv na Task objekt. I když nastavení MaxMessagesPerTask vlastnosti může zvýšit nestrannost mezi úkoly, může způsobit, že systém vytvoří více úkolů, než je nutné, což může snížit výkon.
Povolení zrušení
TPL poskytuje mechanismus, který umožňuje úkolům koordinovat zrušení způsobem spolupráce. Chcete-li povolit bloky toku dat pro účast v tomto mechanismu zrušení, nastavte CancellationToken vlastnost. Pokud je tento CancellationToken objekt nastaven na zrušený stav, všechny bloky toku dat, které monitorují provádění tohoto tokenu dokončení provádění aktuální položky, ale nezahajují zpracování následných položek. Tyto bloky toku dat také vymažou všechny zprávy ve vyrovnávací paměti, uvolní připojení ke všem zdrojovým a cílovým blokům a přepnou do zrušeného stavu. Přechodem do zrušeného stavu má Completion vlastnost nastavenou StatusCanceledvlastnost , pokud během zpracování nedošlo k výjimce. V takovém případě Status je nastavena na Faultedhodnotu .
Příklad, který ukazuje použití zrušení v aplikaci model Windows Forms, viz Postupy: Zrušení bloku toku dat. Další informace o zrušení v TPL naleznete v tématu Zrušení úlohy.
Určení greedy a jiného než greedyho chování
Několik typů bloků toku dat seskupování může fungovat buď v režimu greedy , nebo bez greedy . Ve výchozím nastavení fungují předdefinované typy bloků toku dat v režimu greedy.
U typů bloků spojení, jako JoinBlock<T1,T2>je režim greedy, znamená, že blok okamžitě přijímá data, i když odpovídající data, se kterými se má spojit, ještě nejsou k dispozici. Režim nes greedy znamená, že blok odloží všechny příchozí zprávy, dokud jeden z cílů nebude k dispozici pro dokončení spojení. Pokud některé z odložených zpráv už nejsou k dispozici, blok spojení uvolní všechny odložené zprávy a restartuje proces. BatchBlock<T> U třídy je chování greedy a non-greedy podobné, s tím rozdílem, že v režimu BatchBlock<T> bez greedy objekt odloží všechny příchozí zprávy, dokud nebude k dispozici dostatek různých zdrojů k dokončení dávky.
Chcete-li pro blok toku dat určit režim, který není greedy, nastavte na False
hodnotu Greedy . Příklad, který ukazuje, jak pomocí režimu bez greedy povolit více bloků spojení ke sdílení zdroje dat efektivněji, viz Postupy: Použití JoinBlock ke čtení dat z více zdrojů.
Vlastní bloky toku dat
I když knihovna toků dat TPL poskytuje mnoho předdefinovaných typů bloků, můžete vytvořit další typy bloků, které provádějí vlastní chování. ISourceBlock<TOutput> Implementujte rozhraní ITargetBlock<TInput> přímo nebo použijte metodu Encapsulate k sestavení komplexního bloku, který zapouzdřuje chování existujících typů bloků. Příklady, které ukazují, jak implementovat vlastní funkce bloku toku dat, naleznete v části Návod: Vytvoření vlastního typu bloku toku dat.
Příbuzná témata
Titulek | Popis |
---|---|
Postupy: Zápis zpráv do bloku toku dat a čtení zpráv z bloku toku dat | Ukazuje, jak zapisovat zprávy do a číst zprávy z objektu BufferBlock<T> . |
Postupy: Implementace vzoru toku dat producent–příjemce | Popisuje, jak použít model toku dat k implementaci vzoru příjemce producenta, kde producent odesílá zprávy do bloku toku dat a příjemce čte zprávy z daného bloku. |
Postupy: Provádění akcí po přijetí dat do bloku toku dat | Popisuje, jak poskytnout delegáty na typy bloků toku dat spouštění, ActionBlock<TInput>, TransformBlock<TInput,TOutput>a TransformManyBlock<TInput,TOutput>. |
Návod: Vytvoření kanálu toku dat | Popisuje, jak vytvořit kanál toku dat, který stáhne text z webu a provede operace s tímto textem. |
Postupy: Zrušení propojení bloků toku dat | Ukazuje, jak použít metodu LinkTo k zrušení propojení cílového bloku od jeho zdroje poté, co zdroj nabídne zprávu cíli. |
Návod: Použití toku dat v aplikaci Windows Forms | Ukazuje, jak vytvořit síť bloků toku dat, které provádějí zpracování obrázků v aplikaci model Windows Forms. |
Postupy: Zrušení bloku toku dat | Ukazuje, jak používat zrušení v aplikaci model Windows Forms. |
Postupy: Načítání dat z více zdrojů pomocí třídy JoinBlock | Vysvětluje, jak pomocí JoinBlock<T1,T2> třídy provést operaci, když jsou data k dispozici z více zdrojů, a jak používat režim bez greedy k povolení více bloků spojení ke sdílení zdroje dat efektivněji. |
Postupy: Určení stupně paralelního zpracování v bloku toku dat | Popisuje, jak nastavit MaxDegreeOfParallelism vlastnost tak, aby blok toku dat spouštění zpracovával více než jednu zprávu najednou. |
Postupy: Určení plánovače úloh v bloku toku dat | Ukazuje, jak přidružit konkrétní plánovač úloh při použití toku dat v aplikaci. |
Návod: Zvýšení efektivity díky použití tříd BatchBlock a BatchedJoinBlock | Popisuje, jak pomocí BatchBlock<T> třídy zlepšit efektivitu operací vkládání databáze a jak pomocí BatchedJoinBlock<T1,T2> třídy zachytit výsledky i všechny výjimky, ke kterým dojde, když program čte z databáze. |
Návod: Vytvoření bloku toku dat vlastního typu | Demonstruje dva způsoby vytvoření typu bloku toku dat, který implementuje vlastní chování. |
Task Parallel Library (TPL) | Představuje TPL, knihovnu, která zjednodušuje paralelní a souběžné programování v aplikacích .NET Framework. |