Sdílet prostřednictvím


Postupy: Načítání dat z více zdrojů pomocí třídy JoinBlock

Tento dokument vysvětluje, jak pomocí JoinBlock<T1,T2> třídy provést operaci, když jsou data k dispozici z více zdrojů. Ukazuje také, jak pomocí režimu bez greedy povolit více bloků spojení ke sdílení zdroje dat efektivněji.

Poznámka:

Knihovna toku dat TPL ( System.Threading.Tasks.Dataflow obor názvů) není distribuována s .NET. Pokud chcete nainstalovat System.Threading.Tasks.Dataflow obor názvů v sadě Visual Studio, otevřete projekt, v nabídce Projekt zvolte Spravovat balíčky NuGet a vyhledejte System.Threading.Tasks.Dataflow balíček online. Pokud ho chcete nainstalovat pomocí rozhraní příkazového řádku .NET Core, spusťte dotnet add package System.Threading.Tasks.Dataflow.

Příklad

Následující příklad definuje tři typy prostředků, NetworkResourceFileResource, a MemoryResourcea provádí operace, když jsou prostředky k dispozici. Tento příklad vyžaduje provedení NetworkResource první operace a páru a MemoryResourceFileResourceMemoryResource páru, aby bylo možné provést druhou operaci. Pokud chcete těmto operacím povolit, aby byly k dispozici všechny požadované prostředky, použije JoinBlock<T1,T2> tento příklad třídu. JoinBlock<T1,T2> Když objekt přijme data ze všech zdrojů, rozšíří tato data do cíle, což je v tomto příkladu ActionBlock<TInput> objekt. Oba JoinBlock<T1,T2> objekty se čtou ze sdíleného fondu MemoryResource objektů.

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
   // Represents a resource. A derived class might represent
   // a limited resource such as a memory, network, or I/O
   // device.
   abstract class Resource
   {
   }

   // Represents a memory resource. For brevity, the details of
   // this class are omitted.
   class MemoryResource : Resource
   {
   }

   // Represents a network resource. For brevity, the details of
   // this class are omitted.
   class NetworkResource : Resource
   {
   }

   // Represents a file resource. For brevity, the details of
   // this class are omitted.
   class FileResource : Resource
   {
   }

   static void Main(string[] args)
   {
      // Create three BufferBlock<T> objects. Each object holds a different
      // type of resource.
      var networkResources = new BufferBlock<NetworkResource>();
      var fileResources = new BufferBlock<FileResource>();
      var memoryResources = new BufferBlock<MemoryResource>();

      // Create two non-greedy JoinBlock<T1, T2> objects.
      // The first join works with network and memory resources;
      // the second pool works with file and memory resources.

      var joinNetworkAndMemoryResources =
         new JoinBlock<NetworkResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      var joinFileAndMemoryResources =
         new JoinBlock<FileResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      // Create two ActionBlock<T> objects.
      // The first block acts on a network resource and a memory resource.
      // The second block acts on a file resource and a memory resource.

      var networkMemoryAction =
         new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("Network worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("Network worker: finished using resources...");

               // Release the resources back to their respective pools.
               networkResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      var fileMemoryAction =
         new ActionBlock<Tuple<FileResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("File worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("File worker: finished using resources...");

               // Release the resources back to their respective pools.
               fileResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      // Link the resource pools to the JoinBlock<T1, T2> objects.
      // Because these join blocks operate in non-greedy mode, they do not
      // take the resource from a pool until all resources are available from
      // all pools.

      networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
      memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);

      fileResources.LinkTo(joinFileAndMemoryResources.Target1);
      memoryResources.LinkTo(joinFileAndMemoryResources.Target2);

      // Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

      joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
      joinFileAndMemoryResources.LinkTo(fileMemoryAction);

      // Populate the resource pools. In this example, network and
      // file resources are more abundant than memory resources.

      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());

      memoryResources.Post(new MemoryResource());

      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());

      // Allow data to flow through the network for several seconds.
      Thread.Sleep(10000);
   }
}

