Partilhar via


Orleans Guia de início rápido de streaming

Este guia mostrará uma maneira rápida de configurar e usar Orleans o Streams. Para saber mais sobre os detalhes dos recursos de streaming, leia outras partes desta documentação.

Configurações necessárias

Neste guia, você usará um fluxo baseado em memória que usa mensagens de grão para enviar dados de fluxo para assinantes. Você usará o provedor de armazenamento na memória para armazenar listas de assinaturas. O uso de mecanismos baseados em memória para streaming e armazenamento destina-se apenas ao desenvolvimento e teste locais e não a ambientes de produção.

No silo, onde silo está um ISiloBuilder, chame :AddMemoryStreams

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

No cliente de cluster, onde client é um IClientBuilder, chame AddMemoryStreams.

client.AddMemoryStreams("StreamProvider");

Neste guia, usaremos um fluxo simples baseado em mensagens que usa mensagens granuladas para enviar dados de fluxo para os assinantes. Usaremos o provedor de armazenamento na memória para armazenar listas de assinaturas, portanto, não é uma escolha sábia para aplicativos de produção reais.

No silo, onde hostBuilder está um ISiloHostBuilder, chame :AddSimpleMessageStreamProvider

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

No cliente de cluster, onde clientBuilder é um IClientBuilder, chame AddSimpleMessageStreamProvider.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Nota

Por padrão, as mensagens que são passadas sobre o Fluxo de Mensagens Simples são consideradas imutáveis e podem ser passadas por referência a outros grãos. Para desativar esse comportamento, você deve configurar o provedor de SMS para desativar SimpleMessageStreamProviderOptions.OptimizeForImmutableData

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

Você pode criar fluxos, enviar dados usando-os como produtores e também receber dados como assinantes.

Produzir eventos

É relativamente fácil produzir eventos para streams. Você deve primeiro obter acesso ao provedor de fluxo que você definiu na configuração anteriormente ("StreamProvider") e, em seguida, escolher um fluxo e enviar dados por push para ele.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

É relativamente fácil produzir eventos para streams. Você deve primeiro obter acesso ao provedor de fluxo que você definiu na configuração anteriormente ("SMSProvider") e, em seguida, escolher um fluxo e enviar dados por push para ele.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Como você pode ver, nosso fluxo tem um GUID e um namespace. Isso facilitará a identificação de fluxos exclusivos. Por exemplo, o namespace para uma sala de chat pode ser "Rooms" e o GUID pode ser o GUID proprietário do RoomGrain.

Aqui usamos o GUID de alguma sala de chat conhecida. Usando o OnNextAsync método do fluxo, podemos enviar dados para ele. Vamos fazê-lo dentro de um temporizador, usando números aleatórios. Você também pode usar qualquer outro tipo de dados para o fluxo.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Subscrever e receber dados de streaming

Para receber dados, você pode usar assinaturas implícitas e explícitas, que são descritas com mais detalhes em Assinaturas explícitas e implícitas. Este exemplo usa assinaturas implícitas, que são mais fáceis. Quando um tipo de grão deseja se inscrever implicitamente em um fluxo, ele usa o atributo [ImplicitStreamSubscription(namespace)].

Para o seu caso, defina um ReceiverGrain como este:

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

Sempre que os dados são enviados para os fluxos do namespace RANDOMDATA, como temos no temporizador, um grão do tipo ReceiverGrain com o mesmo Guid do fluxo receberá a mensagem. Mesmo que nenhuma ativação do grão exista atualmente, o tempo de execução criará automaticamente um novo e enviará a mensagem para ele.

Para que isso funcione, precisamos concluir o processo de assinatura, definindo nosso OnNextAsync método para receber dados. Para isso, ReceiverGrain devemos chamar algo assim em seu OnActivateAsync

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

Está pronto! Agora, o único requisito é que algo desencadeie a criação do grão do produtor, e então ele registrará o temporizador e começará a enviar ints aleatórios para todas as partes interessadas.

Mais uma vez, este guia ignora muitos detalhes e só é bom para mostrar o panorama geral. Leia outras partes deste manual e outros recursos no RX para obter uma boa compreensão do que está disponível e como.

A programação reativa pode ser uma abordagem muito poderosa para resolver muitos problemas. Você pode, por exemplo, usar o LINQ no assinante para filtrar números e fazer todos os tipos de coisas interessantes.

Consulte também

Orleans APIs de programação de fluxos