Używanie przesyłania strumieniowego w ASP.NET Core SignalR
Autor: Brennan Conroy
ASP.NET Core SignalR obsługuje przesyłanie strumieniowe z klienta do serwera i z serwera na klienta. Jest to przydatne w scenariuszach, w których fragmenty danych docierają z upływem czasu. Podczas przesyłania strumieniowego każdy fragment jest wysyłany do klienta lub serwera, gdy tylko stanie się dostępny, zamiast czekać na udostępnienie wszystkich danych.
Wyświetl lub pobierz przykładowy kod (jak pobrać)
Konfigurowanie koncentratora na potrzeby przesyłania strumieniowego
Metoda koncentratora automatycznie staje się metodą centrum przesyłania strumieniowego, gdy zwraca wartość IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>>
lub Task<ChannelReader<T>>
.
Przesyłanie strumieniowe serwer-klient
Metody centrum przesyłania strumieniowego mogą zwracać IAsyncEnumerable<T>
się oprócz ChannelReader<T>
metody . Najprostszym sposobem powrotu IAsyncEnumerable<T>
jest utworzenie metody centrum jako metody iteratora asynchronicznego, jak pokazano w poniższym przykładzie. Metody iteratora asynchronicznego centrum mogą akceptować CancellationToken
parametr wyzwalany, gdy klient anuluje subskrypcję strumienia. Metody iteracyjne asynchroniczne unikają typowych problemów z kanałami, takich jak brak wystarczająco wczesnego ChannelReader
zwracania lub zamykania metody bez ukończenia ChannelWriter<T>metody .
Uwaga
Poniższy przykład wymaga języka C# 8.0 lub nowszego.
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
Poniższy przykład przedstawia podstawy danych przesyłanych strumieniowo do klienta przy użyciu kanałów. Za każdym razem, gdy obiekt jest zapisywany w ChannelWriter<T>obiekcie , jest natychmiast wysyłany do klienta. Na końcu element zostanie ukończony, aby poinformować klienta, ChannelWriter
że strumień jest zamknięty.
Uwaga
Zapisz w wątku ChannelWriter<T>
w tle i zwróć je ChannelReader
tak szybko, jak to możliwe. Inne wywołania koncentratora są blokowane do momentu zwrócenia.ChannelReader
Zawijanie logiki w instrukcjitry ... catch
. Ukończ Channel
blok w finally
bloku. Jeśli chcesz przepływać błąd, przechwyć go wewnątrz catch
bloku i zapisać go w finally
bloku.
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
Metody centrum przesyłania strumieniowego serwer-klient mogą akceptować CancellationToken
parametr wyzwalany, gdy klient anuluje subskrypcję strumienia. Użyj tego tokenu, aby zatrzymać operację serwera i zwolnić wszystkie zasoby, jeśli klient rozłączy się przed końcem strumienia.
Przesyłanie strumieniowe klient-serwer
Metoda koncentratora automatycznie staje się metodą centrum przesyłania strumieniowego klient-serwer, gdy akceptuje co najmniej jeden obiekt typu ChannelReader<T> lub IAsyncEnumerable<T>. Poniższy przykład przedstawia podstawy odczytywania danych przesyłanych strumieniowo z klienta. Za każdym razem, gdy klient zapisuje dane ChannelWriter<T>w obiekcie , są zapisywane na ChannelReader
serwerze, z którego jest odczytywana metoda centrum.
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
Poniżej IAsyncEnumerable<T> przedstawiono wersję metody .
Uwaga
Poniższy przykład wymaga języka C# 8.0 lub nowszego.
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
Klient .NET
Przesyłanie strumieniowe serwer-klient
Metody StreamAsync
i StreamAsChannelAsync
są HubConnection
używane do wywoływania metod przesyłania strumieniowego serwer-klient. Przekaż nazwę i argumenty metody centrum zdefiniowane w metodzie hub do metody StreamAsync
lub StreamAsChannelAsync
. Ogólny parametr on StreamAsync<T>
i StreamAsChannelAsync<T>
określa typ obiektów zwracanych przez metodę przesyłania strumieniowego. Obiekt typu IAsyncEnumerable<T>
lub ChannelReader<T>
jest zwracany z wywołania strumienia i reprezentuje strumień na kliencie.
Przykład StreamAsync
, który zwraca wartość IAsyncEnumerable<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
Odpowiedni StreamAsChannelAsync
przykład, który zwraca wartość ChannelReader<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
W poprzednim kodzie:
- Metoda
StreamAsChannelAsync
onHubConnection
służy do wywoływania metody przesyłania strumieniowego serwer-klient. Przekaż nazwę metody centrum i argumenty zdefiniowane w metodzie centrum doStreamAsChannelAsync
metody . - Ogólny parametr on
StreamAsChannelAsync<T>
określa typ obiektów zwracanych przez metodę przesyłania strumieniowego. - Obiekt
ChannelReader<T>
jest zwracany z wywołania strumienia i reprezentuje strumień na kliencie.
Przesyłanie strumieniowe klient-serwer
Istnieją dwa sposoby wywoływania metody centrum przesyłania strumieniowego klient-serwer z klienta platformy .NET. Możesz przekazać IAsyncEnumerable<T>
argument lub ChannelReader
jako argument do SendAsync
, InvokeAsync
lub , w StreamAsChannelAsync
zależności od wywoływanej metody centrum.
Za każdym razem, gdy dane są zapisywane w IAsyncEnumerable
obiekcie lub ChannelWriter
, metoda centrum na serwerze odbiera nowy element z danymi od klienta.
Jeśli używasz IAsyncEnumerable
obiektu, strumień kończy się po zakończeniu metody zwracania elementów strumienia.
Uwaga
Poniższy przykład wymaga języka C# 8.0 lub nowszego.
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
Lub jeśli używasz elementu ChannelWriter
, ukończ kanał za pomocą channel.Writer.Complete()
polecenia :
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
Klient środowiska JavaScript
Przesyłanie strumieniowe serwer-klient
Klienci języka JavaScript nazywają metody przesyłania strumieniowego serwer-klient w centrach za pomocą polecenia connection.stream
. Metoda stream
akceptuje dwa argumenty:
- Nazwa metody centrum. W poniższym przykładzie nazwa metody centrum to
Counter
. - Argumenty zdefiniowane w metodzie piasty. W poniższym przykładzie argumenty są liczbą elementów strumienia do odbierania i opóźnienia między elementami strumienia.
connection.stream
Zwraca element IStreamResult
, który zawiera metodę subscribe
. Przekaż element IStreamSubscriber
i subscribe
ustaw next
wywołania zwrotne , error
i complete
w celu odbierania powiadomień z stream
wywołania.
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
Aby zakończyć strumień z klienta, wywołaj metodę dispose
w ISubscription
metodzie zwróconej subscribe
z metody . Wywołanie tej metody powoduje anulowanie CancellationToken
parametru metody Hub, jeśli został podany.
Przesyłanie strumieniowe klient-serwer
Klienci języka JavaScript wywołują metody przesyłania strumieniowego klient-serwer w centrach, przekazując Subject
jako argument do send
metody , invoke
lub stream
, w zależności od wywoływanej metody centrum. Jest Subject
to klasa, która wygląda jak Subject
. Na przykład w programie RxJS można użyć klasy Subject z tej biblioteki.
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
Wywołanie subject.next(item)
za pomocą elementu zapisuje element w strumieniu, a metoda centrum odbiera element na serwerze.
Aby zakończyć strumień, wywołaj metodę subject.complete()
.
Klienta środowiska Java
Przesyłanie strumieniowe serwer-klient
Klient SignalR java używa stream
metody do wywoływania metod przesyłania strumieniowego. stream
akceptuje trzy lub więcej argumentów:
- Oczekiwany typ elementów strumienia.
- Nazwa metody centrum.
- Argumenty zdefiniowane w metodzie piasty.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
Metoda stream
on HubConnection
zwraca obserwowalny typ elementu strumienia. Metoda obserwowanego subscribe
typu to miejsce, w którym onNext
zdefiniowano programy obsługi i onCompleted
. onError
Przesyłanie strumieniowe klient-serwer
Klient SignalR Java może wywoływać metody przesyłania strumieniowego klient-serwer w centrach, przekazując element Obserwowalny jako argument do send
metody , invoke
lub stream
, w zależności od wywoływanej metody centrum.
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
Wywołanie stream.onNext(item)
za pomocą elementu zapisuje element w strumieniu, a metoda centrum odbiera element na serwerze.
Aby zakończyć strumień, wywołaj metodę stream.onComplete()
.