Udostępnij za pomocą


Przepływ danych (biblioteka równoległa zadań)

Biblioteka równoległa zadań (TPL) udostępnia składniki przepływu danych, które ułatwiają zwiększenie niezawodności aplikacji obsługujących współbieżność. Te składniki przepływu danych są wspólnie określane jako biblioteka przepływów danych TPL. Ten model przepływu danych promuje programowanie oparte na aktorach, zapewniając przekazywanie komunikatów w procesie dla grubszego przepływu danych i zadań potokowych. Składniki przepływu danych bazują na typach i infrastrukturze planowania języka TPL i integrują się z obsługą języków C#, Visual Basic i F# na potrzeby programowania asynchronicznego. Te składniki przepływu danych są przydatne, gdy masz wiele operacji, które muszą komunikować się ze sobą asynchronicznie lub gdy chcesz przetwarzać dane, gdy staną się dostępne. Rozważmy na przykład aplikację, która przetwarza dane obrazu z aparatu internetowego. Korzystając z modelu przepływu danych, aplikacja może przetwarzać ramki obrazów w miarę ich dostępności. Jeśli na przykład aplikacja ulepsza ramki obrazów, wykonując korektę światła lub redukcję czerwonego oka, możesz utworzyć potok składników przepływu danych. Każdy etap potoku może używać bardziej ogólnych funkcji równoległych, takich jak te udostępniane przez TPL, do przekształcania obrazu.

Ten dokument zawiera omówienie biblioteki przepływów danych TPL. Opisuje ona model programowania, wstępnie zdefiniowane typy bloków przepływu danych oraz sposób konfigurowania bloków przepływu danych w celu spełnienia określonych wymagań aplikacji.

Uwaga

Biblioteka przepływów danych TPL (przestrzeń nazw System.Threading.Tasks.Dataflow) nie jest dystrybuowana za pomocą platformy .NET. Aby zainstalować przestrzeń nazw System.Threading.Tasks.Dataflow w programie Visual Studio, otwórz projekt, wybierz pozycję Zarządzaj pakietami NuGet z menu Project i wyszukaj w trybie online pakiet System.Threading.Tasks.Dataflow. Alternatywnie, aby zainstalować go przy użyciu interfejsu wiersza polecenia platformy .NET Core, uruchom polecenie dotnet add package System.Threading.Tasks.Dataflow.

Model programowania

Biblioteka przepływów danych TPL stanowi podstawę do przekazywania komunikatów i równoległego przetwarzania aplikacji intensywnie korzystających z CPU oraz intensywnych operacji we/wy, które charakteryzują się wysoką przepustowością i niskimi opóźnieniami. Zapewnia on również jawną kontrolę nad sposobem buforowania danych i ich przemieszczania się po systemie. Aby lepiej zrozumieć model programowania przepływu danych, rozważ aplikację, która asynchronicznie ładuje obrazy z dysku i tworzy kompozycję z tych obrazów. Tradycyjne modele programowania zwykle wymagają używania wywołań zwrotnych i obiektów synchronizacji, takich jak blokady, do koordynowania zadań i dostępu do udostępnionych danych. Korzystając z modelu programowania przepływu danych, można tworzyć obiekty przepływu danych, które przetwarzają obrazy podczas ich odczytywania z dysku. W modelu przepływu danych deklarujesz sposób obsługi danych, gdy staną się dostępne, a także wszelkie zależności między danymi. Ponieważ środowisko uruchomieniowe zarządza zależnościami między danymi, często można uniknąć wymagania synchronizacji dostępu do udostępnionych danych. Ponadto, ponieważ środowisko uruchomieniowe planuje pracę na podstawie asynchronicznego przybycia danych, przepływ danych może poprawić czas reakcji i przepływność, efektywnie zarządzając podstawowymi wątkami. Przykład, który używa modelu programowania przepływu danych do implementowania przetwarzania obrazów w aplikacji Windows Forms, zobacz Przewodnik: używanie przepływu danych w aplikacji Windows Forms.

Źródła i cele

Biblioteka przepływów danych TPL składa się z bloków przepływu danych, które są strukturami danych, które buforują i przetwarzają dane. TPL definiuje trzy rodzaje bloków przepływu danych: bloki źródłowe, bloki docelowe i bloki propagacji. Blok źródłowy działa jako źródło danych i może być odczytywany. Blok docelowy działa jako odbiorca danych i może być zapisywany. Blok propagacji działa zarówno jako blok źródłowy, jak i blok docelowy, i może być odczytywany i zapisywany. TPL definiuje System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> interfejs do reprezentowania źródeł, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> reprezentowania obiektów docelowych i System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> reprezentowania propagatorów. IPropagatorBlock<TInput,TOutput> dziedziczy zarówno z ISourceBlock<TOutput>, jak i ITargetBlock<TInput>.

Biblioteka przepływów danych TPL udostępnia kilka wstępnie zdefiniowanych typów bloków przepływu danych, które implementują interfejsy ISourceBlock<TOutput>, ITargetBlock<TInput> i IPropagatorBlock<TInput,TOutput>. Te typy bloków przepływu danych zostały opisane w tym dokumencie w sekcji Wstępnie zdefiniowane typy bloków przepływu danych.

Łączące bloki

Bloki przepływu danych można połączyć w potoki, które są sekwencjami liniowymi bloków przepływu danych, lub sieci, które są grafami bloków przepływu danych. Rurociąg jest jedną z form sieci. W przepływie lub sieci źródła asynchronicznie propagują dane do celów, gdy te dane stają się dostępne. Metoda ISourceBlock<TOutput>.LinkTo łączy blok przepływu danych źródłowego z blokiem docelowym. Źródło może być połączone z zerem lub większą liczbą obiektów docelowych; obiekty docelowe można łączyć od zera do wielu źródeł. Możesz jednocześnie dodawać lub usuwać bloki przepływu danych z potoku lub sieci. Wstępnie zdefiniowane typy bloków przepływu danych obsługują wszystkie aspekty bezpieczeństwa w przypadku połączeń i rozłączeń wątków.

Aby zapoznać się z przykładem łączenia bloków przepływu danych w celu utworzenia podstawowego potoku, zobacz Przewodnik: tworzenie potoku przepływu danych. Aby zapoznać się z przykładem łączenia bloków przepływu danych w celu utworzenia bardziej złożonej sieci, zobacz Przewodnik: używanie przepływu danych w aplikacji Windows Forms. Przykład, który odłącza element docelowy od źródła po tym, jak źródło oferuje elementowi docelowemu komunikat, zobacz Instrukcje: odłączanie bloków przepływu danych.

Filtrowanie

Po wywołaniu ISourceBlock<TOutput>.LinkTo metody w celu połączenia źródła z obiektem docelowym można podać delegata, który określa, czy blok docelowy akceptuje lub odrzuca komunikat na podstawie wartości tego komunikatu. Ten mechanizm filtrowania jest przydatnym sposobem zagwarantowania, że blok przepływu danych odbiera tylko określone wartości. W przypadku większości wstępnie zdefiniowanych typów bloków przepływu danych, jeśli blok źródłowy jest połączony z wieloma blokami docelowymi, gdy blok docelowy odrzuca komunikat, źródło oferuje ten komunikat do następnego elementu docelowego. Kolejność, w jakiej źródło oferuje komunikaty do obiektów docelowych, jest definiowana przez źródło i może się różnić w zależności od typu źródła. Większość typów bloków źródłowych przestaje oferować komunikat po zaakceptowaniu tego komunikatu przez jeden element docelowy. Jednym wyjątkiem od tej reguły jest BroadcastBlock<T> klasa, która oferuje każdy komunikat do wszystkich elementów docelowych, nawet jeśli niektóre elementy docelowe odrzucają komunikat. Aby zapoznać się z przykładem, który używa filtrowania do przetwarzania tylko niektórych komunikatów, zobacz Przewodnik: używanie przepływu danych w aplikacji Windows Forms.

Ważne

Ponieważ każdy wstępnie zdefiniowany typ bloku przepływu danych źródła gwarantuje, że komunikaty są propagowane w kolejności, w jakiej są odbierane, każdy komunikat musi być odczytywany z bloku źródłowego, zanim blok źródłowy będzie mógł przetworzyć następny komunikat. W związku z tym, jeśli używasz filtrowania do łączenia wielu obiektów docelowych ze źródłem, upewnij się, że co najmniej jeden blok docelowy odbiera każdy komunikat. W przeciwnym razie Twoja aplikacja może się zawiesić.

Przekazywanie komunikatów

Model programowania przepływu danych jest związany z koncepcją przekazywania komunikatów, gdzie niezależne składniki programu komunikują się ze sobą przez wysyłanie komunikatów. Jednym ze sposobów propagowania komunikatów między składnikami aplikacji jest wywołanie Post metod (synchronicznych) i SendAsync (asynchronicznych) w celu wysyłania komunikatów do docelowych bloków przepływu danych oraz ReceiveReceiveAsyncmetod , i TryReceive do odbierania komunikatów z bloków źródłowych. Te metody można połączyć z potokami lub sieciami przepływu danych, wysyłając dane wejściowe do węzła głównego (bloku docelowego) i odbierając dane wyjściowe z węzła terminalu potoku lub węzłów terminalu sieci (co najmniej jeden blok źródłowy). Możesz również użyć metody Choose, aby odczytać dane z pierwszego podanego źródła, które ma dostępne dane, i wykonać operację na tych danych.

Bloki źródłowe oferują dane blokom docelowym przez wywołanie ITargetBlock<TInput>.OfferMessage metody . Blok docelowy odpowiada na oferowaną wiadomość na jeden z trzech sposobów: może zaakceptować komunikat, odrzucić wiadomość lub odroczyć wiadomość. Gdy element docelowy akceptuje komunikat, metoda OfferMessage zwraca wartość Accepted. Gdy element docelowy odrzuca komunikat, metoda OfferMessage zwraca Declined. Gdy element docelowy wymaga, aby nie odbierał już żadnych komunikatów ze źródła, OfferMessage zwraca wartość DecliningPermanently. Wstępnie zdefiniowane typy bloków źródłowych nie oferują komunikatów połączonym obiektom docelowym po odebraniu takiej wartości zwracanej i automatycznie odłączają się od takich obiektów docelowych.

Gdy blok docelowy odrocza komunikat do późniejszego użycia, metoda OfferMessage zwraca Postponedwartość. Docelowy blok, który odrocza komunikat, może później wywołać metodę ISourceBlock<TOutput>.ReserveMessage , aby spróbować zarezerwować oferowany komunikat. W tym momencie komunikat jest nadal dostępny i może być używany przez blok docelowy, lub komunikat został przejęty przez inny element docelowy. Gdy blok docelowy później wymaga komunikatu lub nie potrzebuje już komunikatu, wywołuje odpowiednio metodę ISourceBlock<TOutput>.ConsumeMessage lub ReleaseReservation . Rezerwacja komunikatów jest zwykle używana przez typy bloków przepływu danych, które działają w trybie oszczędnym. Tryb niemaksymalny jest wyjaśniony w dalszej części tego dokumentu. Zamiast przechowywać odroczony komunikat, blok docelowy może również użyć ISourceBlock<TOutput>.ConsumeMessage metodę, aby spróbować bezpośredniego skonsumowania odroczonego komunikatu.

Zakończenie bloku przepływu danych

