在 ASP.NET Core 中使用流式处理 SignalRUse streaming in ASP.NET Core SignalR
本文内容
作者: Brennan Conroy
ASP.NET Core SignalR 支持从客户端传输到服务器以及从服务器传输到客户端。这适用于数据片段随着时间的推移而发生的情况。流式传输时,每个片段一旦变为可用,就会发送到客户端或服务器,而不是等待所有数据都可用。
ASP.NET Core SignalR 支持服务器方法的流返回值。这适用于数据片段随着时间的推移而发生的情况。将返回值流式传输到客户端时,每个片段会在其可用时立即发送到客户端,而不是等待所有数据都可用。
设置用于流式传输的集线器Set up a hub for streaming
当集线器方法返回 IAsyncEnumerable<T>、ChannelReader<T>、Task<IAsyncEnumerable<T>>
或 Task<ChannelReader<T>>
时,它会自动变为流式处理中心方法。
当集线器方法返回 ChannelReader<T> 或 Task<ChannelReader<T>>
时,该方法会自动变为流式处理中心方法。
服务器到客户端流式处理Server-to-client streaming
除了 ChannelReader<T>
之外,流式处理中心方法还可以返回 IAsyncEnumerable<T>
。返回 IAsyncEnumerable<T>
的最简单方法是将集线器方法设为异步迭代器方法,如下例所示。中心异步迭代器方法可以接受当客户端从流中取消订阅时触发的 CancellationToken
参数。异步迭代器方法避免了与通道常见的问题,例如,不能尽早返回 ChannelReader
或退出方法,无需完成 ChannelWriter<T>。
备注
以下示例需要C# 8.0 或更高版本。
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
下面的示例演示了使用通道将数据流式传输到客户端的基础知识。每当将对象写入 ChannelWriter<T>时,都会立即将对象发送到客户端。最后,ChannelWriter
完成,告诉客户端流已关闭。
备注
在后台线程上写入 ChannelWriter<T>
,并尽快返回 ChannelReader
。其他中心调用会被阻止,直到返回 ChannelReader
。
try … catch
中的环绕逻辑。完成 catch
和 catch
之外的 Channel
,确保中心方法调用正确完成。
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
writer.Complete(localException);
}
public class StreamHub : Hub
{
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
try
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
await writer.WriteAsync(i);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
writer.TryComplete(ex);
}
writer.TryComplete();
}
}
public class StreamHub : Hub
{
public ChannelReader<int> Counter(int count, int delay)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay)
{
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i);
await Task.Delay(delay);
}
}
catch (Exception ex)
{
writer.TryComplete(ex);
}
writer.TryComplete();
}
}
服务器到客户端流式处理中心方法可以接受当客户端从流中取消订阅时触发的 CancellationToken
参数。如果客户端在流末尾之前断开连接,请使用此标记停止服务器操作并释放任何资源。
客户端到服务器的流式处理Client-to-server streaming
当某个集线器方法接受 ChannelReader<T> 或 IAsyncEnumerable<T>类型的一个或多个对象时,它会自动成为客户端到服务器的流式处理中心方法。下面的示例演示了读取从客户端发送的流式处理数据的基础知识。每当客户端写入 ChannelWriter<T>时,数据都会写入中心方法读取的服务器上的 ChannelReader
中。
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
下面是方法的 IAsyncEnumerable<T> 版本。
备注
以下示例需要C# 8.0 或更高版本。
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
.NET 客户端.NET client
服务器到客户端流式处理Server-to-client streaming
HubConnection
上的 StreamAsync
和 StreamAsChannelAsync
方法用于调用服务器到客户端的流式处理方法。将中心方法中定义的集线器方法名称和参数传递到 StreamAsync
或 StreamAsChannelAsync
。StreamAsync<T>
和 StreamAsChannelAsync<T>
上的泛型参数指定流方法返回的对象的类型。IAsyncEnumerable<T>
或 ChannelReader<T>
类型的对象从流调用返回,并表示客户端上的流。
返回 IAsyncEnumerable<int>
的 StreamAsync
示例:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = await hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
一个返回 ChannelReader<int>
的相应 StreamAsChannelAsync
示例:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
HubConnection
上的 StreamAsChannelAsync
方法用于调用服务器到客户端流式处理方法。将中心方法中定义的集线器方法名称和参数传递到 StreamAsChannelAsync
。StreamAsChannelAsync<T>
上的泛型参数指定流方法返回的对象的类型。从流调用返回 ChannelReader<T>
,并表示客户端上的流。
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
HubConnection
上的 StreamAsChannelAsync
方法用于调用服务器到客户端流式处理方法。将中心方法中定义的集线器方法名称和参数传递到 StreamAsChannelAsync
。StreamAsChannelAsync<T>
上的泛型参数指定流方法返回的对象的类型。从流调用返回 ChannelReader<T>
,并表示客户端上的流。
var channel = await hubConnection
.StreamAsChannelAsync<int>("Counter", 10, 500, CancellationToken.None);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
客户端到服务器的流式处理Client-to-server streaming
可以通过两种方法从 .NET 客户端调用客户端到服务器的流式处理中心方法。可以将 IAsyncEnumerable<T>
或 ChannelReader
作为参数传入 SendAsync
、InvokeAsync
或 StreamAsChannelAsync
,具体取决于所调用的集线器方法。
只要将数据写入 IAsyncEnumerable
或 ChannelWriter
对象,服务器上的集线器方法就会收到来自客户端的数据的新项。
如果使用 IAsyncEnumerable
对象,则流在返回流项的方法退出后结束。
备注
以下示例需要C# 8.0 或更高版本。
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
或者,如果使用的是 ChannelWriter
,则使用 channel.Writer.Complete()
完成通道:
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
JavaScript 客户端JavaScript client
服务器到客户端流式处理Server-to-client streaming
JavaScript 客户端通过 connection.stream
调用集线器上的服务器到客户端流式处理方法。stream
方法接受两个参数:
- 集线器方法的名称。在下面的示例中,中心方法名称是
Counter
。 - 在 hub 方法中定义的参数。在下面的示例中,参数是要接收的流项数的计数以及流项之间的延迟。
connection.stream
返回 IStreamResult
,它包含 subscribe
方法。将 IStreamSubscriber
传递到 subscribe
,并设置 next
、error
和 complete
回调,以便从 stream
调用接收通知。
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
若要从客户端结束流,请对从 subscribe
方法返回的 ISubscription
调用 dispose
方法。调用此方法会导致取消集线器方法的 CancellationToken
参数(如果提供了一个参数)。
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
若要从客户端结束流,请对从 subscribe
方法返回的 ISubscription
调用 dispose
方法。
客户端到服务器的流式处理Client-to-server streaming
JavaScript 客户端通过将 Subject
作为参数传入到 send
、invoke
或 stream
(具体取决于所调用的集线器方法),在集线器上调用客户端到服务器的流式处理方法。Subject
是一种类似于 Subject
的类。例如,在 RxJS 中,可以使用该库中的Subject类。
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
使用项调用 subject.next(item)
会将项写入流,集线器方法接收服务器上的项。
若要结束流,请调用 subject.complete()
。
Java 客户端Java client
服务器到客户端流式处理Server-to-client streaming
SignalR Java 客户端使用 stream
方法来调用流式处理方法。stream
接受三个或更多参数:
- 流项的预期类型。
- 集线器方法的名称。
- 在 hub 方法中定义的参数。
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
HubConnection
上的 stream
方法返回流项类型的可观察对象。可观察类型的 subscribe
方法是定义 onNext
、onError
和 onCompleted
处理程序的位置。