如何:取消 PLINQ 查询

下面的示例展示了取消 PLINQ 查询的两种方法。 第一个示例展示了如何取消主要由数据遍历组成的查询。 第二个示例展示了如何取消包含计算成本很高的用户函数的查询。

注意

某些情况下,当启用“仅我的代码”后,Visual Studio 会在引发异常的行中断运行并显示一条错误消息,该消息显示“用户代码未处理异常”。该错误是良性错误。 可以按 F5 继续运行,并请参阅下面示例中所示的异常处理行为。 为了阻止 Visual Studio 在第一个错误出现时中断,只需依次转到“工具”、“选项”、“调试”、“常规” 下,取消选中“仅我的代码”复选框即可。

本示例旨在演示用法,运行速度可能不如等效的顺序 LINQ to Objects 查询快。 若要详细了解加速,请参阅了解 PLINQ 中的加速

示例 1

namespace PLINQCancellation_1
{
    using System;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using static System.Console;

    class Program
    {
        static void Main()
        {
            int[] source = Enumerable.Range(1, 10000000).ToArray();
            using CancellationTokenSource cts = new();

            // Start a new asynchronous task that will cancel the
            // operation from another thread. Typically you would call
            // Cancel() in response to a button click or some other
            // user interface event.
            Task.Factory.StartNew(() =>
            {
                UserClicksTheCancelButton(cts);
            });

            int[]? results = null;
            try
            {
                results =
                    (from num in source.AsParallel().WithCancellation(cts.Token)
                     where num % 3 == 0
                     orderby num descending
                     select num).ToArray();
            }
            catch (OperationCanceledException e)
            {
                WriteLine(e.Message);
            }
            catch (AggregateException ae)
            {
                if (ae.InnerExceptions != null)
                {
                    foreach (Exception e in ae.InnerExceptions)
                    {
                        WriteLine(e.Message);
                    }
                }
            }

            foreach (var item in results ?? Array.Empty<int>())
            {
                WriteLine(item);
            }
            WriteLine();
            ReadKey();
        }

        static void UserClicksTheCancelButton(CancellationTokenSource cts)
        {
            // Wait between 150 and 500 ms, then cancel.
            // Adjust these values if necessary to make
            // cancellation fire while query is still executing.
            Random rand = new();
            Thread.Sleep(rand.Next(150, 500));
            cts.Cancel();
        }
    }
}
Class Program
    Private Shared Sub Main(ByVal args As String())
        Dim source As Integer() = Enumerable.Range(1, 10000000).ToArray()
        Dim cs As New CancellationTokenSource()

        ' Start a new asynchronous task that will cancel the 
        ' operation from another thread. Typically you would call
        ' Cancel() in response to a button click or some other
        ' user interface event.
        Task.Factory.StartNew(Sub()
                                  UserClicksTheCancelButton(cs)
                              End Sub)

        Dim results As Integer() = Nothing
        Try

            results = (From num In source.AsParallel().WithCancellation(cs.Token) _
                       Where num Mod 3 = 0 _
                       Order By num Descending _
                       Select num).ToArray()
        Catch e As OperationCanceledException

            Console.WriteLine(e.Message)
        Catch ae As AggregateException

            If ae.InnerExceptions IsNot Nothing Then
                For Each e As Exception In ae.InnerExceptions
                    Console.WriteLine(e.Message)
                Next
            End If
        Finally
            cs.Dispose()
        End Try

        If results IsNot Nothing Then
            For Each item In results
                Console.WriteLine(item)
            Next
        End If
        Console.WriteLine()

        Console.ReadKey()
    End Sub

    Private Shared Sub UserClicksTheCancelButton(ByVal cs As CancellationTokenSource)
        ' Wait between 150 and 500 ms, then cancel.
        ' Adjust these values if necessary to make
        ' cancellation fire while query is still executing.
        Dim rand As New Random()
        Thread.Sleep(rand.[Next](150, 350))
        cs.Cancel()
    End Sub
End Class

PLINQ 框架不会将一个 OperationCanceledException 滚动到 System.AggregateException 中;必须在单独的 catch 块中处理 OperationCanceledException。 如果一个或多个用户委托抛出 OperationCanceledException(externalCT)(通过使用外部 System.Threading.CancellationToken),但没有其他任何异常,且查询被定义为 AsParallel().WithCancellation(externalCT),那么 PLINQ 会抛出 OperationCanceledException (externalCT),而不是 System.AggregateException。 不过,如果一个用户委托抛出 OperationCanceledException,另一个委托抛出另一种类型的异常,那么这两个异常都会滚动到 AggregateException 中。

关于取消的一般性指南如下:

  1. 如果执行用户委托取消,应将外部 CancellationToken 告知给 PLINQ,并引发 OperationCanceledException(externalCT)。

  2. 如果发生取消且没有引发其他任何异常,则处理 OperationCanceledException,而不是 AggregateException

示例 2

下面的示例展示了如何在用户代码中使用计算成本高的函数时处理取消。

namespace PLINQCancellation_2
{
    using System;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using static System.Console;

    class Program
    {
        static void Main(string[] args)
        {
            int[] source = Enumerable.Range(1, 10000000).ToArray();
            using CancellationTokenSource cts = new();

            // Start a new asynchronous task that will cancel the
            // operation from another thread. Typically you would call
            // Cancel() in response to a button click or some other
            // user interface event.
            Task.Factory.StartNew(() =>
            {
                UserClicksTheCancelButton(cts);
            });

            double[]? results = null;
            try
            {
                results =
                    (from num in source.AsParallel().WithCancellation(cts.Token)
                     where num % 3 == 0
                     select Function(num, cts.Token)).ToArray();
            }
            catch (OperationCanceledException e)
            {
                WriteLine(e.Message);
            }
            catch (AggregateException ae)
            {
                if (ae.InnerExceptions != null)
                {
                    foreach (Exception e in ae.InnerExceptions)
                        WriteLine(e.Message);
                }
            }

            foreach (var item in results ?? Array.Empty<double>())
            {
                WriteLine(item);
            }
            WriteLine();
            ReadKey();
        }

        // A toy method to simulate work.
        static double Function(int n, CancellationToken ct)
        {
            // If work is expected to take longer than 1 ms
            // then try to check cancellation status more
            // often within that work.
            for (int i = 0; i < 5; i++)
            {
                // Work hard for approx 1 millisecond.
                Thread.SpinWait(50000);

                // Check for cancellation request.
                ct.ThrowIfCancellationRequested();
            }
            // Anything will do for our purposes.
            return Math.Sqrt(n);
        }

        static void UserClicksTheCancelButton(CancellationTokenSource cts)
        {
            // Wait between 150 and 500 ms, then cancel.
            // Adjust these values if necessary to make
            // cancellation fire while query is still executing.
            Random rand = new();
            Thread.Sleep(rand.Next(150, 500));
            WriteLine("Press 'c' to cancel");
            if (ReadKey().KeyChar == 'c')
            {
                cts.Cancel();
            }
        }
    }
}
Class Program2
    Private Shared Sub Main(ByVal args As String())


        Dim source As Integer() = Enumerable.Range(1, 10000000).ToArray()
        Dim cs As New CancellationTokenSource()

        ' Start a new asynchronous task that will cancel the 
        ' operation from another thread. Typically you would call
        ' Cancel() in response to a button click or some other
        ' user interface event.
        Task.Factory.StartNew(Sub()

                                  UserClicksTheCancelButton(cs)
                              End Sub)

        Dim results As Double() = Nothing
        Try

            results = (From num In source.AsParallel().WithCancellation(cs.Token) _
                       Where num Mod 3 = 0 _
                       Select [Function](num, cs.Token)).ToArray()
        Catch e As OperationCanceledException


            Console.WriteLine(e.Message)
        Catch ae As AggregateException
            If ae.InnerExceptions IsNot Nothing Then
                For Each e As Exception In ae.InnerExceptions
                    Console.WriteLine(e.Message)
                Next
            End If
        Finally
            cs.Dispose()
        End Try

        If results IsNot Nothing Then
            For Each item In results
                Console.WriteLine(item)
            Next
        End If
        Console.WriteLine()

        Console.ReadKey()
    End Sub

    ' A toy method to simulate work.
    Private Shared Function [Function](ByVal n As Integer, ByVal ct As CancellationToken) As Double
        ' If work is expected to take longer than 1 ms
        ' then try to check cancellation status more
        ' often within that work.
        For i As Integer = 0 To 4
            ' Work hard for approx 1 millisecond.
            Thread.SpinWait(50000)

            ' Check for cancellation request.
            If ct.IsCancellationRequested Then
                Throw New OperationCanceledException(ct)
            End If
        Next
        ' Anything will do for our purposes.
        Return Math.Sqrt(n)
    End Function

    Private Shared Sub UserClicksTheCancelButton(ByVal cs As CancellationTokenSource)
        ' Wait between 150 and 500 ms, then cancel.
        ' Adjust these values if necessary to make
        ' cancellation fire while query is still executing.
        Dim rand As New Random()
        Thread.Sleep(rand.[Next](150, 350))
        Console.WriteLine("Press 'c' to cancel")
        If Console.ReadKey().KeyChar = "c"c Then
            cs.Cancel()

        End If
    End Sub
End Class

在用户代码中处理取消时,无需在查询定义中使用 WithCancellation。 不过,我们建议使用 WithCancellation,因为 WithCancellation 对查询性能没有影响,并让取消由查询运算符和用户代码进行处理。

为了确保系统响应速度,建议每毫秒检查一次取消;不过,只要不超过每 10 毫秒一次,任何频率都认为是可接受的。 此频率不得对代码性能产生不利影响。

如果枚举器已遭清理(例如,当代码跳出循环访问查询结果的 foreach(Visual Basic 中的 For Each)循环时),查询就会被取消,但不会引发异常。

请参阅