How to: Cancel a Dataflow Block
This document demonstrates how to enable cancellation in your application. This example uses Windows Forms to show where work items are active in a dataflow pipeline and also the effects of cancellation.
Note
The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow
package. Alternatively, to install it using the .NET Core CLI, run dotnet add package System.Threading.Tasks.Dataflow
.
Create a C# or Visual Basic Windows Forms Application project. In the following steps, the project is named
CancellationWinForms
.On the form designer for the main form, Form1.cs (Form1.vb for Visual Basic), add a ToolStrip control.
Add a ToolStripButton control to the ToolStrip control. Set the DisplayStyle property to Text and the Text property to Add Work Items.
Add a second ToolStripButton control to the ToolStrip control. Set the DisplayStyle property to Text, the Text property to Cancel, and the Enabled property to
False
.Add four ToolStripProgressBar objects to the ToolStrip control.
This section describes how to create the dataflow pipeline that processes work items and updates the progress bars.
In your project, add a reference to System.Threading.Tasks.Dataflow.dll.
Ensure that Form1.cs (Form1.vb for Visual Basic) contains the following
using
directives (Imports
in Visual Basic).using System; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Windows.Forms;
Imports System.Threading Imports System.Threading.Tasks Imports System.Threading.Tasks.Dataflow
Add the
WorkItem
class as an inner type of theForm1
class.// A placeholder type that performs work. class WorkItem { // Performs work for the provided number of milliseconds. public void DoWork(int milliseconds) { // For demonstration, suspend the current thread. Thread.Sleep(milliseconds); } }
' A placeholder type that performs work. Private Class WorkItem ' Performs work for the provided number of milliseconds. Public Sub DoWork(ByVal milliseconds As Integer) ' For demonstration, suspend the current thread. Thread.Sleep(milliseconds) End Sub End Class
Add the following data members to the
Form1
class.// Enables the user interface to signal cancellation. CancellationTokenSource cancellationSource; // The first node in the dataflow pipeline. TransformBlock<WorkItem, WorkItem> startWork; // The second, and final, node in the dataflow pipeline. ActionBlock<WorkItem> completeWork; // Increments the value of the provided progress bar. ActionBlock<ToolStripProgressBar> incrementProgress; // Decrements the value of the provided progress bar. ActionBlock<ToolStripProgressBar> decrementProgress; // Enables progress bar actions to run on the UI thread. TaskScheduler uiTaskScheduler;
' Enables the user interface to signal cancellation. Private cancellationSource As CancellationTokenSource ' The first node in the dataflow pipeline. Private startWork As TransformBlock(Of WorkItem, WorkItem) ' The second, and final, node in the dataflow pipeline. Private completeWork As ActionBlock(Of WorkItem) ' Increments the value of the provided progress bar. Private incrementProgress As ActionBlock(Of ToolStripProgressBar) ' Decrements the value of the provided progress bar. Private decrementProgress As ActionBlock(Of ToolStripProgressBar) ' Enables progress bar actions to run on the UI thread. Private uiTaskScheduler As TaskScheduler
Add the following method,
CreatePipeline
, to theForm1
class.// Creates the blocks that participate in the dataflow pipeline. private void CreatePipeline() { // Create the cancellation source. cancellationSource = new CancellationTokenSource(); // Create the first node in the pipeline. startWork = new TransformBlock<WorkItem, WorkItem>(workItem => { // Perform some work. workItem.DoWork(250); // Decrement the progress bar that tracks the count of // active work items in this stage of the pipeline. decrementProgress.Post(toolStripProgressBar1); // Increment the progress bar that tracks the count of // active work items in the next stage of the pipeline. incrementProgress.Post(toolStripProgressBar2); // Send the work item to the next stage of the pipeline. return workItem; }, new ExecutionDataflowBlockOptions { CancellationToken = cancellationSource.Token }); // Create the second, and final, node in the pipeline. completeWork = new ActionBlock<WorkItem>(workItem => { // Perform some work. workItem.DoWork(1000); // Decrement the progress bar that tracks the count of // active work items in this stage of the pipeline. decrementProgress.Post(toolStripProgressBar2); // Increment the progress bar that tracks the overall // count of completed work items. incrementProgress.Post(toolStripProgressBar3); }, new ExecutionDataflowBlockOptions { CancellationToken = cancellationSource.Token, MaxDegreeOfParallelism = 2 }); // Connect the two nodes of the pipeline. When the first node completes, // set the second node also to the completed state. startWork.LinkTo( completeWork, new DataflowLinkOptions { PropagateCompletion = true }); // Create the dataflow action blocks that increment and decrement // progress bars. // These blocks use the task scheduler that is associated with // the UI thread. incrementProgress = new ActionBlock<ToolStripProgressBar>( progressBar => progressBar.Value++, new ExecutionDataflowBlockOptions { CancellationToken = cancellationSource.Token, TaskScheduler = uiTaskScheduler }); decrementProgress = new ActionBlock<ToolStripProgressBar>( progressBar => progressBar.Value--, new ExecutionDataflowBlockOptions { CancellationToken = cancellationSource.Token, TaskScheduler = uiTaskScheduler }); }
' Creates the blocks that participate in the dataflow pipeline. Private Sub CreatePipeline() ' Create the cancellation source. cancellationSource = New CancellationTokenSource() ' Create the first node in the pipeline. startWork = New TransformBlock(Of WorkItem, WorkItem)(Function(workItem) ' Perform some work. ' Decrement the progress bar that tracks the count of ' active work items in this stage of the pipeline. ' Increment the progress bar that tracks the count of ' active work items in the next stage of the pipeline. ' Send the work item to the next stage of the pipeline. workItem.DoWork(250) decrementProgress.Post(toolStripProgressBar1) incrementProgress.Post(toolStripProgressBar2) Return workItem End Function, New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token}) ' Create the second, and final, node in the pipeline. completeWork = New ActionBlock(Of WorkItem)(Sub(workItem) ' Perform some work. ' Decrement the progress bar that tracks the count of ' active work items in this stage of the pipeline. ' Increment the progress bar that tracks the overall ' count of completed work items. workItem.DoWork(1000) decrementProgress.Post(toolStripProgressBar2) incrementProgress.Post(toolStripProgressBar3) End Sub, New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token, .MaxDegreeOfParallelism = 2}) ' Connect the two nodes of the pipeline. When the first node completes, ' set the second node also to the completed state. startWork.LinkTo( completeWork, New DataflowLinkOptions With {.PropagateCompletion = true}) ' Create the dataflow action blocks that increment and decrement ' progress bars. ' These blocks use the task scheduler that is associated with ' the UI thread. incrementProgress = New ActionBlock(Of ToolStripProgressBar)( Sub(progressBar) progressBar.Value += 1, New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token, .TaskScheduler = uiTaskScheduler}) decrementProgress = New ActionBlock(Of ToolStripProgressBar)( Sub(progressBar) progressBar.Value -= 1, New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token, .TaskScheduler = uiTaskScheduler}) End Sub
Because the incrementProgress
and decrementProgress
dataflow blocks act on the user interface, it is important that these actions occur on the user-interface thread. To accomplish this, during construction these objects each provide an ExecutionDataflowBlockOptions object that has the TaskScheduler property set to TaskScheduler.FromCurrentSynchronizationContext. The TaskScheduler.FromCurrentSynchronizationContext method creates a TaskScheduler object that performs work on the current synchronization context. Because the Form1
constructor is called from the user-interface thread, the actions for the incrementProgress
and decrementProgress
dataflow blocks also run on the user-interface thread.
This example sets the CancellationToken property when it constructs the members of the pipeline. Because the CancellationToken property permanently cancels dataflow block execution, the whole pipeline must be recreated after the user cancels the operation and then wants to add more work items to the pipeline. For an example that demonstrates an alternative way to cancel a dataflow block so that other work can be performed after an operation is canceled, see Walkthrough: Using Dataflow in a Windows Forms Application.
This section describes how to connect the dataflow pipeline to the user interface. Both creating the pipeline and adding work items to the pipeline are controlled by the event handler for the Add Work Items button. Cancellation is initiated by the Cancel button. When the user clicks either of these buttons, the appropriate action is initiated in an asynchronous manner.
On the form designer for the main form, create an event handler for the Click event for the Add Work Items button.
Implement the Click event for the Add Work Items button.
// Event handler for the Add Work Items button. private void toolStripButton1_Click(object sender, EventArgs e) { // The Cancel button is disabled when the pipeline is not active. // Therefore, create the pipeline and enable the Cancel button // if the Cancel button is disabled. if (!toolStripButton2.Enabled) { CreatePipeline(); // Enable the Cancel button. toolStripButton2.Enabled = true; } // Post several work items to the head of the pipeline. for (int i = 0; i < 5; i++) { toolStripProgressBar1.Value++; startWork.Post(new WorkItem()); } }
' Event handler for the Add Work Items button. Private Sub toolStripButton1_Click(ByVal sender As Object, ByVal e As EventArgs) Handles toolStripButton1.Click ' The Cancel button is disabled when the pipeline is not active. ' Therefore, create the pipeline and enable the Cancel button ' if the Cancel button is disabled. If Not toolStripButton2.Enabled Then CreatePipeline() ' Enable the Cancel button. toolStripButton2.Enabled = True End If ' Post several work items to the head of the pipeline. For i As Integer = 0 To 4 toolStripProgressBar1.Value += 1 startWork.Post(New WorkItem()) Next i End Sub
On the form designer for the main form, create an event handler for the Click event handler for the Cancel button.
Implement the Click event handler for the Cancel button.
// Event handler for the Cancel button. private async void toolStripButton2_Click(object sender, EventArgs e) { // Disable both buttons. toolStripButton1.Enabled = false; toolStripButton2.Enabled = false; // Trigger cancellation. cancellationSource.Cancel(); try { // Asynchronously wait for the pipeline to complete processing and for // the progress bars to update. await Task.WhenAll( completeWork.Completion, incrementProgress.Completion, decrementProgress.Completion); } catch (OperationCanceledException) { } // Increment the progress bar that tracks the number of cancelled // work items by the number of active work items. toolStripProgressBar4.Value += toolStripProgressBar1.Value; toolStripProgressBar4.Value += toolStripProgressBar2.Value; // Reset the progress bars that track the number of active work items. toolStripProgressBar1.Value = 0; toolStripProgressBar2.Value = 0; // Enable the Add Work Items button. toolStripButton1.Enabled = true; }
' Event handler for the Cancel button. Private Async Sub toolStripButton2_Click(ByVal sender As Object, ByVal e As EventArgs) Handles toolStripButton2.Click ' Disable both buttons. toolStripButton1.Enabled = False toolStripButton2.Enabled = False ' Trigger cancellation. cancellationSource.Cancel() Try ' Asynchronously wait for the pipeline to complete processing and for ' the progress bars to update. Await Task.WhenAll(completeWork.Completion, incrementProgress.Completion, decrementProgress.Completion) Catch e1 As OperationCanceledException End Try ' Increment the progress bar that tracks the number of cancelled ' work items by the number of active work items. toolStripProgressBar4.Value += toolStripProgressBar1.Value toolStripProgressBar4.Value += toolStripProgressBar2.Value ' Reset the progress bars that track the number of active work items. toolStripProgressBar1.Value = 0 toolStripProgressBar2.Value = 0 ' Enable the Add Work Items button. toolStripButton1.Enabled = True End Sub
The following example shows the complete code for Form1.cs (Form1.vb for Visual Basic).
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
namespace CancellationWinForms
{
public partial class Form1 : Form
{
// A placeholder type that performs work.
class WorkItem
{
// Performs work for the provided number of milliseconds.
public void DoWork(int milliseconds)
{
// For demonstration, suspend the current thread.
Thread.Sleep(milliseconds);
}
}
// Enables the user interface to signal cancellation.
CancellationTokenSource cancellationSource;
// The first node in the dataflow pipeline.
TransformBlock<WorkItem, WorkItem> startWork;
// The second, and final, node in the dataflow pipeline.
ActionBlock<WorkItem> completeWork;
// Increments the value of the provided progress bar.
ActionBlock<ToolStripProgressBar> incrementProgress;
// Decrements the value of the provided progress bar.
ActionBlock<ToolStripProgressBar> decrementProgress;
// Enables progress bar actions to run on the UI thread.
TaskScheduler uiTaskScheduler;
public Form1()
{
InitializeComponent();
// Create the UI task scheduler from the current synchronization
// context.
uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
}
// Creates the blocks that participate in the dataflow pipeline.
private void CreatePipeline()
{
// Create the cancellation source.
cancellationSource = new CancellationTokenSource();
// Create the first node in the pipeline.
startWork = new TransformBlock<WorkItem, WorkItem>(workItem =>
{
// Perform some work.
workItem.DoWork(250);
// Decrement the progress bar that tracks the count of
// active work items in this stage of the pipeline.
decrementProgress.Post(toolStripProgressBar1);
// Increment the progress bar that tracks the count of
// active work items in the next stage of the pipeline.
incrementProgress.Post(toolStripProgressBar2);
// Send the work item to the next stage of the pipeline.
return workItem;
},
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token
});
// Create the second, and final, node in the pipeline.
completeWork = new ActionBlock<WorkItem>(workItem =>
{
// Perform some work.
workItem.DoWork(1000);
// Decrement the progress bar that tracks the count of
// active work items in this stage of the pipeline.
decrementProgress.Post(toolStripProgressBar2);
// Increment the progress bar that tracks the overall
// count of completed work items.
incrementProgress.Post(toolStripProgressBar3);
},
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token,
MaxDegreeOfParallelism = 2
});
// Connect the two nodes of the pipeline. When the first node completes,
// set the second node also to the completed state.
startWork.LinkTo(
completeWork, new DataflowLinkOptions { PropagateCompletion = true });
// Create the dataflow action blocks that increment and decrement
// progress bars.
// These blocks use the task scheduler that is associated with
// the UI thread.
incrementProgress = new ActionBlock<ToolStripProgressBar>(
progressBar => progressBar.Value++,
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token,
TaskScheduler = uiTaskScheduler
});
decrementProgress = new ActionBlock<ToolStripProgressBar>(
progressBar => progressBar.Value--,
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token,
TaskScheduler = uiTaskScheduler
});
}
// Event handler for the Add Work Items button.
private void toolStripButton1_Click(object sender, EventArgs e)
{
// The Cancel button is disabled when the pipeline is not active.
// Therefore, create the pipeline and enable the Cancel button
// if the Cancel button is disabled.
if (!toolStripButton2.Enabled)
{
CreatePipeline();
// Enable the Cancel button.
toolStripButton2.Enabled = true;
}
// Post several work items to the head of the pipeline.
for (int i = 0; i < 5; i++)
{
toolStripProgressBar1.Value++;
startWork.Post(new WorkItem());
}
}
// Event handler for the Cancel button.
private async void toolStripButton2_Click(object sender, EventArgs e)
{
// Disable both buttons.
toolStripButton1.Enabled = false;
toolStripButton2.Enabled = false;
// Trigger cancellation.
cancellationSource.Cancel();
try
{
// Asynchronously wait for the pipeline to complete processing and for
// the progress bars to update.
await Task.WhenAll(
completeWork.Completion,
incrementProgress.Completion,
decrementProgress.Completion);
}
catch (OperationCanceledException)
{
}
// Increment the progress bar that tracks the number of cancelled
// work items by the number of active work items.
toolStripProgressBar4.Value += toolStripProgressBar1.Value;
toolStripProgressBar4.Value += toolStripProgressBar2.Value;
// Reset the progress bars that track the number of active work items.
toolStripProgressBar1.Value = 0;
toolStripProgressBar2.Value = 0;
// Enable the Add Work Items button.
toolStripButton1.Enabled = true;
}
~Form1()
{
cancellationSource.Dispose();
}
}
}
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Namespace CancellationWinForms
Partial Public Class Form1
Inherits Form
' A placeholder type that performs work.
Private Class WorkItem
' Performs work for the provided number of milliseconds.
Public Sub DoWork(ByVal milliseconds As Integer)
' For demonstration, suspend the current thread.
Thread.Sleep(milliseconds)
End Sub
End Class
' Enables the user interface to signal cancellation.
Private cancellationSource As CancellationTokenSource
' The first node in the dataflow pipeline.
Private startWork As TransformBlock(Of WorkItem, WorkItem)
' The second, and final, node in the dataflow pipeline.
Private completeWork As ActionBlock(Of WorkItem)
' Increments the value of the provided progress bar.
Private incrementProgress As ActionBlock(Of ToolStripProgressBar)
' Decrements the value of the provided progress bar.
Private decrementProgress As ActionBlock(Of ToolStripProgressBar)
' Enables progress bar actions to run on the UI thread.
Private uiTaskScheduler As TaskScheduler
Public Sub New()
InitializeComponent()
' Create the UI task scheduler from the current synchronization
' context.
uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
End Sub
' Creates the blocks that participate in the dataflow pipeline.
Private Sub CreatePipeline()
' Create the cancellation source.
cancellationSource = New CancellationTokenSource()
' Create the first node in the pipeline.
startWork = New TransformBlock(Of WorkItem, WorkItem)(Function(workItem)
' Perform some work.
' Decrement the progress bar that tracks the count of
' active work items in this stage of the pipeline.
' Increment the progress bar that tracks the count of
' active work items in the next stage of the pipeline.
' Send the work item to the next stage of the pipeline.
workItem.DoWork(250)
decrementProgress.Post(toolStripProgressBar1)
incrementProgress.Post(toolStripProgressBar2)
Return workItem
End Function,
New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token})
' Create the second, and final, node in the pipeline.
completeWork = New ActionBlock(Of WorkItem)(Sub(workItem)
' Perform some work.
' Decrement the progress bar that tracks the count of
' active work items in this stage of the pipeline.
' Increment the progress bar that tracks the overall
' count of completed work items.
workItem.DoWork(1000)
decrementProgress.Post(toolStripProgressBar2)
incrementProgress.Post(toolStripProgressBar3)
End Sub,
New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token,
.MaxDegreeOfParallelism = 2})
' Connect the two nodes of the pipeline. When the first node completes,
' set the second node also to the completed state.
startWork.LinkTo(
completeWork, New DataflowLinkOptions With {.PropagateCompletion = true})
' Create the dataflow action blocks that increment and decrement
' progress bars.
' These blocks use the task scheduler that is associated with
' the UI thread.
incrementProgress = New ActionBlock(Of ToolStripProgressBar)(
Sub(progressBar) progressBar.Value += 1,
New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token,
.TaskScheduler = uiTaskScheduler})
decrementProgress = New ActionBlock(Of ToolStripProgressBar)(
Sub(progressBar) progressBar.Value -= 1,
New ExecutionDataflowBlockOptions With {.CancellationToken = cancellationSource.Token,
.TaskScheduler = uiTaskScheduler})
End Sub
' Event handler for the Add Work Items button.
Private Sub toolStripButton1_Click(ByVal sender As Object, ByVal e As EventArgs) Handles toolStripButton1.Click
' The Cancel button is disabled when the pipeline is not active.
' Therefore, create the pipeline and enable the Cancel button
' if the Cancel button is disabled.
If Not toolStripButton2.Enabled Then
CreatePipeline()
' Enable the Cancel button.
toolStripButton2.Enabled = True
End If
' Post several work items to the head of the pipeline.
For i As Integer = 0 To 4
toolStripProgressBar1.Value += 1
startWork.Post(New WorkItem())
Next i
End Sub
' Event handler for the Cancel button.
Private Async Sub toolStripButton2_Click(ByVal sender As Object, ByVal e As EventArgs) Handles toolStripButton2.Click
' Disable both buttons.
toolStripButton1.Enabled = False
toolStripButton2.Enabled = False
' Trigger cancellation.
cancellationSource.Cancel()
Try
' Asynchronously wait for the pipeline to complete processing and for
' the progress bars to update.
Await Task.WhenAll(completeWork.Completion, incrementProgress.Completion, decrementProgress.Completion)
Catch e1 As OperationCanceledException
End Try
' Increment the progress bar that tracks the number of cancelled
' work items by the number of active work items.
toolStripProgressBar4.Value += toolStripProgressBar1.Value
toolStripProgressBar4.Value += toolStripProgressBar2.Value
' Reset the progress bars that track the number of active work items.
toolStripProgressBar1.Value = 0
toolStripProgressBar2.Value = 0
' Enable the Add Work Items button.
toolStripButton1.Enabled = True
End Sub
Protected Overrides Sub Finalize()
cancellationSource.Dispose()
MyBase.Finalize()
End Sub
End Class
End Namespace
The following illustration shows the running application.
.NET feedback
.NET is an open source project. Select a link to provide feedback: