Udostępnij za pośrednictwem


Przepływ danych (Biblioteka zadań równoległych)

Biblioteka równoległa zadań (TPL) udostępnia składniki przepływu danych, które ułatwiają zwiększenie niezawodności aplikacji obsługujących współbieżność. Te składniki przepływu danych są wspólnie określane jako biblioteka przepływów danych TPL. Ten model przepływu danych promuje programowanie oparte na aktorach, zapewniając przekazywanie komunikatów w procesie dla grubszego przepływu danych i zadań potokowych. Składniki przepływu danych bazują na typach i infrastrukturze planowania języka TPL i integrują się z obsługą języków C#, Visual Basic i F# na potrzeby programowania asynchronicznego. Te składniki przepływu danych są przydatne, gdy masz wiele operacji, które muszą komunikować się ze sobą asynchronicznie lub gdy chcesz przetwarzać dane, gdy staną się dostępne. Rozważmy na przykład aplikację, która przetwarza dane obrazu z aparatu internetowego. Korzystając z modelu przepływu danych, aplikacja może przetwarzać ramki obrazów w miarę ich dostępności. Jeśli na przykład aplikacja ulepsza ramki obrazów, wykonując korektę światła lub redukcję czerwonego oka, możesz utworzyć potok składników przepływu danych. Każdy etap potoku może używać bardziej gruboziarnistych funkcji równoległości, takich jak funkcje udostępniane przez TPL, aby przekształcić obraz.

Ten dokument zawiera omówienie biblioteki przepływów danych TPL. Opisuje ona model programowania, wstępnie zdefiniowane typy bloków przepływu danych oraz sposób konfigurowania bloków przepływu danych w celu spełnienia określonych wymagań aplikacji.

Uwaga

Biblioteka przepływów danych TPL ( System.Threading.Tasks.Dataflow przestrzeń nazw) nie jest dystrybuowana za pomocą platformy .NET. Aby zainstalować System.Threading.Tasks.Dataflow przestrzeń nazw w programie Visual Studio, otwórz projekt, wybierz pozycję Zarządzaj pakietami NuGet z menu Project i wyszukaj pakiet w trybie online System.Threading.Tasks.Dataflow . Alternatywnie, aby zainstalować go przy użyciu interfejsu wiersza polecenia platformy .NET Core, uruchom polecenie dotnet add package System.Threading.Tasks.Dataflow.

Model programowania

Biblioteka przepływów danych TPL stanowi podstawę do przekazywania komunikatów i przetwarzania równoległego aplikacji intensywnie korzystających z procesora CPU i operacji we/wy, które mają wysoką przepływność i małe opóźnienia. Zapewnia on również jawną kontrolę nad sposobem buforowanego i poruszania się po systemie. Aby lepiej zrozumieć model programowania przepływu danych, rozważ aplikację, która asynchronicznie ładuje obrazy z dysku i tworzy złożony z tych obrazów. Tradycyjne modele programowania zwykle wymagają używania wywołań zwrotnych i obiektów synchronizacji, takich jak blokady, do koordynowania zadań i dostępu do udostępnionych danych. Korzystając z modelu programowania przepływu danych, można tworzyć obiekty przepływu danych, które przetwarzają obrazy podczas ich odczytywania z dysku. W modelu przepływu danych deklarujesz sposób obsługi danych, gdy staną się dostępne, a także wszelkie zależności między danymi. Ponieważ środowisko uruchomieniowe zarządza zależnościami między danymi, często można uniknąć wymagania synchronizacji dostępu do udostępnionych danych. Ponadto, ponieważ środowisko uruchomieniowe planuje pracę na podstawie asynchronicznego przybycia danych, przepływ danych może poprawić czas reakcji i przepływność, efektywnie zarządzając podstawowymi wątkami. Przykład, który używa modelu programowania przepływu danych do implementowania przetwarzania obrazów w aplikacji Windows Forms, zobacz Przewodnik: używanie przepływu danych w aplikacji Windows Forms.

Źródła i obiekty docelowe

Biblioteka przepływów danych TPL składa się z bloków przepływu danych, które są strukturami danych, które buforują i przetwarzają dane. TPL definiuje trzy rodzaje bloków przepływu danych: bloki źródłowe, bloki docelowe i bloki propagacji. Blok źródłowy działa jako źródło danych i może być odczytywany. Blok docelowy działa jako odbiorca danych i może być zapisywany. Blok propagacji działa zarówno jako blok źródłowy, jak i blok docelowy, i może być odczytywany i zapisywany. TPL definiuje System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> interfejs do reprezentowania źródeł, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> reprezentowania obiektów docelowych i System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> reprezentowania propagatorów. IPropagatorBlock<TInput,TOutput> dziedziczy zarówno z ISourceBlock<TOutput>, jak i ITargetBlock<TInput>.

Biblioteka przepływów danych TPL udostępnia kilka wstępnie zdefiniowanych typów bloków przepływu danych, które implementują ISourceBlock<TOutput>interfejsy , ITargetBlock<TInput>i IPropagatorBlock<TInput,TOutput> . Te typy bloków przepływu danych zostały opisane w tym dokumencie w sekcji Wstępnie zdefiniowane typy bloków przepływu danych.

bloki Połączenie ing

Bloki przepływu danych można połączyć z potokami formularzy, które są sekwencjami liniowymi bloków przepływu danych lub sieciami, które są grafami bloków przepływu danych. Potok jest jedną z form sieci. W potoku lub sieci źródła asynchronicznie propagują dane do obiektów docelowych, gdy te dane staną się dostępne. Metoda ISourceBlock<TOutput>.LinkTo łączy blok przepływu danych źródłowego z blokiem docelowym. Źródło może być połączone z zerem lub większą większa większa liczba obiektów docelowych; obiekty docelowe można łączyć z zera lub większej liczby źródeł. Bloki przepływu danych można dodawać lub usuwać z potoku lub sieci współbieżnie. Wstępnie zdefiniowane typy bloków przepływu danych obsługują wszystkie aspekty bezpieczeństwa wątków łączenia i odłączania.

