Menggunakan streaming di ASP.NET Core SignalR
Oleh Brennan Conroy
ASP.NET Core SignalR mendukung streaming dari klien ke server dan dari server ke klien. Ini berguna untuk skenario di mana fragmen data tiba dari waktu ke waktu. Saat streaming, setiap fragmen dikirim ke klien atau server segera setelah tersedia, daripada menunggu semua data tersedia.
Melihat atau mengunduh kode sampel (cara mengunduh)
Menyiapkan hub untuk streaming
Metode hub secara otomatis menjadi metode hub streaming saat mengembalikan IAsyncEnumerable<T>, , ChannelReader<T>Task<IAsyncEnumerable<T>>
, atau Task<ChannelReader<T>>
.
Streaming server-ke-klien
Metode hub streaming dapat kembali IAsyncEnumerable<T>
selain ChannelReader<T>
. Cara paling sederhana untuk kembali IAsyncEnumerable<T>
adalah dengan menjadikan metode hub sebagai metode iterator asinkron seperti yang ditunjukkan sampel berikut. Metode iterator asinkron hub dapat menerima CancellationToken
parameter yang dipicu saat klien berhenti berlangganan dari aliran. Metode iterator asinkron menghindari masalah umum dengan Saluran, seperti tidak mengembalikan ChannelReader
metode yang cukup awal atau keluar dari metode tanpa menyelesaikan ChannelWriter<T>.
Catatan
Sampel berikut memerlukan C# 8.0 atau yang lebih baru.
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
Sampel berikut menunjukkan dasar-dasar data streaming ke klien menggunakan Saluran. Setiap kali objek ditulis ke ChannelWriter<T>, objek segera dikirim ke klien. Pada akhirnya, ChannelWriter
selesai untuk memberi tahu klien bahwa aliran ditutup.
Catatan
Tulis ke ChannelWriter<T>
pada utas latar belakang dan kembalikan ChannelReader
sesegera mungkin. Pemanggilan hub lainnya diblokir hingga ChannelReader
dikembalikan.
Bungkus logika dalam try ... catch
pernyataan. Selesaikan Channel
dalam blokfinally
. Jika Anda ingin mengalirkan kesalahan, ambil di dalam catch
blok dan tulis di finally
blok.
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
Metode hub streaming server-ke-klien dapat menerima CancellationToken
parameter yang dipicu saat klien berhenti berlangganan dari aliran. Gunakan token ini untuk menghentikan operasi server dan merilis sumber daya apa pun jika klien terputus sebelum akhir aliran.
Streaming klien-ke-server
Metode hub secara otomatis menjadi metode hub streaming klien-ke-server ketika menerima satu atau beberapa objek jenis ChannelReader<T> atau IAsyncEnumerable<T>. Sampel berikut menunjukkan dasar-dasar membaca data streaming yang dikirim dari klien. Setiap kali klien menulis ke ChannelWriter<T>, data ditulis ke dalam ChannelReader
server tempat metode hub dibaca.
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
Versi IAsyncEnumerable<T> metode berikut.
Catatan
Sampel berikut memerlukan C# 8.0 atau yang lebih baru.
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
Klien .NET
Streaming server-ke-klien
Metode StreamAsync
dan StreamAsChannelAsync
pada HubConnection
digunakan untuk memanggil metode streaming server-ke-klien. Teruskan nama metode hub dan argumen yang ditentukan dalam metode hub ke StreamAsync
atau StreamAsChannelAsync
. Parameter generik pada StreamAsync<T>
dan StreamAsChannelAsync<T>
menentukan jenis objek yang dikembalikan oleh metode streaming. Objek jenis IAsyncEnumerable<T>
atau ChannelReader<T>
dikembalikan dari pemanggilan aliran dan mewakili aliran pada klien.
StreamAsync
Contoh yang mengembalikan IAsyncEnumerable<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
Contoh terkait StreamAsChannelAsync
yang mengembalikan ChannelReader<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
Dalam kode sebelumnya:
- Metode
StreamAsChannelAsync
padaHubConnection
digunakan untuk memanggil metode streaming server-ke-klien. Teruskan nama metode hub dan argumen yang ditentukan dalam metode hub keStreamAsChannelAsync
. - Parameter generik pada
StreamAsChannelAsync<T>
menentukan jenis objek yang dikembalikan oleh metode streaming. ChannelReader<T>
dikembalikan dari pemanggilan aliran dan mewakili aliran pada klien.
Streaming klien-ke-server
Ada dua cara untuk memanggil metode hub streaming klien-ke-server dari klien .NET. Anda dapat meneruskan IAsyncEnumerable<T>
atau ChannelReader
sebagai argumen ke SendAsync
, , InvokeAsync
atau StreamAsChannelAsync
, tergantung pada metode hub yang dipanggil.
Setiap kali data ditulis ke IAsyncEnumerable
objek atau ChannelWriter
, metode hub di server menerima item baru dengan data dari klien.
Jika menggunakan IAsyncEnumerable
objek, aliran berakhir setelah metode mengembalikan item streaming keluar.
Catatan
Sampel berikut memerlukan C# 8.0 atau yang lebih baru.
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
Atau jika Anda menggunakan ChannelWriter
, Anda menyelesaikan saluran dengan channel.Writer.Complete()
:
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
Klien JavaScript
Streaming server-ke-klien
Klien JavaScript memanggil metode streaming server-ke-klien di hub dengan connection.stream
. Metode stream
menerima dua argumen:
- Nama metode hub. Dalam contoh berikut, nama metode hub adalah
Counter
. - Argumen yang ditentukan dalam metode hub. Dalam contoh berikut, argumen adalah hitungan jumlah item streaming yang akan diterima dan penundaan antara item streaming.
connection.stream
IStreamResult
mengembalikan , yang berisi subscribe
metode . IStreamSubscriber
Teruskan ke subscribe
dan atur next
panggilan balik , error
, dan complete
untuk menerima pemberitahuan dari stream
pemanggilan.
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
Untuk mengakhiri aliran dari klien, panggil dispose
metode pada ISubscription
yang dikembalikan dari subscribe
metode . Memanggil metode ini menyebabkan pembatalan CancellationToken
parameter metode Hub, jika Anda menyediakannya.
Streaming klien-ke-server
Klien JavaScript memanggil metode streaming klien-ke-server di hub dengan meneruskan Subject
sebagai argumen ke send
, , invoke
atau stream
, tergantung pada metode hub yang dipanggil. Subject
adalah kelas yang terlihat seperti Subject
. Misalnya di RxJS, Anda dapat menggunakan kelas Subjek dari pustaka tersebut.
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
Memanggil subject.next(item)
dengan item menulis item ke aliran, dan metode hub menerima item di server.
Untuk mengakhiri aliran, panggil subject.complete()
.
Klien Java
Streaming server-ke-klien
Klien SignalR Java menggunakan stream
metode untuk memanggil metode streaming. stream
menerima tiga argumen atau lebih:
- Jenis item aliran yang diharapkan.
- Nama metode hub.
- Argumen yang ditentukan dalam metode hub.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
Metode stream
pada HubConnection
mengembalikan Yang Dapat Diamati dari jenis item aliran. Metode Jenis subscribe
yang dapat diamati adalah tempat onNext
, onError
dan onCompleted
handler didefinisikan.
Streaming klien-ke-server
Klien SignalR Java dapat memanggil metode streaming klien-ke-server di hub dengan meneruskan Observable sebagai argumen ke send
, , invoke
atau stream
, tergantung pada metode hub yang dipanggil.
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
Memanggil stream.onNext(item)
dengan item menulis item ke aliran, dan metode hub menerima item di server.
Untuk mengakhiri aliran, panggil stream.onComplete()
.
Sumber Daya Tambahan:
ASP.NET Core
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk