Verwenden des Streamings in ASP.NET Core SignalR
Von Brennan Conroy
ASP.NET Core SignalR unterstützt das Streaming vom Client zum Server und vom Server zum Client. Dies ist nützlich für Szenarien, in denen Datenfragmente im Laufe der Zeit eintreffen. Beim Streaming wird jedes Fragment an den Client oder Server gesendet, sobald es verfügbar ist, anstatt darauf zu warten, dass die gesamten Daten verfügbar sind.
Anzeigen oder Herunterladen von Beispielcode (Vorgehensweise zum Herunterladen)
Einrichten eines Hubs für das Streaming
Eine Hubmethode wird automatisch zu einer Hubmethode für das Streaming, wenn sie IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>>
oder Task<ChannelReader<T>>
zurückgibt.
Streaming vom Server zum Client
Hubmethoden für das Streaming können zusätzlich zu ChannelReader<T>
auch IAsyncEnumerable<T>
zurückgeben. Die einfachste Möglichkeit, IAsyncEnumerable<T>
zurückzugeben, besteht darin, die Hubmethode zu einer asynchronen Iteratormethode zu machen, wie das folgende Beispiel zeigt. Die asynchronen Iteratormethoden des Hubs können einen CancellationToken
-Parameter akzeptieren, der ausgelöst wird, wenn der Client das Abonnement des Datenstroms kündigt. Asynchrone Iteratormethoden vermeiden Probleme, die bei Kanälen häufig auftreten, z. B. dass der ChannelReader
nicht früh genug zurückgegeben oder die Methode verlassen wird, ohne den ChannelWriter<T> abzuschließen.
Hinweis
Das folgende Beispiel erfordert C# 8.0 oder höher.
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
Das folgende Beispiel zeigt die Grundlagen des Streaming von Daten zum Client mithilfe von Kanälen. Immer wenn ein Objekt in den ChannelWriter<T> geschrieben wird, wird das Objekt sofort an den Client gesendet. Am Ende wird der ChannelWriter
abgeschlossen, um dem Client mitzuteilen, dass der Datenstrom geschlossen ist.
Hinweis
Schreiben Sie in einem Hintergrundthread in den ChannelWriter<T>
, und geben Sie den ChannelReader
so schnell wie möglich zurück. Andere Hubaufrufe werden blockiert, bis ein ChannelReader
zurückgegeben wird.
Umschließen Sie die Logik mit einer try ... catch
Anweisung. Vervollständigen Sie den Channel
in einem finally
-Block. Wenn Sie den Ablauf eines Fehlers verfolgen möchten, erfassen Sie ihn innerhalb des catch
-Blocks und schreiben ihn in den finally
-Block.
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
Hubmethoden für das Streaming vom Server zum Client können einen CancellationToken
-Parameter akzeptieren, der ausgelöst wird, wenn der Client das Abonnement des Datenstroms kündigt. Verwenden Sie dieses Token, um den Serverbetrieb anzuhalten und alle Ressourcen freizugeben, wenn der Client die Verbindung vor dem Ende des Datenstroms trennt.
Streaming vom Client zum Server
Eine Hubmethode wird automatisch zu einer Hubmethode für das Streaming vom Client zum Server, wenn sie ein oder mehrere Objekte des Typs ChannelReader<T> oder IAsyncEnumerable<T> akzeptiert. Das folgende Beispiel zeigt die Grundlagen des Lesens von Streamingdaten, die vom Client gesendet werden. Immer wenn der Client in den ChannelWriter<T> schreibt, werden die Daten in den ChannelReader
auf dem Server geschrieben, von dem die Hubmethode gerade liest.
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
Eine IAsyncEnumerable<T>-Version der Methode folgt.
Hinweis
Das folgende Beispiel erfordert C# 8.0 oder höher.
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
.NET-Client
Streaming vom Server zum Client
Die Methoden StreamAsync
und StreamAsChannelAsync
für HubConnection
werden verwendet, um Methoden für das Streaming vom Server zum Client aufzurufen. Übergeben Sie den Namen der Hubmethode und die in der Hubmethode definierten Argumente an StreamAsync
oder StreamAsChannelAsync
. Der generische Parameter für StreamAsync<T>
und StreamAsChannelAsync<T>
gibt den Typ der Objekte an, die von der Streamingmethode zurückgegeben werden. Ein Objekt vom Typ IAsyncEnumerable<T>
oder ChannelReader<T>
wird vom Datenstromaufruf zurückgegeben und repräsentiert den Datenstrom auf dem Client.
Ein StreamAsync
-Beispiel, das IAsyncEnumerable<int>
zurückgibt:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
Ein entsprechendes StreamAsChannelAsync
-Beispiel, das ChannelReader<int>
zurückgibt:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
Im vorherigen Code:
- Die Methode
StreamAsChannelAsync
fürHubConnection
wird verwendet, um eine Streamingmethode vom Server zum Client aufzurufen. Übergeben Sie den Namen der Hubmethode und die in der Hubmethode definierten Argumente anStreamAsChannelAsync
. - Der generische Parameter für
StreamAsChannelAsync<T>
gibt den Typ der Objekte an, die von der Streamingmethode zurückgegeben werden. - Ein
ChannelReader<T>
wird vom Datenstromaufruf zurückgegeben und repräsentiert den Datenstrom auf dem Client.
Streaming vom Client zum Server
Es gibt zwei Möglichkeiten, eine Hubmethode für das Streaming vom Client zum Server vom .NET-Client aus aufzurufen. Sie können entweder ein IAsyncEnumerable<T>
oder ein ChannelReader
als Argument an SendAsync
, InvokeAsync
oder StreamAsChannelAsync
übergeben, je nachdem, welche Hubmethode aufgerufen wird.
Immer wenn Daten in das IAsyncEnumerable
- oder ChannelWriter
-Objekt geschrieben werden, erhält die Hubmethode auf dem Server ein neues Element mit den Daten vom Client.
Wenn Sie ein IAsyncEnumerable
-Objekt verwenden, endet der Datenstrom nach Beendigung der Methode, die Datenstromelemente zurückgibt.
Hinweis
Das folgende Beispiel erfordert C# 8.0 oder höher.
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
Oder wenn Sie einen ChannelWriter
verwenden, vervollständigen Sie den Kanal mit channel.Writer.Complete()
:
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
JavaScript-Client
Streaming vom Server zum Client
JavaScript-Clients rufen Methoden zum Streaming vom Server zum Client auf Hubs mit connection.stream
auf. Die Methode stream
akzeptiert zwei Argumente:
- Den Namen der Hubmethode. Im folgenden Beispiel lautet der Name der Hubmethode
Counter
. - In der Hubmethode definierte Argumente. Im folgenden Beispiel sind die Argumente ein Zähler für die Anzahl der zu empfangenden Datenstromelemente und die Verzögerung zwischen den Datenstromelementen.
connection.stream
gibt ein IStreamResult
zurück, das eine subscribe
-Methode enthält. Übergeben Sie einen IStreamSubscriber
an subscribe
und legen Sie die next
-, error
- und complete
-Rückrufe fest, um Benachrichtigungen vom stream
-Aufruf zu erhalten.
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
Um den Datenstrom vom Client zu beenden, rufen Sie die Methode dispose
für das ISubscription
auf, das von der Methode subscribe
zurückgegeben wird. Der Aufruf dieser Methode führt zum Abbruch des Parameters CancellationToken
der Hubmethode, falls Sie einen solchen angegeben haben.
Streaming vom Client zum Server
JavaScript-Clients rufen Methoden zum Streaming vom Client zum Server auf Hubs auf, indem sie ein Subject
als Argument an send
, invoke
oder stream
übergeben, je nachdem, welche Hubmethode aufgerufen wird. Das Subject
ist eine Klasse, die wie ein Subject
aussieht. In RxJS können Sie z. B. die Betreff-Klasse aus dieser Bibliothek verwenden.
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
Der Aufruf von subject.next(item)
mit einem Element schreibt das Element in den Datenstrom, und die Hubmethode empfängt das Element auf dem Server.
Um den Datenstrom zu beenden, rufen Sie subject.complete()
auf.
Java-Client
Streaming vom Server zum Client
Der SignalR Java-Client verwendet die stream
-Methode zum Aufrufen von Streamingmethoden. stream
akzeptiert drei oder mehr Argumente:
- Der erwartete Typ der Datenstromelemente.
- Den Namen der Hubmethode.
- In der Hubmethode definierte Argumente.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
Die Methode stream
für HubConnection
gibt ein „Observable“ des Datenstromelementtyps zurück. In der Methode subscribe
des Observable-Typs werden die Handler onNext
, onError
und onCompleted
definiert.
Streaming vom Client zum Server
Der SignalR Java-Client kann Methoden für das Streaming vom Client zum Server auf Hubs aufrufen, indem er ein Observable als Argument an send
, invoke
oder stream
übergibt, je nachdem, welche Hubmethode aufgerufen wird.
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
Der Aufruf von stream.onNext(item)
mit einem Element schreibt das Element in den Datenstrom, und die Hubmethode empfängt das Element auf dem Server.
Um den Datenstrom zu beenden, rufen Sie stream.onComplete()
auf.