Aby zapoznać się z przykładem łączenia bloków przepływu danych w celu utworzenia podstawowego potoku, zobacz Przewodnik: tworzenie potoku przepływu danych. Aby zapoznać się z przykładem łączenia bloków przepływu danych w celu utworzenia bardziej złożonej sieci, zobacz Przewodnik: używanie przepływu danych w aplikacji Windows Forms. Przykład, który odłącza element docelowy od źródła po źródle oferuje komunikat docelowy, zobacz Instrukcje: odłączanie bloków przepływu danych.

Filtrowanie

Po wywołaniu ISourceBlock<TOutput>.LinkTo metody w celu połączenia źródła z obiektem docelowym można podać delegata, który określa, czy blok docelowy akceptuje lub odrzuca komunikat na podstawie wartości tego komunikatu. Ten mechanizm filtrowania jest przydatnym sposobem zagwarantowania, że blok przepływu danych odbiera tylko określone wartości. W przypadku większości wstępnie zdefiniowanych typów bloków przepływu danych, jeśli blok źródłowy jest połączony z wieloma blokami docelowymi, gdy blok docelowy odrzuca komunikat, źródło oferuje ten komunikat do następnego elementu docelowego. Kolejność, w jakiej źródło oferuje komunikaty do obiektów docelowych, jest definiowana przez źródło i może się różnić w zależności od typu źródła. Większość typów bloków źródłowych przestaje oferować komunikat po zaakceptowaniu tego komunikatu przez jeden element docelowy. Jednym wyjątkiem od tej reguły jest BroadcastBlock<T> klasa, która oferuje każdy komunikat do wszystkich elementów docelowych, nawet jeśli niektóre elementy docelowe odrzucają komunikat. Aby zapoznać się z przykładem, który używa filtrowania do przetwarzania tylko niektórych komunikatów, zobacz Przewodnik: używanie przepływu danych w aplikacji Windows Forms.

Ważne

Ponieważ każdy wstępnie zdefiniowany typ bloku przepływu danych źródła gwarantuje, że komunikaty są propagowane w kolejności, w jakiej są odbierane, każdy komunikat musi być odczytywany z bloku źródłowego, zanim blok źródłowy będzie mógł przetworzyć następny komunikat. W związku z tym, jeśli używasz filtrowania do łączenia wielu obiektów docelowych ze źródłem, upewnij się, że co najmniej jeden blok docelowy odbiera każdy komunikat. W przeciwnym razie aplikacja może zakleszczeć.

Przekazywanie komunikatów

Model programowania przepływu danych jest związany z koncepcją przekazywania komunikatów, gdzie niezależne składniki programu komunikują się ze sobą przez wysyłanie komunikatów. Jednym ze sposobów propagowania komunikatów między składnikami aplikacji jest wywołanie Post metod (synchronicznych) i SendAsync (asynchronicznych) w celu wysyłania komunikatów do docelowych bloków przepływu danych oraz ReceiveReceiveAsyncmetod , i TryReceive do odbierania komunikatów z bloków źródłowych. Te metody można połączyć z potokami lub sieciami przepływu danych, wysyłając dane wejściowe do węzła głównego (bloku docelowego) i odbierając dane wyjściowe z węzła terminalu potoku lub węzłów terminalu sieci (co najmniej jeden blok źródłowy). Możesz również użyć Choose metody , aby odczytać z pierwszego z podanych źródeł, które mają dostępne dane i wykonać akcję na tych danych.

Bloki źródłowe oferują dane blokom docelowym przez wywołanie ITargetBlock<TInput>.OfferMessage metody . Blok docelowy odpowiada na oferowaną wiadomość na jeden z trzech sposobów: może zaakceptować komunikat, odrzucić wiadomość lub odroczyć wiadomość. Gdy element docelowy akceptuje komunikat, OfferMessage metoda zwraca Acceptedwartość . Gdy element docelowy odrzuca komunikat, OfferMessage metoda zwraca Declinedwartość . Gdy element docelowy wymaga, aby nie odbierał już żadnych komunikatów ze źródła, OfferMessage zwraca wartość DecliningPermanently. Wstępnie zdefiniowane typy bloków źródłowych nie oferują komunikatów połączonym obiektom docelowym po odebraniu takiej wartości zwracanej i automatycznie odłączają je od takich obiektów docelowych.

Gdy blok docelowy odrocza komunikat do późniejszego użycia, OfferMessage metoda zwraca Postponedwartość . Docelowy blok, który odrocza komunikat, może później wywołać metodę ISourceBlock<TOutput>.ReserveMessage , aby spróbować zarezerwować oferowany komunikat. W tym momencie komunikat jest nadal dostępny i może być używany przez blok docelowy lub komunikat został wykonany przez inny element docelowy. Gdy blok docelowy później wymaga komunikatu lub nie potrzebuje już komunikatu, wywołuje odpowiednio metodę ISourceBlock<TOutput>.ConsumeMessage lub ReleaseReservation . Rezerwacja komunikatów jest zwykle używana przez typy bloków przepływu danych, które działają w trybie niechłannym. Tryb niechłanny jest wyjaśniony w dalszej części tego dokumentu. Zamiast rezerwować odroczony komunikat, blok docelowy może również użyć ISourceBlock<TOutput>.ConsumeMessage metody , aby podjąć próbę bezpośredniego użycia przełożonego komunikatu.

Uzupełnianie bloku przepływu danych

Bloki przepływu danych obsługują również koncepcję ukończenia. Blok przepływu danych, który jest w stanie ukończonym, nie wykonuje żadnych dalszych prac. Każdy blok przepływu danych ma skojarzony System.Threading.Tasks.Task obiekt, znany jako zadanie ukończenia, który reprezentuje stan ukończenia bloku. Ponieważ można poczekać na Task zakończenie obiektu, używając zadań ukończenia, możesz poczekać na zakończenie co najmniej jednego węzła terminalu sieci przepływu danych. Interfejs IDataflowBlock definiuje metodę Complete , która informuje blok przepływu danych o żądaniu jego ukończenia, a Completion właściwość, która zwraca zadanie ukończenia bloku przepływu danych. Zarówno ISourceBlock<TOutput> , jak i ITargetBlock<TInput> dziedziczą IDataflowBlock interfejs.

Istnieją dwa sposoby określania, czy blok przepływu danych został ukończony bez błędu, napotkał co najmniej jeden błąd, czy został anulowany. Pierwszym sposobem jest wywołanie Task.Wait metody w zadaniu ukończenia try-catch w bloku (Try-Catch w Visual Basic). Poniższy przykład tworzy ActionBlock<TInput> obiekt, który zgłasza, ArgumentOutOfRangeException jeśli jego wartość wejściowa jest mniejsza niż zero. AggregateException jest zgłaszany, gdy ten przykład wywołuje Wait zadanie ukończenia. Dostęp ArgumentOutOfRangeException do obiektu AggregateException jest uzyskiwany za pośrednictwem InnerExceptions właściwości .

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

W tym przykładzie pokazano, w którym wyjątek jest nieobsługiwany w delegatu bloku przepływu danych wykonywania. Zalecamy obsługę wyjątków w ciałach takich bloków. Jeśli jednak nie możesz tego zrobić, blok zachowuje się tak, jakby został anulowany i nie przetwarza przychodzących komunikatów.

Po jawnym AggregateException anulowaniu bloku przepływu danych obiekt znajduje się OperationCanceledException we InnerExceptions właściwości . Aby uzyskać więcej informacji na temat anulowania przepływu danych, zobacz sekcję Włączanie anulowania .

Drugim sposobem określenia stanu ukończenia bloku przepływu danych jest użycie kontynuacji zadania ukończenia lub użycie asynchronicznych funkcji języka C# i Visual Basic w celu asynchronicznego oczekiwania na zadanie ukończenia. Delegat, który podajesz do Task.ContinueWith metody, przyjmuje Task obiekt, który reprezentuje zadanie antecedent. W przypadku Completion właściwości delegat kontynuacji przyjmuje samo zadanie ukończenia. Poniższy przykład przypomina poprzedni, z tą różnicą, że używa ContinueWith również metody do utworzenia zadania kontynuacji, które wyświetla stan ogólnej operacji przepływu danych.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Można również użyć właściwości, takich jak IsCanceled w treści zadania kontynuacji, aby określić dodatkowe informacje o stanie ukończenia bloku przepływu danych. Aby uzyskać więcej informacji na temat zadań kontynuacji i sposobu ich powiązania z anulowaniem i obsługą błędów, zobacz Łączenie zadań za pomocą zadań kontynuacji, anulowanie zadań i obsługa wyjątków.

Wstępnie zdefiniowane typy bloków przepływu danych

Biblioteka przepływów danych TPL udostępnia kilka wstępnie zdefiniowanych typów bloków przepływu danych. Te typy są podzielone na trzy kategorie: buforowanie bloków, bloków wykonywania i bloków grupowania. W poniższych sekcjach opisano typy bloków tworzące te kategorie.

Bloki buforowania

Buforowanie bloków przechowuje dane do użycia przez użytkowników danych. Biblioteka przepływów danych TPL udostępnia trzy typy bloków buforowania: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T>i System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

Klasa BufferBlock<T> reprezentuje asynchroniczną strukturę obsługi komunikatów ogólnego przeznaczenia. Ta klasa przechowuje najpierw kolejkę komunikatów (FIFO), które mogą być zapisywane przez wiele źródeł lub odczytywane przez wiele obiektów docelowych. Gdy obiekt docelowy odbiera komunikat z BufferBlock<T> obiektu, ten komunikat zostanie usunięty z kolejki komunikatów. W związku z tym, mimo że BufferBlock<T> obiekt może mieć wiele obiektów docelowych, tylko jeden obiekt docelowy otrzyma każdy komunikat. Klasa jest przydatna BufferBlock<T> , gdy chcesz przekazać wiele komunikatów do innego składnika, a ten składnik musi odbierać każdy komunikat.

Poniższy podstawowy przykład publikuje kilka Int32 wartości w BufferBlock<T> obiekcie, a następnie odczytuje te wartości z powrotem z tego obiektu.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Pełny przykład przedstawiający sposób zapisywania komunikatów w obiekcie i odczytywania ich z BufferBlock<T> obiektu znajduje się w temacie How to: Write Messages to and Read Messages from a Dataflow Block (Instrukcje: zapisywanie komunikatów w blokach przepływu danych i odczytywanie komunikatów).

BroadcastBlock<T>

Klasa jest przydatna BroadcastBlock<T> , gdy musisz przekazać wiele komunikatów do innego składnika, ale ten składnik wymaga tylko najnowszej wartości. Ta klasa jest również przydatna, gdy chcesz rozgłaszać komunikat do wielu składników.

Poniższy podstawowy przykład publikuje Double wartość obiektu BroadcastBlock<T> , a następnie odczytuje wartość z powrotem z tego obiektu kilka razy. Ponieważ wartości nie są usuwane z BroadcastBlock<T> obiektów po ich odczytaniu, ta sama wartość jest dostępna za każdym razem.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Pełny przykład przedstawiający sposób emisji BroadcastBlock<T> komunikatu do wielu bloków docelowych znajduje się w temacie How to: Specify a Task Scheduler in a Dataflow Block (Instrukcje: określanie harmonogramu zadań w bloku przepływu danych).

WriteOnceBlock<T>

Klasa WriteOnceBlock<T> przypomina klasę BroadcastBlock<T> , z tą różnicą WriteOnceBlock<T> , że obiekt może być zapisywany tylko raz. Słowo kluczowe ReadOnly (ReadOnly w języku Visual Basic) może być WriteOnceBlock<T> podobne do słowa kluczowego języka C#, z tą różnicą, że WriteOnceBlock<T> obiekt staje się niezmienny po otrzymaniu wartości zamiast w konstrukcji. BroadcastBlock<T> Podobnie jak klasa, gdy obiekt docelowy odbiera komunikat z WriteOnceBlock<T> obiektu, ten komunikat nie jest usuwany z tego obiektu. W związku z tym wiele obiektów docelowych otrzymuje kopię komunikatu. Klasa jest przydatna WriteOnceBlock<T> , gdy chcesz propagować tylko pierwszy z wielu komunikatów.

Poniższy podstawowy przykład publikuje wiele String wartości w WriteOnceBlock<T> obiekcie, a następnie odczytuje wartość z powrotem z tego obiektu. WriteOnceBlock<T> Ponieważ obiekt może być zapisywany tylko raz, po WriteOnceBlock<T> odebraniu komunikatu przez obiekt odrzuca kolejne komunikaty.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Pełny przykład przedstawiający sposób odbierania WriteOnceBlock<T> wartości pierwszej operacji, która kończy się, znajduje się w temacie How to: Unlink Dataflow Blocks (Instrukcje: odłączanie bloków przepływu danych).

Bloki wykonywania

Bloki wykonywania wywołają delegata dostarczonego przez użytkownika dla każdego elementu odebranych danych. Biblioteka przepływów danych TPL udostępnia trzy typy bloków wykonywania: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>i System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

Klasa ActionBlock<TInput> jest blokiem docelowym, który wywołuje delegata, gdy odbiera dane. Obiekt można ActionBlock<TInput> traktować jako delegata, który jest uruchamiany asynchronicznie, gdy dane staną się dostępne. Delegat, który podajesz do ActionBlock<TInput> obiektu, może być typu Action<T> lub typu System.Func<TInput, Task>. Gdy używasz ActionBlock<TInput> obiektu z elementem Action<T>, przetwarzanie każdego elementu wejściowego jest uznawane za ukończone po powrocie delegata. Gdy używasz ActionBlock<TInput> obiektu z elementem System.Func<TInput, Task>, przetwarzanie każdego elementu wejściowego jest uznawane za ukończone tylko po zakończeniu zwracanego Task obiektu. Korzystając z tych dwóch mechanizmów, można użyć ActionBlock<TInput> zarówno do synchronicznego, jak i asynchronicznego przetwarzania każdego elementu wejściowego.

Poniższy podstawowy przykład publikuje wiele Int32 wartości w ActionBlock<TInput> obiekcie. Obiekt ActionBlock<TInput> wyświetla te wartości w konsoli programu . W tym przykładzie ustawiono blok na stan ukończony i oczekuje na zakończenie wszystkich zadań przepływu danych.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Aby zapoznać się z kompletnymi przykładami, które pokazują, jak używać delegatów z klasą ActionBlock<TInput> , zobacz Instrukcje: wykonywanie akcji, gdy blok przepływu danych odbiera dane.

TransformBlock<TInput, TOutput>

Klasa TransformBlock<TInput,TOutput> przypomina klasę ActionBlock<TInput> , z tą różnicą, że działa zarówno jako źródło, jak i jako element docelowy. Delegat przekazywany do TransformBlock<TInput,TOutput> obiektu zwraca wartość typu TOutput. Delegat, który podajesz do TransformBlock<TInput,TOutput> obiektu, może być typu System.Func<TInput, TOutput> lub typu System.Func<TInput, Task<TOutput>>. Gdy używasz TransformBlock<TInput,TOutput> obiektu z elementem System.Func<TInput, TOutput>, przetwarzanie każdego elementu wejściowego jest uznawane za ukończone po powrocie delegata. W przypadku używania obiektu używanego z System.Func<TInput, Task<TOutput>>programem przetwarzanie każdego elementu wejściowego TransformBlock<TInput,TOutput> jest uznawane za ukończone tylko po zakończeniu zwracanego Task<TResult> obiektu. Podobnie jak w przypadku ActionBlock<TInput>programu , korzystając z tych dwóch mechanizmów, można użyć TransformBlock<TInput,TOutput> zarówno do synchronicznego, jak i asynchronicznego przetwarzania każdego elementu wejściowego.

Poniższy podstawowy przykład tworzy TransformBlock<TInput,TOutput> obiekt, który oblicza pierwiastek kwadratowy danych wejściowych. Obiekt TransformBlock<TInput,TOutput> przyjmuje Int32 wartości jako dane wejściowe i generuje Double wartości jako dane wyjściowe.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Aby zapoznać się z kompletnymi przykładami używanymi TransformBlock<TInput,TOutput> w sieci bloków przepływu danych, które wykonują przetwarzanie obrazów w aplikacji Windows Forms, zobacz Przewodnik: używanie przepływu danych w aplikacji Windows Forms.

TransformManyBlock<TInput, TOutput>

Klasa TransformManyBlock<TInput,TOutput> przypomina klasę TransformBlock<TInput,TOutput> , z tą różnicą, że TransformManyBlock<TInput,TOutput> generuje zero lub więcej wartości wyjściowych dla każdej wartości wejściowej, a nie tylko jedną wartość wyjściową dla każdej wartości wejściowej. Delegat, który podajesz do TransformManyBlock<TInput,TOutput> obiektu, może być typu System.Func<TInput, IEnumerable<TOutput>> lub typu System.Func<TInput, Task<IEnumerable<TOutput>>>. Gdy używasz TransformManyBlock<TInput,TOutput> obiektu z elementem System.Func<TInput, IEnumerable<TOutput>>, przetwarzanie każdego elementu wejściowego jest uznawane za ukończone po powrocie delegata. Gdy używasz TransformManyBlock<TInput,TOutput> obiektu z elementem System.Func<TInput, Task<IEnumerable<TOutput>>>, przetwarzanie każdego elementu wejściowego jest uznawane za ukończone tylko po zakończeniu zwracanego System.Threading.Tasks.Task<IEnumerable<TOutput>> obiektu.

Poniższy podstawowy przykład tworzy TransformManyBlock<TInput,TOutput> obiekt, który dzieli ciągi na ich poszczególne sekwencje znaków. Obiekt TransformManyBlock<TInput,TOutput> przyjmuje String wartości jako dane wejściowe i generuje Char wartości jako dane wyjściowe.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Aby zapoznać się z kompletnymi przykładami używanymi TransformManyBlock<TInput,TOutput> do tworzenia wielu niezależnych danych wyjściowych dla każdego danych wejściowych w potoku przepływu danych, zobacz Przewodnik: tworzenie potoku przepływu danych.

Stopień równoległości

Każdy ActionBlock<TInput>obiekt , TransformBlock<TInput,TOutput>i TransformManyBlock<TInput,TOutput> buforuje komunikaty wejściowe do momentu, aż blok będzie gotowy do ich przetworzenia. Domyślnie te klasy przetwarzają komunikaty w kolejności, w jakiej są odbierane, jeden komunikat jednocześnie. Można również określić stopień równoległości, aby umożliwić obiektom ActionBlock<TInput>i TransformManyBlock<TInput,TOutput>TransformBlock<TInput,TOutput> przetwarzanie wielu komunikatów jednocześnie. Aby uzyskać więcej informacji na temat współbieżnego wykonywania, zobacz sekcję Określanie stopnia równoległości w dalszej części tego dokumentu. Aby zapoznać się z przykładem ustawiania stopnia równoległości, aby umożliwić blokowi przepływu danych wykonywania przetwarzanie więcej niż jednego komunikatu jednocześnie, zobacz How to: Specify the Degree of Parallelism in a Dataflow Block (Instrukcje: określanie stopnia równoległości w bloku przepływu danych).

Podsumowanie typów delegatów

W poniższej tabeli podsumowano typy delegatów, które można udostępnić obiektom ActionBlock<TInput>, TransformBlock<TInput,TOutput>i TransformManyBlock<TInput,TOutput> . Ta tabela określa również, czy typ delegata działa synchronicznie, czy asynchronicznie.

Typ Synchroniczny typ delegata Typ delegata asynchronicznego
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Wyrażenia lambda można również używać podczas pracy z typami bloków wykonywania. Przykład pokazujący, jak używać wyrażenia lambda z blokiem wykonywania, zobacz How to: Perform Action When a Dataflow Block Odbiera dane.

Bloki grupowania

Grupowanie bloków łączy dane z co najmniej jednego źródła i w różnych ograniczeniach. Biblioteka przepływów danych TPL udostępnia trzy typy bloków sprzężenia: BatchBlock<T>, JoinBlock<T1,T2>i BatchedJoinBlock<T1,T2>.

BatchBlock<T>

Klasa BatchBlock<T> łączy zestawy danych wejściowych, które są nazywane partiami, w tablice danych wyjściowych. Podczas tworzenia BatchBlock<T> obiektu należy określić rozmiar każdej partii. BatchBlock<T> Gdy obiekt odbiera określoną liczbę elementów wejściowych, asynchronicznie propaguje tablicę zawierającą te elementy. BatchBlock<T> Jeśli obiekt jest ustawiony na stan ukończony, ale nie zawiera wystarczającej liczby elementów do utworzenia partii, propaguje końcową tablicę zawierającą pozostałe elementy wejściowe.

Klasa BatchBlock<T> działa w trybie chciwości lub niechłaństwa. W trybie chciwości, który jest domyślny, BatchBlock<T> obiekt akceptuje każdy komunikat, który jest oferowany i propaguje tablicę po otrzymaniu określonej liczby elementów. W trybie BatchBlock<T> niechłannym obiekt odkłada wszystkie komunikaty przychodzące do momentu, aż wystarczające źródła zaoferowały komunikaty blokowi w celu utworzenia partii. Tryb chciwy zazwyczaj działa lepiej niż niechłanny tryb, ponieważ wymaga mniejszego obciążenia związanego z przetwarzaniem. Można jednak użyć trybu niechłannego, gdy musisz koordynować zużycie z wielu źródeł w sposób niepodzielne. Określ tryb niechłanny, ustawiając wartość GreedyFalse na w parametrze dataflowBlockOptions w konstruktorze BatchBlock<T> .

Poniższy podstawowy przykład publikuje kilka Int32 wartości do BatchBlock<T> obiektu, który przechowuje dziesięć elementów w partii. Aby zagwarantować, że wszystkie wartości są propagowane z BatchBlock<T>elementu , ten przykład wywołuje metodę Complete . Metoda Complete ustawia BatchBlock<T> obiekt na stan ukończony, a w związku z tym BatchBlock<T> obiekt propaguje wszystkie pozostałe elementy jako końcową partię.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Aby zapoznać się z kompletnym przykładem używanym BatchBlock<T> do poprawy wydajności operacji wstawiania bazy danych, zobacz Przewodnik: używanie funkcji BatchBlock i BatchedJoinBlock w celu zwiększenia wydajności.

JoinBlock<T1, T2, ...>

Klasy JoinBlock<T1,T2> i JoinBlock<T1,T2,T3> zbierają elementy wejściowe i propagują obiekty System.Tuple<T1,T2> lub System.Tuple<T1,T2,T3> zawierające te elementy. Klasy JoinBlock<T1,T2> i JoinBlock<T1,T2,T3> nie dziedziczą z klasy ITargetBlock<TInput>. Zamiast tego udostępniają właściwości , Target1, Target2i Target3, które implementują ITargetBlock<TInput>.

Podobnie jak BatchBlock<T>, JoinBlock<T1,T2> i JoinBlock<T1,T2,T3> działają w trybie chciwości lub niechłannym. W trybie chciwości, który jest domyślny, JoinBlock<T1,T2> obiekt lub JoinBlock<T1,T2,T3> akceptuje każdy komunikat, który jest oferowany i propaguje krotkę po każdym z jego obiektów docelowych otrzymuje co najmniej jeden komunikat. W trybie JoinBlock<T1,T2> niechłannym obiekt lub JoinBlock<T1,T2,T3> odroczy wszystkie komunikaty przychodzące, dopóki wszystkie obiekty docelowe nie zostaną zaoferowane dane wymagane do utworzenia krotki. W tym momencie blok angażuje się w dwufazowy protokół zatwierdzania w celu niepodzielnego pobierania wszystkich wymaganych elementów ze źródeł. To odroczenie umożliwia innej jednostce korzystanie z danych w międzyczasie w celu umożliwienia ogólnego systemu wprowadzania postępów.

Poniższy podstawowy przykład przedstawia przypadek, w którym JoinBlock<T1,T2,T3> obiekt wymaga wielu danych do obliczenia wartości. W tym przykładzie tworzony jest JoinBlock<T1,T2,T3> obiekt, który wymaga dwóch Int32 wartości i Char wartości do wykonania operacji arytmetycznej.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Pełny przykład, który używa JoinBlock<T1,T2> obiektów w trybie niechłannym do współpracy współużytkowania zasobu, zobacz Instrukcje: używanie funkcji JoinBlock do odczytywania danych z wielu źródeł.

BatchedJoinBlock<T1, T2, ...>

Klasy BatchedJoinBlock<T1,T2> i BatchedJoinBlock<T1,T2,T3> zbierają partie elementów wejściowych i propagują je System.Tuple(IList(T1), IList(T2)) lub System.Tuple(IList(T1), IList(T2), IList(T3)) obiekty zawierające te elementy. Pomyśl o BatchedJoinBlock<T1,T2> kombinacji elementów BatchBlock<T> i JoinBlock<T1,T2>. Określ rozmiar każdej partii podczas tworzenia BatchedJoinBlock<T1,T2> obiektu. BatchedJoinBlock<T1,T2> Udostępnia również właściwości i Target1Target2, które implementują ITargetBlock<TInput>polecenie . Gdy określona liczba elementów wejściowych jest odbierana ze wszystkich obiektów docelowych, BatchedJoinBlock<T1,T2> obiekt asynchronicznie propaguje System.Tuple(IList(T1), IList(T2)) obiekt zawierający te elementy.

Poniższy podstawowy przykład tworzy BatchedJoinBlock<T1,T2> obiekt, który przechowuje wyniki, Int32 wartości i błędy, które są Exception obiektami. W tym przykładzie wykonuje wiele operacji i zapisuje wyniki we Target1 właściwości oraz błędy Target2 we właściwości BatchedJoinBlock<T1,T2> obiektu. Ponieważ liczba pomyślnych i zakończonych niepowodzeniem operacji jest nieznana z wyprzedzeniem, IList<T> obiekty umożliwiają każdemu obiektowi docelowemu odbieranie zera lub większej liczby wartości.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Pełny przykład używany BatchedJoinBlock<T1,T2> do przechwytywania wyników i wszelkich wyjątków występujących podczas odczytu programu z bazy danych można znaleźć w temacie Walkthrough: Using BatchBlock and BatchedJoinBlock to Improve Efficiency (Wskazówki: używanie funkcji BatchBlock i BatchedJoinBlock w celu poprawy wydajności).

Konfigurowanie zachowania bloku przepływu danych

Możesz włączyć dodatkowe opcje, podając System.Threading.Tasks.Dataflow.DataflowBlockOptions obiekt konstruktorowi typów bloków przepływu danych. Te opcje kontrolują zachowanie, takie jak harmonogram, który zarządza podstawowym zadaniem i stopniem równoległości. Obiekt DataflowBlockOptions zawiera również typy pochodne, które określają zachowanie specyficzne dla niektórych typów bloków przepływu danych. Poniższa tabela zawiera podsumowanie typu opcji skojarzonych z każdym typem bloku przepływu danych.

Typ bloku przepływu danych Typ: DataflowBlockOptions
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

Poniższe sekcje zawierają dodatkowe informacje na temat ważnych rodzajów opcji bloków przepływu danych dostępnych za pośrednictwem System.Threading.Tasks.Dataflow.DataflowBlockOptionsklas , System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptionsi System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions .

Określanie harmonogramu zadań

Każdy wstępnie zdefiniowany blok przepływu danych używa mechanizmu planowania zadań TPL do wykonywania działań, takich jak propagowanie danych do miejsca docelowego, odbieranie danych ze źródła i uruchamianie delegatów zdefiniowanych przez użytkownika, gdy dane staną się dostępne. TaskScheduler jest abstrakcyjną klasą reprezentującą harmonogram zadań, który kolejkuje zadania do wątków. Domyślny harmonogram Defaultzadań , używa ThreadPool klasy do kolejkowania i wykonywania pracy. Domyślny harmonogram zadań można zastąpić, ustawiając TaskScheduler właściwość podczas konstruowania obiektu bloku przepływu danych.

