Freigeben über


Vorgehensweise: Verwenden von JoinBlock zum Lesen von Daten aus mehreren Quellen

In diesem Dokument wird erläutert, wie Sie mithilfe der JoinBlock<T1,T2> Klasse einen Vorgang ausführen, wenn Daten aus mehreren Quellen verfügbar sind. Außerdem wird veranschaulicht, wie Sie den nicht-gierigen Modus verwenden, damit mehrere Verknüpfungsblöcke eine Datenquelle effizienter freigeben können.

Hinweis

Die TPL-Datenflussbibliothek (der System.Threading.Tasks.Dataflow Namespace) ist in .NET 6 und höheren Versionen enthalten. Für .NET Framework- und .NET Standard-Projekte müssen Sie das 📦 NuGet-Paket "System.Threading.Tasks.Dataflow" installieren.

Example

Im folgenden Beispiel werden drei Ressourcentypen definiert, NetworkResourceFileResourceund MemoryResourcees werden Vorgänge ausgeführt, wenn Ressourcen verfügbar werden. In diesem Beispiel sind ein NetworkResource- und ein MemoryResource-Paar erforderlich, um den ersten Vorgang durchzuführen, und ein FileResource- und MemoryResource-Paar, um den zweiten Vorgang durchzuführen. Damit diese Vorgänge ausgeführt werden können, wenn alle erforderlichen Ressourcen verfügbar sind, verwendet dieses Beispiel die JoinBlock<T1,T2> Klasse. Wenn ein JoinBlock<T1,T2> Objekt Daten aus allen Quellen empfängt, verteilt es diese Daten an das Ziel, bei dem es sich in diesem Beispiel um ein ActionBlock<TInput> Objekt handelt. Beide JoinBlock<T1,T2> Objekte werden aus einem freigegebenen Pool von MemoryResource Objekten gelesen.

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
   // Represents a resource. A derived class might represent
   // a limited resource such as a memory, network, or I/O
   // device.
   abstract class Resource
   {
   }

   // Represents a memory resource. For brevity, the details of
   // this class are omitted.
   class MemoryResource : Resource
   {
   }

   // Represents a network resource. For brevity, the details of
   // this class are omitted.
   class NetworkResource : Resource
   {
   }

   // Represents a file resource. For brevity, the details of
   // this class are omitted.
   class FileResource : Resource
   {
   }

   static void Main(string[] args)
   {
      // Create three BufferBlock<T> objects. Each object holds a different
      // type of resource.
      var networkResources = new BufferBlock<NetworkResource>();
      var fileResources = new BufferBlock<FileResource>();
      var memoryResources = new BufferBlock<MemoryResource>();

      // Create two non-greedy JoinBlock<T1, T2> objects.
      // The first join works with network and memory resources;
      // the second pool works with file and memory resources.

      var joinNetworkAndMemoryResources =
         new JoinBlock<NetworkResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      var joinFileAndMemoryResources =
         new JoinBlock<FileResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      // Create two ActionBlock<T> objects.
      // The first block acts on a network resource and a memory resource.
      // The second block acts on a file resource and a memory resource.

      var networkMemoryAction =
         new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("Network worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("Network worker: finished using resources...");

               // Release the resources back to their respective pools.
               networkResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      var fileMemoryAction =
         new ActionBlock<Tuple<FileResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("File worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("File worker: finished using resources...");

               // Release the resources back to their respective pools.
               fileResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      // Link the resource pools to the JoinBlock<T1, T2> objects.
      // Because these join blocks operate in non-greedy mode, they do not
      // take the resource from a pool until all resources are available from
      // all pools.

      networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
      memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);

      fileResources.LinkTo(joinFileAndMemoryResources.Target1);
      memoryResources.LinkTo(joinFileAndMemoryResources.Target2);

      // Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

      joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
      joinFileAndMemoryResources.LinkTo(fileMemoryAction);

      // Populate the resource pools. In this example, network and
      // file resources are more abundant than memory resources.

      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());

      memoryResources.Post(new MemoryResource());

      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());

      // Allow data to flow through the network for several seconds.
      Thread.Sleep(10000);
   }
}

