如何:使用 JoinBlock 从多个源读取数据

本文档介绍如何在来自多个源的数据可用时使用 JoinBlock<T1,T2> 类执行操作。 还演示了如何使用非贪婪模式使多个联接块更有效地共享数据源。

注意

TPL 数据流库(System.Threading.Tasks.Dataflow 命名空间)不随 .NET 一起分发。 若要在 Visual Studio 中安装 System.Threading.Tasks.Dataflow 命名空间,请打开项目,选择“项目”菜单中的“管理 NuGet 包”,再在线搜索 System.Threading.Tasks.Dataflow 包。 或者,若要使用 .NET Core CLI 进行安装,请运行 dotnet add package System.Threading.Tasks.Dataflow

示例

下面的示例定义了三种资源类型(NetworkResourceFileResourceMemoryResource)并在资源可用时执行操作。 此示例需要使用 NetworkResourceMemoryResource 对才能执行第一个操作,需要使用 FileResourceMemoryResource 对才能执行第二个操作。 为了在所有所需资源可用时启用这些操作,此示例使用 JoinBlock<T1,T2> 类。 当 JoinBlock<T1,T2> 对象从所有源接收数据时,它会将该数据传播到其目标,其目标在本示例中为 ActionBlock<TInput> 对象。 两个 JoinBlock<T1,T2> 对象都从 MemoryResource 对象的共享池中读取。

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
   // Represents a resource. A derived class might represent
   // a limited resource such as a memory, network, or I/O
   // device.
   abstract class Resource
   {
   }

   // Represents a memory resource. For brevity, the details of
   // this class are omitted.
   class MemoryResource : Resource
   {
   }

   // Represents a network resource. For brevity, the details of
   // this class are omitted.
   class NetworkResource : Resource
   {
   }

   // Represents a file resource. For brevity, the details of
   // this class are omitted.
   class FileResource : Resource
   {
   }

   static void Main(string[] args)
   {
      // Create three BufferBlock<T> objects. Each object holds a different
      // type of resource.
      var networkResources = new BufferBlock<NetworkResource>();
      var fileResources = new BufferBlock<FileResource>();
      var memoryResources = new BufferBlock<MemoryResource>();

      // Create two non-greedy JoinBlock<T1, T2> objects.
      // The first join works with network and memory resources;
      // the second pool works with file and memory resources.

      var joinNetworkAndMemoryResources =
         new JoinBlock<NetworkResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      var joinFileAndMemoryResources =
         new JoinBlock<FileResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      // Create two ActionBlock<T> objects.
      // The first block acts on a network resource and a memory resource.
      // The second block acts on a file resource and a memory resource.

      var networkMemoryAction =
         new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("Network worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("Network worker: finished using resources...");

               // Release the resources back to their respective pools.
               networkResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      var fileMemoryAction =
         new ActionBlock<Tuple<FileResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("File worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("File worker: finished using resources...");

               // Release the resources back to their respective pools.
               fileResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      // Link the resource pools to the JoinBlock<T1, T2> objects.
      // Because these join blocks operate in non-greedy mode, they do not
      // take the resource from a pool until all resources are available from
      // all pools.

      networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
      memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);

      fileResources.LinkTo(joinFileAndMemoryResources.Target1);
      memoryResources.LinkTo(joinFileAndMemoryResources.Target2);

      // Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

      joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
      joinFileAndMemoryResources.LinkTo(fileMemoryAction);

      // Populate the resource pools. In this example, network and
      // file resources are more abundant than memory resources.

      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());

      memoryResources.Post(new MemoryResource());

      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());

      // Allow data to flow through the network for several seconds.
      Thread.Sleep(10000);
   }
}