Gdy ten sam harmonogram zadań zarządza wieloma blokami przepływu danych, może wymusić między nimi zasady. Jeśli na przykład wiele bloków przepływu danych jest skonfigurowanych do określania wartości docelowej wyłącznego harmonogramu tego samego ConcurrentExclusiveSchedulerPair obiektu, wszystkie prace wykonywane w tych blokach są serializowane. Podobnie, jeśli te bloki są skonfigurowane do określania celu współbieżnego harmonogramu tego samego ConcurrentExclusiveSchedulerPair obiektu i że harmonogram jest skonfigurowany tak, aby miał maksymalny poziom współbieżności, wszystkie prace z tych bloków są ograniczone do tej liczby operacji współbieżnych. Przykład, który używa ConcurrentExclusiveSchedulerPair klasy , aby umożliwić równoległe wykonywanie operacji odczytu, ale operacje zapisu mają być wykonywane wyłącznie we wszystkich innych operacjach, zobacz Instrukcje: Określanie harmonogramu zadań w bloku przepływu danych. Aby uzyskać więcej informacji na temat harmonogramów zadań w TPL, zobacz TaskScheduler temat klasy.

Określanie stopnia równoległości

Domyślnie trzy typy bloków wykonywania, które biblioteka przepływów danych TPL udostępnia, ActionBlock<TInput>, TransformBlock<TInput,TOutput>i TransformManyBlock<TInput,TOutput>, przetwarzają jeden komunikat jednocześnie. Te typy bloków przepływu danych przetwarzają również komunikaty w kolejności, w jakiej są odbierane. Aby umożliwić blokom przepływu danych przetwarzanie komunikatów jednocześnie, ustaw ExecutionDataflowBlockOptions.MaxDegreeOfParallelism właściwość podczas konstruowania obiektu bloku przepływu danych.

Wartość domyślna MaxDegreeOfParallelism to 1, co gwarantuje, że blok przepływu danych przetwarza jeden komunikat jednocześnie. Ustawienie tej właściwości na wartość większą niż 1 umożliwia blokowi przepływu danych przetwarzanie wielu komunikatów jednocześnie. Ustawienie tej właściwości w celu DataflowBlockOptions.Unbounded umożliwienia bazowemu harmonogramowi zadań zarządzania maksymalnym stopniem współbieżności.

Ważne

Po określeniu maksymalnego stopnia równoległości, który jest większy niż 1, wiele komunikatów jest przetwarzanych jednocześnie i w związku z tym komunikaty mogą nie być przetwarzane w kolejności, w której są odbierane. Kolejność, w jakiej komunikaty są danymi wyjściowymi bloku, jest jednak taka sama, w jakiej są odbierane.

MaxDegreeOfParallelism Ponieważ właściwość reprezentuje maksymalny stopień równoległości, blok przepływu danych może być wykonywany z mniejszym stopniem równoległości niż określono. Blok przepływu danych może używać mniejszego stopnia równoległości w celu spełnienia wymagań funkcjonalnych lub braku dostępnych zasobów systemowych. Blok przepływu danych nigdy nie wybiera bardziej równoległości niż określasz.

Wartość MaxDegreeOfParallelism właściwości jest wyłączna dla każdego obiektu bloku przepływu danych. Jeśli na przykład cztery obiekty bloku przepływu danych określają 1 dla maksymalnego stopnia równoległości, wszystkie cztery obiekty bloków przepływu danych mogą być uruchamiane równolegle.

Aby uzyskać przykład, który ustawia maksymalny stopień równoległości, aby umożliwić równoległe wykonywanie długotrwałych operacji, zobacz How to: Specify the Degree of Parallelism in a Dataflow Block (Instrukcje: określanie stopnia równoległości w bloku przepływu danych).

Określanie liczby komunikatów na zadanie

Wstępnie zdefiniowane typy bloków przepływu danych używają zadań do przetwarzania wielu elementów wejściowych. Pomaga to zminimalizować liczbę obiektów zadań wymaganych do przetwarzania danych, co pozwala aplikacjom działać wydajniej. Jednak gdy zadania z jednego zestawu bloków przepływu danych przetwarzają dane, zadania z innych bloków przepływu danych mogą wymagać oczekiwania na czas przetwarzania przez kolejkowanie komunikatów. Aby umożliwić lepszą uczciwość między zadaniami przepływu danych, ustaw MaxMessagesPerTask właściwość . Gdy MaxMessagesPerTask jest ustawiona wartość DataflowBlockOptions.Unbounded, która jest wartością domyślną, zadanie używane przez blok przepływu danych przetwarza tyle komunikatów, ile jest dostępnych. Gdy MaxMessagesPerTask parametr jest ustawiony na wartość inną niż Unbounded, blok przepływu danych przetwarza maksymalnie tę liczbę komunikatów na Task obiekt. Mimo że ustawienie MaxMessagesPerTask właściwości może zwiększyć uczciwość między zadaniami, może spowodować utworzenie przez system większej liczby zadań, niż jest to konieczne, co może zmniejszyć wydajność.

Włączanie anulowania

TPL zapewnia mechanizm, który umożliwia wykonywanie zadań w celu koordynowania anulowania w sposób współpracy. Aby włączyć bloki przepływu danych do udziału w tym mechanizmie anulowania, ustaw CancellationToken właściwość . Gdy ten CancellationToken obiekt jest ustawiony na stan anulowany, wszystkie bloki przepływu danych, które monitorują wykonanie bieżącego elementu tokenu, ale nie rozpoczynają przetwarzania kolejnych elementów. Te bloki przepływu danych usuwają również wszystkie buforowane komunikaty, zwalniają połączenia z dowolnymi blokami źródłowymi i docelowymi oraz przechodzą do stanu anulowanego. Przechodząc do stanu anulowanego, Completion właściwość ma właściwość ustawioną Status na Canceled, chyba że wystąpił wyjątek podczas przetwarzania. W takim przypadku Status jest ustawiona wartość Faulted.

Aby zapoznać się z przykładem użycia anulowania w aplikacji Windows Forms, zobacz Instrukcje: anulowanie bloku przepływu danych. Aby uzyskać więcej informacji na temat anulowania w TPL, zobacz Anulowanie zadania.

Określanie chciwych i niechłannych zachowań

Kilka grupujących typów bloków przepływu danych może działać w trybie chciwości lub niechłanności . Domyślnie wstępnie zdefiniowane typy bloków przepływu danych działają w trybie chciwości.

W przypadku typów bloków sprzężenia, takich jak JoinBlock<T1,T2>, tryb chciwy oznacza, że blok natychmiast akceptuje dane, nawet jeśli odpowiednie dane, z którymi ma być przyłączone, nie są jeszcze dostępne. Tryb niechłanny oznacza, że blok odłoży wszystkie komunikaty przychodzące, dopóki jeden nie będzie dostępny dla każdego z jego celów, aby ukończyć sprzężenie. Jeśli którykolwiek z odroczonych komunikatów nie jest już dostępny, blok sprzężenia zwalnia wszystkie odroczone komunikaty i ponownie uruchamia proces. BatchBlock<T> W przypadku klasy, chciwość i niechłanne zachowanie jest podobne, z tą różnicą, że w trybie niechłannym obiekt odroczy wszystkie komunikaty przychodzące, BatchBlock<T> dopóki nie będą dostępne z różnych źródeł, aby ukończyć partię.

Aby określić tryb niechłanny dla bloku przepływu danych, ustaw wartość GreedyFalse. Przykład przedstawiający sposób używania trybu niechłannego w celu umożliwienia wydajniejszego udostępniania źródła danych wielu bloków sprzężenia, zobacz Instrukcje: używanie funkcji JoinBlock do odczytywania danych z wielu źródeł.

Niestandardowe bloki przepływu danych

Mimo że biblioteka przepływów danych TPL udostępnia wiele wstępnie zdefiniowanych typów bloków, można utworzyć dodatkowe typy bloków, które wykonują niestandardowe zachowanie. Zaimplementuj ISourceBlock<TOutput> interfejsy lub ITargetBlock<TInput> bezpośrednio lub użyj Encapsulate metody , aby utworzyć złożony blok, który hermetyzuje zachowanie istniejących typów bloków. Przykłady pokazujące sposób implementowania niestandardowych funkcji bloku przepływu danych można znaleźć w temacie Przewodnik: tworzenie niestandardowego typu bloku przepływu danych.

Nazwa opis
Instrukcje: zapisywanie komunikatów w bloku przepływu danych i odczytywanie ich z tego bloku Pokazuje, jak zapisywać komunikaty i odczytywać komunikaty z BufferBlock<T> obiektu.
Instrukcje: implementowanie wzorca przepływu danych producent — konsument Opisuje sposób użycia modelu przepływu danych w celu zaimplementowania wzorca producenta-konsumenta, w którym producent wysyła komunikaty do bloku przepływu danych, a użytkownik odczytuje komunikaty z tego bloku.
Instrukcje: wykonywanie akcji w przypadku odebrania danych przez blok przepływu danych Opisuje sposób dostarczania delegatów do typów bloków przepływu danych wykonywania, ActionBlock<TInput>, TransformBlock<TInput,TOutput>i TransformManyBlock<TInput,TOutput>.
Przewodnik: tworzenie potoku przepływu danych Opisuje sposób tworzenia potoku przepływu danych, który pobiera tekst z Internetu i wykonuje operacje na tym tekście.
Instrukcje: rozłączanie bloków przepływu danych Pokazuje, jak użyć LinkTo metody , aby odłączyć blok docelowy od jego źródła po źródle oferuje komunikat do obiektu docelowego.
Przewodnik: korzystanie z przepływu danych w aplikacji Windows Forms Pokazuje, jak utworzyć sieć bloków przepływu danych, które wykonują przetwarzanie obrazów w aplikacji Windows Forms.
Instrukcje: anulowanie bloku przepływu danych Demonstruje sposób używania anulowania w aplikacji Windows Forms.
Instrukcje: korzystanie z klasy JoinBlock do odczytywania danych z wielu źródeł W tym artykule wyjaśniono, jak używać JoinBlock<T1,T2> klasy do wykonywania operacji, gdy dane są dostępne z wielu źródeł oraz jak używać trybu niechłannego, aby umożliwić wydajniejsze udostępnianie źródła danych wielu bloków sprzężenia.
Instrukcje: określanie stopnia równoległości w bloku przepływu danych Opisuje sposób ustawiania MaxDegreeOfParallelism właściwości w celu włączenia bloku przepływu danych wykonywania do przetwarzania więcej niż jednego komunikatu jednocześnie.
Instrukcje: określanie harmonogramu zadań w bloku przepływu danych Przedstawia sposób kojarzenia określonego harmonogramu zadań podczas korzystania z przepływu danych w aplikacji.
Przewodnik: poprawa wydajności przy użyciu klas BatchBlock i BatchedJoinBlock Opisuje sposób użycia BatchBlock<T> klasy w celu zwiększenia wydajności operacji wstawiania bazy danych oraz sposobu użycia BatchedJoinBlock<T1,T2> klasy do przechwytywania wyników i wszelkich wyjątków występujących podczas odczytu programu z bazy danych.
Przewodnik: tworzenie niestandardowego typu bloku przepływu danych Demonstruje dwa sposoby tworzenia typu bloku przepływu danych, który implementuje zachowanie niestandardowe.
Biblioteka zadań równoległych (TPL) Wprowadzenie do języka TPL , biblioteki, która upraszcza programowanie równoległe i współbieżne w aplikacjach .NET Framework.