Edit

Share via


How to: Specify a Task Scheduler in a Dataflow Block

This document demonstrates how to associate a specific task scheduler when you use dataflow in your application. The example uses the System.Threading.Tasks.ConcurrentExclusiveSchedulerPair class in a Windows Forms application to show when reader tasks are active and when a writer task is active. It also uses the TaskScheduler.FromCurrentSynchronizationContext method to enable a dataflow block to run on the user-interface thread.

Note

The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package. Alternatively, to install it using the .NET Core CLI, run dotnet add package System.Threading.Tasks.Dataflow.

To Create the Windows Forms Application

  1. Create a Visual C# or Visual Basic Windows Forms Application project. In the following steps, the project is named WriterReadersWinForms.

  2. On the form designer for the main form, Form1.cs (Form1.vb for Visual Basic), add four CheckBox controls. Set the Text property to Reader 1 for checkBox1, Reader 2 for checkBox2, Reader 3 for checkBox3, and Writer for checkBox4. Set the Enabled property for each control to False.

  3. Add a Timer control to the form. Set the Interval property to 2500.

Adding Dataflow Functionality

This section describes how to create the dataflow blocks that participate in the application and how to associate each one with a task scheduler.

To Add Dataflow Functionality to the Application

  1. In your project, add a reference to System.Threading.Tasks.Dataflow.dll.

  2. Ensure that Form1.cs (Form1.vb for Visual Basic) contains the following using directives (Imports in Visual Basic).

    using System;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    using System.Windows.Forms;
    
    Imports System.Threading
    Imports System.Threading.Tasks
    Imports System.Threading.Tasks.Dataflow
    
    
  3. Add a BroadcastBlock<T> data member to the Form1 class.

    // Broadcasts values to an ActionBlock<int> object that is associated
    // with each check box.
    BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);
    
    ' Broadcasts values to an ActionBlock<int> object that is associated
    ' with each check box.
    Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)
    
  4. In the Form1 constructor, after the call to InitializeComponent, create an ActionBlock<TInput> object that toggles the state of CheckBox objects.

    // Create an ActionBlock<CheckBox> object that toggles the state
    // of CheckBox objects.
    // Specifying the current synchronization context enables the
    // action to run on the user-interface thread.
    var toggleCheckBox = new ActionBlock<CheckBox>(checkBox =>
    {
       checkBox.Checked = !checkBox.Checked;
    },
    new ExecutionDataflowBlockOptions
    {
       TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
    });
    
    ' Create an ActionBlock<CheckBox> object that toggles the state
    ' of CheckBox objects.
    ' Specifying the current synchronization context enables the 
    ' action to run on the user-interface thread.
    Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})
    
  5. In the Form1 constructor, create a ConcurrentExclusiveSchedulerPair object and four ActionBlock<TInput> objects, one ActionBlock<TInput> object for each CheckBox object. For each ActionBlock<TInput> object, specify an ExecutionDataflowBlockOptions object that has the TaskScheduler property set to the ConcurrentScheduler property for the readers, and the ExclusiveScheduler property for the writer.

    // Create a ConcurrentExclusiveSchedulerPair object.
    // Readers will run on the concurrent part of the scheduler pair.
    // The writer will run on the exclusive part of the scheduler pair.
    var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
    
    // Create an ActionBlock<int> object for each reader CheckBox object.
    // Each ActionBlock<int> object represents an action that can read
    // from a resource in parallel to other readers.
    // Specifying the concurrent part of the scheduler pair enables the
    // reader to run in parallel to other actions that are managed by
    // that scheduler.
    var readerActions =
       from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3}
       select new ActionBlock<int>(milliseconds =>
       {
          // Toggle the check box to the checked state.
          toggleCheckBox.Post(checkBox);
    
          // Perform the read action. For demonstration, suspend the current
          // thread to simulate a lengthy read operation.
          Thread.Sleep(milliseconds);
    
          // Toggle the check box to the unchecked state.
          toggleCheckBox.Post(checkBox);
       },
       new ExecutionDataflowBlockOptions
       {
          TaskScheduler = taskSchedulerPair.ConcurrentScheduler
       });
    
    // Create an ActionBlock<int> object for the writer CheckBox object.
    // This ActionBlock<int> object represents an action that writes to
    // a resource, but cannot run in parallel to readers.
    // Specifying the exclusive part of the scheduler pair enables the
    // writer to run in exclusively with respect to other actions that are
    // managed by the scheduler pair.
    var writerAction = new ActionBlock<int>(milliseconds =>
    {
       // Toggle the check box to the checked state.
       toggleCheckBox.Post(checkBox4);
    
       // Perform the write action. For demonstration, suspend the current
       // thread to simulate a lengthy write operation.
       Thread.Sleep(milliseconds);
    
       // Toggle the check box to the unchecked state.
       toggleCheckBox.Post(checkBox4);
    },
    new ExecutionDataflowBlockOptions
    {
       TaskScheduler = taskSchedulerPair.ExclusiveScheduler
    });
    
    // Link the broadcaster to each reader and writer block.
    // The BroadcastBlock<T> class propagates values that it
    // receives to all connected targets.
    foreach (var readerAction in readerActions)
    {
       broadcaster.LinkTo(readerAction);
    }
    broadcaster.LinkTo(writerAction);
    
    ' Create a ConcurrentExclusiveSchedulerPair object.
    ' Readers will run on the concurrent part of the scheduler pair.
    ' The writer will run on the exclusive part of the scheduler pair.
    Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair()
    
    ' Create an ActionBlock<int> object for each reader CheckBox object.
    ' Each ActionBlock<int> object represents an action that can read 
    ' from a resource in parallel to other readers.
    ' Specifying the concurrent part of the scheduler pair enables the 
    ' reader to run in parallel to other actions that are managed by 
    ' that scheduler.
    Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _
                        Select New ActionBlock(Of Integer)(Sub(milliseconds)
                                               ' Toggle the check box to the checked state.
                                               ' Perform the read action. For demonstration, suspend the current
                                               ' thread to simulate a lengthy read operation.
                                               ' Toggle the check box to the unchecked state.
                                               toggleCheckBox.Post(checkBox)
                                                               Thread.Sleep(milliseconds)
                                                               toggleCheckBox.Post(checkBox)
                                                           End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler})
    
    ' Create an ActionBlock<int> object for the writer CheckBox object.
    ' This ActionBlock<int> object represents an action that writes to 
    ' a resource, but cannot run in parallel to readers.
    ' Specifying the exclusive part of the scheduler pair enables the 
    ' writer to run in exclusively with respect to other actions that are 
    ' managed by the scheduler pair.
    Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds)
                                                       ' Toggle the check box to the checked state.
                                                       ' Perform the write action. For demonstration, suspend the current
                                                       ' thread to simulate a lengthy write operation.
                                                       ' Toggle the check box to the unchecked state.
                                                       toggleCheckBox.Post(checkBox4)
                                                       Thread.Sleep(milliseconds)
                                                       toggleCheckBox.Post(checkBox4)
                                                   End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler})
    
    ' Link the broadcaster to each reader and writer block.
    ' The BroadcastBlock<T> class propagates values that it 
    ' receives to all connected targets.
    For Each readerAction In readerActions
        broadcaster.LinkTo(readerAction)
    Next readerAction
    broadcaster.LinkTo(writerAction)
    
  6. In the Form1 constructor, start the Timer object.

    // Start the timer.
    timer1.Start();
    
    ' Start the timer.
    timer1.Start()
    
  7. On the form designer for the main form, create an event handler for the Tick event for the timer.

  8. Implement the Tick event for the timer.

    // Event handler for the timer.
    private void timer1_Tick(object sender, EventArgs e)
    {
       // Post a value to the broadcaster. The broadcaster
       // sends this message to each target.
       broadcaster.Post(1000);
    }
    
    ' Event handler for the timer.
    Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick
        ' Post a value to the broadcaster. The broadcaster
        ' sends this message to each target. 
        broadcaster.Post(1000)
    End Sub
    

