Поделиться через


Мост с существующими асинхронными источниками

Помимо событий .NET, в платформа .NET Framework существуют и другие асинхронные источники данных. Одним из них является шаблон асинхронного метода. В этом шаблоне проектирования предоставляются два метода. Один метод (обычно называется BeginX) используется для запуска вычисления и возвращает дескриптор IAsyncResult, который передается во второй метод (обычно с именем EndX), который затем получает результат вычисления. Завершение обычно сигнализируется путем реализации делегата AsyncCallback или опроса IAsyncResult.IsCompleted. Код, соответствующий этому шаблону, часто трудно читать и поддерживать. В этом разделе мы покажем, как использовать фабричные методы Rx для преобразования таких асинхронных источников данных в наблюдаемые последовательности.

Преобразование асинхронных шаблонов в наблюдаемые последовательности

Многие асинхронные методы в .NET записываются с сигнатурами, такими как BeginX и EndX, где X — это имя метода, выполняемого асинхронно. BeginX принимает аргументы для выполнения метода, AsyncCallback, который является действием, которое принимает IAsyncResult и ничего не возвращает, и, наконец, состояние объекта. EndX принимает IAsyncResult, который передается из AsyncCallback, чтобы получить значение асинхронного вызова.

Оператор FromAsyncPattern типа Observable создает оболочку для методов Begin и End (которые передаются в качестве параметров оператору) и возвращает функцию, которая принимает те же параметры, что и Begin, и возвращает наблюдаемый объект. Этот наблюдаемый объект представляет последовательность, которая публикует одно значение, которое является асинхронным результатом только что указанного вызова.

В следующем примере мы преобразуем BeginRead и EndRead для объекта Stream , использующего шаблон IAsyncResult, в функцию, возвращающую наблюдаемую последовательность. Для универсальных параметров оператора FromAsyncPattern мы указываем типы аргументов BeginRead до обратного вызова. Так как метод EndRead возвращает значение, мы добавляем этот тип в качестве окончательного универсального параметра для FromAsyncPattern. Если наведите указатель var мыши на read, вы заметите, что возвращаемое значение FromAsyncPattern является делегатом функции со следующей сигнатурой: Func<byte[], int32,int32, IObservable<int32>>, что означает, что эта функция принимает 3 параметра (те же параметры для BeginRead) и возвращает IObservable<Int32>. Этот объект IObservable содержит одно значение, которое представляет собой целое число, возвращаемое EndRead, и количество байтов, считанных из потока, в диапазоне от нуля (0) до запрошенного числа байтов. Так как теперь мы получаем IObservable вместо IAsyncResult, мы можем использовать все операторы LINQ, доступные для Observables, и подписаться на него, проанализировать или составить его.

Stream inputStream = Console.OpenStandardInput();
var read = Observable.FromAsyncPattern<byte[], int, int, int>(inputStream.BeginRead, inputStream.EndRead);
byte[] someBytes = new byte[10];
IObservable<int> source = read(someBytes, 0, 10);
IDisposable subscription = source.Subscribe(
                            x => Console.WriteLine("OnNext: {0}", x),
                            ex => Console.WriteLine("OnError: {0}", ex.Message),
                            () => Console.WriteLine("OnCompleted"));
Console.ReadKey();