在 ASP.NET Core 中使用流式处理 SignalRUse streaming in ASP.NET Core SignalR

本文内容

作者: Brennan Conroy

ASP.NET Core SignalR 支持从客户端传输到服务器以及从服务器传输到客户端。这适用于数据片段随着时间的推移而发生的情况。流式传输时,每个片段一旦变为可用,就会发送到客户端或服务器,而不是等待所有数据都可用。

ASP.NET Core SignalR 支持服务器方法的流返回值。这适用于数据片段随着时间的推移而发生的情况。将返回值流式传输到客户端时,每个片段会在其可用时立即发送到客户端,而不是等待所有数据都可用。

查看或下载示例代码如何下载

设置用于流式传输的集线器Set up a hub for streaming

当集线器方法返回 IAsyncEnumerable<T>ChannelReader<T>Task<IAsyncEnumerable<T>>Task<ChannelReader<T>>时,它会自动变为流式处理中心方法。

当集线器方法返回 ChannelReader<T>Task<ChannelReader<T>>时,该方法会自动变为流式处理中心方法。

服务器到客户端流式处理Server-to-client streaming

除了 ChannelReader<T>之外,流式处理中心方法还可以返回 IAsyncEnumerable<T>返回 IAsyncEnumerable<T> 的最简单方法是将集线器方法设为异步迭代器方法,如下例所示。中心异步迭代器方法可以接受当客户端从流中取消订阅时触发的 CancellationToken 参数。异步迭代器方法避免了与通道常见的问题,例如,不能尽早返回 ChannelReader 或退出方法,无需完成 ChannelWriter<T>

备注

以下示例需要C# 8.0 或更高版本。

  1. public class AsyncEnumerableHub : Hub
  2. {
  3. public async IAsyncEnumerable<int> Counter(
  4. int count,
  5. int delay,
  6. [EnumeratorCancellation]
  7. CancellationToken cancellationToken)
  8. {
  9. for (var i = 0; i < count; i++)
  10. {
  11. // Check the cancellation token regularly so that the server will stop
  12. // producing items if the client disconnects.
  13. cancellationToken.ThrowIfCancellationRequested();
  14. yield return i;
  15. // Use the cancellationToken in other APIs that accept cancellation
  16. // tokens so the cancellation can flow down to them.
  17. await Task.Delay(delay, cancellationToken);
  18. }
  19. }
  20. }

下面的示例演示了使用通道将数据流式传输到客户端的基础知识。每当将对象写入 ChannelWriter<T>时,都会立即将对象发送到客户端。最后,ChannelWriter 完成,告诉客户端流已关闭。

备注

在后台线程上写入 ChannelWriter<T>,并尽快返回 ChannelReader其他中心调用会被阻止,直到返回 ChannelReader

