Share via


Procedura: utilizzare JoinBlock per leggere dati da più origini

In questo documento viene spiegato come utilizzare la classe JoinBlock<T1,T2> per eseguire un'operazione quando i dati sono disponibili da più origini. Viene inoltre illustrato come utilizzare la modalità non greedy per consentire la condivisione in modo più efficiente di un'origine dati da parte di più blocchi join.

Nota

La libreria del flusso di dati TPL (spazio dei nomi System.Threading.Tasks.Dataflow) non viene distribuita con .NET. Per installare lo spazio dei nomi System.Threading.Tasks.Dataflow in Visual Studio, aprire il progetto in Visual Studio, scegliere Gestisci pacchetti NuGet dal menu Progetto ed eseguire una ricerca online del pacchetto System.Threading.Tasks.Dataflow. In alternativa, per installarlo usando l'interfaccia della riga di comando di .NET Core, eseguire dotnet add package System.Threading.Tasks.Dataflow.

Esempio

Nell'esempio seguente vengono definiti tre tipi di risorsa, NetworkResource, FileResource e MemoryResource e vengono eseguite operazioni quando queste diventano disponibili. Per questo esempio è richiesta una coppia NetworkResource e MemoryResource per eseguire la prima operazione e una coppia FileResource e MemoryResource per eseguire la seconda. Per consentire l'esecuzione di queste operazioni quando tutte le risorse necessarie sono disponibili, nell'esempio viene utilizzata la classe JoinBlock<T1,T2>. Quando tramite un oggetto JoinBlock<T1,T2> vengono ricevuti dati da tutte le origini, questi dati vengono propagati alla relativa destinazione, che in questo esempio è un oggetto ActionBlock<TInput>. La lettura da parte di entrambi gli oggetti JoinBlock<T1,T2> viene eseguita da un pool condiviso di oggetti MemoryResource.

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
   // Represents a resource. A derived class might represent
   // a limited resource such as a memory, network, or I/O
   // device.
   abstract class Resource
   {
   }

   // Represents a memory resource. For brevity, the details of
   // this class are omitted.
   class MemoryResource : Resource
   {
   }

   // Represents a network resource. For brevity, the details of
   // this class are omitted.
   class NetworkResource : Resource
   {
   }

   // Represents a file resource. For brevity, the details of
   // this class are omitted.
   class FileResource : Resource
   {
   }

   static void Main(string[] args)
   {
      // Create three BufferBlock<T> objects. Each object holds a different
      // type of resource.
      var networkResources = new BufferBlock<NetworkResource>();
      var fileResources = new BufferBlock<FileResource>();
      var memoryResources = new BufferBlock<MemoryResource>();

      // Create two non-greedy JoinBlock<T1, T2> objects.
      // The first join works with network and memory resources;
      // the second pool works with file and memory resources.

      var joinNetworkAndMemoryResources =
         new JoinBlock<NetworkResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      var joinFileAndMemoryResources =
         new JoinBlock<FileResource, MemoryResource>(
            new GroupingDataflowBlockOptions
            {
               Greedy = false
            });

      // Create two ActionBlock<T> objects.
      // The first block acts on a network resource and a memory resource.
      // The second block acts on a file resource and a memory resource.

      var networkMemoryAction =
         new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("Network worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("Network worker: finished using resources...");

               // Release the resources back to their respective pools.
               networkResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      var fileMemoryAction =
         new ActionBlock<Tuple<FileResource, MemoryResource>>(
            data =>
            {
               // Perform some action on the resources.

               // Print a message.
               Console.WriteLine("File worker: using resources...");

               // Simulate a lengthy operation that uses the resources.
               Thread.Sleep(new Random().Next(500, 2000));

               // Print a message.
               Console.WriteLine("File worker: finished using resources...");

               // Release the resources back to their respective pools.
               fileResources.Post(data.Item1);
               memoryResources.Post(data.Item2);
            });

      // Link the resource pools to the JoinBlock<T1, T2> objects.
      // Because these join blocks operate in non-greedy mode, they do not
      // take the resource from a pool until all resources are available from
      // all pools.

      networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
      memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);

      fileResources.LinkTo(joinFileAndMemoryResources.Target1);
      memoryResources.LinkTo(joinFileAndMemoryResources.Target2);

      // Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

      joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
      joinFileAndMemoryResources.LinkTo(fileMemoryAction);

      // Populate the resource pools. In this example, network and
      // file resources are more abundant than memory resources.

      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());
      networkResources.Post(new NetworkResource());

      memoryResources.Post(new MemoryResource());

      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());
      fileResources.Post(new FileResource());

      // Allow data to flow through the network for several seconds.
      Thread.Sleep(10000);
   }
}

