Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Хотя для получения сообщений из исходных блоков можно использовать методы DataflowBlock.Receive, DataflowBlock.ReceiveAsyncи DataflowBlock.TryReceive, можно также подключить блоки сообщений для формирования конвейера потоков данных. Конвейер потока данных представляет собой ряд компонентов или блоки потока данных, каждая из которых выполняет определенную задачу, которая способствует более крупной цели. Каждый блок потока данных в конвейере потоков данных выполняет работу при получении сообщения из другого блока потока данных. Это можно сравнить со сборочной линией производства автомобилей. По мере того как каждый автомобиль проходит через линию сборки, одна станция собирает раму, другая устанавливает двигатель, и так далее. Так как линия сборки позволяет одновременно собирать несколько транспортных средств, это обеспечивает лучшую пропускную способность, чем сборка полных транспортных средств одновременно.
В этом документе представлен конвейер потока данных, который скачивает книгу Илиада Гомера с веб-сайта и выполняет поиск в тексте для сопоставления отдельных слов с словами, которые переставляют символы первого слова в обратном порядке. Формирование конвейера потока данных в этом документе состоит из следующих шагов:
Создайте блоки потока данных, участвующие в конвейере.
Подключите каждый блок потока данных к следующему блоку в конвейере. Каждый блок получает на вход выходные данные предыдущего блока в конвейере.
Для каждого блока потока данных создайте задачу продолжения, которая переводит следующий блок в завершенное состояние, после завершения предыдущего блока.
Отправка данных на начало конвейера.
Пометьте головную часть конвейера как завершённую.
Дождитесь завершения всей работы конвейера.
Предпосылки
Прочитайте Dataflow, прежде чем начинать работу с этим пошаговым руководством.
Создание консольного приложения
В Visual Studio создайте проект консольного приложения Visual C# или Visual Basic. Установите пакет NuGet System.Threading.Tasks.Dataflow.
Примечание.
Библиотека потоков данных TPL (пространство имен System.Threading.Tasks.Dataflow) не распространяется с помощью .NET. Чтобы установить пространство имен System.Threading.Tasks.Dataflow в Visual Studio, откройте проект, выберите Управление пакетами NuGet в меню Project и найдите пакет System.Threading.Tasks.Dataflow
в Интернете. Кроме того, чтобы установить его с помощью cli .NET Core, запустите dotnet add package System.Threading.Tasks.Dataflow
.
Добавьте в проект следующий код, чтобы создать базовое приложение.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
static void Main()
{
}
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
End Sub
End Module
Создание блоков потока данных
Добавьте следующий код в метод Main
, чтобы создать блоки потока данных, участвующие в конвейере. В следующей таблице приводится сводка роли каждого члена конвейера.
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
Член | Тип | Описание |
---|---|---|
downloadString |
TransformBlock<TInput,TOutput> | Загружает текст книги из Интернета. |
createWordList |
TransformBlock<TInput,TOutput> | Разделяет текст книги на массив слов. |
filterWordList |
TransformBlock<TInput,TOutput> | Удаляет короткие слова и дубликаты из массива слов. |
findReversedWords |
TransformManyBlock<TInput,TOutput> | Находит все слова в отфильтрованной коллекции массивов слов, чьи перевёрнутые формы также встречаются в массиве слов. |
printReversedWords |
ActionBlock<TInput> | Отображает слова и соответствующие обратные слова в консоли. |
Хотя вы можете объединить несколько шагов в конвейере потока данных в этом примере на один шаг, в примере показано, как создать несколько независимых задач потока данных для выполнения более крупной задачи. В примере используется TransformBlock<TInput,TOutput>, чтобы позволить каждому члену конвейера выполнить операцию с входными данными и отправить результаты на следующий этап в конвейере. Элемент findReversedWords
конвейера является объектом TransformManyBlock<TInput,TOutput>, так как он создает несколько независимых выходных данных для каждого входного элемента. Хвост конвейера, printReversedWords
, является объектом ActionBlock<TInput>, так как он выполняет действие по входным данным и не создает результат.
Формирование конвейера
Добавьте следующий код, чтобы подключить каждый блок к следующему блоку в конвейере.
При вызове метода LinkTo для подключения блока потока данных источника к целевому блоку потока данных блок исходного потока данных распространяет данные в целевой блок по мере того, как данные становятся доступными. Если вы также предоставляете DataflowLinkOptions с PropagateCompletion задано значение true, успешное или неудачное завершение одного блока в конвейере приведет к завершению следующего блока в конвейере.
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
Передача данных в конвейер обработки данных
Добавьте следующий код, чтобы вставить URL-адрес книги Илиада Гомера в начало конвейера потока данных.
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
В этом примере используется DataflowBlock.Post для синхронной отправки данных в начало конвейера. Используйте метод DataflowBlock.SendAsync, если необходимо асинхронно отправлять данные в узел потока данных.
Завершение действия конвейера
Добавьте следующий код, чтобы пометить голову конвейера как завершенный. Начальник конвейера объявляет о его завершении работы после обработки всех буферных сообщений.
// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()
Этот пример отправляет один URL-адрес через конвейер потока данных для обработки. Если вы отправляете несколько входных данных через конвейер, вызовите метод IDataflowBlock.Complete после отправки всех входных данных. Этот шаг можно пропустить, если приложение не имеет четко определенной точки, в которой данные больше не доступны, или приложению не нужно ожидать завершения конвейера.
Ожидание завершения конвейера
Добавьте следующий код, чтобы дождаться завершения конвейера. Общая операция завершается после завершения хвоста конвейера.
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
Вы можете ждать завершения потока данных из любого потока или из нескольких потоков одновременно.
Полный пример
В следующем примере показан полный код для этого пошагового руководства.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
static void Main()
{
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine($"Downloading '{uri}'...");
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
}
}
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
' Mark the head of the pipeline as complete.
downloadString.Complete()
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
Дальнейшие шаги
В этом примере отправляется один URL-адрес для обработки через конвейер потока данных. При отправке нескольких входных значений через конвейер можно ввести в приложение форму параллелизма, которая напоминает, как части могут перемещаться через фабрику автомобилей. Когда первый член конвейера отправляет результат второму элементу, он может обрабатывать другой элемент параллельно, так как второй член обрабатывает первый результат.
Параллелизм, который достигается с помощью конвейеров потоков данных, называется грубо-зернистым параллелизмом, так как обычно он состоит из меньшего количества, более крупных задач. Кроме того, можно использовать более детальный параллелизм небольших, короткоживущих задач в конвейере обработки данных. В этом примере элемент findReversedWords
конвейера использует PLINQ для параллельной обработки нескольких элементов в рабочем списке. Использование тонкого параллелизма в крупнозернистом конвейере может повысить общую пропускную способность.
Вы также можете подключить блок потока данных источника к нескольким целевым блокам, чтобы создать сеть обработки данных. Перегруженная версия метода LinkTo принимает объект Predicate<T>, определяющий, принимает ли целевой блок каждое сообщение на основе его значения. Большинство типов блоков потока данных, которые действуют в качестве источников, предлагают сообщения всем подключенным целевым блокам в том порядке, в котором они были подключены, пока один из блоков не принимает это сообщение. С помощью этого механизма фильтрации можно создавать системы подключенных блоков потока данных, которые направляют определенные данные через один путь и другие данные через другой путь. Пример, использующий фильтрацию для создания сети потока данных, см. в пошаговом руководстве : Использование потока данных в приложении Windows Forms.