/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to use non-greedy join blocks to distribute
' resources among a dataflow network.
Friend Class Program
    ' Represents a resource. A derived class might represent 
    ' a limited resource such as a memory, network, or I/O
    ' device.
    Private MustInherit Class Resource
    End Class

    ' Represents a memory resource. For brevity, the details of 
    ' this class are omitted.
    Private Class MemoryResource
        Inherits Resource
    End Class

    ' Represents a network resource. For brevity, the details of 
    ' this class are omitted.
    Private Class NetworkResource
        Inherits Resource
    End Class

    ' Represents a file resource. For brevity, the details of 
    ' this class are omitted.
    Private Class FileResource
        Inherits Resource
    End Class

    Shared Sub Main(ByVal args() As String)
        ' Create three BufferBlock<T> objects. Each object holds a different
        ' type of resource.
        Dim networkResources = New BufferBlock(Of NetworkResource)()
        Dim fileResources = New BufferBlock(Of FileResource)()
        Dim memoryResources = New BufferBlock(Of MemoryResource)()

        ' Create two non-greedy JoinBlock<T1, T2> objects. 
        ' The first join works with network and memory resources; 
        ' the second pool works with file and memory resources.

        Dim joinNetworkAndMemoryResources = New JoinBlock(Of NetworkResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

        Dim joinFileAndMemoryResources = New JoinBlock(Of FileResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

        ' Create two ActionBlock<T> objects. 
        ' The first block acts on a network resource and a memory resource.
        ' The second block acts on a file resource and a memory resource.

        Dim networkMemoryAction = New ActionBlock(Of Tuple(Of NetworkResource, MemoryResource))(Sub(data)
                                                                                                    ' Perform some action on the resources.
                                                                                                    ' Print a message.
                                                                                                    ' Simulate a lengthy operation that uses the resources.
                                                                                                    ' Print a message.
                                                                                                    ' Release the resources back to their respective pools.
                                                                                                    Console.WriteLine("Network worker: using resources...")
                                                                                                    Thread.Sleep(New Random().Next(500, 2000))
                                                                                                    Console.WriteLine("Network worker: finished using resources...")
                                                                                                    networkResources.Post(data.Item1)
                                                                                                    memoryResources.Post(data.Item2)
                                                                                                End Sub)

        Dim fileMemoryAction = New ActionBlock(Of Tuple(Of FileResource, MemoryResource))(Sub(data)
                                                                                              ' Perform some action on the resources.
                                                                                              ' Print a message.
                                                                                              ' Simulate a lengthy operation that uses the resources.
                                                                                              ' Print a message.
                                                                                              ' Release the resources back to their respective pools.
                                                                                              Console.WriteLine("File worker: using resources...")
                                                                                              Thread.Sleep(New Random().Next(500, 2000))
                                                                                              Console.WriteLine("File worker: finished using resources...")
                                                                                              fileResources.Post(data.Item1)
                                                                                              memoryResources.Post(data.Item2)
                                                                                          End Sub)

        ' Link the resource pools to the JoinBlock<T1, T2> objects.
        ' Because these join blocks operate in non-greedy mode, they do not
        ' take the resource from a pool until all resources are available from
        ' all pools.

        networkResources.LinkTo(joinNetworkAndMemoryResources.Target1)
        memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2)

        fileResources.LinkTo(joinFileAndMemoryResources.Target1)
        memoryResources.LinkTo(joinFileAndMemoryResources.Target2)

        ' Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

        joinNetworkAndMemoryResources.LinkTo(networkMemoryAction)
        joinFileAndMemoryResources.LinkTo(fileMemoryAction)

        ' Populate the resource pools. In this example, network and 
        ' file resources are more abundant than memory resources.

        networkResources.Post(New NetworkResource())
        networkResources.Post(New NetworkResource())
        networkResources.Post(New NetworkResource())

        memoryResources.Post(New MemoryResource())

        fileResources.Post(New FileResource())
        fileResources.Post(New FileResource())
        fileResources.Post(New FileResource())

        ' Allow data to flow through the network for several seconds.
        Thread.Sleep(10000)

    End Sub

End Class

' Sample output:
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'

Um die effiziente Verwendung des freigegebenen Pool von MemoryResource-Objekten zu ermöglichen, wird in diesem Beispiel ein GroupingDataflowBlockOptions-Objekt angegeben, das die Greedy-Eigenschaft auf False gesetzt hat, um JoinBlock<T1,T2>-Objekte zu erstellen, die im Nicht-Giermodus agieren. Ein zurückhaltender Join-Block verzögert alle eingehenden Nachrichten, bis aus jeder Quelle eine Nachricht verfügbar ist. Wenn eine der verschobenen Nachrichten von einem anderen Block akzeptiert wurde, startet der Verknüpfungsblock den Prozess neu. Der nicht-gierige Modus ermöglicht Verknüpfungsoperationen, die einen oder mehrere Quellblöcke gemeinsam verwenden, um Fortschritte zu erzielen, während die anderen Blöcke auf Daten warten. Wenn ein MemoryResource Objekt in diesem Beispiel dem memoryResources Pool hinzugefügt wird, kann der erste Verknüpfungsblock, der seine zweite Datenquelle empfängt, vorwärts voranschreiten. Wenn in diesem Beispiel der gierige Modus verwendet werden soll, bei dem es sich um den Standard handelt, kann ein Verknüpfungsblock das MemoryResource Objekt übernehmen und warten, bis die zweite Ressource verfügbar ist. Wenn der andere Zusammenführungsblock jedoch seine zweite Datenquelle zur Verfügung hat, kann er keinen Fortschritt machen, da das MemoryResource Objekt vom anderen Zusammenführungsblock übernommen wurde.

Robuste Programmierung

Die Verwendung von nicht gierigen Verknüpfungen kann Ihnen auch helfen, deadlock in Ihrer Anwendung zu verhindern. In einer Softwareanwendung kommt es zu einem Deadlock, wenn zwei oder mehr Prozesse jeweils eine Ressource halten und gegenseitig darauf warten, dass ein anderer Prozess eine andere Ressource freigibt. Betrachten Sie eine Anwendung, die zwei JoinBlock<T1,T2> Objekte definiert. Beide Objekte lesen jeweils Daten aus zwei freigegebenen Quellblöcken. In einem gierigen Modus kann die Anwendung möglicherweise blockieren, wenn ein Verknüpfungsblock aus der ersten Quelle liest und der zweite Verknüpfungsblock aus der zweiten Quelle liest, da beide Verknüpfungsblöcke gegenseitig darauf warten, dass der jeweils andere seine Ressource freigibt. Im nicht gierigen Modus liest jeder Verknüpfungsblock nur dann aus seinen Quellen, wenn alle Daten verfügbar sind, und daher wird das Risiko eines Deadlocks eliminiert.

Siehe auch