Orleans Guia de início rápido de streaming
Este guia mostrará uma maneira rápida de configurar e usar Orleans o Streams. Para saber mais sobre os detalhes dos recursos de streaming, leia outras partes desta documentação.
Configurações necessárias
Neste guia, você usará um fluxo baseado em memória que usa mensagens de grão para enviar dados de fluxo para assinantes. Você usará o provedor de armazenamento na memória para armazenar listas de assinaturas. O uso de mecanismos baseados em memória para streaming e armazenamento destina-se apenas ao desenvolvimento e teste locais e não a ambientes de produção.
No silo, onde silo
está um ISiloBuilder, chame :AddMemoryStreams
silo.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
No cliente de cluster, onde client
é um IClientBuilder, chame AddMemoryStreams.
client.AddMemoryStreams("StreamProvider");
Neste guia, usaremos um fluxo simples baseado em mensagens que usa mensagens granuladas para enviar dados de fluxo para os assinantes. Usaremos o provedor de armazenamento na memória para armazenar listas de assinaturas, portanto, não é uma escolha sábia para aplicativos de produção reais.
No silo, onde hostBuilder
está um ISiloHostBuilder
, chame :AddSimpleMessageStreamProvider
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddMemoryGrainStorage("PubSubStore");
No cliente de cluster, onde clientBuilder
é um IClientBuilder
, chame AddSimpleMessageStreamProvider.
clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
Nota
Por padrão, as mensagens que são passadas sobre o Fluxo de Mensagens Simples são consideradas imutáveis e podem ser passadas por referência a outros grãos. Para desativar esse comportamento, você deve configurar o provedor de SMS para desativar SimpleMessageStreamProviderOptions.OptimizeForImmutableData
siloBuilder
.AddSimpleMessageStreamProvider(
"SMSProvider",
options => options.OptimizeForImmutableData = false);
Você pode criar fluxos, enviar dados usando-os como produtores e também receber dados como assinantes.
Produzir eventos
É relativamente fácil produzir eventos para streams. Você deve primeiro obter acesso ao provedor de fluxo que você definiu na configuração anteriormente ("StreamProvider"
) e, em seguida, escolher um fluxo e enviar dados por push para ele.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
É relativamente fácil produzir eventos para streams. Você deve primeiro obter acesso ao provedor de fluxo que você definiu na configuração anteriormente ("SMSProvider"
) e, em seguida, escolher um fluxo e enviar dados por push para ele.
// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
Como você pode ver, nosso fluxo tem um GUID e um namespace. Isso facilitará a identificação de fluxos exclusivos. Por exemplo, o namespace para uma sala de chat pode ser "Rooms" e o GUID pode ser o GUID proprietário do RoomGrain.
Aqui usamos o GUID de alguma sala de chat conhecida. Usando o OnNextAsync
método do fluxo, podemos enviar dados para ele. Vamos fazê-lo dentro de um temporizador, usando números aleatórios. Você também pode usar qualquer outro tipo de dados para o fluxo.
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));
Subscrever e receber dados de streaming
Para receber dados, você pode usar assinaturas implícitas e explícitas, que são descritas com mais detalhes em Assinaturas explícitas e implícitas. Este exemplo usa assinaturas implícitas, que são mais fáceis. Quando um tipo de grão deseja se inscrever implicitamente em um fluxo, ele usa o atributo [ImplicitStreamSubscription(namespace)].
Para o seu caso, defina um ReceiverGrain
como este:
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
Sempre que os dados são enviados para os fluxos do namespace RANDOMDATA
, como temos no temporizador, um grão do tipo ReceiverGrain
com o mesmo Guid
do fluxo receberá a mensagem. Mesmo que nenhuma ativação do grão exista atualmente, o tempo de execução criará automaticamente um novo e enviará a mensagem para ele.
Para que isso funcione, precisamos concluir o processo de assinatura, definindo nosso OnNextAsync
método para receber dados. Para isso, ReceiverGrain
devemos chamar algo assim em seu OnActivateAsync
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();
// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
Está pronto! Agora, o único requisito é que algo desencadeie a criação do grão do produtor, e então ele registrará o temporizador e começará a enviar ints aleatórios para todas as partes interessadas.
Mais uma vez, este guia ignora muitos detalhes e só é bom para mostrar o panorama geral. Leia outras partes deste manual e outros recursos no RX para obter uma boa compreensão do que está disponível e como.
A programação reativa pode ser uma abordagem muito poderosa para resolver muitos problemas. Você pode, por exemplo, usar o LINQ no assinante para filtrar números e fazer todos os tipos de coisas interessantes.