Menggunakan streaming di ASP.NET Core SignalR

Oleh Brennan Conroy

ASP.NET Core SignalR mendukung streaming dari klien ke server dan dari server ke klien. Ini berguna untuk skenario di mana fragmen data tiba dari waktu ke waktu. Saat streaming, setiap fragmen dikirim ke klien atau server segera setelah tersedia, daripada menunggu semua data tersedia.

Melihat atau mengunduh kode sampel (cara mengunduh)

Menyiapkan hub untuk streaming

Metode hub secara otomatis menjadi metode hub streaming saat mengembalikan IAsyncEnumerable<T>, , ChannelReader<T>Task<IAsyncEnumerable<T>>, atau Task<ChannelReader<T>>.

Streaming server-ke-klien

Metode hub streaming dapat kembali IAsyncEnumerable<T> selain ChannelReader<T>. Cara paling sederhana untuk kembali IAsyncEnumerable<T> adalah dengan menjadikan metode hub sebagai metode iterator asinkron seperti yang ditunjukkan sampel berikut. Metode iterator asinkron hub dapat menerima CancellationToken parameter yang dipicu saat klien berhenti berlangganan dari aliran. Metode iterator asinkron menghindari masalah umum dengan Saluran, seperti tidak mengembalikan ChannelReader metode yang cukup awal atau keluar dari metode tanpa menyelesaikan ChannelWriter<T>.

Catatan

Sampel berikut memerlukan C# 8.0 atau yang lebih baru.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

Sampel berikut menunjukkan dasar-dasar data streaming ke klien menggunakan Saluran. Setiap kali objek ditulis ke ChannelWriter<T>, objek segera dikirim ke klien. Pada akhirnya, ChannelWriter selesai untuk memberi tahu klien bahwa aliran ditutup.

Catatan

Tulis ke ChannelWriter<T> pada utas latar belakang dan kembalikan ChannelReader sesegera mungkin. Pemanggilan hub lainnya diblokir hingga ChannelReader dikembalikan.

Bungkus logika dalam try ... catch pernyataan. Selesaikan Channel dalam blokfinally. Jika Anda ingin mengalirkan kesalahan, ambil di dalam catch blok dan tulis di finally blok.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

Metode hub streaming server-ke-klien dapat menerima CancellationToken parameter yang dipicu saat klien berhenti berlangganan dari aliran. Gunakan token ini untuk menghentikan operasi server dan merilis sumber daya apa pun jika klien terputus sebelum akhir aliran.

Streaming klien-ke-server

Metode hub secara otomatis menjadi metode hub streaming klien-ke-server ketika menerima satu atau beberapa objek jenis ChannelReader<T> atau IAsyncEnumerable<T>. Sampel berikut menunjukkan dasar-dasar membaca data streaming yang dikirim dari klien. Setiap kali klien menulis ke ChannelWriter<T>, data ditulis ke dalam ChannelReader server tempat metode hub dibaca.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

Versi IAsyncEnumerable<T> metode berikut.

Catatan

Sampel berikut memerlukan C# 8.0 atau yang lebih baru.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

Klien .NET

Streaming server-ke-klien

Metode StreamAsync dan StreamAsChannelAsync pada HubConnection digunakan untuk memanggil metode streaming server-ke-klien. Teruskan nama metode hub dan argumen yang ditentukan dalam metode hub ke StreamAsync atau StreamAsChannelAsync. Parameter generik pada StreamAsync<T> dan StreamAsChannelAsync<T> menentukan jenis objek yang dikembalikan oleh metode streaming. Objek jenis IAsyncEnumerable<T> atau ChannelReader<T> dikembalikan dari pemanggilan aliran dan mewakili aliran pada klien.

StreamAsync Contoh yang mengembalikan IAsyncEnumerable<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Contoh terkait StreamAsChannelAsync yang mengembalikan ChannelReader<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

Dalam kode sebelumnya:

  • Metode StreamAsChannelAsync pada HubConnection digunakan untuk memanggil metode streaming server-ke-klien. Teruskan nama metode hub dan argumen yang ditentukan dalam metode hub ke StreamAsChannelAsync.
  • Parameter generik pada StreamAsChannelAsync<T> menentukan jenis objek yang dikembalikan oleh metode streaming.
  • ChannelReader<T> dikembalikan dari pemanggilan aliran dan mewakili aliran pada klien.

Streaming klien-ke-server

Ada dua cara untuk memanggil metode hub streaming klien-ke-server dari klien .NET. Anda dapat meneruskan IAsyncEnumerable<T> atau ChannelReader sebagai argumen ke SendAsync, , InvokeAsyncatau StreamAsChannelAsync, tergantung pada metode hub yang dipanggil.

Setiap kali data ditulis ke IAsyncEnumerable objek atau ChannelWriter , metode hub di server menerima item baru dengan data dari klien.

Jika menggunakan IAsyncEnumerable objek, aliran berakhir setelah metode mengembalikan item streaming keluar.

Catatan

Sampel berikut memerlukan C# 8.0 atau yang lebih baru.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

Atau jika Anda menggunakan ChannelWriter, Anda menyelesaikan saluran dengan channel.Writer.Complete():

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

Klien JavaScript

Streaming server-ke-klien

Klien JavaScript memanggil metode streaming server-ke-klien di hub dengan connection.stream. Metode stream menerima dua argumen:

  • Nama metode hub. Dalam contoh berikut, nama metode hub adalah Counter.
  • Argumen yang ditentukan dalam metode hub. Dalam contoh berikut, argumen adalah hitungan jumlah item streaming yang akan diterima dan penundaan antara item streaming.

connection.streamIStreamResultmengembalikan , yang berisi subscribe metode . IStreamSubscriber Teruskan ke subscribe dan atur nextpanggilan balik , error, dan complete untuk menerima pemberitahuan dari stream pemanggilan.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Untuk mengakhiri aliran dari klien, panggil dispose metode pada ISubscription yang dikembalikan dari subscribe metode . Memanggil metode ini menyebabkan pembatalan CancellationToken parameter metode Hub, jika Anda menyediakannya.

Streaming klien-ke-server

Klien JavaScript memanggil metode streaming klien-ke-server di hub dengan meneruskan Subject sebagai argumen ke send, , invokeatau stream, tergantung pada metode hub yang dipanggil. Subject adalah kelas yang terlihat seperti Subject. Misalnya di RxJS, Anda dapat menggunakan kelas Subjek dari pustaka tersebut.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Memanggil subject.next(item) dengan item menulis item ke aliran, dan metode hub menerima item di server.

Untuk mengakhiri aliran, panggil subject.complete().

Klien Java

Streaming server-ke-klien

Klien SignalR Java menggunakan stream metode untuk memanggil metode streaming. stream menerima tiga argumen atau lebih:

  • Jenis item aliran yang diharapkan.
  • Nama metode hub.
  • Argumen yang ditentukan dalam metode hub.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

Metode stream pada HubConnection mengembalikan Yang Dapat Diamati dari jenis item aliran. Metode Jenis subscribe yang dapat diamati adalah tempat onNext, onError dan onCompleted handler didefinisikan.

Streaming klien-ke-server

Klien SignalR Java dapat memanggil metode streaming klien-ke-server di hub dengan meneruskan Observable sebagai argumen ke send, , invokeatau stream, tergantung pada metode hub yang dipanggil.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Memanggil stream.onNext(item) dengan item menulis item ke aliran, dan metode hub menerima item di server.

Untuk mengakhiri aliran, panggil stream.onComplete().

Sumber Daya Tambahan: