Usar transmissão no ASP.NET Core SignalR
Por Brennan Conroy
O ASP.NET Core SignalR é compatível com transmissão de cliente para servidor e de servidor para cliente. Isso é útil para cenários em que fragmentos de dados chegam ao longo do tempo. Ao transmitir, cada fragmento é enviado para o cliente ou servidor assim que ele fica disponível, em vez de aguardar que todos os dados fiquem disponíveis.
Exibir ou baixar código de exemplo (como baixar)
Configurar um hub para transmissão
Um método de hub se torna automaticamente um método de hub de transmissão quando retorna IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>>
ou Task<ChannelReader<T>>
.
Fluxo de dados de servidor para cliente
Os métodos de hub de transmissão podem retornar IAsyncEnumerable<T>
além de ChannelReader<T>
. A maneira mais simples de retornar IAsyncEnumerable<T>
é tornando o método hub um método iterador assíncrono, como demonstra o exemplo a seguir. Os métodos de iterador assíncrono do hub podem aceitar um parâmetro CancellationToken
que é disparado quando o cliente cancela a assinatura do fluxo. Os métodos iteradores assíncronos evitam problemas comuns com Canais, como não retornar o ChannelReader
cedo o suficiente ou sair do método sem concluir o ChannelWriter<T>.
Observação
O exemplo a seguir requer o C# 8.0 ou posterior.
public class AsyncEnumerableHub : Hub
{
public async IAsyncEnumerable<int> Counter(
int count,
int delay,
[EnumeratorCancellation]
CancellationToken cancellationToken)
{
for (var i = 0; i < count; i++)
{
// Check the cancellation token regularly so that the server will stop
// producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return i;
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
}
O exemplo a seguir mostra as noções básicas de dados de transmissão para o cliente usando Canais. Sempre que um objeto é gravado no ChannelWriter<T>, o objeto é imediatamente enviado ao cliente. No final, o ChannelWriter
é concluído para informar ao cliente que o fluxo está fechado.
Observação
Escreva no ChannelWriter<T>
em um thread em segundo plano e retorne o ChannelReader
o mais rápido possível. Outras invocações de hub são bloqueadas até que um ChannelReader
seja retornado.
Encapsular lógica em uma try ... catch
instrução . Conclua o Channel
em um finally
bloco. Se você quiser fluir um erro, capture-o dentro do bloco catch
e escreva-o no bloco finally
.
public ChannelReader<int> Counter(
int count,
int delay,
CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<int>();
// We don't want to await WriteItemsAsync, otherwise we'd end up waiting
// for all the items to be written before returning the channel back to
// the client.
_ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(
ChannelWriter<int> writer,
int count,
int delay,
CancellationToken cancellationToken)
{
Exception localException = null;
try
{
for (var i = 0; i < count; i++)
{
await writer.WriteAsync(i, cancellationToken);
// Use the cancellationToken in other APIs that accept cancellation
// tokens so the cancellation can flow down to them.
await Task.Delay(delay, cancellationToken);
}
}
catch (Exception ex)
{
localException = ex;
}
finally
{
writer.Complete(localException);
}
}
Os métodos de iterador assíncrono do hub podem aceitar um parâmetro CancellationToken
que é disparado quando o cliente cancela a assinatura do fluxo. Use esse token para interromper a operação do servidor e liberar todos os recursos se o cliente se desconectar antes do final do fluxo.
Fluxo de dados do cliente para o servidor
Um método de hub se torna automaticamente um método de hub de transmissão cliente para servidor quando aceita um ou mais objetos do tipo ChannelReader<T> ou IAsyncEnumerable<T>. O exemplo a seguir mostra os conceitos básicos da leitura de dados de transmissão enviados do cliente. Sempre que o cliente grava no ChannelWriter<T>, os dados são gravados no ChannelReader
no servidor a partir do qual o método de hub está lendo.
public async Task UploadStream(ChannelReader<string> stream)
{
while (await stream.WaitToReadAsync())
{
while (stream.TryRead(out var item))
{
// do something with the stream item
Console.WriteLine(item);
}
}
}
Uma versão IAsyncEnumerable<T> do método a seguir.
Observação
O exemplo a seguir requer o C# 8.0 ou posterior.
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
Console.WriteLine(item);
}
}
Cliente .NET
Fluxo de dados de servidor para cliente
Os métodos StreamAsync
e StreamAsChannelAsync
em HubConnection
são usados para invocar métodos de transmissão de servidor para cliente. Passe o nome do método de hub e todos os argumentos definidos no método de hub para StreamAsync
ou StreamAsChannelAsync
. O parâmetro genérico em StreamAsync<T>
e StreamAsChannelAsync<T>
especifica o tipo de objetos retornados pelo método de transmissão. Um objeto do tipo IAsyncEnumerable<T>
ou ChannelReader<T>
é retornado da invocação de fluxo e representa o fluxo no cliente.
Um exemplo StreamAsync
que retorna IAsyncEnumerable<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await foreach (var count in stream)
{
Console.WriteLine($"{count}");
}
Console.WriteLine("Streaming completed");
Um exemplo correspondente StreamAsChannelAsync
que retorna ChannelReader<int>
:
// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
// Read all currently available data synchronously, before waiting for more data
while (channel.TryRead(out var count))
{
Console.WriteLine($"{count}");
}
}
Console.WriteLine("Streaming completed");
No código anterior:
- O método
StreamAsChannelAsync
emHubConnection
é usado para invocar um método de transmissão de servidor para cliente. Passe o nome do método de hub e todos os argumentos definidos no método de hub paraStreamAsChannelAsync
. - O parâmetro genérico em
StreamAsChannelAsync<T>
specifica o tipo de objetos retornados pelo método de transmissão. - Um
ChannelReader<T>
é retornado da invocação de fluxo e representa o fluxo no cliente.
Fluxo de dados do cliente para o servidor
Há duas maneiras de invocar um método de hub de transmissão do cliente para servidor do cliente .NET. Você pode passar um IAsyncEnumerable<T>
ou ChannelReader
um como um argumento para SendAsync
, InvokeAsync
ou StreamAsChannelAsync
, dependendo do método de hub invocado.
Sempre que os dados são gravados no objeto IAsyncEnumerable
ou ChannelWriter
, o método hub no servidor recebe um novo item com os dados do cliente.
Se estiver usando um objeto IAsyncEnumerable
, o fluxo terminará depois que o método que retorna itens de fluxo for encerrado.
Observação
O exemplo a seguir requer o C# 8.0 ou posterior.
async IAsyncEnumerable<string> clientStreamData()
{
for (var i = 0; i < 5; i++)
{
var data = await FetchSomeData();
yield return data;
}
//After the for loop has completed and the local function exits the stream completion will be sent.
}
await connection.SendAsync("UploadStream", clientStreamData());
Ou se você estiver usando um ChannelWriter
, conclua o canal com channel.Writer.Complete()
:
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();
Cliente JavaScript
Fluxo de dados de servidor para cliente
Os clientes JavaScript chamam métodos de transmissão de servidor para cliente em hubs com connection.stream
. O método stream
aceita dois argumentos:
- O nome do método de hub. No exemplo a seguir, o nome do método de hub é
Counter
. - Argumentos definidos no método de hub. No exemplo a seguir, os argumentos são uma contagem para o número de itens de fluxo a serem recebidos e o atraso entre os itens de fluxo.
connection.stream
retorna um IStreamResult
, que contém um método subscribe
. Passe um IStreamSubscriber
para subscribe
e defina os retornos de chamada next
, error
e complete
para receber notificações da invocação stream
.
connection.stream("Counter", 10, 500)
.subscribe({
next: (item) => {
var li = document.createElement("li");
li.textContent = item;
document.getElementById("messagesList").appendChild(li);
},
complete: () => {
var li = document.createElement("li");
li.textContent = "Stream completed";
document.getElementById("messagesList").appendChild(li);
},
error: (err) => {
var li = document.createElement("li");
li.textContent = err;
document.getElementById("messagesList").appendChild(li);
},
});
Para encerrar o fluxo do cliente, chame o método dispose
no ISubscription
retornado do método subscribe
. Chamar esse método causará o cancelamento do parâmetro CancellationToken
do método de hub, se você tiver fornecido um.
Fluxo de dados do cliente para o servidor
Os clientes JavaScript chamam métodos de transmissão de cliente para servidor em hubs passando um Subject
como um argumento para send
, invoke
ou stream
, dependendo do método de hub invocado. O Subject
é uma classe que se parece com um Subject
. Por exemplo, no RxJS, você pode usar a classe Subject dessa biblioteca.
const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
iteration++;
subject.next(iteration.toString());
if (iteration === 10) {
clearInterval(intervalHandle);
subject.complete();
}
}, 500);
Chamar subject.next(item)
com um item grava o item no fluxo e o método hub recebe o item no servidor.
Para encerrar o fluxo, chame subject.complete()
.
Cliente Java
Fluxo de dados de servidor para cliente
O cliente Java SignalR usa o método stream
para invocar métodos de transmissão. stream
aceita três ou mais argumentos:
- O tipo esperado dos itens de fluxo.
- O nome do método de hub.
- Argumentos definidos no método de hub.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
.subscribe(
(item) -> {/* Define your onNext handler here. */ },
(error) -> {/* Define your onError handler here. */},
() -> {/* Define your onCompleted handler here. */});
O método stream
em HubConnection
retorna um Observável do tipo de item de fluxo. O método subscribe
do tipo Observável é onde os manipuladores onNext
,onError
e onCompleted
são definidos.
Fluxo de dados do cliente para o servidor
O cliente Java do SignalR chama métodos de transmissão de cliente para servidor em hubs passando um Observável como um argumento para send
, invoke
ou stream
, dependendo do método de hub invocado.
ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();
Chamar stream.onNext(item)
com um item grava o item no fluxo e o método hub recebe o item no servidor.
Para encerrar a transmissão, chame stream.onComplete()
.