在 ASP.NET Core SignalR 中使用串流
ASP.NET Core SignalR 支援從用戶端串流到伺服器,以及從伺服器串流到用戶端。 這對於資料片段會隨時間推進而送達的案例非常有用。 進行串流時,無需等待所有資料變得可用,當各個片段在可用時即會送往用戶端或伺服器。
檢視或下載範例程式碼 \(英文\) (如何下載)
設定串流中樞
當中樞方法傳回 IAsyncEnumerable<T>、ChannelReader<T>、Task<IAsyncEnumerable<T>>
或 Task<ChannelReader<T>>
時,中樞方法即自動成為串流中樞方法。
伺服器對用戶端串流
除了 ChannelReader<T>
之外,串流中樞方法也可以傳回 IAsyncEnumerable<T>
。 傳回 IAsyncEnumerable<T>
的最簡單方法是將中樞方法設定為非同步迭代器方法,如下例所示。 中樞非同步迭代器方法可以接受 CancellationToken
參數,並於用戶端取消訂閱資料流程時觸發。 非同步迭代器方法可避免通道常見的問題,例如,沒有儘早傳回 ChannelReader
,或在未完成 ChannelWriter<T> 的情況下結束方法。
注意
下列範例需要 C# 8.0 或更新版本。
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
下列範例顯示使用通道將資料串流至用戶端的基本概念。 每當物件寫入 ChannelWriter<T> 時,就會將物件立即傳送至用戶端。 最後,ChannelWriter
已完成,並告知用戶端資料流程已關閉。
注意
寫入背景執行緒上的 ChannelWriter<T>
,並儘快傳回 ChannelReader
。 未傳回 ChannelReader
之前,會封鎖其他中樞叫用。
將邏輯包裝在 try ... catch
陳述式中。 在 finally
區塊中完成 Channel
。 如果您想要傳送錯誤,請在 catch
區塊內擷取該錯誤並將其寫入 finally
區塊中。
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
伺服器對用戶端串流中樞方法可以接受 CancellationToken
參數,並於用戶端取消訂閱資料流時加以觸發。 如果用戶端在資料流結束之前中斷連線,請使用此權杖停止伺服器作業並釋放任何資源。
用戶端對伺服器串流
當中樞方法接受一或多個類型為 ChannelReader<T> 或 IAsyncEnumerable<T> 的物件時,中樞方法即自動成為用戶端對伺服器串流中樞方法。 下列範例顯示讀取用戶端所傳送串流資料的基本概念。 每當用戶端寫入 ChannelWriter<T> 時,就會將資料寫入中樞方法所讀取之伺服器上的 ChannelReader
。
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
該方法的 IAsyncEnumerable<T> 版本如下。
注意
下列範例需要 C# 8.0 或更新版本。
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
.NET 用戶端
伺服器對用戶端串流
HubConnection
上的 StreamAsync
和 StreamAsChannelAsync
方法用於叫用伺服器對用戶端串流方法。 將中樞方法中定義的中樞方法名稱和引數傳遞至 StreamAsync
或 StreamAsChannelAsync
。 StreamAsync<T>
和 StreamAsChannelAsync<T>
上的泛型參數會指定串流方法所傳回的物件類型。 從資料流叫用傳回類型為 IAsyncEnumerable<T>
或 ChannelReader<T>
的物件,並用以代表用戶端上的資料流。
傳回 IAsyncEnumerable<int>
的 StreamAsync
範例:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
傳回 ChannelReader<int>
的對應 StreamAsChannelAsync
範例:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
在前述程式碼中:
HubConnection
上的StreamAsChannelAsync
方法用於叫用伺服器對用戶端串流方法。 將中樞方法中定義的中樞方法名稱和引數傳遞至StreamAsChannelAsync
。StreamAsChannelAsync<T>
上的泛型參數會指定串流方法所傳回的物件類型。- 從資料流叫用傳回
ChannelReader<T>
,並用以代表用戶端上的資料流。
用戶端對伺服器串流
有兩種方法可從 .NET 用戶端叫用用戶端對伺服器串流中樞方法。 您可以根據叫用的中樞方法,將 IAsyncEnumerable<T>
或 ChannelReader
當做引數傳入SendAsync
、InvokeAsync
或 StreamAsChannelAsync
。
每當資料寫入 IAsyncEnumerable
或 ChannelWriter
物件時,伺服器上的中樞方法就會從用戶端接收具有該資料的新項目。
如果是使用 IAsyncEnumerable
物件,資料流會在傳回資料流項目的方法結束之後終止。
注意
下列範例需要 C# 8.0 或更新版本。
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
或者,如果您是使用 ChannelWriter
,則可使用 channel.Writer.Complete()
完成通道:
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
JavaScript 用戶端
伺服器對用戶端串流
JavaScript 用戶端會使用 connection.stream
呼叫中樞上的伺服器對用戶端串流方法。 stream
方法接受兩種引數:
- 中樞方法的名稱。 在下列範例中,中樞方法名稱為
Counter
。 - 在中樞方法中定義的引數。 在下列範例中,引數包括要接收的資料流項目數和資料流項目之間的延遲。
connection.stream
會傳回 IStreamResult
,其中包含 subscribe
方法。 傳遞 IStreamSubscriber
至 subscribe
並設定 next
、error
和 complete
回呼,以接收來自 stream
叫用的通知。
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
若要結束用戶端的資料流,請在 subscribe
方法傳回的 ISubscription
上呼叫 dispose
方法。 呼叫此方法會導致取消 Hub 方法的 CancellationToken
參數 (如已提供該參數)。
用戶端對伺服器串流
JavaScript 用戶端可以透過將 Subject
作為引數傳遞給 send
、invoke
或 stream
(視叫用的中樞方法而定),在中樞上呼叫用戶端對伺服器串流方法。 Subject
是一個類似 Subject
的類別。 例如,在 RxJS 中,您可以使用該程式庫的主旨類別。
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
使用項目來呼叫 subject.next(item)
會將項目寫入資料流中,而中樞方法會接收伺服器上的項目。
若要結束資料流,請呼叫 subject.complete()
。
Java 用戶端
伺服器對用戶端串流
SignalR JAVA 用戶端會使用 stream
方法來叫用串流方法。 stream
接受三種 (含) 以上的引數:
- 預期類型的資料流項目。
- 中樞方法的名稱。
- 在中樞方法中定義的引數。
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
HubConnection
上的 stream
方法會傳回 Observable 的資料流項目類型。 Observable 類型的 subscribe
方法是定義 onNext
、onError
和 onCompleted
等處理常式之處。
用戶端對伺服器串流
SignalR JAVA 用戶端可以透過將 Observable 作為引數傳遞給 send
、invoke
或 stream
(視叫用的中樞方法而定),在中樞上呼叫用戶端對伺服器串流方法。
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
使用項目來呼叫 stream.onNext(item)
會將項目寫入資料流中,而中樞方法會接收伺服器上的項目。
若要結束資料流,請呼叫 stream.onComplete()
。