/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to use non-greedy join blocks to distribute
' resources among a dataflow network.
Friend Class Program
    ' Represents a resource. A derived class might represent 
    ' a limited resource such as a memory, network, or I/O
    ' device.
    Private MustInherit Class Resource
    End Class

    ' Represents a memory resource. For brevity, the details of 
    ' this class are omitted.
    Private Class MemoryResource
        Inherits Resource
    End Class

    ' Represents a network resource. For brevity, the details of 
    ' this class are omitted.
    Private Class NetworkResource
        Inherits Resource
    End Class

    ' Represents a file resource. For brevity, the details of 
    ' this class are omitted.
    Private Class FileResource
        Inherits Resource
    End Class

    Shared Sub Main(ByVal args() As String)
        ' Create three BufferBlock<T> objects. Each object holds a different
        ' type of resource.
        Dim networkResources = New BufferBlock(Of NetworkResource)()
        Dim fileResources = New BufferBlock(Of FileResource)()
        Dim memoryResources = New BufferBlock(Of MemoryResource)()

        ' Create two non-greedy JoinBlock<T1, T2> objects. 
        ' The first join works with network and memory resources; 
        ' the second pool works with file and memory resources.

        Dim joinNetworkAndMemoryResources = New JoinBlock(Of NetworkResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

        Dim joinFileAndMemoryResources = New JoinBlock(Of FileResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

        ' Create two ActionBlock<T> objects. 
        ' The first block acts on a network resource and a memory resource.
        ' The second block acts on a file resource and a memory resource.

        Dim networkMemoryAction = New ActionBlock(Of Tuple(Of NetworkResource, MemoryResource))(Sub(data)
                                                                                                    ' Perform some action on the resources.
                                                                                                    ' Print a message.
                                                                                                    ' Simulate a lengthy operation that uses the resources.
                                                                                                    ' Print a message.
                                                                                                    ' Release the resources back to their respective pools.
                                                                                                    Console.WriteLine("Network worker: using resources...")
                                                                                                    Thread.Sleep(New Random().Next(500, 2000))
                                                                                                    Console.WriteLine("Network worker: finished using resources...")
                                                                                                    networkResources.Post(data.Item1)
                                                                                                    memoryResources.Post(data.Item2)
                                                                                                End Sub)

        Dim fileMemoryAction = New ActionBlock(Of Tuple(Of FileResource, MemoryResource))(Sub(data)
                                                                                              ' Perform some action on the resources.
                                                                                              ' Print a message.
                                                                                              ' Simulate a lengthy operation that uses the resources.
                                                                                              ' Print a message.
                                                                                              ' Release the resources back to their respective pools.
                                                                                              Console.WriteLine("File worker: using resources...")
                                                                                              Thread.Sleep(New Random().Next(500, 2000))
                                                                                              Console.WriteLine("File worker: finished using resources...")
                                                                                              fileResources.Post(data.Item1)
                                                                                              memoryResources.Post(data.Item2)
                                                                                          End Sub)

        ' Link the resource pools to the JoinBlock<T1, T2> objects.
        ' Because these join blocks operate in non-greedy mode, they do not
        ' take the resource from a pool until all resources are available from
        ' all pools.

        networkResources.LinkTo(joinNetworkAndMemoryResources.Target1)
        memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2)

        fileResources.LinkTo(joinFileAndMemoryResources.Target1)
        memoryResources.LinkTo(joinFileAndMemoryResources.Target2)

        ' Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

        joinNetworkAndMemoryResources.LinkTo(networkMemoryAction)
        joinFileAndMemoryResources.LinkTo(fileMemoryAction)

        ' Populate the resource pools. In this example, network and 
        ' file resources are more abundant than memory resources.

        networkResources.Post(New NetworkResource())
        networkResources.Post(New NetworkResource())
        networkResources.Post(New NetworkResource())

        memoryResources.Post(New MemoryResource())

        fileResources.Post(New FileResource())
        fileResources.Post(New FileResource())
        fileResources.Post(New FileResource())

        ' Allow data to flow through the network for several seconds.
        Thread.Sleep(10000)

    End Sub

End Class

' Sample output:
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'

为了实现 MemoryResource 对象共享池的高效使用,此示例指定了一个 GroupingDataflowBlockOptions 对象,该对象将 Greedy 属性设置为 False 以创建在非贪婪模式下运行的 JoinBlock<T1,T2> 对象。 非贪婪联接块会推迟所有传入的消息,直至从每个源收到一条消息。 如果任何推迟的消息由另一个块接受,联接块将重新启动该进程。 非贪婪模式使联接块能够共享一个或多个源块,以便在其他块等待数据时能够使进程向前推进。 在此示例中,如果将 MemoryResource 对象添加到 memoryResources 池中,那么要接收第二个数据源的第一个联接块可以将进程向前推进。 如果此示例使用贪婪模式(默认模式),一个联接块可能会接受 MemoryResource 对象,然后等待第二个资源变为可用。 但是,如果另一个联接块有自己的第二个可用数据源,则它无法使进程向前推进,因为 MemoryResource 对象已被另一联接块占用。

可靠编程

使用非贪婪联接还有助于防止应用程序中出现死锁。 在软件应用中,如果两个或多个进程分别留有资源,且相互等待另一进程释放其他资源,就会发生死锁。 考虑一个定义两个 JoinBlock<T1,T2> 对象的应用程序。 两个对象都从两个共享源块读取数据。 在贪婪模式下,如果一个联接块从第一个源读取,第二个联接块从第二个源读取,则应用程序可能发生死锁,原因是两个联接块相互等待另一个联接块释放其资源。 在非贪婪模式下,每个联接块只在所有数据可用时才从其源读取,因此消除了死锁风险。

请参阅