Aracılığıyla paylaş


İzlenecek yol: Veri Akışı İşlem Hattı Oluşturma

Kaynak bloklardan ileti almak için DataflowBlock.Receive, DataflowBlock.ReceiveAsync ve DataflowBlock.TryReceive yöntemlerini kullanabilirken, ileti bloklarını bir veri akışı işlem hattı oluşturacak şekilde de bağlayabilirsiniz. Veri akışı işlem hattı, her biri daha büyük bir hedefe katkıda bulunan belirli bir görevi gerçekleştiren bir dizi bileşen veya veri akışı bloğudur. Bir veri akışı işlem hattındaki her veri akışı bloğu, başka bir veri akışı bloğundan ileti aldığında iş gerçekleştirir. Buna bir benzetme, otomobil üretimi için bir montaj hattıdır. Her araç montaj hattından geçerken, bir istasyon çerçeveyi monte eder, bir sonraki motor yükler, vb. Bir montaj hattı birden çok aracın aynı anda monte edilmesine olanak sağladığından, aynı anda tam araçların montajından daha iyi aktarım hızı sağlar.

Bu belgede, bir web sitesinden Homeros'un İlyadamı kitabını indiren ve tek tek sözcükleri ilk sözcüğün karakterlerini ters çeviren sözcüklerle eşleştirmek için metinde arama yapılan bir veri akışı işlem hattı gösterilmektedir. Bu belgede veri akışı işlem hattının oluşumu aşağıdaki adımlardan oluşur:

  1. İşlem hattına katılan veri akışı bloklarını oluşturun.

  2. Her veri akışı bloğunu işlem hattındaki bir sonraki bloğa bağlayın. Her blok, işlem hattındaki önceki bloğun çıkışını girdi olarak alır.

  3. Her veri akışı bloğu için, bir sonraki bloğu önceki blok tamamlandıktan sonra tamamlanmış duruma ayarlayan bir devamlılık görevi oluşturun.

  4. Verileri işlem hattının başına gönderin.

  5. İşlem hattının başını tamamlandı olarak işaretleyin.

  6. İşlem hattının tüm çalışmaları tamamlanmasını bekleyin.

Önkoşullar

Bu kılavuza başlamadan önce Veri Akışı'nı okuyun.

Konsol Uygulaması Oluşturma

Visual Studio'da bir Visual C# veya Visual Basic Konsol Uygulaması projesi oluşturun. System.Threading.Tasks.Dataflow NuGet paketini yükleyin.

Uyarı

TPL veri akışı kitaplığı ( System.Threading.Tasks.Dataflow ad alanı), .NET 6 ve sonraki sürümlerde bulunur. .NET Framework ve .NET Standard projeleri için System.Threading.Tasks.Dataflow NuGet paketini yüklemeniz📦 gerekir.

Temel uygulamayı oluşturmak için projenize aşağıdaki kodu ekleyin.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Veri Akışı Bloklarını Oluşturma

İşlem hattıyla bağlantılı veri akışı bloklarını oluşturmak için Main yöntemine aşağıdaki kodu ekleyin. Aşağıdaki tablo, işlem hattının her üyesinin rolünü özetler.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine($"Downloading '{uri}'...");

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Üye Türü Description
downloadString TransformBlock<TInput,TOutput> Web'den kitap metnini indirir.
createWordList TransformBlock<TInput,TOutput> Kitap metnini bir sözcük dizisine ayırır.
filterWordList TransformBlock<TInput,TOutput> Kısa sözcükleri ve yinelenenleri sözcük dizisinden kaldırır.
findReversedWords TransformManyBlock<TInput,TOutput> Filtrelenmiş sözcük dizisi koleksiyonundaki tüm sözcükleri bulun, bu sözcüklerin tersi de kelime dizisinde yer alıyor.
printReversedWords ActionBlock<TInput> Konsolda sözcükleri ve karşılık gelen ters sözcükleri görüntüler.

Bu örnekteki veri akışı işlem hattındaki birden çok adımı tek adımda birleştirebilse de, örnekte daha büyük bir görevi gerçekleştirmek için birden çok bağımsız veri akışı görevi oluşturma kavramı gösterilmektedir. Örnek, işlem hattının her üyesinin giriş verileri üzerinde bir işlem gerçekleştirmesini ve sonuçları işlem hattındaki sonraki adıma göndermesini sağlamak için kullanır TransformBlock<TInput,TOutput> . her findReversedWords giriş için birden çok bağımsız çıkış ürettiğinden işlem hattının üyesi bir TransformManyBlock<TInput,TOutput> nesnedir. İşlem hattının kuyruğu, printReversedWordsgirişi üzerinde bir eylem gerçekleştirdiğinden ve bir sonuç üretmediğinden bir ActionBlock<TInput> nesnedir.