Bloki przepływu danych obsługują również koncepcję ukończenia. Blok przepływu danych, który jest w stanie ukończonym, nie wykonuje żadnych dalszych prac. Każdy blok przepływu danych ma skojarzony System.Threading.Tasks.Task obiekt, znany jako zadanie ukończenia, który reprezentuje stan ukończenia bloku. Ponieważ możesz czekać, aż obiekt Task zakończy działanie, używając zadań ukończenia, możesz również poczekać na zakończenie jednego lub więcej węzłów końcowych sieci przepływu danych. Interfejs IDataflowBlock definiuje metodę Complete , która informuje blok przepływu danych o żądaniu jego ukończenia, a Completion właściwość, która zwraca zadanie ukończenia bloku przepływu danych. Zarówno ISourceBlock<TOutput> , jak i ITargetBlock<TInput> dziedziczą IDataflowBlock interfejs.

Istnieją dwa sposoby określania, czy blok przepływu danych został ukończony bez błędu, napotkał co najmniej jeden błąd, czy został anulowany. Pierwszym sposobem jest wywołanie metody Task.Wait na zadaniu zakończeniowym w bloku try-catch (Try-Catch w Visual Basic). Poniższy przykład tworzy obiekt ActionBlock<TInput>, który rzuca ArgumentOutOfRangeException, jeśli jego wartość wejściowa jest mniejsza niż zero. AggregateException jest wyrzucany, gdy ten przykład wywołuje Wait na zadaniu zakończenia. Dostęp do ArgumentOutOfRangeException jest uzyskiwany poprzez właściwość InnerExceptions obiektu AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

W tym przykładzie przedstawiono przypadek, w którym wyjątek jest nieobsługiwany w delegacie bloku przepływu danych wykonania. Zalecamy obsługę wyjątków w ciałach takich bloków. Jeśli jednak nie możesz tego zrobić, blok zachowuje się tak, jakby został anulowany i nie przetwarza komunikatów przychodzących.

Po jawnym anulowaniu bloku przepływu danych, obiekt zawiera AggregateException w właściwości OperationCanceledException. Aby uzyskać więcej informacji na temat anulowania przepływu danych, zobacz sekcję Włączanie anulowania .

Drugim sposobem określenia stanu ukończenia bloku przepływu danych jest użycie kontynuacji zadania ukończenia lub użycie asynchronicznych funkcji języka C# i Visual Basic w celu asynchronicznego oczekiwania na zadanie ukończenia. Delegat, który przekazujesz do metody Task.ContinueWith, przyjmuje obiekt Task, który reprezentuje poprzedzające zadanie. W przypadku właściwości Completion delegat kontynuacji przyjmuje zadanie dotyczące samego ukończenia. Poniższy przykład przypomina poprzedni, z tą różnicą, że używa ContinueWith również metody do utworzenia zadania kontynuacji, które wyświetla stan ogólnej operacji przepływu danych.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine($"n = {n}");
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine($"The status of the completion task is '{task.Status}'.");
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine($"Encountered {e.GetType().Name}: {e.Message}");
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Można również użyć właściwości, takich jak IsCanceled w treści zadania kontynuacji, aby określić dodatkowe informacje o stanie ukończenia bloku przepływu danych. Aby uzyskać więcej informacji na temat zadań kontynuacji i sposobu ich powiązania z anulowaniem i obsługą błędów, zobacz Łączenie zadań za pomocą zadań kontynuacji, anulowanie zadań i obsługa wyjątków.

Wstępnie zdefiniowane typy bloków przepływu danych

Biblioteka przepływów danych TPL udostępnia kilka wstępnie zdefiniowanych typów bloków przepływu danych. Te typy są podzielone na trzy kategorie: bloki buforujące, bloki wykonawcze i bloki grupujące. W poniższych sekcjach opisano typy bloków tworzące te kategorie.

Bloki buforowania

Buforowanie bloków przechowuje dane do użycia przez użytkowników danych. Biblioteka przepływów danych TPL udostępnia trzy typy bloków buforowania: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>i System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

Klasa BufferBlock<T> reprezentuje asynchroniczną strukturę obsługi komunikatów ogólnego przeznaczenia. Ta klasa przechowuje kolejkę w modelu FIFO komunikatów, które mogą być zapisywane przez wiele źródeł lub odczytywane przez wiele celów. Gdy obiekt docelowy odbiera komunikat z BufferBlock<T> obiektu, ten komunikat zostanie usunięty z kolejki komunikatów. W związku z tym, mimo że BufferBlock<T> obiekt może mieć wiele obiektów docelowych, tylko jeden obiekt docelowy otrzyma każdy komunikat. Klasa jest przydatna BufferBlock<T> , gdy chcesz przekazać wiele komunikatów do innego składnika, a ten składnik musi odbierać każdy komunikat.

Poniższy podstawowy przykład wysyła kilka wartości Int32 do obiektu BufferBlock<T>, a następnie odczytuje te wartości z powrotem z tego obiektu.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Pełny przykład przedstawiający sposób zapisywania komunikatów do obiektu i odczytywania ich z obiektu można znaleźć w temacie BufferBlock<T>.

BroadcastBlock<T>

Klasa jest przydatna BroadcastBlock<T> , gdy musisz przekazać wiele komunikatów do innego składnika, ale ten składnik wymaga tylko najnowszej wartości. Ta klasa jest również przydatna, gdy chcesz rozgłaszać komunikat do wielu komponentów.

Poniższy podstawowy przykład publikuje wartość Double do obiektu BroadcastBlock<T>, a następnie kilkukrotnie odczytuje tę wartość z powrotem z tego obiektu. Ponieważ wartości nie są usuwane z BroadcastBlock<T> obiektów po ich odczytaniu, ta sama wartość jest dostępna za każdym razem.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Pełny przykład przedstawiający sposób wysyłania BroadcastBlock<T> komunikatu do wielu bloków docelowych można znaleźć w temacie How to: Specify a Task Scheduler in a Dataflow Block (Instrukcje: określanie harmonogramu zadań w bloku przepływu danych).

WriteOnceBlock<T>

Klasa WriteOnceBlock<T> przypomina klasę BroadcastBlock<T>, z tą różnicą, że obiekt WriteOnceBlock<T> może być zapisywany tylko raz. Można traktować WriteOnceBlock<T> jako podobne do słowa kluczowego readonly w języku C# (ReadOnly w Visual Basic), z tym że obiekt WriteOnceBlock<T> staje się niezmienny po otrzymaniu wartości, a nie podczas konstrukcji. Podobnie jak klasa BroadcastBlock<T> , gdy obiekt docelowy odbiera komunikat z WriteOnceBlock<T> obiektu, ten komunikat nie jest usuwany z tego obiektu. W związku z tym wiele obiektów docelowych otrzymuje kopię komunikatu. Klasa jest przydatna WriteOnceBlock<T> , gdy chcesz propagować tylko pierwszy z wielu komunikatów.

Poniższy podstawowy przykład umieszcza wiele wartości w obiekcie String, a następnie odczytuje te wartości z powrotem z tego obiektu. Ponieważ WriteOnceBlock<T> obiekt może być zapisany tylko raz, po tym jak WriteOnceBlock<T> obiekt odbierze komunikat, odrzuca kolejne komunikaty.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Aby uzyskać kompletny przykład ilustrujący sposób użycia WriteOnceBlock<T> do otrzymania wartości pierwszej zakończonej operacji, zobacz Instrukcje: odłączanie bloków przepływu danych.

Bloki wykonywania

Bloki wykonawcze wywołują delegata dostarczonego przez użytkownika dla każdej części odebranych danych. Biblioteka przepływów danych TPL udostępnia trzy typy bloków wykonywania: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>i System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

Klasa ActionBlock<TInput> jest blokiem docelowym, który wywołuje delegata, gdy odbiera dane. Obiekt można ActionBlock<TInput> traktować jako delegata, który jest uruchamiany asynchronicznie, gdy dane staną się dostępne. Delegat, który podajesz do ActionBlock<TInput> obiektu, może być typu Action<T> lub typu System.Func<TInput, Task>. Gdy używasz ActionBlock<TInput> obiektu z elementem Action<T>, przetwarzanie każdego elementu wejściowego jest uznawane za ukończone po powrocie delegata. Gdy używasz ActionBlock<TInput> obiektu z elementem System.Func<TInput, Task>, przetwarzanie każdego elementu wejściowego jest uznawane za ukończone tylko po zakończeniu zwracanego Task obiektu. Korzystając z tych dwóch mechanizmów, można użyć ActionBlock<TInput> zarówno do synchronicznego, jak i asynchronicznego przetwarzania każdego elementu wejściowego.

Poniższy podstawowy przykład wysyła wiele Int32 wartości do obiektu ActionBlock<TInput>. Obiekt ActionBlock<TInput> wyświetla te wartości w konsoli programu . W tym przykładzie ustawiono blok na stan ukończony i oczekuje na zakończenie wszystkich zadań przepływu danych.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Aby zapoznać się z kompletnymi przykładami, które pokazują, jak używać delegatów z klasą ActionBlock<TInput> , zobacz Instrukcje: wykonywanie akcji, gdy blok przepływu danych odbiera dane.

TransformBlock<TInput, TOutput>

Klasa TransformBlock<TInput,TOutput> przypomina klasę ActionBlock<TInput> , z tą różnicą, że działa zarówno jako źródło, jak i jako element docelowy. Delegat przekazywany do TransformBlock<TInput,TOutput> obiektu zwraca wartość typu TOutput. Delegat, który podajesz do TransformBlock<TInput,TOutput> obiektu, może być typu System.Func<TInput, TOutput> lub typu System.Func<TInput, Task<TOutput>>. Gdy używasz TransformBlock<TInput,TOutput> obiektu z elementem System.Func<TInput, TOutput>, przetwarzanie każdego elementu wejściowego jest uznawane za ukończone po powrocie delegata. W przypadku korzystania z obiektu TransformBlock<TInput,TOutput> używanego z System.Func<TInput, Task<TOutput>>, przetwarzanie każdego elementu wejściowego jest uznawane za ukończone tylko po zakończeniu zwracanego obiektu Task<TResult>. Korzystając z tych dwóch mechanizmów, podobnie jak w przypadku ActionBlock<TInput>, można użyć TransformBlock<TInput,TOutput> zarówno do synchronicznego, jak i asynchronicznego przetwarzania każdego elementu wejściowego.

Poniższy podstawowy przykład tworzy TransformBlock<TInput,TOutput> obiekt, który oblicza pierwiastek kwadratowy danych wejściowych. Obiekt TransformBlock<TInput,TOutput> przyjmuje Int32 wartości jako dane wejściowe i generuje Double wartości jako dane wyjściowe.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Aby zapoznać się z kompletnymi przykładami używanymi TransformBlock<TInput,TOutput> w sieci bloków przepływu danych, które wykonują przetwarzanie obrazów w aplikacji Windows Forms, zobacz Przewodnik: używanie przepływu danych w aplikacji Windows Forms.

TransformManyBlock<TInput, TOutput>

Klasa TransformManyBlock<TInput,TOutput> przypomina klasę TransformBlock<TInput,TOutput> , z tą różnicą, że TransformManyBlock<TInput,TOutput> generuje zero lub więcej wartości wyjściowych dla każdej wartości wejściowej, a nie tylko jedną wartość wyjściową dla każdej wartości wejściowej. Delegat, który podajesz do TransformManyBlock<TInput,TOutput> obiektu, może być typu System.Func<TInput, IEnumerable<TOutput>> lub typu System.Func<TInput, Task<IEnumerable<TOutput>>>. Gdy używasz TransformManyBlock<TInput,TOutput> obiektu z elementem System.Func<TInput, IEnumerable<TOutput>>, przetwarzanie każdego elementu wejściowego jest uznawane za ukończone po powrocie delegata. Gdy używasz TransformManyBlock<TInput,TOutput> obiektu z elementem System.Func<TInput, Task<IEnumerable<TOutput>>>, przetwarzanie każdego elementu wejściowego jest uznawane za ukończone tylko po zakończeniu zwracanego System.Threading.Tasks.Task<IEnumerable<TOutput>> obiektu.

Poniższy podstawowy przykład tworzy TransformManyBlock<TInput,TOutput> obiekt, który dzieli ciągi na ich poszczególne sekwencje znaków. Obiekt TransformManyBlock<TInput,TOutput> przyjmuje String wartości jako dane wejściowe i generuje Char wartości jako dane wyjściowe.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Aby zapoznać się z kompletnymi przykładami pokazującymi, jak używać TransformManyBlock<TInput,TOutput> do tworzenia wielu niezależnych wyników dla każdego wejścia w potoku przepływu danych, zobacz Przewodnik: tworzenie potoku przepływu danych.

Stopień równoległości

Każdy ActionBlock<TInput>obiekt , TransformBlock<TInput,TOutput>i TransformManyBlock<TInput,TOutput> buforuje komunikaty wejściowe do momentu, aż blok będzie gotowy do ich przetworzenia. Domyślnie te klasy przetwarzają komunikaty w kolejności, w jakiej są odbierane, jeden komunikat jednocześnie. Można również określić stopień równoległości, aby umożliwić obiektom ActionBlock<TInput>i TransformBlock<TInput,TOutput>TransformManyBlock<TInput,TOutput> przetwarzanie wielu komunikatów jednocześnie. Aby uzyskać więcej informacji na temat współbieżnego wykonywania, zobacz sekcję Określanie stopnia równoległości w dalszej części tego dokumentu. Aby zapoznać się z przykładem ustawienia stopnia równoległości, który pozwala blokowi przepływu danych przetwarzać więcej niż jeden komunikat jednocześnie, zobacz Jak: Określać stopień równoległości w bloku przepływu danych (Instrukcje: określanie stopnia równoległości w bloku przepływu danych).

Podsumowanie typów delegatów

W poniższej tabeli podsumowano typy delegatów, które można udostępnić obiektom ActionBlock<TInput>, TransformBlock<TInput,TOutput>i TransformManyBlock<TInput,TOutput> . Ta tabela określa również, czy typ delegata działa synchronicznie, czy asynchronicznie.

Typ Synchroniczny typ delegata Typ delegata asynchronicznego
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Można również używać wyrażeń lambda przy pracy z typami bloków wykonawczych. Przykład pokazujący, jak używać wyrażenia lambda z blokiem wykonania, zobacz Jak: Wykonać Działanie, Gdy Blok Przepływu Danych Odbiera Dane.

Bloki grupowania

Grupowanie bloków łączy dane z co najmniej jednego źródła pod różnymi ograniczeniami. Biblioteka przepływów danych TPL udostępnia trzy typy bloków sprzężenia: BatchBlock<T>, JoinBlock<T1,T2>i BatchedJoinBlock<T1,T2>.

BatchBlock<T>

Klasa BatchBlock<T> łączy zestawy danych wejściowych, które są nazywane partiami, w tablice danych wyjściowych. Podczas tworzenia BatchBlock<T> obiektu należy określić rozmiar każdej partii. BatchBlock<T> Gdy obiekt odbiera określoną liczbę elementów wejściowych, asynchronicznie propaguje tablicę zawierającą te elementy. BatchBlock<T> Jeśli obiekt jest ustawiony na ukończony stan, ale nie zawiera wystarczającej liczby elementów do utworzenia partii, propaguje końcową tablicę zawierającą pozostałe elementy wejściowe.

Klasa BatchBlock<T> działa w trybie chciwości lub niechłaństwa . W trybie chciwości, który jest domyślny, BatchBlock<T> obiekt akceptuje każdy komunikat, który jest oferowany i propaguje tablicę po otrzymaniu określonej liczby elementów. W trybie niechłannym obiekt odkłada wszystkie komunikaty przychodzące do momentu, aż wystarczająca liczba źródeł przekaże komunikaty do bloku, aby utworzyć partię. Tryb chciwy zazwyczaj działa lepiej niż niechłanny tryb, ponieważ wymaga mniejszego obciążenia związanego z przetwarzaniem. Można jednak użyć trybu leniwego, gdy konieczne jest koordynowanie zużycia z wielu źródeł w sposób atomowy. Określ tryb nieżarłoczny, ustawiając Greedy na False w parametrze dataflowBlockOptions w konstruktorze BatchBlock<T>.

Poniższy podstawowy przykład przesyła kilka Int32 wartości do BatchBlock<T> obiektu, który przechowuje dziesięć elementów w jednym zbiorze. Aby zagwarantować, że wszystkie wartości są propagowane z elementu BatchBlock<T>, ten przykład wywołuje metodę Complete. Metoda Complete ustawia BatchBlock<T> obiekt na stan ukończony, a w związku z tym BatchBlock<T> obiekt propaguje wszystkie pozostałe elementy jako końcową partię.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine($"The sum of the elements in batch 1 is {batchBlock.Receive().Sum()}.");

Console.WriteLine($"The sum of the elements in batch 2 is {batchBlock.Receive().Sum()}.");

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Aby zapoznać się z kompletnym przykładem używanym BatchBlock<T> do poprawy wydajności operacji wstawiania bazy danych, zobacz Przewodnik: używanie funkcji BatchBlock i BatchedJoinBlock w celu zwiększenia wydajności.

JoinBlock<T1, T2, ...>