/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to use non-greedy join blocks to distribute
' resources among a dataflow network.
Friend Class Program
    ' Represents a resource. A derived class might represent 
    ' a limited resource such as a memory, network, or I/O
    ' device.
    Private MustInherit Class Resource
    End Class

    ' Represents a memory resource. For brevity, the details of 
    ' this class are omitted.
    Private Class MemoryResource
        Inherits Resource
    End Class

    ' Represents a network resource. For brevity, the details of 
    ' this class are omitted.
    Private Class NetworkResource
        Inherits Resource
    End Class

    ' Represents a file resource. For brevity, the details of 
    ' this class are omitted.
    Private Class FileResource
        Inherits Resource
    End Class

    Shared Sub Main(ByVal args() As String)
        ' Create three BufferBlock<T> objects. Each object holds a different
        ' type of resource.
        Dim networkResources = New BufferBlock(Of NetworkResource)()
        Dim fileResources = New BufferBlock(Of FileResource)()
        Dim memoryResources = New BufferBlock(Of MemoryResource)()

        ' Create two non-greedy JoinBlock<T1, T2> objects. 
        ' The first join works with network and memory resources; 
        ' the second pool works with file and memory resources.

        Dim joinNetworkAndMemoryResources = New JoinBlock(Of NetworkResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

        Dim joinFileAndMemoryResources = New JoinBlock(Of FileResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})

        ' Create two ActionBlock<T> objects. 
        ' The first block acts on a network resource and a memory resource.
        ' The second block acts on a file resource and a memory resource.

        Dim networkMemoryAction = New ActionBlock(Of Tuple(Of NetworkResource, MemoryResource))(Sub(data)
                                                                                                    ' Perform some action on the resources.
                                                                                                    ' Print a message.
                                                                                                    ' Simulate a lengthy operation that uses the resources.
                                                                                                    ' Print a message.
                                                                                                    ' Release the resources back to their respective pools.
                                                                                                    Console.WriteLine("Network worker: using resources...")
                                                                                                    Thread.Sleep(New Random().Next(500, 2000))
                                                                                                    Console.WriteLine("Network worker: finished using resources...")
                                                                                                    networkResources.Post(data.Item1)
                                                                                                    memoryResources.Post(data.Item2)
                                                                                                End Sub)

        Dim fileMemoryAction = New ActionBlock(Of Tuple(Of FileResource, MemoryResource))(Sub(data)
                                                                                              ' Perform some action on the resources.
                                                                                              ' Print a message.
                                                                                              ' Simulate a lengthy operation that uses the resources.
                                                                                              ' Print a message.
                                                                                              ' Release the resources back to their respective pools.
                                                                                              Console.WriteLine("File worker: using resources...")
                                                                                              Thread.Sleep(New Random().Next(500, 2000))
                                                                                              Console.WriteLine("File worker: finished using resources...")
                                                                                              fileResources.Post(data.Item1)
                                                                                              memoryResources.Post(data.Item2)
                                                                                          End Sub)

        ' Link the resource pools to the JoinBlock<T1, T2> objects.
        ' Because these join blocks operate in non-greedy mode, they do not
        ' take the resource from a pool until all resources are available from
        ' all pools.

        networkResources.LinkTo(joinNetworkAndMemoryResources.Target1)
        memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2)

        fileResources.LinkTo(joinFileAndMemoryResources.Target1)
        memoryResources.LinkTo(joinFileAndMemoryResources.Target2)

        ' Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

        joinNetworkAndMemoryResources.LinkTo(networkMemoryAction)
        joinFileAndMemoryResources.LinkTo(fileMemoryAction)

        ' Populate the resource pools. In this example, network and 
        ' file resources are more abundant than memory resources.

        networkResources.Post(New NetworkResource())
        networkResources.Post(New NetworkResource())
        networkResources.Post(New NetworkResource())

        memoryResources.Post(New MemoryResource())

        fileResources.Post(New FileResource())
        fileResources.Post(New FileResource())
        fileResources.Post(New FileResource())

        ' Allow data to flow through the network for several seconds.
        Thread.Sleep(10000)

    End Sub

End Class

' Sample output:
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'

Per consentire un utilizzo efficiente del pool condiviso di oggetti MemoryResource, in questo esempio viene specificato un oggetto GroupingDataflowBlockOptions con la proprietà Greedy impostata su False per creare oggetti JoinBlock<T1,T2> utilizzati in modalità non greedy. In un blocco join non greedy tutti i messaggi in ingresso vengono posticipati finché non ne è disponibile uno da ogni origine. Se uno dei messaggi posposti viene accettato da un altro blocco, tramite il blocco join viene riavviato il processo. La modalità non greedy consente ai blocchi join in cui vengono condivisi uno o più blocchi di origine di proseguire il lavoro mentre i dati vengono attesi dagli altri blocchi. In questo esempio, se un oggetto MemoryResource viene aggiunto al pool memoryResources, il primo blocco join utilizzato per ricevere la seconda origine dati può continuare. Se in questo esempio venisse utilizzata la modalità greedy, vale a dire l'impostazione predefinita, un blocco join potrebbe accettare l'oggetto MemoryResource e attendere che la seconda risorsa diventi disponibile. Tuttavia, se nell'altro blocco join è presente la seconda origine dati disponibile, non è possibile continuare perché l'oggetto MemoryResource è stato accettato dall'altro blocco join.

Programmazione efficiente

L'utilizzo di join non greedy può inoltre aiutare a impedire un deadlock nell'applicazione. In un'applicazione software si verifica un deadlock quando due o più processi bloccano ognuno una risorsa e attendono entrambi che un altro processo rilasci altre risorse. Si consideri un'applicazione in cui sono definiti due oggetti JoinBlock<T1,T2>. I dati dei due blocchi di origine condivisi possono essere letti da entrambi gli oggetti. In modalità greedy, se la prima origine viene letta dal primo blocco e la seconda origine dal secondo blocco, si potrebbe verificare un deadlock dell'applicazione poiché entrambi i blocchi join sono in attesa del completamento dell'altro per liberare la relativa risorsa. In modalità non greedy, la lettura dalle relative origini da parte di ogni blocco join viene eseguita solo quando tutti i dati sono disponibili e, di conseguenza, il rischio di deadlock è eliminato.

Vedi anche