在 ASP.NET Core SignalR 中使用流式传输
由 Brennan Conroy 提供
ASP.NET Core SignalR 支持从客户端到服务器以及从服务器到客户端的流式传输。 这适用于数据片段在一段时间内到达的情况。 流式传输时,每个片段在可用时立即发送到客户端或服务器,而不是等待所有数据变为可用。
设置流式传输中心
中心方法在返回 IAsyncEnumerable<T>、ChannelReader<T>、Task<IAsyncEnumerable<T>>
或 Task<ChannelReader<T>>
时,自动成为流式传输中心方法。
服务器到客户端流式传输
除了 ChannelReader<T>
之外,流式传输中心方法还可以返回 IAsyncEnumerable<T>
。 最简单的返回 IAsyncEnumerable<T>
方式是将中心方法作为异步迭代器方法,如以下示例所示。 中心异步迭代器方法可接受在客户端取消订阅流时触发的 CancellationToken
参数。 异步迭代器方法可避免通道的常见问题,例如不提前返回 ChannelReader
或在未完成 ChannelWriter<T> 时退出方法。
注意
以下示例需要 C# 8.0 或更高版本。
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
以下示例显示了使用通道将数据流式传输到客户端的基础知识。 每当对象写入 ChannelWriter<T> 时,对象会被立即发送到客户端。 最后,完成 ChannelWriter
以告知客户端流已关闭。
注意
写入后台线程上的 ChannelWriter<T>
,并尽快返回 ChannelReader
。 在返回 ChannelReader
之前,其他中心调用会被阻止。
在 try ... catch
语句中包装逻辑。 完成 finally
块 中的 Channel
。 如果要流式处理错误,请在 catch
中捕获该错误,并写入 finally
块中。
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
服务器到客户端流式传输中心方法可以接受在客户端取消订阅流式传输时触发的 CancellationToken
参数。 如果客户端在流式传输结束之前断开连接,则使用此令牌停止服务器操作并释放任何资源。
客户端到服务器流式传输
当中心方法接受 ChannelReader<T> 或 IAsyncEnumerable<T> 类型的一个或多个对象时,它会自动成为客户端到服务器流式传输中心方法。 以下示例说明了读取从客户端发送的流式传输数据的基础知识。 每当客户端写入 ChannelWriter<T> 时,数据都会写入中心方法正在读取的服务器的 ChannelReader
中。
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
方法的 IAsyncEnumerable<T> 版本如下。
注意
以下示例需要 C# 8.0 或更高版本。
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
.NET 客户端
服务器到客户端流式传输
HubConnection
上的 StreamAsync
和 StreamAsChannelAsync
方法用于调用服务器到客户端的流式处理方法。 将中心方法中定义的中心方法名称和参数传递给 StreamAsync
或 StreamAsChannelAsync
。 StreamAsync<T>
和 StreamAsChannelAsync<T>
上的泛型参数指定流式传输方法返回的对象类型。 类型为 IAsyncEnumerable<T>
或 ChannelReader<T>
的对象从流式传输调用返回,表示客户端上的流式传输。
返回 IAsyncEnumerable<int>
的 StreamAsync
示例:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
返回 ChannelReader<int>
的相应 StreamAsChannelAsync
示例:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
在前面的代码中:
HubConnection
上的StreamAsChannelAsync
方法用于调用服务器到客户端的流式传输方法。 将中心方法中定义的中心方法名称和参数传递给StreamAsChannelAsync
。StreamAsChannelAsync<T>
上的泛型参数指定流式传输方法返回的对象类型。ChannelReader<T>
从流式传输调用返回,表示客户端上的流式传输。
客户端到服务器流式传输
有两种方法可以从 .NET 客户端调用客户端到服务器流式传输中心方法。 可以将 IAsyncEnumerable<T>
或 ChannelReader
作为参数传递给 SendAsync
、InvokeAsync
或 StreamAsChannelAsync
,具体取决于调用的中心方法。
每当数据写入 IAsyncEnumerable
或 ChannelWriter
对象时,服务器上的中心方法都会收到一个新项目,该项目包含来自客户端的数据。
如果使用 IAsyncEnumerable
对象,则流式传输在返回流式传输项目的方法退出后结束。
注意
以下示例需要 C# 8.0 或更高版本。
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
或者,如果使用 ChannelWriter
,则使用 channel.Writer.Complete()
完成通道:
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
JavaScript 客户端
服务器到客户端流式传输
JavaScript 客户端使用 connection.stream
在中心调用服务器到客户端的流式传输方法。 此 stream
方法接受两个参数:
- 中心方法的名称。 在下面的示例中,中心方法名称为
Counter
。 - 在中心方法中定义的参数。 在以下示例中,参数是要接收的流式传输项目数和流式传输项目之间的延迟的计数。
connection.stream
返回 IStreamResult
,其中包括 subscribe
方法。 将 IStreamSubscriber
传递到 subscribe
,并设置 next
、error
和 complete
回调以接收来自 stream
调用的通知。
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
若要从客户端结束流式传输,请对从 subscribe
方法返回的 ISubscription
调用 dispose
方法。 如果提供了中心方法的 CancellationToken
参数,则调用此方法会导致取消该参数。
客户端到服务器流式传输
JavaScript 客户端通过将 Subject
作为参数传递给 send
、invoke
或 stream
来调用中心上的客户端到服务器流式传输方法,具体取决于调用的中心方法。 Subject
是类似于 Subject
的类。 例如,在 RxJS 中,可以使用该库中的 Subject 类。
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
使用项调用 subject.next(item)
会将该项写入流式传输,并且中心方法将在服务器上接收该项。
若要结束流式传输,请调用 subject.complete()
。
Java 客户端
服务器到客户端流式传输
SignalR Java 客户端使用 stream
方法调用流式处理方法。 stream
接受三个或多个参数:
- 流式传输项的预期类型。
- 中心方法的名称。
- 在中心方法中定义的参数。
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
HubConnection
上的 stream
方法返回流式传输项目类型的 Observable。 Observable 类型的 subscribe
方法是定义 onNext
、onError
和 onCompleted
处理程序的位置。
客户端到服务器流式传输
SignalR Java 客户端通过将 Observable 作为参数传递给 send
、invoke
或 stream
,可调用中心上的客户端到服务器流式传输方法,具体取决于调用的中心方法。
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
使用项调用 stream.onNext(item)
会将该项写入流式传输,并且中心方法将在服务器上接收该项。
若要结束流式传输,请调用 stream.onComplete()
。