İşlem Hattını Oluşturma

Her bloğu işlem hattındaki bir sonraki bloğa bağlamak için aşağıdaki kodu ekleyin.

Kaynak veri akışı bloğunu LinkTo hedef veri akışı bloğuna bağlamak için yöntemini çağırdığınızda, kaynak veri akışı bloğu verileri kullanılabilir hale geldikçe hedef bloğuna yayılır. Ayrıca DataflowLinkOptions değerini 'true' olarak ayarlayıp PropagateCompletion ile birlikte sağlarsanız, işlem hattındaki bir bloğun başarılı veya başarısız tamamlanması, işlem hattındaki bir sonraki bloğun tamamlanmasına neden olur.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

İşlem Hattına Veri Gönderme

Homeros'un İlyada'sı kitabının URL'sini veri akışı işlem hattının başına göndermek için aşağıdaki kodu ekleyin.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

Bu örnek, DataflowBlock.Post kullanarak veriyi işlem hattının başına senkron olarak göndermektedir. Bir veri akışı düğümüne DataflowBlock.SendAsync zaman uyumsuz olarak veri göndermeniz gerektiğinde yöntemini kullanın.

Boru Hattı Aktivitesi Tamamlanıyor

İşlem hattının başını tamamlandı olarak işaretlemek için aşağıdaki kodu ekleyin. İşlem hattının başı, tüm arabelleğe alınan iletileri işledikten sonra tamamlandığını duyurur.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

Bu örnek, işlenecek veri akışı işlem hattı üzerinden bir URL gönderir. İşlem hattı üzerinden birden fazla giriş gönderirseniz, tüm veriyi gönderdikten sonra IDataflowBlock.Complete yöntemini çağırın. Uygulamanızın artık kullanılabilir durumda olmayan iyi tanımlanmış bir noktası yoksa veya uygulamanın işlem hattının bitmesini beklemesi gerekmediyse bu adımı atlayabilirsiniz.

İşlem Hattının Bitmesini Bekliyor

İşlem hattının bitmesini beklemek için aşağıdaki kodu ekleyin. İşlem hattının kuyruğu tamamlandığında işlem bütünüyle tamamlanmış olur.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Herhangi bir iş parçacığından veya birden çok iş parçacığından aynı anda veri akışının tamamlanmasını bekleyebilirsiniz.

Tam Örnek

Aşağıdaki örnekte, bu kılavuz için tam kod gösterilmektedir.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine($"Downloading '{uri}'...");

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Sonraki Adımlar

Bu örnek, veri akışı işlem hattı aracılığıyla işlemek için bir URL gönderir. İşlem hattı üzerinden birden fazla giriş değeri gönderirseniz, uygulamanıza parçaların bir otomobil fabrikasında nasıl hareket ettiği gibi bir çeşit paralellik ekleyebilirsiniz. İşlem hattının ilk üyesi ikinci üyeye sonucunu gönderdiğinde, ikinci üye ilk sonucu işlerken başka bir öğeyi paralel olarak işleyebilir.

Veri akışı işlem hatları kullanılarak elde edilen paralellik, genellikle daha az ve daha büyük görevlerden oluştuğundan , kaba taneli paralellik olarak bilinir. Veri akışı işlem hattında daha küçük ve kısa çalışan görevlerin daha ayrıntılı paralelliğini de kullanabilirsiniz. Bu örnekte işlem hattının üyesi, findReversedWords iş listesindeki birden çok öğeyi paralel olarak işlemek için PLINQ kullanır. Kaba taneli bir işlem hattında ince taneli paralellik kullanılması genel aktarım hızını iyileştirebilir.

Ayrıca bir veri akışı ağı oluşturmak için bir kaynak veri akışı bloğunu birden çok hedef bloğuna bağlayabilirsiniz. Yöntemin LinkTo aşırı yüklenmiş sürümü, hedef bloğun değerine göre her iletiyi kabul edip etmediğini tanımlayan bir Predicate<T> nesne alır. Çoğu veri akışı blok türü, kaynak olarak davrandığında, bloklardan biri ilgili iletiyi kabul edene kadar, bağlı oldukları sıraya göre tüm bağlı hedef bloklara ileti sunar. Bu filtreleme mekanizmasını kullanarak, belirli verileri bir yol ve diğer verileri başka bir yol üzerinden yönlendiren bağlı veri akışı blokları sistemleri oluşturabilirsiniz. Veri akışı ağı oluşturmak için filtreleme kullanan bir örnek için bkz . İzlenecek Yol: Windows Forms Uygulamasında Veri Akışını Kullanma.

Ayrıca bakınız