try … catch中的环绕逻辑。完成 catchcatch 之外的 Channel,确保中心方法调用正确完成。

  1. public ChannelReader<int> Counter(
  2. int count,
  3. int delay,
  4. CancellationToken cancellationToken)
  5. {
  6. var channel = Channel.CreateUnbounded<int>();
  7. // We don't want to await WriteItemsAsync, otherwise we'd end up waiting
  8. // for all the items to be written before returning the channel back to
  9. // the client.
  10. _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
  11. return channel.Reader;
  12. }
  13. private async Task WriteItemsAsync(
  14. ChannelWriter<int> writer,
  15. int count,
  16. int delay,
  17. CancellationToken cancellationToken)
  18. {
  19. Exception localException = null;
  20. try
  21. {
  22. for (var i = 0; i < count; i++)
  23. {
  24. await writer.WriteAsync(i, cancellationToken);
  25. // Use the cancellationToken in other APIs that accept cancellation
  26. // tokens so the cancellation can flow down to them.
  27. await Task.Delay(delay, cancellationToken);
  28. }
  29. }
  30. catch (Exception ex)
  31. {
  32. localException = ex;
  33. }
  34. writer.Complete(localException);
  35. }
  1. public class StreamHub : Hub
  2. {
  3. public ChannelReader<int> Counter(
  4. int count,
  5. int delay,
  6. CancellationToken cancellationToken)
  7. {
  8. var channel = Channel.CreateUnbounded<int>();
  9. // We don't want to await WriteItemsAsync, otherwise we'd end up waiting
  10. // for all the items to be written before returning the channel back to
  11. // the client.
  12. _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
  13. return channel.Reader;
  14. }
  15. private async Task WriteItemsAsync(
  16. ChannelWriter<int> writer,
  17. int count,
  18. int delay,
  19. CancellationToken cancellationToken)
  20. {
  21. try
  22. {
  23. for (var i = 0; i < count; i++)
  24. {
  25. // Check the cancellation token regularly so that the server will stop
  26. // producing items if the client disconnects.
  27. cancellationToken.ThrowIfCancellationRequested();
  28. await writer.WriteAsync(i);
  29. // Use the cancellationToken in other APIs that accept cancellation
  30. // tokens so the cancellation can flow down to them.
  31. await Task.Delay(delay, cancellationToken);
  32. }
  33. }
  34. catch (Exception ex)
  35. {
  36. writer.TryComplete(ex);
  37. }
  38. writer.TryComplete();
  39. }
  40. }
  1. public class StreamHub : Hub
  2. {
  3. public ChannelReader<int> Counter(int count, int delay)
  4. {
  5. var channel = Channel.CreateUnbounded<int>();
  6. // We don't want to await WriteItemsAsync, otherwise we'd end up waiting
  7. // for all the items to be written before returning the channel back to
  8. // the client.
  9. _ = WriteItemsAsync(channel.Writer, count, delay);
  10. return channel.Reader;
  11. }
  12. private async Task WriteItemsAsync(
  13. ChannelWriter<int> writer,
  14. int count,
  15. int delay)
  16. {
  17. try
  18. {
  19. for (var i = 0; i < count; i++)
  20. {
  21. await writer.WriteAsync(i);
  22. await Task.Delay(delay);
  23. }
  24. }
  25. catch (Exception ex)
  26. {
  27. writer.TryComplete(ex);
  28. }
  29. writer.TryComplete();
  30. }
  31. }

服务器到客户端流式处理中心方法可以接受当客户端从流中取消订阅时触发的 CancellationToken 参数。如果客户端在流末尾之前断开连接,请使用此标记停止服务器操作并释放任何资源。

客户端到服务器的流式处理Client-to-server streaming

当某个集线器方法接受 ChannelReader<T>IAsyncEnumerable<T>类型的一个或多个对象时,它会自动成为客户端到服务器的流式处理中心方法。下面的示例演示了读取从客户端发送的流式处理数据的基础知识。每当客户端写入 ChannelWriter<T>时,数据都会写入中心方法读取的服务器上的 ChannelReader 中。

  1. public async Task UploadStream(ChannelReader<string> stream)
  2. {
  3. while (await stream.WaitToReadAsync())
  4. {
  5. while (stream.TryRead(out var item))
  6. {
  7. // do something with the stream item
  8. Console.WriteLine(item);
  9. }
  10. }
  11. }

下面是方法的 IAsyncEnumerable<T> 版本。

备注

以下示例需要C# 8.0 或更高版本。

  1. public async Task UploadStream(IAsyncEnumerable<string> stream)
  2. {
  3. await foreach (var item in stream)
  4. {
  5. Console.WriteLine(item);
  6. }
  7. }

.NET 客户端.NET client

服务器到客户端流式处理Server-to-client streaming

HubConnection 上的 StreamAsyncStreamAsChannelAsync 方法用于调用服务器到客户端的流式处理方法。将中心方法中定义的集线器方法名称和参数传递到 StreamAsyncStreamAsChannelAsyncStreamAsync<T>StreamAsChannelAsync<T> 上的泛型参数指定流方法返回的对象的类型。IAsyncEnumerable<T>ChannelReader<T> 类型的对象从流调用返回,并表示客户端上的流。