Klasy JoinBlock<T1,T2> i JoinBlock<T1,T2,T3> zbierają elementy wejściowe i propagują obiekty System.Tuple<T1,T2> lub System.Tuple<T1,T2,T3> zawierające te elementy. Klasy JoinBlock<T1,T2> i JoinBlock<T1,T2,T3> nie dziedziczą z klasy ITargetBlock<TInput>. Zamiast tego udostępniają właściwości , Target1, Target2i Target3, które implementują ITargetBlock<TInput>.

Podobnie jak BatchBlock<T>, JoinBlock<T1,T2> i JoinBlock<T1,T2,T3> działają w trybie łapczywym lub niełapczywym. W trybie zachłannym, który jest domyślny, obiekt JoinBlock<T1,T2> lub JoinBlock<T1,T2,T3> akceptuje każdy oferowany komunikat i przesyła krotkę po tym, jak każdy z docelowych obiektów otrzyma co najmniej jeden komunikat. W trybie niechłannym obiekt JoinBlock<T1,T2> lub JoinBlock<T1,T2,T3> odkłada wszystkie przychodzące komunikaty, dopóki dane wymagane do utworzenia krotki nie zostaną zaoferowane wszystkim obiektom docelowym. W tym momencie blok przystępuje do realizacji dwufazowego protokołu zatwierdzania, aby atomowo pobrać wszystkie wymagane elementy ze źródeł. To odroczenie umożliwia innej jednostce korzystanie z danych w międzyczasie w celu umożliwienia ogólnego systemu wprowadzania postępów.

Poniższy podstawowy przykład przedstawia przypadek, w którym JoinBlock<T1,T2,T3> obiekt wymaga wielu danych do obliczenia wartości. W tym przykładzie tworzony jest obiekt JoinBlock<T1,T2,T3>, który do wykonania operacji arytmetycznej wymaga dwóch wartości Int32 oraz jednej wartości Char.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine($"{data.Item1} + {data.Item2} = {data.Item1 + data.Item2}");
         break;
      case '-':
         Console.WriteLine($"{data.Item1} - {data.Item2} = {data.Item1 - data.Item2}");
         break;
      default:
         Console.WriteLine($"Unknown operator '{data.Item3}'.");
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Pełny przykład przedstawiający użycie JoinBlock<T1,T2> obiektów w trybie niechłannym do współdzielenia zasobu w sposób kooperacyjny znajdziesz w sekcji Jak używać funkcji JoinBlock do odczytywania danych z wielu źródeł.

BatchedJoinBlock<T1, T2, ...>

Klasy BatchedJoinBlock<T1,T2> i BatchedJoinBlock<T1,T2,T3> zbierają partie elementów wejściowych i propagują na zewnątrz obiekty System.Tuple(IList(T1), IList(T2)) lub System.Tuple(IList(T1), IList(T2), IList(T3)), które zawierają te elementy. Pomyśl o BatchedJoinBlock<T1,T2> jako o połączeniu BatchBlock<T> i JoinBlock<T1,T2>. Określ rozmiar każdej partii podczas tworzenia BatchedJoinBlock<T1,T2> obiektu. BatchedJoinBlock<T1,T2> udostępnia również właściwości, Target1 oraz Target2, które implementują ITargetBlock<TInput>. Gdy określona liczba elementów wejściowych jest odbierana ze wszystkich obiektów docelowych, BatchedJoinBlock<T1,T2> obiekt asynchronicznie propaguje System.Tuple(IList(T1), IList(T2)) obiekt zawierający te elementy.

Poniższy podstawowy przykład tworzy BatchedJoinBlock<T1,T2> obiekt, który przechowuje wyniki, Int32 wartości i błędy, które są Exception obiektami. W tym przykładzie wykonywane są różne operacje, a wyniki zapisywane są we właściwości Target1, a błędy we właściwości Target2 obiektu BatchedJoinBlock<T1,T2>. Ponieważ liczba pomyślnych i nieudanych operacji jest nieznana z góry, obiekty IList<T> umożliwiają każdemu celowi odbieranie zerowej lub większej liczby wartości.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Pełny przykład, który używa BatchedJoinBlock<T1,T2> do przechwytywania wyników i wszelkich wyjątków występujących podczas odczytu danych przez program z bazy danych, można znaleźć w artykule Przewodnik: Używanie funkcji BatchBlock i BatchedJoinBlock w celu poprawy wydajności.

Konfigurowanie zachowania bloku przepływu danych

Możesz włączyć dodatkowe opcje, podając System.Threading.Tasks.Dataflow.DataflowBlockOptions obiekt konstruktorowi typów bloków przepływu danych. Te opcje kontrolują zachowanie, takie jak harmonogram zarządzający zadaniem podstawowym i stopień równoległości. Obiekt DataflowBlockOptions zawiera również typy pochodne, które określają zachowanie specyficzne dla niektórych typów bloków przepływu danych. Poniższa tabela zawiera podsumowanie typu opcji skojarzonych z każdym typem bloku przepływu danych.

Typ bloku przepływu danych Typ: DataflowBlockOptions
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Poniższe sekcje zawierają dodatkowe informacje na temat ważnych rodzajów opcji bloków przepływu danych dostępnych za pośrednictwem System.Threading.Tasks.Dataflow.DataflowBlockOptionsklas , System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionsi System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions .

Określanie harmonogramu zadań

Każdy wstępnie zdefiniowany blok przepływu danych używa mechanizmu planowania zadań TPL do wykonywania działań, takich jak propagowanie danych do miejsca docelowego, odbieranie danych ze źródła i uruchamianie delegatów zdefiniowanych przez użytkownika, gdy dane staną się dostępne. TaskScheduler jest abstrakcyjną klasą reprezentującą harmonogram zadań, który kolejkuje zadania do wątków. Domyślny harmonogram zadań Default używa klasy ThreadPool do kolejkowania i wykonywania zadań. Domyślny harmonogram zadań można zastąpić, ustawiając TaskScheduler właściwość podczas konstruowania obiektu bloku przepływu danych.