Because the toggleCheckBox dataflow block acts on the user interface, it is important that this action occur on the user-interface thread. To accomplish this, during construction this object provides an ExecutionDataflowBlockOptions object that has the TaskScheduler property set to TaskScheduler.FromCurrentSynchronizationContext. The FromCurrentSynchronizationContext method creates a TaskScheduler object that performs work on the current synchronization context. Because the Form1 constructor is called from the user-interface thread, the action for the toggleCheckBox dataflow block also runs on the user-interface thread.

This example also uses the ConcurrentExclusiveSchedulerPair class to enable some dataflow blocks to act concurrently, and another dataflow block to act exclusive with respect to all other dataflow blocks that run on the same ConcurrentExclusiveSchedulerPair object. This technique is useful when multiple dataflow blocks share a resource and some require exclusive access to that resource, because it eliminates the requirement to manually synchronize access to that resource. The elimination of manual synchronization can make code more efficient.

Example

The following example shows the complete code for Form1.cs (Form1.vb for Visual Basic).

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;

namespace WriterReadersWinForms
{
   public partial class Form1 : Form
   {
      // Broadcasts values to an ActionBlock<int> object that is associated
      // with each check box.
      BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);

      public Form1()
      {
         InitializeComponent();

         // Create an ActionBlock<CheckBox> object that toggles the state
         // of CheckBox objects.
         // Specifying the current synchronization context enables the
         // action to run on the user-interface thread.
         var toggleCheckBox = new ActionBlock<CheckBox>(checkBox =>
         {
            checkBox.Checked = !checkBox.Checked;
         },
         new ExecutionDataflowBlockOptions
         {
            TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
         });

         // Create a ConcurrentExclusiveSchedulerPair object.
         // Readers will run on the concurrent part of the scheduler pair.
         // The writer will run on the exclusive part of the scheduler pair.
         var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();

         // Create an ActionBlock<int> object for each reader CheckBox object.
         // Each ActionBlock<int> object represents an action that can read
         // from a resource in parallel to other readers.
         // Specifying the concurrent part of the scheduler pair enables the
         // reader to run in parallel to other actions that are managed by
         // that scheduler.
         var readerActions =
            from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3}
            select new ActionBlock<int>(milliseconds =>
            {
               // Toggle the check box to the checked state.
               toggleCheckBox.Post(checkBox);

               // Perform the read action. For demonstration, suspend the current
               // thread to simulate a lengthy read operation.
               Thread.Sleep(milliseconds);

               // Toggle the check box to the unchecked state.
               toggleCheckBox.Post(checkBox);
            },
            new ExecutionDataflowBlockOptions
            {
               TaskScheduler = taskSchedulerPair.ConcurrentScheduler
            });

