Использование потоковой передачи в ASP.NET Core SignalR

Бреннан Конрой

ASP.NET Core SignalR поддерживает потоковую передачу с клиента на сервер и с сервера на клиент. Это полезно для сценариев, когда фрагменты данных поступают со временем. При потоковой передаче каждый фрагмент отправляется клиенту или серверу, как только он становится доступным, а не ожидает, чтобы все данные стали доступными.

Просмотреть или скачать образец кода (описание загрузки)

Настройка концентратора для потоковой передачи

Метод концентратора автоматически становится методом концентратора потоковой передачи при возврате IAsyncEnumerable<T>, ChannelReader<T>Task<IAsyncEnumerable<T>>илиTask<ChannelReader<T>>.

Потоковая передача между клиентами

Методы концентратора потоковой передачи могут возвращаться IAsyncEnumerable<T> в дополнение к ChannelReader<T>. Самый простой способ возврата IAsyncEnumerable<T> — сделать метод концентратора асинхронным итератором, как показано в следующем примере. Методы асинхронного итератора концентратора могут принимать CancellationToken параметр, который активируется при отмене подписки клиента из потока. Методы асинхронного итератора избежать распространенных проблем с каналами, например, не возвращая достаточно раннее ChannelReader или выход из метода, не завершая ChannelWriter<T>выполнение.

Примечание.

Для следующего примера требуется C# 8.0 или более поздней версии.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

В следующем примере показаны основы потоковой передачи данных клиенту с помощью каналов. Каждый раз, когда объект записывается ChannelWriter<T>в клиент, объект немедленно отправляется клиенту. В конце завершается, чтобы сообщить клиенту, ChannelWriter что поток закрыт.

Примечание.

Напишите в ChannelWriter<T> фоновый поток и верните его как можно скорее ChannelReader . Другие вызовы концентратора блокируются до ChannelReader возврата.

Оболочка логики в инструкцииtry ... catch. Завершите Channelfinally выполнение блока. Если вы хотите выполнить поток ошибки, запишите его внутри catch блока и напишите его в блоке finally .

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

Методы потоковой передачи между клиентами могут принимать CancellationToken параметр, который активируется при отмене подписки клиента из потока. Используйте этот маркер, чтобы остановить операцию сервера и освободить все ресурсы, если клиент отключается до конца потока.

Потоковая передача между клиентами

Метод концентратора автоматически становится методом потоковой передачи клиента на сервер, когда он принимает один или несколько объектов типа ChannelReader<T> или IAsyncEnumerable<T>. В следующем примере показаны основы чтения потоковых данных, отправляемых от клиента. Всякий раз, когда клиент ChannelWriter<T>записывает данные в ChannelReader сервер, с которого считывается метод концентратора.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

Ниже IAsyncEnumerable<T> приведена версия метода.

Примечание.

Для следующего примера требуется C# 8.0 или более поздней версии.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

Клиент .NET

Потоковая передача между клиентами

Методы StreamAsync и StreamAsChannelAsync методы HubConnection используются для вызова методов потоковой передачи между серверами и клиентами. Передайте имя и аргументы метода концентратора, определенные в методе StreamAsync концентратора или StreamAsChannelAsync. Универсальный параметр и StreamAsync<T>StreamAsChannelAsync<T> указывает тип объектов, возвращаемых методом потоковой передачи. Объект типа IAsyncEnumerable<T> или ChannelReader<T> возвращается из вызова потока и представляет поток на клиенте.

Пример, который StreamAsync возвращает IAsyncEnumerable<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Соответствующий StreamAsChannelAsync пример, возвращающий ChannelReader<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

В предыдущем коде:

  • HubConnection Метод используется StreamAsChannelAsync для вызова метода потоковой передачи между клиентами. Передайте имя и аргументы метода концентратора, определенные в методе StreamAsChannelAsyncконцентратора.
  • Универсальный параметр указывает StreamAsChannelAsync<T> тип объектов, возвращаемых методом потоковой передачи.
  • Возвращается ChannelReader<T> из вызова потока и представляет поток на клиенте.

Потоковая передача между клиентами

Существует два способа вызова метода потоковой передачи клиента на сервер из клиента .NET. Можно передать в качестве IAsyncEnumerable<T> аргумента SendAsyncInvokeAsyncStreamAsChannelAsyncили ChannelReader в зависимости от вызываемого метода концентратора.

Всякий раз, когда данные записываются в IAsyncEnumerable объект или ChannelWriter объект, метод концентратора на сервере получает новый элемент с данными от клиента.

Если используется IAsyncEnumerable объект, поток заканчивается после выхода метода, возвращающего элементы потока.

Примечание.

Для следующего примера требуется C# 8.0 или более поздней версии.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

Или, если вы используете канал, выполните следующие действияChannelWriter:channel.Writer.Complete()

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

Клиент на JavaScript

Потоковая передача между клиентами

Клиенты JavaScript вызывают методы потоковой передачи между клиентами на концентраторах connection.stream. Метод stream принимает два аргумента:

  • Имя метода концентратора. В следующем примере используется Counterимя метода концентратора.
  • Аргументы, определенные в методе концентратора. В следующем примере аргументы являются числом элементов потока, которые будут получать, и задержкой между элементами потока.

connection.stream возвращает значение IStreamResult, содержащее subscribe метод. Передайте и задайте completenexterrorобратные IStreamSubscribersubscribe вызовы, чтобы получать уведомления от stream вызова.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Чтобы завершить поток из клиента, вызовите dispose метод ISubscription , возвращаемый из subscribe метода. Вызов этого метода приводит к отмене CancellationToken параметра метода Hub, если он указан.

Потоковая передача между клиентами

Клиенты JavaScript вызывают методы потоковой передачи между клиентами на концентраторах, передавая в Subject качестве аргумента invokesendили streamв зависимости от вызываемого метода концентратора. Это Subject класс, который выглядит как класс Subject. Например, в RxJS можно использовать класс Subject из этой библиотеки.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Вызов subject.next(item) элемента записывает элемент в поток, а метод концентратора получает элемент на сервере.

Чтобы завершить поток, вызовите subject.complete().

Клиент на Java

Потоковая передача между клиентами

SignalR Клиент Java использует stream метод для вызова методов потоковой передачи. stream принимает три или более аргументов:

  • Ожидаемый тип элементов потока.
  • Имя метода концентратора.
  • Аргументы, определенные в методе концентратора.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

Метод HubConnection возвращает stream наблюдаемый тип элемента потока. Метод наблюдаемого типа subscribe — где onNextonErroronCompleted определяются обработчики.

Потоковая передача между клиентами

SignalR Клиент Java может вызывать методы потоковой передачи между клиентами на концентраторах, передав в качестве аргумента в качестве аргумента invokesendили streamв зависимости от вызываемого метода концентратора.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Вызов stream.onNext(item) элемента записывает элемент в поток, а метод концентратора получает элемент на сервере.

Чтобы завершить поток, вызовите stream.onComplete().

Дополнительные ресурсы