/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to use non-greedy join blocks to distribute
' resources among a dataflow network.
Friend Class Program
    ' Represents a resource. A derived class might represent 
    ' a limited resource such as a memory, network, or I/O
    ' device.
    Private MustInherit Class Resource
    End Class

    ' Represents a memory resource. For brevity, the details of 
    ' this class are omitted.
    Private Class MemoryResource
        Inherits Resource
    End Class

    ' Represents a network resource. For brevity, the details of 
    ' this class are omitted.
    Private Class NetworkResource
        Inherits Resource
    End Class

    ' Represents a file resource. For brevity, the details of 
    ' this class are omitted.
    Private Class FileResource
        Inherits Resource
    End Class

    Shared Sub Main(ByVal args() As String)
        ' Create three BufferBlock<T> objects. Each object holds a different
        ' type of resource.
        Dim networkResources = New BufferBlock(Of NetworkResource)()
        Dim fileResources = New BufferBlock(Of FileResource)()
        Dim memoryResources = New BufferBlock(Of MemoryResource)()

        ' Create two non-greedy JoinBlock<T1, T2> objects. 
        ' The first join works with network and memory resources; 
        ' the second pool works with file and memory resources.

        Dim joinNetworkAndMemoryResources = New JoinBlock(Of NetworkResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

        Dim joinFileAndMemoryResources = New JoinBlock(Of FileResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

        ' Create two ActionBlock<T> objects. 
        ' The first block acts on a network resource and a memory resource.
        ' The second block acts on a file resource and a memory resource.

        Dim networkMemoryAction = New ActionBlock(Of Tuple(Of NetworkResource, MemoryResource))(Sub(data)
                                                                                                    ' Perform some action on the resources.
                                                                                                    ' Print a message.
                                                                                                    ' Simulate a lengthy operation that uses the resources.
                                                                                                    ' Print a message.
                                                                                                    ' Release the resources back to their respective pools.
                                                                                                    Console.WriteLine("Network worker: using resources...")
                                                                                                    Thread.Sleep(New Random().Next(500, 2000))
                                                                                                    Console.WriteLine("Network worker: finished using resources...")
                                                                                                    networkResources.Post(data.Item1)
                                                                                                    memoryResources.Post(data.Item2)
                                                                                                End Sub)

        Dim fileMemoryAction = New ActionBlock(Of Tuple(Of FileResource, MemoryResource))(Sub(data)
                                                                                              ' Perform some action on the resources.
                                                                                              ' Print a message.
                                                                                              ' Simulate a lengthy operation that uses the resources.
                                                                                              ' Print a message.
                                                                                              ' Release the resources back to their respective pools.
                                                                                              Console.WriteLine("File worker: using resources...")
                                                                                              Thread.Sleep(New Random().Next(500, 2000))
                                                                                              Console.WriteLine("File worker: finished using resources...")
                                                                                              fileResources.Post(data.Item1)
                                                                                              memoryResources.Post(data.Item2)
                                                                                          End Sub)

        ' Link the resource pools to the JoinBlock<T1, T2> objects.
        ' Because these join blocks operate in non-greedy mode, they do not
        ' take the resource from a pool until all resources are available from
        ' all pools.

        networkResources.LinkTo(joinNetworkAndMemoryResources.Target1)
        memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2)

        fileResources.LinkTo(joinFileAndMemoryResources.Target1)
        memoryResources.LinkTo(joinFileAndMemoryResources.Target2)

        ' Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

        joinNetworkAndMemoryResources.LinkTo(networkMemoryAction)
        joinFileAndMemoryResources.LinkTo(fileMemoryAction)

        ' Populate the resource pools. In this example, network and 
        ' file resources are more abundant than memory resources.

        networkResources.Post(New NetworkResource())
        networkResources.Post(New NetworkResource())
        networkResources.Post(New NetworkResource())

        memoryResources.Post(New MemoryResource())

        fileResources.Post(New FileResource())
        fileResources.Post(New FileResource())
        fileResources.Post(New FileResource())

        ' Allow data to flow through the network for several seconds.
        Thread.Sleep(10000)

    End Sub

End Class

' Sample output:
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'

Chcete-li povolit efektivní použití sdíleného fondu MemoryResource objektů, tento příklad určuje GroupingDataflowBlockOptions objekt, který má Greedy vlastnost nastavenou na False vytvoření JoinBlock<T1,T2> objektů, které fungují v režimu bez greedy. Blok připojení bez greedy odloží všechny příchozí zprávy, dokud nebude dostupný z každého zdroje. Pokud některý z odložených zpráv přijal jiný blok, blok spojení proces restartuje. Režim bez greedy umožňuje, aby bloky spojení, které sdílejí jeden nebo více zdrojových bloků, mohly pokračovat, protože ostatní bloky čekají na data. V tomto příkladu, pokud MemoryResource je objekt přidán do memoryResources fondu, první blok spojení, který obdrží druhý zdroj dat, může pokračovat. Pokud by tento příklad používal režim greedy, což je výchozí, může jeden blok spojení převzít MemoryResource objekt a počkat, až bude druhý prostředek dostupný. Pokud ale druhý blok spojení má k dispozici druhý zdroj dat, nemůže pokračovat, protože MemoryResource objekt byl převzat jiným blokem spojení.

Robustní programování

Použití jiných než greedy spojení může také pomoct zabránit zablokování ve vaší aplikaci. V softwarové aplikaci dojde k vzájemnému zablokování, když dva nebo více procesů každý uchovává prostředek a vzájemně čeká na další proces uvolnění některého jiného prostředku. Představte si aplikaci, která definuje dva JoinBlock<T1,T2> objekty. Oba objekty čtou data ze dvou sdílených zdrojových bloků. Pokud jeden blok spojení čte z prvního zdroje a druhý blok spojení čte z druhého zdroje, může aplikace v režimu greedy zablokovat, protože oba bloky spojení vzájemně čekají na uvolnění svého prostředku. V režimu bez greedy se každý blok spojení čte ze svých zdrojů pouze v případě, že jsou k dispozici všechna data, a proto se eliminuje riziko zablokování.

Viz také