Compartilhar via


Início rápido sobre streaming no Orleans

Este guia mostrará uma maneira rápida de configurar e usar Streams do Orleans. Para saber mais detalhes dos recursos de streaming, leia outras partes desta documentação.

Configurações necessárias

Neste guia, usaremos um fluxo baseado em memória que usa sistemas de mensagens de grão para enviar dados de fluxo aos assinantes. Você usará o provedor de armazenamento na memória para armazenar listas de assinaturas. O uso de mecanismos baseados em memória para transmissão e armazenamento destina-se apenas ao desenvolvimento e teste locais e não serve para ambientes de produção.

No silo, em que silo é uma ISiloBuilder, chame AddMemoryStreams:

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

No cliente do cluster, em que client é um IClientBuilder, chame AddMemoryStreams.

client.AddMemoryStreams("StreamProvider");

Neste guia, usaremos um fluxo simples baseado em mensagem que usa mensagens de grão para enviar dados de fluxo aos assinantes. Usaremos o provedor de armazenamento na memória para armazenar listas de assinaturas, portanto, esta não é uma opção adequada para aplicativos de produção reais.

No silo, em que hostBuilder é uma ISiloHostBuilder, chame AddSimpleMessageStreamProvider:

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

No cliente do cluster, em que clientBuilder é um IClientBuilder, chame AddSimpleMessageStreamProvider.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Observação

Por padrão, as mensagens transmitidas pelo fluxo de mensagens simples são consideradas imutáveis e podem ser transmitidas por referência a outros grãos. Para desativar esse comportamento, configure o provedor de SMS para desativar SimpleMessageStreamProviderOptions.OptimizeForImmutableData

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

Você pode criar fluxos, enviar dados usando-os como produtores e receber dados como assinantes.

Produzir eventos

É relativamente fácil produzir eventos para fluxos. Primeiro, você deve obter acesso ao provedor de fluxo que foi definido anteriormente na configuração ("StreamProvider") e, em seguida, escolher um fluxo e enviar dados por push para ele.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

É relativamente fácil produzir eventos para fluxos. Primeiro, você deve obter acesso ao provedor de fluxo que foi definido anteriormente na configuração ("SMSProvider") e, em seguida, escolher um fluxo e enviar dados por push para ele.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Como é possível notar, o fluxo tem um GUID e um namespace. Isso facilitará a identificação de fluxos exclusivos. Por exemplo, o namespace de uma sala de chat pode ser "Salas" e o GUID pode ser o GUID do proprietário do RoomGrain.

Aqui, usamos o GUID de uma sala de chat conhecida. Usando o método OnNextAsync do fluxo, é possível enviar dados por push para ele. Vamos fazer isso dentro de um temporizador, usando números aleatórios. Também é possível usar qualquer outro tipo de dados para o fluxo.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Assinar e receber dados de transmissão

Para receber dados, você pode usar assinaturas implícitas e explícitas, que são descritas com mais detalhes em Assinaturas explícitas e implícitas. Esse exemplo usa assinaturas implícitas, que são mais fáceis. Quando um tipo de grão deseja assinar implicitamente um fluxo, ele usa o atributo [ImplicitStreamSubscription(namespace)].

No seu caso, defina uma ReceiverGrain desta maneira:

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

Sempre que os dados forem enviados por push para os fluxos do namespace RANDOMDATA, como ocorre no temporizador, um grão do tipo ReceiverGrain com o mesmo Guid do fluxo receberá a mensagem. Mesmo que não existam ativações do grão no momento, o runtime criará uma automaticamente e enviará a mensagem para ela.

Para isso, é preciso concluir o processo de assinatura definindo nosso método OnNextAsync para receber dados. A fim de realizar essa tarefa, o ReceiverGrain deve chamar algo desse tipo no OnActivateAsync

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

Tudo pronto! Agora, o único requisito é que algo dispare a criação do grão do produtor. Em seguida, ele registrará o temporizador e começará a enviar ints aleatórios para todos os interessados.

Novamente, este guia ignora muitos detalhes e só é bom para mostrar o quadro geral. Leia outras partes deste manual e outros recursos no RX para obter uma boa compreensão do que está disponível e como utilizar as opções.

A programação reativa pode ser uma abordagem muito poderosa para resolver muitos problemas. Por exemplo, é possível usar LINQ no assinante para filtrar números e fazer diversas coisas interessantes.

Confira também

APIs de Programação de Streams do Orleans