         // Create an ActionBlock<int> object for the writer CheckBox object.
         // This ActionBlock<int> object represents an action that writes to
         // a resource, but cannot run in parallel to readers.
         // Specifying the exclusive part of the scheduler pair enables the
         // writer to run in exclusively with respect to other actions that are
         // managed by the scheduler pair.
         var writerAction = new ActionBlock<int>(milliseconds =>
         {
            // Toggle the check box to the checked state.
            toggleCheckBox.Post(checkBox4);

            // Perform the write action. For demonstration, suspend the current
            // thread to simulate a lengthy write operation.
            Thread.Sleep(milliseconds);

            // Toggle the check box to the unchecked state.
            toggleCheckBox.Post(checkBox4);
         },
         new ExecutionDataflowBlockOptions
         {
            TaskScheduler = taskSchedulerPair.ExclusiveScheduler
         });

         // Link the broadcaster to each reader and writer block.
         // The BroadcastBlock<T> class propagates values that it
         // receives to all connected targets.
         foreach (var readerAction in readerActions)
         {
            broadcaster.LinkTo(readerAction);
         }
         broadcaster.LinkTo(writerAction);

         // Start the timer.
         timer1.Start();
      }

      // Event handler for the timer.
      private void timer1_Tick(object sender, EventArgs e)
      {
         // Post a value to the broadcaster. The broadcaster
         // sends this message to each target.
         broadcaster.Post(1000);
      }
   }
}
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow


Namespace WriterReadersWinForms
    Partial Public Class Form1
        Inherits Form
        ' Broadcasts values to an ActionBlock<int> object that is associated
        ' with each check box.
        Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)

        Public Sub New()
            InitializeComponent()

            ' Create an ActionBlock<CheckBox> object that toggles the state
            ' of CheckBox objects.
            ' Specifying the current synchronization context enables the 
            ' action to run on the user-interface thread.
            Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})

            ' Create a ConcurrentExclusiveSchedulerPair object.
            ' Readers will run on the concurrent part of the scheduler pair.
            ' The writer will run on the exclusive part of the scheduler pair.
            Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair()

            ' Create an ActionBlock<int> object for each reader CheckBox object.
            ' Each ActionBlock<int> object represents an action that can read 
            ' from a resource in parallel to other readers.
            ' Specifying the concurrent part of the scheduler pair enables the 
            ' reader to run in parallel to other actions that are managed by 
            ' that scheduler.
            Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _
                                Select New ActionBlock(Of Integer)(Sub(milliseconds)
                                                       ' Toggle the check box to the checked state.
                                                       ' Perform the read action. For demonstration, suspend the current
                                                       ' thread to simulate a lengthy read operation.
                                                       ' Toggle the check box to the unchecked state.
                                                       toggleCheckBox.Post(checkBox)
                                                                       Thread.Sleep(milliseconds)
                                                                       toggleCheckBox.Post(checkBox)
                                                                   End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler})

            ' Create an ActionBlock<int> object for the writer CheckBox object.
            ' This ActionBlock<int> object represents an action that writes to 
            ' a resource, but cannot run in parallel to readers.
            ' Specifying the exclusive part of the scheduler pair enables the 
            ' writer to run in exclusively with respect to other actions that are 
            ' managed by the scheduler pair.
            Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds)
                                                               ' Toggle the check box to the checked state.
                                                               ' Perform the write action. For demonstration, suspend the current
                                                               ' thread to simulate a lengthy write operation.
                                                               ' Toggle the check box to the unchecked state.
                                                               toggleCheckBox.Post(checkBox4)
                                                               Thread.Sleep(milliseconds)
                                                               toggleCheckBox.Post(checkBox4)
                                                           End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler})

            ' Link the broadcaster to each reader and writer block.
            ' The BroadcastBlock<T> class propagates values that it 
            ' receives to all connected targets.
            For Each readerAction In readerActions
                broadcaster.LinkTo(readerAction)
            Next readerAction
            broadcaster.LinkTo(writerAction)

            ' Start the timer.
            timer1.Start()
        End Sub

        ' Event handler for the timer.
        Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick
            ' Post a value to the broadcaster. The broadcaster
            ' sends this message to each target. 
            broadcaster.Post(1000)
        End Sub
    End Class
End Namespace

See also