返回 IAsyncEnumerable<int>StreamAsync 示例:

  1. // Call "Cancel" on this CancellationTokenSource to send a cancellation message to
  2. // the server, which will trigger the corresponding token in the hub method.
  3. var cancellationTokenSource = new CancellationTokenSource();
  4. var stream = await hubConnection.StreamAsync<int>(
  5. "Counter", 10, 500, cancellationTokenSource.Token);
  6. await foreach (var count in stream)
  7. {
  8. Console.WriteLine($"{count}");
  9. }
  10. Console.WriteLine("Streaming completed");

一个返回 ChannelReader<int>的相应 StreamAsChannelAsync 示例:

  1. // Call "Cancel" on this CancellationTokenSource to send a cancellation message to
  2. // the server, which will trigger the corresponding token in the hub method.
  3. var cancellationTokenSource = new CancellationTokenSource();
  4. var channel = await hubConnection.StreamAsChannelAsync<int>(
  5. "Counter", 10, 500, cancellationTokenSource.Token);
  6. // Wait asynchronously for data to become available
  7. while (await channel.WaitToReadAsync())
  8. {
  9. // Read all currently available data synchronously, before waiting for more data
  10. while (channel.TryRead(out var count))
  11. {
  12. Console.WriteLine($"{count}");
  13. }
  14. }
  15. Console.WriteLine("Streaming completed");

HubConnection 上的 StreamAsChannelAsync 方法用于调用服务器到客户端流式处理方法。将中心方法中定义的集线器方法名称和参数传递到 StreamAsChannelAsyncStreamAsChannelAsync<T> 上的泛型参数指定流方法返回的对象的类型。从流调用返回 ChannelReader<T>,并表示客户端上的流。

  1. // Call "Cancel" on this CancellationTokenSource to send a cancellation message to
  2. // the server, which will trigger the corresponding token in the hub method.
  3. var cancellationTokenSource = new CancellationTokenSource();
  4. var channel = await hubConnection.StreamAsChannelAsync<int>(
  5. "Counter", 10, 500, cancellationTokenSource.Token);
  6. // Wait asynchronously for data to become available
  7. while (await channel.WaitToReadAsync())
  8. {
  9. // Read all currently available data synchronously, before waiting for more data
  10. while (channel.TryRead(out var count))
  11. {
  12. Console.WriteLine($"{count}");
  13. }
  14. }
  15. Console.WriteLine("Streaming completed");

HubConnection 上的 StreamAsChannelAsync 方法用于调用服务器到客户端流式处理方法。将中心方法中定义的集线器方法名称和参数传递到 StreamAsChannelAsyncStreamAsChannelAsync<T> 上的泛型参数指定流方法返回的对象的类型。从流调用返回 ChannelReader<T>,并表示客户端上的流。

  1. var channel = await hubConnection
  2. .StreamAsChannelAsync<int>("Counter", 10, 500, CancellationToken.None);
  3. // Wait asynchronously for data to become available
  4. while (await channel.WaitToReadAsync())
  5. {
  6. // Read all currently available data synchronously, before waiting for more data
  7. while (channel.TryRead(out var count))
  8. {
  9. Console.WriteLine($"{count}");
  10. }
  11. }
  12. Console.WriteLine("Streaming completed");

客户端到服务器的流式处理Client-to-server streaming

可以通过两种方法从 .NET 客户端调用客户端到服务器的流式处理中心方法。可以将 IAsyncEnumerable<T>ChannelReader 作为参数传入 SendAsyncInvokeAsyncStreamAsChannelAsync,具体取决于所调用的集线器方法。

只要将数据写入 IAsyncEnumerableChannelWriter 对象,服务器上的集线器方法就会收到来自客户端的数据的新项。

如果使用 IAsyncEnumerable 对象,则流在返回流项的方法退出后结束。

备注

以下示例需要C# 8.0 或更高版本。

  1. async IAsyncEnumerable<string> clientStreamData()
  2. {
  3. for (var i = 0; i < 5; i++)
  4. {
  5. var data = await FetchSomeData();
  6. yield return data;
  7. }
  8. //After the for loop has completed and the local function exits the stream completion will be sent.
  9. }
  10. await connection.SendAsync("UploadStream", clientStreamData());