Gdy ten sam harmonogram zadań zarządza wieloma blokami przepływu danych, może wymusić między nimi zasady. Na przykład, jeśli wiele bloków przepływu danych jest skonfigurowanych, aby kierować na wyłączny harmonogram tego samego obiektu ConcurrentExclusiveSchedulerPair, wszystkie zadania wykonywane w tych blokach są serializowane. Podobnie, jeśli te bloki są skonfigurowane do kierowania na współbieżny harmonogram tego samego ConcurrentExclusiveSchedulerPair obiektu, i ten harmonogram jest skonfigurowany tak, aby miał maksymalny poziom współbieżności, cała praca z tych bloków jest ograniczona do tej liczby równoczesnych operacji. Przykład użycia ConcurrentExclusiveSchedulerPair klasy w celu umożliwienia równoległego wykonywania operacji odczytu, podczas gdy operacje zapisu są wykonywane wyłącznie, bez innych operacji, znajdziesz w Instrukcje: Określanie harmonogramu zadań w bloku przepływu danych. Aby uzyskać więcej informacji na temat harmonogramów zadań w TPL, zobacz TaskScheduler temat dotyczący klasy.

Określanie stopnia równoległości

Domyślnie trzy typy bloków wykonywania, które biblioteka przepływów danych TPL udostępnia, ActionBlock<TInput>, TransformBlock<TInput,TOutput>i TransformManyBlock<TInput,TOutput>, przetwarzają jeden komunikat jednocześnie. Te typy bloków przepływu danych przetwarzają również komunikaty w kolejności, w jakiej są odbierane. Aby umożliwić blokom przepływu danych przetwarzanie komunikatów jednocześnie, ustaw ExecutionDataflowBlockOptions.MaxDegreeOfParallelism właściwość podczas konstruowania obiektu bloku przepływu danych.

Wartość domyślna MaxDegreeOfParallelism to 1, co gwarantuje, że blok przepływu danych przetwarza jeden komunikat jednocześnie. Ustawienie tej właściwości na wartość większą niż 1 umożliwia blokowi przepływu danych przetwarzanie wielu komunikatów jednocześnie. Ustawienie tej właściwości na DataflowBlockOptions.Unbounded umożliwia bazowemu harmonogramowi zadań zarządzanie maksymalnym stopniem współbieżności.

Ważne

Po określeniu maksymalnego stopnia równoległości, który jest większy niż 1, wiele komunikatów jest przetwarzanych jednocześnie i w związku z tym komunikaty mogą nie być przetwarzane w kolejności, w której są odbierane. Kolejność, w jakiej komunikaty są wyprowadzane z bloku, jest taka sama, w jakiej są odbierane.

MaxDegreeOfParallelism Ponieważ właściwość reprezentuje maksymalny stopień równoległości, blok przepływu danych może być wykonywany z mniejszym stopniem równoległości niż określono. Blok przepływu danych może używać mniejszego stopnia równoległości w celu spełnienia wymagań funkcjonalnych lub braku dostępnych zasobów systemowych. Blok przepływu danych nigdy nie wybiera większej równoległości niż określasz.

Wartość MaxDegreeOfParallelism właściwości jest wyłączna dla każdego obiektu bloku przepływu danych. Jeśli na przykład cztery obiekty bloku przepływu danych określają 1 dla maksymalnego stopnia równoległości, wszystkie cztery obiekty bloków przepływu danych mogą być uruchamiane równolegle.

Aby uzyskać przykład, który ustawia maksymalny stopień równoległości, aby umożliwić równoległe wykonywanie długotrwałych operacji, zobacz How to: Specify the Degree of Parallelism in a Dataflow Block (Instrukcje: określanie stopnia równoległości w bloku przepływu danych).

Określanie liczby komunikatów na zadanie

Wstępnie zdefiniowane typy bloków przepływu danych używają zadań do przetwarzania wielu elementów wejściowych. Pomaga to zminimalizować liczbę obiektów zadań wymaganych do przetwarzania danych, co pozwala aplikacjom działać wydajniej. Jednak gdy zadania z jednego zestawu bloków przepływu danych przetwarzają dane, zadania z innych bloków przepływu danych mogą wymagać oczekiwania na czas przetwarzania przez kolejkowanie komunikatów. Aby umożliwić lepszą równowagę między zadaniami przepływu danych, ustaw właściwość MaxMessagesPerTask. Gdy MaxMessagesPerTask jest ustawiony na DataflowBlockOptions.Unbounded, która jest wartością domyślną, zadanie używane przez blok przepływu danych przetwarza tyle komunikatów, ile dostępnych. Gdy MaxMessagesPerTask parametr jest ustawiony na wartość inną niż Unbounded, blok przepływu danych przetwarza maksymalnie tę liczbę komunikatów na Task obiekt. Mimo że ustawienie właściwości MaxMessagesPerTask może zwiększyć uczciwość między zadaniami, to może również spowodować, że system utworzy więcej zadań, niż jest to konieczne, co z kolei może zmniejszyć wydajność.

Umożliwianie anulowania

TPL zapewnia mechanizm, który umożliwia zadaniom współdziałanie w celu koordynowania anulowania. Aby bloki przepływu danych mogły uczestniczyć w tym mechanizmie anulowania, ustaw właściwość CancellationToken. Gdy ten CancellationToken obiekt jest ustawiony na stan anulowany, wszystkie bloki przepływu danych, które monitorują ten token, kończą wykonywanie bieżącego elementu, ale nie rozpoczynają przetwarzania kolejnych elementów. Te bloki przepływu danych usuwają również wszystkie buforowane komunikaty, zwalniają połączenia z dowolnymi blokami źródłowymi i docelowymi oraz przechodzą do stanu anulowanego. Przechodząc do stanu anulowanego, właściwość Completion ma ustawioną wartość Status na Canceled, chyba że wystąpił wyjątek podczas przetwarzania. W takim przypadku Status zostaje ustawione na Faulted.

