Utiliser la diffusion en continu dans ASP.NET Core SignalR
Par Brennan Conroy
SignalR ASP.NET Core prend en charge la diffusion en continu de client à serveur et de serveur à client. Cela est utile pour les scénarios où des fragments de données arrivent au fil du temps. Lors de la diffusion en continu, chaque fragment est envoyé au client ou au serveur dès qu’il devient disponible, plutôt que d’attendre que toutes les données soient disponibles.
Affichez ou téléchargez l’exemple de code (procédure de téléchargement)
Configurer un hub pour la diffusion en continu
Une méthode hub devient automatiquement une méthode hub de streaming lorsqu’elle retourne IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>>
ou Task<ChannelReader<T>>
.
Diffuser en continu de serveur à client
Les méthodes de hub de streaming peuvent retourner IAsyncEnumerable<T>
en plus de ChannelReader<T>
. Le moyen le plus simple de retourner IAsyncEnumerable<T>
consiste à faire de la méthode hub une méthode d’itérateur asynchrone, comme le montre l’exemple suivant. Les méthodes d’itérateur asynchrone hub peuvent accepter un paramètre CancellationToken
qui est déclenché lorsque le client se désinscrit du flux. Les méthodes d’itérateur asynchrone évitent les problèmes courants avec les canaux, tels que le fait de ne pas retourner ChannelReader
suffisamment tôt ou de quitter la méthode sans terminer le ChannelWriter<T>.
Notes
L’exemple suivant nécessite C# 8.0 ou version ultérieure.
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
L’exemple suivant montre les principes de base de la diffusion de données vers le client à l’aide de Canaux. Chaque fois qu’un objet est écrit dans ChannelWriter<T>, l’objet est immédiatement envoyé au client. À la fin, le ChannelWriter
est terminé pour indiquer au client que le flux est fermé.
Notes
Écrivez dans le ChannelWriter<T>
sur un thread d’arrière-plan et retournez le ChannelReader
dès que possible. Les autres appels de hub sont bloqués jusqu’à ce qu’un ChannelReader
soit retourné.
Encapsuler la logique dans une try ... catch
instruction. Effectuez le Channel
dans un finally
bloc. Si vous souhaitez transmettre une erreur, capturez-la à l’intérieur du bloc catch
et écrivez-la dans le bloc finally
.
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
Les méthodes du hub de streaming de serveur à client peuvent accepter un paramètre CancellationToken
qui est déclenché lorsque le client se désinscrit du flux. Utilisez ce jeton pour arrêter l’opération serveur et libérer toutes les ressources si le client se déconnecte avant la fin du flux.
Diffuser en continu de client à serveur
Une méthode hub devient automatiquement une méthode de hub de streaming de client à serveur lorsqu’elle accepte un ou plusieurs objets de type ChannelReader<T> ou IAsyncEnumerable<T>. L’exemple suivant montre les principes de base de la lecture des données de streaming envoyées à partir du client. Chaque fois que le client écrit dans ChannelWriter<T>, les données sont écrites dans le ChannelReader
sur le serveur à partir duquel la méthode hub lit.
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
Une version IAsyncEnumerable<T> de la méthode est la suivante.
Notes
L’exemple suivant nécessite C# 8.0 ou version ultérieure.
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
Client .NET
Diffuser en continu de serveur à client
Les méthodes StreamAsync
et StreamAsChannelAsync
sur HubConnection
sont utilisées pour appeler des méthodes de streaming de serveur à client. Passez le nom et les arguments de la méthode hub définis dans la méthode hub à StreamAsync
ou StreamAsChannelAsync
. Le paramètre générique sur StreamAsync<T>
et StreamAsChannelAsync<T>
spécifie le type d’objets retournés par la méthode de streaming. Un objet de type IAsyncEnumerable<T>
ou ChannelReader<T>
est retourné à partir de l’appel de flux et représente le flux sur le client.
Exemple StreamAsync
qui retourne IAsyncEnumerable<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
Exemple correspondant StreamAsChannelAsync
qui retourne ChannelReader<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
Dans le code précédent :
- La méthode
StreamAsChannelAsync
surHubConnection
est utilisée pour appeler une méthode de streaming de serveur à client. Passez le nom et les arguments de la méthode hub définis dans la méthode hub àStreamAsChannelAsync
. - Le paramètre générique sur
StreamAsChannelAsync<T>
spécifie le type d’objets retournés par la méthode de streaming. - Un
ChannelReader<T>
est retourné à partir de l’appel de flux et représente le flux sur le client.
Diffuser en continu de client à serveur
Il existe deux façons d’appeler une méthode de hub de streaming de client à serveur à partir du client .NET. Vous pouvez passer un IAsyncEnumerable<T>
ou un ChannelReader
en tant qu’argument à SendAsync
, InvokeAsync
ou StreamAsChannelAsync
, en fonction de la méthode hub appelée.
Chaque fois que des données sont écrites dans l’objet IAsyncEnumerable
ou ChannelWriter
, la méthode hub sur le serveur reçoit un nouvel élément avec les données du client.
Si vous utilisez un objetIAsyncEnumerable
, le flux se termine après la sortie de la méthode des éléments de flux de retour.
Notes
L’exemple suivant nécessite C# 8.0 ou version ultérieure.
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
Ou si vous utilisez un ChannelWriter
, vous complétez le canal avec channel.Writer.Complete()
:
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
Client JavaScript
Diffuser en continu de serveur à client
Les clients JavaScript appellent des méthodes de streaming de serveur à client sur des hubs avec connection.stream
. La méthode stream
accepte deux arguments :
- Le nom de la méthode hub. Dans l’exemple suivant, le nom de la méthode hub est
Counter
. - Arguments définis dans la méthode hub. Dans l’exemple suivant, les arguments correspondent au nombre d’éléments de flux à recevoir et au délai entre les éléments de flux.
connection.stream
retourne un IStreamResult
, qui contient une méthode subscribe
. Passez un IStreamSubscriber
à subscribe
et définissez les rappels next
, error
et complete
pour recevoir des notifications de l’appel stream
.
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
Pour mettre fin au flux à partir du client, appelez la méthode dispose
sur le ISubscription
qui est retourné par la méthode subscribe
. L’appel de cette méthode entraîne l’annulation du paramètre CancellationToken
de la méthode Hub, si vous en avez fourni une.
Diffuser en continu de client à serveur
Les clients JavaScript appellent des méthodes de diffusion en continu de client à serveur sur des hubs en transmettant un Subject
en tant qu’argument à send
, invoke
ou stream
, en fonction de la méthode hub appelée. Le Subject
est une classe qui ressemble à un Subject
. Par exemple, dans RxJS, vous pouvez utiliser la classe Sujet de cette bibliothèque.
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
L’appel subject.next(item)
avec un élément écrit l’élément dans le flux, et la méthode hub reçoit l’élément sur le serveur.
Pour mettre fin au flux, appelez subject.complete()
.
Client Java
Diffuser en continu de serveur à client
Le SignalR client Java utilise la méthode stream
pour appeler des méthodes de streaming. stream
accepte au moins trois arguments :
- Type attendu des éléments de flux.
- Le nom de la méthode hub.
- Arguments définis dans la méthode hub.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
La méthode stream
sur HubConnection
retourne une observable du type d’élément de flux. La méthode du type Observable est l’emplacement subscribe
où les gestionnaires et onNext
,onError
et onCompleted
sont définis.
Diffuser en continu de client à serveur
Le SignalR client Java peut appeler des méthodes de diffusion en continu de client à serveur sur des hubs en transmettant un observable en tant qu’argument à send
, invoke
ou stream
, en fonction de la méthode hub appelée.
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
L’appel stream.onNext(item)
avec un élément écrit l’élément dans le flux, et la méthode hub reçoit l’élément sur le serveur.
Pour mettre fin au flux, appelez stream.onComplete()
.