Практическое руководство. Использование JoinBlock для чтения данных из нескольких источников

В этом документе объясняется, как использовать JoinBlock<T1,T2> класс для выполнения операции, когда данные доступны из нескольких источников. В нем также показано, как использовать нежадный режим, для обеспечения более эффективного общего доступа к источнику данных несколькими объединяющими блоками.

Замечание

Библиотека потоков данных TPL ( System.Threading.Tasks.Dataflow пространство имен) включена в .NET 6 и более поздние версии. Для проектов .NET Framework и .NET Standard необходимо установить 📦 пакет NuGet System.Threading.Tasks.Dataflow.

Example

В следующем примере определяются три типа ресурсов: NetworkResource, FileResource и MemoryResource. Операции выполняются, когда ресурсы становятся доступными. В этом примере требуется пара NetworkResource и MemoryResource для выполнения первой операции и пара FileResource и MemoryResource для выполнения второй операции. Чтобы обеспечить выполнение этих операций при наличии всех необходимых ресурсов, в этом примере используется JoinBlock<T1,T2> класс. JoinBlock<T1,T2> Когда объект получает данные из всех источников, он передает их в целевой объект ActionBlock<TInput>, как показано в этом примере. Оба JoinBlock<T1,T2> объекта считываются из общего пула MemoryResource объектов.

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
   // Represents a resource. A derived class might represent
   // a limited resource such as a memory, network, or I/O
   // device.
   abstract class Resource
   {
   }

   // Represents a memory resource. For brevity, the details of
   // this class are omitted.
   class MemoryResource : Resource
   {
   }

   // Represents a network resource. For brevity, the details of
   // this class are omitted.
   class NetworkResource : Resource
   {
   }

   // Represents a file resource. For brevity, the details of
   // this class are omitted.
   class FileResource : Resource
   {
   }

   static void Main(string[] args)
   {
      // Create three BufferBlock<T> objects. Each object holds a different
      // type of resource.
      var networkResources = new BufferBlock<NetworkResource>();
      var fileResources = new BufferBlock<FileResource>();
      var memoryResources = new BufferBlock<MemoryResource>();

      // Create two non-greedy JoinBlock<T1, T2> objects.
      // The first join works with network and memory resources;
      // the second pool works with file and memory resources.

      var joinNetworkAndMemoryResources =
         new JoinBlock<NetworkResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      var joinFileAndMemoryResources =
         new JoinBlock<FileResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      // Create two ActionBlock<T> objects.
      // The first block acts on a network resource and a memory resource.
      // The second block acts on a file resource and a memory resource.

      var networkMemoryAction =
         new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("Network worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("Network worker: finished using resources...");

               // Release the resources back to their respective pools.
               networkResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      var fileMemoryAction =
         new ActionBlock<Tuple<FileResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("File worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("File worker: finished using resources...");

               // Release the resources back to their respective pools.
               fileResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      // Link the resource pools to the JoinBlock<T1, T2> objects.
      // Because these join blocks operate in non-greedy mode, they do not
      // take the resource from a pool until all resources are available from
      // all pools.

      networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
      memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);

      fileResources.LinkTo(joinFileAndMemoryResources.Target1);
      memoryResources.LinkTo(joinFileAndMemoryResources.Target2);

      // Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

      joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
      joinFileAndMemoryResources.LinkTo(fileMemoryAction);

      // Populate the resource pools. In this example, network and
      // file resources are more abundant than memory resources.

      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());

      memoryResources.Post(new MemoryResource());

      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());

      // Allow data to flow through the network for several seconds.
      Thread.Sleep(10000);
   }
}

/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to use non-greedy join blocks to distribute
' resources among a dataflow network.
Friend Class Program
    ' Represents a resource. A derived class might represent 
    ' a limited resource such as a memory, network, or I/O
    ' device.
    Private MustInherit Class Resource
    End Class

    ' Represents a memory resource. For brevity, the details of 
    ' this class are omitted.
    Private Class MemoryResource
        Inherits Resource
    End Class

    ' Represents a network resource. For brevity, the details of 
    ' this class are omitted.
    Private Class NetworkResource
        Inherits Resource
    End Class

    ' Represents a file resource. For brevity, the details of 
    ' this class are omitted.
    Private Class FileResource
        Inherits Resource
    End Class

    Shared Sub Main(ByVal args() As String)
        ' Create three BufferBlock<T> objects. Each object holds a different
        ' type of resource.
        Dim networkResources = New BufferBlock(Of NetworkResource)()
        Dim fileResources = New BufferBlock(Of FileResource)()
        Dim memoryResources = New BufferBlock(Of MemoryResource)()

        ' Create two non-greedy JoinBlock<T1, T2> objects. 
        ' The first join works with network and memory resources; 
        ' the second pool works with file and memory resources.

        Dim joinNetworkAndMemoryResources = New JoinBlock(Of NetworkResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

        Dim joinFileAndMemoryResources = New JoinBlock(Of FileResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

        ' Create two ActionBlock<T> objects. 
        ' The first block acts on a network resource and a memory resource.
        ' The second block acts on a file resource and a memory resource.

        Dim networkMemoryAction = New ActionBlock(Of Tuple(Of NetworkResource, MemoryResource))(Sub(data)
                                                                                                    ' Perform some action on the resources.
                                                                                                    ' Print a message.
                                                                                                    ' Simulate a lengthy operation that uses the resources.
                                                                                                    ' Print a message.
                                                                                                    ' Release the resources back to their respective pools.
                                                                                                    Console.WriteLine("Network worker: using resources...")
                                                                                                    Thread.Sleep(New Random().Next(500, 2000))
                                                                                                    Console.WriteLine("Network worker: finished using resources...")
                                                                                                    networkResources.Post(data.Item1)
                                                                                                    memoryResources.Post(data.Item2)
                                                                                                End Sub)

        Dim fileMemoryAction = New ActionBlock(Of Tuple(Of FileResource, MemoryResource))(Sub(data)
                                                                                              ' Perform some action on the resources.
                                                                                              ' Print a message.
                                                                                              ' Simulate a lengthy operation that uses the resources.
                                                                                              ' Print a message.
                                                                                              ' Release the resources back to their respective pools.
                                                                                              Console.WriteLine("File worker: using resources...")
                                                                                              Thread.Sleep(New Random().Next(500, 2000))
                                                                                              Console.WriteLine("File worker: finished using resources...")
                                                                                              fileResources.Post(data.Item1)
                                                                                              memoryResources.Post(data.Item2)
                                                                                          End Sub)

        ' Link the resource pools to the JoinBlock<T1, T2> objects.
        ' Because these join blocks operate in non-greedy mode, they do not
        ' take the resource from a pool until all resources are available from
        ' all pools.

        networkResources.LinkTo(joinNetworkAndMemoryResources.Target1)
        memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2)

        fileResources.LinkTo(joinFileAndMemoryResources.Target1)
        memoryResources.LinkTo(joinFileAndMemoryResources.Target2)

        ' Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

        joinNetworkAndMemoryResources.LinkTo(networkMemoryAction)
        joinFileAndMemoryResources.LinkTo(fileMemoryAction)

        ' Populate the resource pools. In this example, network and 
        ' file resources are more abundant than memory resources.

        networkResources.Post(New NetworkResource())
        networkResources.Post(New NetworkResource())
        networkResources.Post(New NetworkResource())

        memoryResources.Post(New MemoryResource())

        fileResources.Post(New FileResource())
        fileResources.Post(New FileResource())
        fileResources.Post(New FileResource())

        ' Allow data to flow through the network for several seconds.
        Thread.Sleep(10000)

    End Sub

End Class

' Sample output:
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'

Чтобы обеспечить эффективное использование общего пула MemoryResource объектов, в этом примере указывается GroupingDataflowBlockOptions объект, имеющий свойство Greedy, заданное для False создания JoinBlock<T1,T2> объектов, которые действуют в не-жадном режиме. Блок не жадного соединения откладывает все входящие сообщения до тех пор, пока один не будет доступен из каждого источника. Если любой из отложенных сообщений был принят другим блоком, блок соединения перезапускает процесс. Не жадный режим позволяет объединять блоки, которые совместно используют один или несколько исходных блоков, чтобы сделать прогресс вперед, так как другие блоки ожидают данных. В этом примере, если MemoryResource объект добавляется в memoryResources пул, первый объединяющий блок, получив второй источник данных, может продолжить выполнение. Если в этом примере используется жадный режим, который является стандартным, один блок соединения может занять MemoryResource объект и ждать, пока второй ресурс станет доступным. Однако, если у другого блока соединения доступен второй источник данных, он не может продвигаться вперед, так как объект MemoryResource был взят другим блоком соединения.

Надежное программирование

Использование не жадных соединений также может помочь предотвратить взаимоблокировку в приложении. В приложении программного обеспечения взаимоблокировка возникает, когда два или несколько процессов используют ресурс и одновременно ожидают, пока другой процесс не освободит какой-либо из ресурсов. Рассмотрим приложение, определяющее два JoinBlock<T1,T2> объекта. Оба объекта считывают данные из двух общих блоков источника. В жадном режиме, если один блок соединения считывает данные из первого источника, а второй блок соединения — из второго источника, приложение может войти в состояние взаимной блокировки, поскольку оба блока соединения будут взаимно ожидать освобождения ресурса другим. В пассивном режиме каждый блок соединения считывает из своих источников только в том случае, если все данные доступны, и поэтому риск взаимоблокировки устраняется.

См. также