Uso del streaming en ASP.NET Core SignalR

Por Brennan Conroy

SignalR ASP.NET Core admite el streaming desde el cliente al servidor y desde el servidor al cliente. Esto es útil para escenarios en los que llegan fragmentos de datos a lo largo del tiempo. Cuando se transmite, cada fragmento se envía al cliente o servidor tan pronto como esté disponible, en lugar de esperar a que todos los datos estén disponibles.

Vea o descargue el código de ejemplo (cómo descargarlo)

Configuración de un centro para streaming

Un método de concentrador se convierte automáticamente en un método de centro de streaming cuando devuelve IAsyncEnumerable<T>, ChannelReader<T>Task<IAsyncEnumerable<T>>, o Task<ChannelReader<T>>.

Streaming del servidor al cliente

Los métodos del concentrador de streaming pueden devolver IAsyncEnumerable<T> además de ChannelReader<T>. La manera más sencilla de devolver IAsyncEnumerable<T> es convertir el método de concentrador en un método de iterador asincrónico, como se muestra en el ejemplo siguiente. Los métodos de iterador asincrónico del centro pueden aceptar un CancellationToken parámetro que se desencadena cuando el cliente cancela la suscripción de la secuencia. Los métodos de iterador asincrónicos evitan problemas comunes con canales, como no devolver lo ChannelReader suficientemente temprano o salir del método sin completar .ChannelWriter<T>

Nota:

El ejemplo siguiente requiere C# 8.0 o posterior.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

En el ejemplo siguiente se muestran los conceptos básicos de los datos de streaming al cliente mediante Canales. Cada vez que se escribe un objeto en ChannelWriter<T>, el objeto se envía inmediatamente al cliente. Al final, ChannelWriter se completa para indicar al cliente que se cierra la secuencia.

Nota

Escriba en ChannelWriter<T> en un subproceso en segundo plano y devuelva lo ChannelReader antes posible. Otras invocaciones del centro se bloquean hasta que se devuelve .ChannelReader

Ajuste de la lógica en una try ... catch instrucción . Complete en Channel un finally bloque. Si desea fluir un error, captúrelo dentro del catch bloque y escríbalo en el finally bloque.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

Los métodos del centro de streaming de servidor a cliente pueden aceptar un CancellationToken parámetro que se desencadena cuando el cliente cancela la suscripción de la secuencia. Use este token para detener la operación del servidor y liberar los recursos si el cliente se desconecta antes del final de la secuencia.

Streaming del cliente al servidor

Un método de concentrador se convierte automáticamente en un método de concentrador de streaming de cliente a servidor cuando acepta uno o varios objetos de tipo ChannelReader<T> o IAsyncEnumerable<T>. En el ejemplo siguiente se muestran los conceptos básicos de la lectura de datos de streaming enviados desde el cliente. Cada vez que el cliente escribe en, ChannelWriter<T>los datos se escriben en el ChannelReader servidor desde el que se lee el método concentrador.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

A continuación se muestra una IAsyncEnumerable<T> versión del método .

Nota

El ejemplo siguiente requiere C# 8.0 o posterior.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

Cliente .NET

Streaming del servidor al cliente

Los StreamAsync métodos y StreamAsChannelAsync en HubConnection se usan para invocar métodos de streaming de servidor a cliente. Pase el nombre y los argumentos del método concentrador definidos en el método concentrador a StreamAsync o StreamAsChannelAsync. El parámetro genérico en StreamAsync<T> y StreamAsChannelAsync<T> especifica el tipo de objetos devueltos por el método de streaming. Un objeto de tipo IAsyncEnumerable<T> o ChannelReader<T> se devuelve de la invocación de secuencia y representa la secuencia en el cliente.

Un StreamAsyncEjemplo que devuelve IAsyncEnumerable<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Un correspondienteStreamAsChannelAsync ejemplo que devuelveChannelReader<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

En el código anterior:

  • El StreamAsChannelAsync método en HubConnection se usa para invocar un método de streaming de servidor a cliente. Pase el nombre y los argumentos del método concentrador definidos en el método concentrador a StreamAsChannelAsync.
  • El parámetro genérico de StreamAsChannelAsync<T> especifica el tipo de objetos devueltos por el método de streaming.
  • ChannelReader<T> Se devuelve de la invocación de secuencia y representa la secuencia en el cliente.

Streaming del cliente al servidor

Hay dos maneras de invocar un método de centro de streaming de cliente a servidor desde el cliente .NET. Puede pasar un IAsyncEnumerable<T> o ChannelReader como argumento a SendAsync, InvokeAsynco StreamAsChannelAsync, dependiendo del método concentrador invocado.

Cada vez que se escriben datos en el IAsyncEnumerable objeto o ChannelWriter , el método concentrador del servidor recibe un nuevo elemento con los datos del cliente.

Si usa un IAsyncEnumerable objeto , la secuencia finaliza después de que se cierre el método que devuelve elementos de secuencia.

Nota

El ejemplo siguiente requiere C# 8.0 o posterior.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

O bien, si usa un ChannelWriter, complete el canal con channel.Writer.Complete():

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

Cliente de JavaScript

Streaming del servidor al cliente

Los clientes de JavaScript llaman a métodos de streaming de servidor a cliente en centros con connection.stream. El streammétodo acepta dos argumentos:

  • Nombre del método de concentrador. En el ejemplo siguiente, el nombre del método concentrador es Counter.
  • Argumentos definidos en el método del concentrador. En el ejemplo siguiente, los argumentos son un recuento para el número de elementos de secuencia que se van a recibir y el retraso entre los elementos de secuencia.

connection.stream devuelve un IStreamResultobjeto , que contiene un subscribe método . Pase a IStreamSubscribersubscribe y establezca las nextdevoluciones de llamada , errory complete para recibir notificaciones de la stream invocación.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Para finalizar la secuencia desde el cliente, llame al dispose método en el ISubscription que se devuelve del subscribe método . Al llamar a este método, la cancelación del CancellationToken parámetro del método Hub, si proporcionó una.

Streaming del cliente al servidor

Los clientes de JavaScript llaman a métodos de streaming de cliente a servidor en centros pasando como Subject argumento a send, invokeo stream, dependiendo del método concentrador invocado. Subject es una clase que tiene un aspecto similar a .Subject Por ejemplo, en RxJS, puede usar la clase Subject de esa biblioteca.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Al llamar a subject.next(item) con un elemento, se escribe el elemento en la secuencia y el método del concentrador recibe el elemento en el servidor.

Para finalizar la secuencia, llame a subject.complete().

Cliente de Java

Streaming del servidor al cliente

El SignalR cliente Java usa el stream método para invocar métodos de streaming. stream acepta tres o más argumentos:

  • Tipo esperado de los elementos de secuencia.
  • Nombre del método de concentrador.
  • Argumentos definidos en el método del concentrador.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

El stream método de HubConnection devuelve un observable del tipo de elemento de secuencia. El método del subscribe tipo Observable es donde onNextse definen los onError controladores y onCompleted .

Streaming del cliente al servidor

El SignalR cliente Java puede llamar a métodos de streaming de cliente a servidor en concentradores pasando un Observable como argumento a send, invokeo stream, dependiendo del método concentrador invocado.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Al llamar a stream.onNext(item) con un elemento, se escribe el elemento en la secuencia y el método del concentrador recibe el elemento en el servidor.

Para finalizar la secuencia, llame a stream.onComplete().

Recursos adicionales