或者,如果使用的是 ChannelWriter,则使用 channel.Writer.Complete()完成通道:

  1. var channel = Channel.CreateBounded<string>(10);
  2. await connection.SendAsync("UploadStream", channel.Reader);
  3. await channel.Writer.WriteAsync("some data");
  4. await channel.Writer.WriteAsync("some more data");
  5. channel.Writer.Complete();

JavaScript 客户端JavaScript client

服务器到客户端流式处理Server-to-client streaming

JavaScript 客户端通过 connection.stream调用集线器上的服务器到客户端流式处理方法。stream 方法接受两个参数:

  • 集线器方法的名称。在下面的示例中,中心方法名称是 Counter
  • 在 hub 方法中定义的参数。在下面的示例中,参数是要接收的流项数的计数以及流项之间的延迟。

connection.stream 返回 IStreamResult,它包含 subscribe 方法。IStreamSubscriber 传递到 subscribe,并设置 nexterrorcomplete 回调,以便从 stream 调用接收通知。

  1. connection.stream("Counter", 10, 500)
  2. .subscribe({
  3. next: (item) => {
  4. var li = document.createElement("li");
  5. li.textContent = item;
  6. document.getElementById("messagesList").appendChild(li);
  7. },
  8. complete: () => {
  9. var li = document.createElement("li");
  10. li.textContent = "Stream completed";
  11. document.getElementById("messagesList").appendChild(li);
  12. },
  13. error: (err) => {
  14. var li = document.createElement("li");
  15. li.textContent = err;
  16. document.getElementById("messagesList").appendChild(li);
  17. },
  18. });

若要从客户端结束流,请对从 subscribe 方法返回的 ISubscription 调用 dispose 方法。调用此方法会导致取消集线器方法的 CancellationToken 参数(如果提供了一个参数)。

  1. connection.stream("Counter", 10, 500)
  2. .subscribe({
  3. next: (item) => {
  4. var li = document.createElement("li");
  5. li.textContent = item;
  6. document.getElementById("messagesList").appendChild(li);
  7. },
  8. complete: () => {
  9. var li = document.createElement("li");
  10. li.textContent = "Stream completed";
  11. document.getElementById("messagesList").appendChild(li);
  12. },
  13. error: (err) => {
  14. var li = document.createElement("li");
  15. li.textContent = err;
  16. document.getElementById("messagesList").appendChild(li);
  17. },
  18. });

若要从客户端结束流,请对从 subscribe 方法返回的 ISubscription 调用 dispose 方法。

客户端到服务器的流式处理Client-to-server streaming

JavaScript 客户端通过将 Subject 作为参数传入到 sendinvokestream(具体取决于所调用的集线器方法),在集线器上调用客户端到服务器的流式处理方法。Subject 是一种类似于 Subject的类。例如,在 RxJS 中,可以使用该库中的Subject类。

  1. const subject = new signalR.Subject();
  2. yield connection.send("UploadStream", subject);
  3. var iteration = 0;
  4. const intervalHandle = setInterval(() => {
  5. iteration++;
  6. subject.next(iteration.toString());
  7. if (iteration === 10) {
  8. clearInterval(intervalHandle);
  9. subject.complete();
  10. }
  11. }, 500);

使用项调用 subject.next(item) 会将项写入流,集线器方法接收服务器上的项。

若要结束流,请调用 subject.complete()

Java 客户端Java client

服务器到客户端流式处理Server-to-client streaming

SignalR Java 客户端使用 stream 方法来调用流式处理方法。stream 接受三个或更多参数:

  • 流项的预期类型。
  • 集线器方法的名称。
  • 在 hub 方法中定义的参数。
  1. hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
  2. .subscribe(
  3. (item) -> {/* Define your onNext handler here. */ },
  4. (error) -> {/* Define your onError handler here. */},
  5. () -> {/* Define your onCompleted handler here. */});

HubConnection 上的 stream 方法返回流项类型的可观察对象。可观察类型的 subscribe 方法是定义 onNextonErroronCompleted 处理程序的位置。

其他资源Additional resources