Aracılığıyla paylaş


Nasıl yapılır: Veri Akışı Bloğunda Paralellik Derecesini Belirtme

Bu belgede, bir yürütme veri akışı bloğunun ExecutionDataflowBlockOptions.MaxDegreeOfParallelism aynı anda birden fazla iletiyi işlemesini sağlamak için özelliğinin nasıl ayarlanacağı açıklanır. Bunu yapmak, uzun süre çalışan bir hesaplama gerçekleştiren ve iletileri paralel olarak işlemenin avantajlarından yararlanabilen bir veri akışı bloğunuz olduğunda yararlıdır. Bu örnekte, aynı anda birden çok veri akışı işlemi gerçekleştirmek için sınıfı kullanılır System.Threading.Tasks.Dataflow.ActionBlock<TInput> ; ancak, TPL Veri Akışı Kitaplığı'nın sağladığı önceden tanımlanmış yürütme bloğu türlerinden herhangi birinde en yüksek paralellik derecesini belirtebilirsiniz: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>ve System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

Not

TPL Veri Akışı Kitaplığı ( System.Threading.Tasks.Dataflow ad alanı) .NET ile dağıtılmaz. Ad alanını System.Threading.Tasks.Dataflow Visual Studio'ya yüklemek için projenizi açın, Proje menüsünden NuGet Paketlerini Yönet'i seçin ve çevrimiçi ortamda System.Threading.Tasks.Dataflow paketi arayın. Alternatif olarak, .NET Core CLI kullanarak yüklemek için komutunu çalıştırındotnet add package System.Threading.Tasks.Dataflow.

Örnek

Aşağıdaki örnek iki veri akışı hesaplaması gerçekleştirir ve her hesaplama için gereken geçen süreyi yazdırır. İlk hesaplama, varsayılan olan 1'in en yüksek paralellik derecesini belirtir. En fazla 1 paralellik derecesi, veri akışı bloğunun iletileri seri olarak işlemesine neden olur. İkinci hesaplama, kullanılabilir işlemci sayısına eşit olan en yüksek paralellik derecesini belirtmesi dışında ilk hesaplamaya benzer. Bu, veri akışı bloğunun paralel olarak birden çok işlem gerçekleştirmesini sağlar.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to specify the maximum degree of parallelism
// when using dataflow.
class Program
{
   // Performs several computations by using dataflow and returns the elapsed
   // time required to perform the computations.
   static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism,
      int messageCount)
   {
      // Create an ActionBlock<int> that performs some work.
      var workerBlock = new ActionBlock<int>(
         // Simulate work by suspending the current thread.
         millisecondsTimeout => Thread.Sleep(millisecondsTimeout),
         // Specify a maximum degree of parallelism.
         new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = maxDegreeOfParallelism
         });

      // Compute the time that it takes for several messages to
      // flow through the dataflow block.

      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      for (int i = 0; i < messageCount; i++)
      {
         workerBlock.Post(1000);
      }
      workerBlock.Complete();

      // Wait for all messages to propagate through the network.
      workerBlock.Completion.Wait();

      // Stop the timer and return the elapsed number of milliseconds.
      stopwatch.Stop();
      return stopwatch.Elapsed;
   }
   static void Main(string[] args)
   {
      int processorCount = Environment.ProcessorCount;
      int messageCount = processorCount;

      // Print the number of processors on this computer.
      Console.WriteLine("Processor count = {0}.", processorCount);

      TimeSpan elapsed;

      // Perform two dataflow computations and print the elapsed
      // time required for each.

      // This call specifies a maximum degree of parallelism of 1.
      // This causes the dataflow block to process messages serially.
      elapsed = TimeDataflowComputations(1, messageCount);
      Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
         "elapsed time = {2}ms.", 1, messageCount, (int)elapsed.TotalMilliseconds);

      // Perform the computations again. This time, specify the number of
      // processors as the maximum degree of parallelism. This causes
      // multiple messages to be processed in parallel.
      elapsed = TimeDataflowComputations(processorCount, messageCount);
      Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
         "elapsed time = {2}ms.", processorCount, messageCount, (int)elapsed.TotalMilliseconds);
   }
}