Aby zapoznać się z przykładem użycia anulowania w aplikacji Windows Forms, zobacz Instrukcje: anulowanie bloku przepływu danych. Aby uzyskać więcej informacji na temat anulowania w TPL, zobacz Anulowanie zadania.

Określanie chciwych i niechłannych zachowań

Niektóre grupujące bloki przepływu danych mogą działać w trybie łakomym lub niełakomym. Domyślnie wstępnie zdefiniowane typy bloków przepływu danych działają w trybie chciwości.

W przypadku typów bloków łączenia, takich jak JoinBlock<T1,T2>, tryb zachłanny oznacza, że blok natychmiast akceptuje dane, nawet jeśli odpowiednie dane do połączenia nie są jeszcze dostępne. Tryb niechłanny oznacza, że blok odłoży wszystkie komunikaty przychodzące, dopóki jeden nie będzie dostępny dla każdego z jego celów, aby ukończyć sprzężenie. Jeśli którykolwiek z odroczonych komunikatów nie jest już dostępny, blok łączenia usuwa wszystkie odroczone komunikaty i ponownie uruchamia proces. Dla klasy BatchBlock<T>, zachowanie chciwe i niechciwe jest podobne, z wyjątkiem, że w trybie niechciwym obiekt odkłada wszystkie przychodzące komunikaty BatchBlock<T>, dopóki nie będzie ich wystarczająco z różnych źródeł do ukończenia partii.

Aby określić tryb niechłanny dla bloku przepływu danych, ustaw wartość GreedyFalse. Przykład przedstawiający sposób używania trybu niechłannego w celu umożliwienia wydajniejszego udostępniania źródła danych wielu bloków sprzężenia, zobacz Instrukcje: używanie funkcji JoinBlock do odczytywania danych z wielu źródeł.

Niestandardowe bloki przepływu danych

Mimo że biblioteka przepływów danych TPL udostępnia wiele wstępnie zdefiniowanych typów bloków, można utworzyć dodatkowe typy bloków, które wykonują niestandardowe zachowanie. Zaimplementuj bezpośrednio interfejsy ISourceBlock<TOutput> lub ITargetBlock<TInput> albo użyj metody Encapsulate, aby utworzyć złożony blok, który enkapsuluje zachowanie istniejących typów bloków. Przykłady pokazujące sposób implementowania niestandardowych funkcji bloku przepływu danych można znaleźć w temacie Przewodnik: tworzenie niestandardowego typu bloku przepływu danych.

Nazwa Opis
Instrukcje: zapisywanie komunikatów i odczytywanie komunikatów z bloku przepływu danych Pokazuje, jak zapisywać komunikaty i odczytywać komunikaty z BufferBlock<T> obiektu.
Instrukcje: implementowanie wzorca przepływu danych Producer-Consumer Opisuje sposób użycia modelu przepływu danych w celu zaimplementowania wzorca producenta-konsumenta, w którym producent wysyła komunikaty do bloku przepływu danych, a użytkownik odczytuje komunikaty z tego bloku.
Instrukcje: wykonywanie akcji, gdy blok przepływu danych odbiera dane Opisuje sposób dostarczania delegatów do typów bloków przepływu danych odpowiadających za wykonanie, ActionBlock<TInput>, TransformBlock<TInput,TOutput>, i TransformManyBlock<TInput,TOutput>.
Przewodnik: tworzenie potoku przepływu danych Opisuje sposób tworzenia potoku przepływu danych, który pobiera tekst z Internetu i wykonuje operacje na tym tekście.
Instrukcje: odłączanie bloków przepływu danych Pokazuje, jak użyć metody LinkTo, aby odłączyć blok docelowy od jego źródła po tym, jak źródło oferuje komunikat do celu.
Przewodnik: używanie przepływu danych w aplikacji Windows Forms Pokazuje, jak utworzyć sieć bloków przepływu danych, które wykonują przetwarzanie obrazów w aplikacji Windows Forms.
Instrukcje: anulowanie bloku przepływu danych Demonstruje sposób używania anulowania w aplikacji Windows Forms.
Instrukcje: odczytywanie danych z wielu źródeł przy użyciu funkcji JoinBlock W tym artykule wyjaśniono, jak używać klasy JoinBlock<T1,T2> do wykonywania operacji, gdy dane są dostępne z wielu źródeł oraz jak używać trybu leniwego, aby umożliwić wielu blokom sprzężenia wydajniejsze udostępnianie źródła danych.
Instrukcje: określanie stopnia równoległości w bloku przepływu danych Opisuje sposób ustawiania właściwości MaxDegreeOfParallelism bloku przepływu danych w celu przetwarzania więcej niż jednego komunikatu jednocześnie.
Instrukcje: określanie harmonogramu zadań w bloku przepływu danych Przedstawia sposób kojarzenia określonego harmonogramu zadań podczas korzystania z przepływu danych w aplikacji.
Przewodnik: używanie funkcji BatchBlock i BatchedJoinBlock w celu zwiększenia wydajności Opisuje sposób użycia BatchBlock<T> klasy w celu zwiększenia wydajności operacji wstawiania bazy danych oraz sposobu użycia BatchedJoinBlock<T1,T2> klasy do przechwytywania wyników i wszelkich wyjątków występujących podczas odczytu programu z bazy danych.
Przewodnik: tworzenie niestandardowego typu bloku przepływu danych Demonstruje dwa sposoby tworzenia typu bloku przepływu danych, który implementuje zachowanie niestandardowe.
Biblioteka zadań równoległych (TPL) Wprowadzenie do języka TPL , biblioteki, która upraszcza programowanie równoległe i współbieżne w aplikacjach .NET Framework.