Usare lo streaming in ASP.NET Core SignalR
ASP.NET Core SignalR supporta lo streaming dal client al server e dal server al client. Ciò è utile per gli scenari in cui arrivano frammenti di dati nel tempo. Quando si esegue lo streaming, ogni frammento viene inviato al client o al server non appena diventa disponibile, anziché attendere che tutti i dati diventino disponibili.
Visualizzare o scaricare il codice di esempio (procedura per il download)
Configurare un hub per lo streaming
Un metodo hub diventa automaticamente un metodo hub di streaming quando restituisce IAsyncEnumerable<T>, Task<IAsyncEnumerable<T>>
ChannelReader<T>, o Task<ChannelReader<T>>
.
Streaming da server a client
I metodi dell'hub di streaming possono restituire IAsyncEnumerable<T>
oltre a ChannelReader<T>
. Il modo più semplice per restituire IAsyncEnumerable<T>
consiste nel rendere il metodo hub un metodo iteratore asincrono come illustrato nell'esempio seguente. I metodi iteratore asincrono dell'hub possono accettare un CancellationToken
parametro attivato quando il client annulla la sottoscrizione al flusso. I metodi iteratori asincroni evitano problemi comuni con i canali, ad esempio non restituendo il ChannelReader
metodo abbastanza presto o chiudendo il metodo senza completare .ChannelWriter<T>
Nota
L'esempio seguente richiede C# 8.0 o versione successiva.
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
L'esempio seguente illustra le nozioni di base dei dati di streaming al client usando i canali. Ogni volta che un oggetto viene scritto in ChannelWriter<T>, l'oggetto viene inviato immediatamente al client. Al termine, l'oggetto ChannelWriter
viene completato per indicare al client che il flusso è chiuso.
Nota
Scrivere in ChannelWriter<T>
su un thread in background e restituire il ChannelReader
prima possibile. Altre chiamate dell'hub vengono bloccate fino a quando non viene restituito un oggetto ChannelReader
.
Eseguire il wrapping della logica in un'istruzione try ... catch
. Completare l'oggetto Channel
in un finally
blocco. Se si vuole eseguire il flusso di un errore, acquisiscerlo all'interno del catch
blocco e scriverlo nel finally
blocco.
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
I metodi dell'hub di streaming da server a client possono accettare un CancellationToken
parametro attivato quando il client annulla la sottoscrizione al flusso. Usare questo token per arrestare l'operazione del server e rilasciare eventuali risorse se il client si disconnette prima della fine del flusso.
Streaming da client a server
Un metodo hub diventa automaticamente un metodo hub di streaming da client a server quando accetta uno o più oggetti di tipo ChannelReader<T> o IAsyncEnumerable<T>. L'esempio seguente illustra le nozioni di base sulla lettura dei dati di streaming inviati dal client. Ogni volta che il client scrive in ChannelWriter<T>, i dati vengono scritti nel ChannelReader
nel server da cui viene letto il metodo hub.
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
Segue una IAsyncEnumerable<T> versione del metodo .
Nota
L'esempio seguente richiede C# 8.0 o versione successiva.
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
Client .NET
Streaming da server a client
I StreamAsync
metodi e StreamAsChannelAsync
in HubConnection
vengono usati per richiamare i metodi di streaming da server a client. Passare il nome del metodo hub e gli argomenti definiti nel metodo hub a StreamAsync
o StreamAsChannelAsync
. Il parametro generico su StreamAsync<T>
e StreamAsChannelAsync<T>
specifica il tipo di oggetti restituiti dal metodo di streaming. Un oggetto di tipo IAsyncEnumerable<T>
o ChannelReader<T>
viene restituito dalla chiamata al flusso e rappresenta il flusso nel client.
Esempio che restituisce StreamAsync
IAsyncEnumerable<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
Esempio corrispondente StreamAsChannelAsync
che restituisce ChannelReader<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
Nel codice precedente:
- Il
StreamAsChannelAsync
metodo suHubConnection
viene usato per richiamare un metodo di streaming da server a client. Passare il nome del metodo hub e gli argomenti definiti nel metodo hub aStreamAsChannelAsync
. - Il parametro generico in
StreamAsChannelAsync<T>
specifica il tipo di oggetti restituiti dal metodo di streaming. - Un
ChannelReader<T>
oggetto viene restituito dalla chiamata al flusso e rappresenta il flusso nel client.
Streaming da client a server
Esistono due modi per richiamare un metodo hub di streaming da client a server dal client .NET. È possibile passare un IAsyncEnumerable<T>
oggetto o ChannelReader
come argomento a SendAsync
, InvokeAsync
o StreamAsChannelAsync
, a seconda del metodo hub richiamato.
Ogni volta che i dati vengono scritti nell'oggetto IAsyncEnumerable
o ChannelWriter
, il metodo hub nel server riceve un nuovo elemento con i dati dal client.
Se si usa un IAsyncEnumerable
oggetto , il flusso termina dopo l'uscita del metodo che restituisce gli elementi del flusso.
Nota
L'esempio seguente richiede C# 8.0 o versione successiva.
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
In alternativa, se si usa un ChannelWriter
oggetto , completare il canale con channel.Writer.Complete()
:
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
Client JavaScript
Streaming da server a client
I client JavaScript chiamano metodi di streaming da server a client in hub con connection.stream
. Il stream
metodo accetta due argomenti:
- Nome del metodo hub. Nell'esempio seguente il nome del metodo hub è
Counter
. - Argomenti definiti nel metodo hub. Nell'esempio seguente gli argomenti sono un conteggio per il numero di elementi del flusso da ricevere e il ritardo tra gli elementi del flusso.
connection.stream
restituisce un IStreamResult
oggetto , che contiene un subscribe
metodo . Passare un IStreamSubscriber
oggetto a subscribe
e impostare i next
callback , error
e complete
per ricevere notifiche dalla stream
chiamata.
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
Per terminare il flusso dal client, chiamare il dispose
metodo sull'oggetto ISubscription
restituito dal subscribe
metodo . La chiamata a questo metodo causa l'annullamento CancellationToken
del parametro del metodo Hub, se ne è stato specificato uno.
Streaming da client a server
I client JavaScript chiamano metodi di streaming da client a server negli hub passando un Subject
oggetto come argomento a send
, invoke
o stream
, a seconda del metodo hub richiamato. Subject
è una classe che ha un aspetto simile a .Subject
Ad esempio, in RxJS, è possibile usare la classe Subject da tale libreria.
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
La chiamata subject.next(item)
con un elemento scrive l'elemento nel flusso e il metodo hub riceve l'elemento nel server.
Per terminare il flusso, chiamare subject.complete()
.
Client Java
Streaming da server a client
Il SignalR client Java usa il stream
metodo per richiamare i metodi di streaming. stream
accetta tre o più argomenti:
- Tipo previsto degli elementi del flusso.
- Nome del metodo hub.
- Argomenti definiti nel metodo hub.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
Il stream
metodo su HubConnection
restituisce un oggetto Observable del tipo di elemento del flusso. Il metodo del tipo Observable è il punto in subscribe
cui onNext
onError
sono definiti i gestori e onCompleted
.
Streaming da client a server
Il SignalR client Java può chiamare metodi di streaming da client a server negli hub passando un observable come argomento a send
, invoke
o stream
, a seconda del metodo hub richiamato.
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
La chiamata stream.onNext(item)
con un elemento scrive l'elemento nel flusso e il metodo hub riceve l'elemento nel server.
Per terminare il flusso, chiamare stream.onComplete()
.