Usar transmissão no ASP.NET Core SignalR

Por Brennan Conroy

O ASP.NET Core SignalR é compatível com transmissão de cliente para servidor e de servidor para cliente. Isso é útil para cenários em que fragmentos de dados chegam ao longo do tempo. Ao transmitir, cada fragmento é enviado para o cliente ou servidor assim que ele fica disponível, em vez de aguardar que todos os dados fiquem disponíveis.

Exibir ou baixar código de exemplo (como baixar)

Configurar um hub para transmissão

Um método de hub se torna automaticamente um método de hub de transmissão quando retorna IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>>ou Task<ChannelReader<T>>.

Fluxo de dados de servidor para cliente

Os métodos de hub de transmissão podem retornar IAsyncEnumerable<T> além de ChannelReader<T>. A maneira mais simples de retornar IAsyncEnumerable<T> é tornando o método hub um método iterador assíncrono, como demonstra o exemplo a seguir. Os métodos de iterador assíncrono do hub podem aceitar um parâmetro CancellationToken que é disparado quando o cliente cancela a assinatura do fluxo. Os métodos iteradores assíncronos evitam problemas comuns com Canais, como não retornar o ChannelReader cedo o suficiente ou sair do método sem concluir o ChannelWriter<T>.

Observação

O exemplo a seguir requer o C# 8.0 ou posterior.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

O exemplo a seguir mostra as noções básicas de dados de transmissão para o cliente usando Canais. Sempre que um objeto é gravado no ChannelWriter<T>, o objeto é imediatamente enviado ao cliente. No final, o ChannelWriter é concluído para informar ao cliente que o fluxo está fechado.

Observação

Escreva no ChannelWriter<T> em um thread em segundo plano e retorne o ChannelReader o mais rápido possível. Outras invocações de hub são bloqueadas até que um ChannelReader seja retornado.

Encapsular lógica em uma try ... catch instrução . Conclua o Channel em um finally bloco. Se você quiser fluir um erro, capture-o dentro do bloco catch e escreva-o no bloco finally.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

Os métodos de iterador assíncrono do hub podem aceitar um parâmetro CancellationToken que é disparado quando o cliente cancela a assinatura do fluxo. Use esse token para interromper a operação do servidor e liberar todos os recursos se o cliente se desconectar antes do final do fluxo.

Fluxo de dados do cliente para o servidor

Um método de hub se torna automaticamente um método de hub de transmissão cliente para servidor quando aceita um ou mais objetos do tipo ChannelReader<T> ou IAsyncEnumerable<T>. O exemplo a seguir mostra os conceitos básicos da leitura de dados de transmissão enviados do cliente. Sempre que o cliente grava no ChannelWriter<T>, os dados são gravados no ChannelReader no servidor a partir do qual o método de hub está lendo.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

Uma versão IAsyncEnumerable<T> do método a seguir.

Observação

O exemplo a seguir requer o C# 8.0 ou posterior.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

Cliente .NET

Fluxo de dados de servidor para cliente

Os métodos StreamAsync e StreamAsChannelAsync em HubConnection são usados para invocar métodos de transmissão de servidor para cliente. Passe o nome do método de hub e todos os argumentos definidos no método de hub para StreamAsync ou StreamAsChannelAsync. O parâmetro genérico em StreamAsync<T> e StreamAsChannelAsync<T> especifica o tipo de objetos retornados pelo método de transmissão. Um objeto do tipo IAsyncEnumerable<T> ou ChannelReader<T> é retornado da invocação de fluxo e representa o fluxo no cliente.

Um exemplo StreamAsync que retorna IAsyncEnumerable<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Um exemplo correspondente StreamAsChannelAsync que retorna ChannelReader<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

No código anterior:

  • O método StreamAsChannelAsync em HubConnection é usado para invocar um método de transmissão de servidor para cliente. Passe o nome do método de hub e todos os argumentos definidos no método de hub para StreamAsChannelAsync.
  • O parâmetro genérico em StreamAsChannelAsync<T> specifica o tipo de objetos retornados pelo método de transmissão.
  • Um ChannelReader<T> é retornado da invocação de fluxo e representa o fluxo no cliente.

Fluxo de dados do cliente para o servidor

Há duas maneiras de invocar um método de hub de transmissão do cliente para servidor do cliente .NET. Você pode passar um IAsyncEnumerable<T> ou ChannelReader um como um argumento para SendAsync, InvokeAsyncou StreamAsChannelAsync, dependendo do método de hub invocado.

Sempre que os dados são gravados no objeto IAsyncEnumerable ou ChannelWriter , o método hub no servidor recebe um novo item com os dados do cliente.

Se estiver usando um objeto IAsyncEnumerable, o fluxo terminará depois que o método que retorna itens de fluxo for encerrado.

Observação

O exemplo a seguir requer o C# 8.0 ou posterior.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

Ou se você estiver usando um ChannelWriter, conclua o canal com channel.Writer.Complete():

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

Cliente JavaScript

Fluxo de dados de servidor para cliente

Os clientes JavaScript chamam métodos de transmissão de servidor para cliente em hubs com connection.stream. O método stream aceita dois argumentos:

  • O nome do método de hub. No exemplo a seguir, o nome do método de hub é Counter.
  • Argumentos definidos no método de hub. No exemplo a seguir, os argumentos são uma contagem para o número de itens de fluxo a serem recebidos e o atraso entre os itens de fluxo.

connection.stream retorna um IStreamResult, que contém um método subscribe. Passe um IStreamSubscriber para subscribe e defina os retornos de chamada next, error e complete para receber notificações da invocação stream.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Para encerrar o fluxo do cliente, chame o método dispose no ISubscription retornado do método subscribe. Chamar esse método causará o cancelamento do parâmetro CancellationToken do método de hub, se você tiver fornecido um.

Fluxo de dados do cliente para o servidor

Os clientes JavaScript chamam métodos de transmissão de cliente para servidor em hubs passando um Subject como um argumento para send, invokeou stream, dependendo do método de hub invocado. O Subject é uma classe que se parece com um Subject. Por exemplo, no RxJS, você pode usar a classe Subject dessa biblioteca.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Chamar subject.next(item) com um item grava o item no fluxo e o método hub recebe o item no servidor.

Para encerrar o fluxo, chame subject.complete().

Cliente Java

Fluxo de dados de servidor para cliente

O cliente Java SignalR usa o método stream para invocar métodos de transmissão. stream aceita três ou mais argumentos:

  • O tipo esperado dos itens de fluxo.
  • O nome do método de hub.
  • Argumentos definidos no método de hub.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

O método stream em HubConnection retorna um Observável do tipo de item de fluxo. O método subscribe do tipo Observável é onde os manipuladores onNext,onError e onCompleted são definidos.

Fluxo de dados do cliente para o servidor

O cliente Java do SignalR chama métodos de transmissão de cliente para servidor em hubs passando um Observável como um argumento para send, invoke ou stream, dependendo do método de hub invocado.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Chamar stream.onNext(item) com um item grava o item no fluxo e o método hub recebe o item no servidor.

Para encerrar a transmissão, chame stream.onComplete().

Recursos adicionais