如何:取消数据流块
本文档介绍如何在应用程序中启用取消。 此示例使用 Windows 窗体显示数据流管道中工作项的活动位置以及取消的效果。
注意
TPL 数据流库(System.Threading.Tasks.Dataflow 命名空间)不随 .NET 一起分发。 若要在 Visual Studio 中安装 System.Threading.Tasks.Dataflow 命名空间,请打开项目,选择“项目”菜单中的“管理 NuGet 包”,再在线搜索 System.Threading.Tasks.Dataflow
包。 或者,若要使用 .NET Core CLI 进行安装,请运行 dotnet add package System.Threading.Tasks.Dataflow
。
创建 Windows 窗体应用程序
创建一个 C# 或 Visual Basic Windows 窗体应用程序项目。 在以下步骤中,该项目命名为
CancellationWinForms
。在主窗体的窗体设计器中,Form1.cs(对于 Visual Basic,则为 Form1.vb)添加了 ToolStrip 控件。
向 ToolStrip 控件添加 ToolStripButton 控件。 将 DisplayStyle 属性设置为 Text,并将 Text 属性设置为“添加工作项”DisplayStyle。
向 ToolStrip 控件再添加一个 ToolStripButton 控件。 将 DisplayStyle 属性设置为 Text,将 Text 属性设置为“Cancel”DisplayStyle,并将 Enabled 属性设置为
False
。向 ToolStrip 控件添加四个 ToolStripProgressBar 对象。
创建数据流管道
本部分介绍如何创建数据流管道,用以处理工作项以及更新进度条。
创建数据流管道
在项目中,添加对 System.Threading.Tasks.Dataflow.dll 的引用。
确保 Form1.cs(对于 Visual Basic 则为 Form1.vb)包含以下
using
指令(Visual Basic 中为Imports
)。using System; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Windows.Forms;
Imports System.Threading Imports System.Threading.Tasks Imports System.Threading.Tasks.Dataflow
将
WorkItem
类添加为Form1
类的内部类型。// A placeholder type that performs work. class WorkItem { // Performs work for the provided number of milliseconds. public void DoWork(int milliseconds) { // For demonstration, suspend the current thread. Thread.Sleep(milliseconds); } }
' A placeholder type that performs work. Private Class WorkItem ' Performs work for the provided number of milliseconds. Public Sub DoWork(ByVal milliseconds As Integer) ' For demonstration, suspend the current thread. Thread.Sleep(milliseconds) End Sub End Class
将以下数据成员添加到
Form1
类。// Enables the user interface to signal cancellation. CancellationTokenSource cancellationSource; // The first node in the dataflow pipeline. TransformBlock<WorkItem, WorkItem> startWork; // The second, and final, node in the dataflow pipeline. ActionBlock<WorkItem> completeWork; // Increments the value of the provided progress bar. ActionBlock<ToolStripProgressBar> incrementProgress; // Decrements the value of the provided progress bar. ActionBlock<ToolStripProgressBar> decrementProgress; // Enables progress bar actions to run on the UI thread. TaskScheduler uiTaskScheduler;
' Enables the user interface to signal cancellation. Private cancellationSource As CancellationTokenSource ' The first node in the dataflow pipeline. Private startWork As TransformBlock(Of WorkItem, WorkItem) ' The second, and final, node in the dataflow pipeline. Private completeWork As ActionBlock(Of WorkItem) ' Increments the value of the provided progress bar. Private incrementProgress As ActionBlock(Of ToolStripProgressBar) ' Decrements the value of the provided progress bar. Private decrementProgress As ActionBlock(Of ToolStripProgressBar) ' Enables progress bar actions to run on the UI thread. Private uiTaskScheduler As TaskScheduler
将下面的
CreatePipeline
方法添加到Form1
类。// Creates the blocks that participate in the dataflow pipeline. private void CreatePipeline() { // Create the cancellation source. cancellationSource = new CancellationTokenSource(); // Create the first node in the pipeline. startWork = new TransformBlock<WorkItem, WorkItem>(workItem => { // Perform some work. workItem.DoWork(250); // Decrement the progress bar that tracks the count of // active work items in this stage of the pipeline. decrementProgress.Post(toolStripProgressBar1); // Increment the progress bar that tracks the count of // active work items in the next stage of the pipeline. incrementProgress.Post(toolStripProgressBar2); // Send the work item to the next stage of the pipeline. return workItem; }, new ExecutionDataflowBlockOptions { CancellationToken = cancellationSource.Token }); // Create the second, and final, node in the pipeline. completeWork = new ActionBlock<WorkItem>(workItem => { // Perform some work. workItem.DoWork(1000); // Decrement the progress bar that tracks the count of // active work items in this stage of the pipeline. decrementProgress.Post(toolStripProgressBar2); // Increment the progress bar that tracks the overall // count of completed work items. incrementProgress.Post(toolStripProgressBar3); }, new ExecutionDataflowBlockOptions { CancellationToken = cancellationSource.Token, MaxDegreeOfParallelism = 2 }); // Connect the two nodes of the pipeline. When the first node completes, // set the second node also to the completed state. startWork.LinkTo( completeWork, new DataflowLinkOptions { PropagateCompletion = true }); // Create the dataflow action blocks that increment and decrement // progress bars. // These blocks use the task scheduler that is associated with // the UI thread. incrementProgress = new ActionBlock<ToolStripProgressBar>( progressBar => progressBar.Value++, new ExecutionDataflowBlockOptions { CancellationToken = cancellationSource.Token, TaskScheduler = uiTaskScheduler }); decrementProgress = new ActionBlock<ToolStripProgressBar>( progressBar => progressBar.Value--, new ExecutionDataflowBlockOptions { CancellationToken = cancellationSource.Token, TaskScheduler = uiTaskScheduler }); }
' Creates the blocks that participate in the dataflow pipeline. Private Sub CreatePipeline() ' Create the cancellation source. cancellationSource = New CancellationTokenSource() ' Create the first node in the pipeline. startWork = New TransformBlock(Of WorkItem, WorkItem)(Function(workItem) ' Perform some work. ' Decrement the progress bar that tracks the count of ' active work items in this stage of the pipeline. ' Increment the progress bar that tracks the count of ' active work items in the next stage of the pipeline. ' Send the work item to the next stage of the pipeline. workItem.DoWork(250) decrementProgress.Post(toolStripProgressBar1) incrementProgress.Post(toolStripProgressBar2) Return workItem End Function, New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token}) ' Create the second, and final, node in the pipeline. completeWork = New ActionBlock(Of WorkItem)(Sub(workItem) ' Perform some work. ' Decrement the progress bar that tracks the count of ' active work items in this stage of the pipeline. ' Increment the progress bar that tracks the overall ' count of completed work items. workItem.DoWork(1000) decrementProgress.Post(toolStripProgressBar2) incrementProgress.Post(toolStripProgressBar3) End Sub, New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token, .MaxDegreeOfParallelism = 2}) ' Connect the two nodes of the pipeline. When the first node completes, ' set the second node also to the completed state. startWork.LinkTo( completeWork, New DataflowLinkOptions With {.PropagateCompletion = true}) ' Create the dataflow action blocks that increment and decrement ' progress bars. ' These blocks use the task scheduler that is associated with ' the UI thread. incrementProgress = New ActionBlock(Of ToolStripProgressBar)( Sub(progressBar) progressBar.Value += 1, New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token, .TaskScheduler = uiTaskScheduler}) decrementProgress = New ActionBlock(Of ToolStripProgressBar)( Sub(progressBar) progressBar.Value -= 1, New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token, .TaskScheduler = uiTaskScheduler}) End Sub
因为 incrementProgress
和 decrementProgress
数据流块是在用户界面上操作,所以这些操作要在用户界面线程上执行,这一点很重要。 为此,在构造期间,每个对象都提供将 TaskScheduler 属性设置为 TaskScheduler.FromCurrentSynchronizationContext 的 ExecutionDataflowBlockOptions 对象。 TaskScheduler.FromCurrentSynchronizationContext 方法会创建一个在当前同步上下文中执行工作的 TaskScheduler 对象。 因为 Form1
构造函数是从用户界面线程中调用的,所以 incrementProgress
和 decrementProgress
数据流块的操作也会在用户界面线程上运行。
此示例在构造管道成员时设置 CancellationToken 属性。 由于 CancellationToken 属性永久取消数据流块执行,因此如果用户在取消操作后又想再将更多工作项添加到管道中,必须重新创建整个管道。 有关演示使用另一种方法取消数据流块以便在取消操作后可以执行其他工作的示例,请参阅演练:在 Windows 窗体应用程序中使用数据流。
将数据流管道连接到用户界面
本节介绍如何将数据流管道连接到用户界面。 创建管道以及将工作项添加到管道中都由“添加工作项”按钮的事件处理程序控制。 通过“取消”按钮启动取消操作。 用户单击以上任一按钮时,都会以异步方式启动相应操作。
将数据流管道连接到用户界面
-
// Event handler for the Add Work Items button. private void toolStripButton1_Click(object sender, EventArgs e) { // The Cancel button is disabled when the pipeline is not active. // Therefore, create the pipeline and enable the Cancel button // if the Cancel button is disabled. if (!toolStripButton2.Enabled) { CreatePipeline(); // Enable the Cancel button. toolStripButton2.Enabled = true; } // Post several work items to the head of the pipeline. for (int i = 0; i < 5; i++) { toolStripProgressBar1.Value++; startWork.Post(new WorkItem()); } }
' Event handler for the Add Work Items button. Private Sub toolStripButton1_Click(ByVal sender As Object, ByVal e As EventArgs) Handles toolStripButton1.Click ' The Cancel button is disabled when the pipeline is not active. ' Therefore, create the pipeline and enable the Cancel button ' if the Cancel button is disabled. If Not toolStripButton2.Enabled Then CreatePipeline() ' Enable the Cancel button. toolStripButton2.Enabled = True End If ' Post several work items to the head of the pipeline. For i As Integer = 0 To 4 toolStripProgressBar1.Value += 1 startWork.Post(New WorkItem()) Next i End Sub
-
// Event handler for the Cancel button. private async void toolStripButton2_Click(object sender, EventArgs e) { // Disable both buttons. toolStripButton1.Enabled = false; toolStripButton2.Enabled = false; // Trigger cancellation. cancellationSource.Cancel(); try { // Asynchronously wait for the pipeline to complete processing and for // the progress bars to update. await Task.WhenAll( completeWork.Completion, incrementProgress.Completion, decrementProgress.Completion); } catch (OperationCanceledException) { } // Increment the progress bar that tracks the number of cancelled // work items by the number of active work items. toolStripProgressBar4.Value += toolStripProgressBar1.Value; toolStripProgressBar4.Value += toolStripProgressBar2.Value; // Reset the progress bars that track the number of active work items. toolStripProgressBar1.Value = 0; toolStripProgressBar2.Value = 0; // Enable the Add Work Items button. toolStripButton1.Enabled = true; }
' Event handler for the Cancel button. Private Async Sub toolStripButton2_Click(ByVal sender As Object, ByVal e As EventArgs) Handles toolStripButton2.Click ' Disable both buttons. toolStripButton1.Enabled = False toolStripButton2.Enabled = False ' Trigger cancellation. cancellationSource.Cancel() Try ' Asynchronously wait for the pipeline to complete processing and for ' the progress bars to update. Await Task.WhenAll(completeWork.Completion, incrementProgress.Completion, decrementProgress.Completion) Catch e1 As OperationCanceledException End Try ' Increment the progress bar that tracks the number of cancelled ' work items by the number of active work items. toolStripProgressBar4.Value += toolStripProgressBar1.Value toolStripProgressBar4.Value += toolStripProgressBar2.Value ' Reset the progress bars that track the number of active work items. toolStripProgressBar1.Value = 0 toolStripProgressBar2.Value = 0 ' Enable the Add Work Items button. toolStripButton1.Enabled = True End Sub
示例
下面的示例演示 Form1.cs(对于 Visual Basic 则为 Form1.vb)的完整代码。
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
namespace CancellationWinForms
{
public partial class Form1 : Form
{
// A placeholder type that performs work.
class WorkItem
{
// Performs work for the provided number of milliseconds.
public void DoWork(int milliseconds)
{
// For demonstration, suspend the current thread.
Thread.Sleep(milliseconds);
}
}
// Enables the user interface to signal cancellation.
CancellationTokenSource cancellationSource;
// The first node in the dataflow pipeline.
TransformBlock<WorkItem, WorkItem> startWork;
// The second, and final, node in the dataflow pipeline.
ActionBlock<WorkItem> completeWork;
// Increments the value of the provided progress bar.
ActionBlock<ToolStripProgressBar> incrementProgress;
// Decrements the value of the provided progress bar.
ActionBlock<ToolStripProgressBar> decrementProgress;
// Enables progress bar actions to run on the UI thread.
TaskScheduler uiTaskScheduler;
public Form1()
{
InitializeComponent();
// Create the UI task scheduler from the current synchronization
// context.
uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
}
// Creates the blocks that participate in the dataflow pipeline.
private void CreatePipeline()
{
// Create the cancellation source.
cancellationSource = new CancellationTokenSource();
// Create the first node in the pipeline.
startWork = new TransformBlock<WorkItem, WorkItem>(workItem =>
{
// Perform some work.
workItem.DoWork(250);
// Decrement the progress bar that tracks the count of
// active work items in this stage of the pipeline.
decrementProgress.Post(toolStripProgressBar1);
// Increment the progress bar that tracks the count of
// active work items in the next stage of the pipeline.
incrementProgress.Post(toolStripProgressBar2);
// Send the work item to the next stage of the pipeline.
return workItem;
},
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token
});
// Create the second, and final, node in the pipeline.
completeWork = new ActionBlock<WorkItem>(workItem =>
{
// Perform some work.
workItem.DoWork(1000);
// Decrement the progress bar that tracks the count of
// active work items in this stage of the pipeline.
decrementProgress.Post(toolStripProgressBar2);
// Increment the progress bar that tracks the overall
// count of completed work items.
incrementProgress.Post(toolStripProgressBar3);
},
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token,
MaxDegreeOfParallelism = 2
});
// Connect the two nodes of the pipeline. When the first node completes,
// set the second node also to the completed state.
startWork.LinkTo(
completeWork, new DataflowLinkOptions { PropagateCompletion = true });
// Create the dataflow action blocks that increment and decrement
// progress bars.
// These blocks use the task scheduler that is associated with
// the UI thread.
incrementProgress = new ActionBlock<ToolStripProgressBar>(
progressBar => progressBar.Value++,
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token,
TaskScheduler = uiTaskScheduler
});
decrementProgress = new ActionBlock<ToolStripProgressBar>(
progressBar => progressBar.Value--,
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token,
TaskScheduler = uiTaskScheduler
});
}
// Event handler for the Add Work Items button.
private void toolStripButton1_Click(object sender, EventArgs e)
{
// The Cancel button is disabled when the pipeline is not active.
// Therefore, create the pipeline and enable the Cancel button
// if the Cancel button is disabled.
if (!toolStripButton2.Enabled)
{
CreatePipeline();
// Enable the Cancel button.
toolStripButton2.Enabled = true;
}
// Post several work items to the head of the pipeline.
for (int i = 0; i < 5; i++)
{
toolStripProgressBar1.Value++;
startWork.Post(new WorkItem());
}
}
// Event handler for the Cancel button.
private async void toolStripButton2_Click(object sender, EventArgs e)
{
// Disable both buttons.
toolStripButton1.Enabled = false;
toolStripButton2.Enabled = false;
// Trigger cancellation.
cancellationSource.Cancel();
try
{
// Asynchronously wait for the pipeline to complete processing and for
// the progress bars to update.
await Task.WhenAll(
completeWork.Completion,
incrementProgress.Completion,
decrementProgress.Completion);
}
catch (OperationCanceledException)
{
}
// Increment the progress bar that tracks the number of cancelled
// work items by the number of active work items.
toolStripProgressBar4.Value += toolStripProgressBar1.Value;
toolStripProgressBar4.Value += toolStripProgressBar2.Value;
// Reset the progress bars that track the number of active work items.
toolStripProgressBar1.Value = 0;
toolStripProgressBar2.Value = 0;
// Enable the Add Work Items button.
toolStripButton1.Enabled = true;
}
~Form1()
{
cancellationSource.Dispose();
}
}
}
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Namespace CancellationWinForms
Partial Public Class Form1
Inherits Form
' A placeholder type that performs work.
Private Class WorkItem
' Performs work for the provided number of milliseconds.
Public Sub DoWork(ByVal milliseconds As Integer)
' For demonstration, suspend the current thread.
Thread.Sleep(milliseconds)
End Sub
End Class
' Enables the user interface to signal cancellation.
Private cancellationSource As CancellationTokenSource
' The first node in the dataflow pipeline.
Private startWork As TransformBlock(Of WorkItem, WorkItem)
' The second, and final, node in the dataflow pipeline.
Private completeWork As ActionBlock(Of WorkItem)
' Increments the value of the provided progress bar.
Private incrementProgress As ActionBlock(Of ToolStripProgressBar)
' Decrements the value of the provided progress bar.
Private decrementProgress As ActionBlock(Of ToolStripProgressBar)
' Enables progress bar actions to run on the UI thread.
Private uiTaskScheduler As TaskScheduler
Public Sub New()
InitializeComponent()
' Create the UI task scheduler from the current synchronization
' context.
uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
End Sub
' Creates the blocks that participate in the dataflow pipeline.
Private Sub CreatePipeline()
' Create the cancellation source.
cancellationSource = New CancellationTokenSource()
' Create the first node in the pipeline.
startWork = New TransformBlock(Of WorkItem, WorkItem)(Function(workItem)
' Perform some work.
' Decrement the progress bar that tracks the count of
' active work items in this stage of the pipeline.
' Increment the progress bar that tracks the count of
' active work items in the next stage of the pipeline.
' Send the work item to the next stage of the pipeline.
workItem.DoWork(250)
decrementProgress.Post(toolStripProgressBar1)
incrementProgress.Post(toolStripProgressBar2)
Return workItem
End Function,
New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token})
' Create the second, and final, node in the pipeline.
completeWork = New ActionBlock(Of WorkItem)(Sub(workItem)
' Perform some work.
' Decrement the progress bar that tracks the count of
' active work items in this stage of the pipeline.
' Increment the progress bar that tracks the overall
' count of completed work items.
workItem.DoWork(1000)
decrementProgress.Post(toolStripProgressBar2)
incrementProgress.Post(toolStripProgressBar3)
End Sub,
New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token,
.MaxDegreeOfParallelism = 2})
' Connect the two nodes of the pipeline. When the first node completes,
' set the second node also to the completed state.
startWork.LinkTo(
completeWork, New DataflowLinkOptions With {.PropagateCompletion = true})
' Create the dataflow action blocks that increment and decrement
' progress bars.
' These blocks use the task scheduler that is associated with
' the UI thread.
incrementProgress = New ActionBlock(Of ToolStripProgressBar)(
Sub(progressBar) progressBar.Value += 1,
New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token,
.TaskScheduler = uiTaskScheduler})
decrementProgress = New ActionBlock(Of ToolStripProgressBar)(
Sub(progressBar) progressBar.Value -= 1,
New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token,
.TaskScheduler = uiTaskScheduler})
End Sub
' Event handler for the Add Work Items button.
Private Sub toolStripButton1_Click(ByVal sender As Object, ByVal e As EventArgs) Handles toolStripButton1.Click
' The Cancel button is disabled when the pipeline is not active.
' Therefore, create the pipeline and enable the Cancel button
' if the Cancel button is disabled.
If Not toolStripButton2.Enabled Then
CreatePipeline()
' Enable the Cancel button.
toolStripButton2.Enabled = True
End If
' Post several work items to the head of the pipeline.
For i As Integer = 0 To 4
toolStripProgressBar1.Value += 1
startWork.Post(New WorkItem())
Next i
End Sub
' Event handler for the Cancel button.
Private Async Sub toolStripButton2_Click(ByVal sender As Object, ByVal e As EventArgs) Handles toolStripButton2.Click
' Disable both buttons.
toolStripButton1.Enabled = False
toolStripButton2.Enabled = False
' Trigger cancellation.
cancellationSource.Cancel()
Try
' Asynchronously wait for the pipeline to complete processing and for
' the progress bars to update.
Await Task.WhenAll(completeWork.Completion, incrementProgress.Completion, decrementProgress.Completion)
Catch e1 As OperationCanceledException
End Try
' Increment the progress bar that tracks the number of cancelled
' work items by the number of active work items.
toolStripProgressBar4.Value += toolStripProgressBar1.Value
toolStripProgressBar4.Value += toolStripProgressBar2.Value
' Reset the progress bars that track the number of active work items.
toolStripProgressBar1.Value = 0
toolStripProgressBar2.Value = 0
' Enable the Add Work Items button.
toolStripButton1.Enabled = True
End Sub
Protected Overrides Sub Finalize()
cancellationSource.Dispose()
MyBase.Finalize()
End Sub
End Class
End Namespace
下图显示正在运行的应用程序。