/* Sample output:
Processor count = 4.
Degree of parallelism = 1; message count = 4; elapsed time = 4032ms.
Degree of parallelism = 4; message count = 4; elapsed time = 1001ms.
*/
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to specify the maximum degree of parallelism 
' when using dataflow.
Friend Class Program
    ' Performs several computations by using dataflow and returns the elapsed
    ' time required to perform the computations.
    Private Shared Function TimeDataflowComputations(ByVal maxDegreeOfParallelism As Integer, ByVal messageCount As Integer) As TimeSpan
        ' Create an ActionBlock<int> that performs some work.
        Dim workerBlock = New ActionBlock(Of Integer)(Function(millisecondsTimeout) Pause(millisecondsTimeout), New ExecutionDataflowBlockOptions() With {.MaxDegreeOfParallelism = maxDegreeOfParallelism})
        ' Simulate work by suspending the current thread.
        ' Specify a maximum degree of parallelism.

        ' Compute the time that it takes for several messages to 
        ' flow through the dataflow block.

        Dim stopwatch As New Stopwatch()
        stopwatch.Start()

        For i As Integer = 0 To messageCount - 1
            workerBlock.Post(1000)
        Next i
        workerBlock.Complete()

        ' Wait for all messages to propagate through the network.
        workerBlock.Completion.Wait()

        ' Stop the timer and return the elapsed number of milliseconds.
        stopwatch.Stop()
        Return stopwatch.Elapsed
    End Function

    Private Shared Function Pause(ByVal obj As Object)
        Thread.Sleep(obj)
        Return Nothing
    End Function
    Shared Sub Main(ByVal args() As String)
        Dim processorCount As Integer = Environment.ProcessorCount
        Dim messageCount As Integer = processorCount

        ' Print the number of processors on this computer.
        Console.WriteLine("Processor count = {0}.", processorCount)

        Dim elapsed As TimeSpan

        ' Perform two dataflow computations and print the elapsed
        ' time required for each.

        ' This call specifies a maximum degree of parallelism of 1.
        ' This causes the dataflow block to process messages serially.
        elapsed = TimeDataflowComputations(1, messageCount)
        Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " & "elapsed time = {2}ms.", 1, messageCount, CInt(Fix(elapsed.TotalMilliseconds)))

        ' Perform the computations again. This time, specify the number of 
        ' processors as the maximum degree of parallelism. This causes
        ' multiple messages to be processed in parallel.
        elapsed = TimeDataflowComputations(processorCount, messageCount)
        Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " & "elapsed time = {2}ms.", processorCount, messageCount, CInt(Fix(elapsed.TotalMilliseconds)))
    End Sub
End Class

' Sample output:
'Processor count = 4.
'Degree of parallelism = 1; message count = 4; elapsed time = 4032ms.
'Degree of parallelism = 4; message count = 4; elapsed time = 1001ms.
'

Güçlü Programlama

Varsayılan olarak, önceden tanımlanmış her veri akışı bloğu iletilerin alındığı sırada yayılır. Birden çok ileti, 1'den büyük bir paralellik derecesi belirttiğinizde aynı anda işleniyor olsa da, yine de alındıkları sırayla yayılırlar.

MaxDegreeOfParallelism özelliği en büyük paralellik derecesini temsil ettiğinden, veri akışı bloğu belirttiğinizden daha düşük derecede paralellikle çalışabilir. Veri akışı bloğu, işlevsel gereksinimlerini karşılamak veya kullanılabilir sistem kaynaklarının eksikliğini hesaba eklemek için daha az paralellik derecesi kullanabilir. Veri akışı bloğu hiçbir zaman belirttiğinizden daha yüksek bir paralellik derecesi seçmez.

Ayrıca bkz.