Compartilhar via


APIs de streaming do Orleans

Os aplicativos interagem com fluxos por meio de APIs muito semelhantes às conhecidas Rx (Reactive Extensions) no .NET. A principal diferença é que as extensões de streaming do Orleans são assíncronas, para tornar o processamento mais eficiente na malha de computação distribuída e escalável do Orleans.

Fluxo assíncrono

Um aplicativo começa usando um provedor de fluxo para obter o identificador de um fluxo. Saiba mais sobre provedores de fluxo aqui, mas, basicamente, eles são como fábricas de fluxos que permitem aos implementadores personalizar o comportamento e a semântica dos fluxos:

IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");

Um aplicativo pode obter uma referência ao provedor de fluxo chamando o método Grain.GetStreamProvider quando dentro de um grão ou chamando o método GrainClient.GetStreamProvider quando no cliente.

O Orleans.Streams.IAsyncStream<T> é um identificador lógico e fortemente tipado para um fluxo virtual. Isso é semelhante à referência de granularidade do Orleans. Chamadas para GetStreamProvider e GetStream são puramente locais. Os argumentos para GetStream são uma GUID e uma cadeia de caracteres adicional que chamamos de namespace de fluxo (que pode ser nulo). Juntos, a GUID e a cadeia de caracteres de namespace compõem a identidade do fluxo (semelhante aos argumentos de IGrainFactory.GetGrain). A combinação de GUID e cadeia de caracteres de namespace fornece flexibilidade extra na determinação de identidades de fluxo. Assim como o grão 7 pode existir no tipo de grão PlayerGrain e um grão 7 diferente pode existir no tipo de grão ChatRoomGrain, o fluxo 123 pode existir com o namespace de fluxo PlayerEventsStream e um fluxo 123 diferente pode existir no namespace de fluxo ChatRoomMessagesStream.

Produção e consumo

O IAsyncStream<T> implementa as interfaces IAsyncObserver<T> e IAsyncObservable<T>. Dessa forma, um aplicativo pode usar o fluxo para produzir novos eventos nele por meio de Orleans.Streams.IAsyncObserver<T> ou para assinar e consumir eventos dele por meio de Orleans.Streams.IAsyncObservable<T>.

public interface IAsyncObserver<in T>
{
    Task OnNextAsync(T item, StreamSequenceToken token = null);
    Task OnCompletedAsync();
    Task OnErrorAsync(Exception ex);
}

public interface IAsyncObservable<T>
{
    Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}

Para produzir eventos no fluxo, um aplicativo realiza uma chamada

await stream.OnNextAsync<T>(event)

Para assinar um fluxo, um aplicativo realiza uma chamada

StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)

O argumento de SubscribeAsync pode ser um objeto que implementa a interface IAsyncObserver<T> ou uma combinação de funções lambda para processar eventos de entrada. Mais opções para SubscribeAsync estão disponíveis por meio da classe AsyncObservableExtensions. O SubscribeAsync retorna um StreamSubscriptionHandle<T>, que é um identificador opaco que pode ser usado para cancelar a assinatura do fluxo (semelhante a uma versão assíncrona de IDisposable).

await subscriptionHandle.UnsubscribeAsync()

É importante notar que a assinatura é para um grão, não para ativação. Depois que o código de grão é inscrito no fluxo, essa assinatura ultrapassa a vida útil da ativação e permanece durável para sempre, até que o código de grão (potencialmente em uma ativação diferente) seja cancelado explicitamente. Este é o ponto mais importante de uma abstração de fluxo virtual: não só todos os fluxos sempre existem logicamente, mas também uma assinatura de fluxo é durável e vive além de uma ativação física específica que a criou.

Multiplicidade

Um stream do Orleans pode ter diversos produtores e consumidores. Uma mensagem publicada por um produtor será entregue a todos os consumidores que se inscreveram no fluxo antes da publicação da mensagem.

Além disso, o consumidor pode assinar o mesmo fluxo diversas vezes. A cada assinatura, um único StreamSubscriptionHandle<T> é retornado. Se um grão (ou cliente) for inscrito X vezes no mesmo fluxo, ele receberá o mesmo evento X vezes, uma vez para cada assinatura. O consumidor também pode cancelar uma assinatura individual. Para encontrar todas as assinaturas atuais, ele pode fazer uma chamada:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Recuperando de falhas

Se o produtor de um fluxo morrer (ou seu grão for desativado), não haverá nada a ser feito. Na próxima vez que esse grão quiser produzir mais eventos, ele poderá obter o identificador de fluxo novamente e produzir novos eventos da mesma maneira.

A lógica do consumidor é um pouco mais complexa. Como dito antes, uma vez que um grão de consumidor é inscrito em um fluxo, essa assinatura é válida até que o grão seja cancelado explicitamente. Se o consumidor do stream morrer (ou sua granularidade for desativada) e um novo evento for gerado nesse stream, a granularidade do consumidor será reativada automaticamente, assim como acontece com qualquer granularidade comum do Orleans, que é ativada automaticamente quando uma mensagem é enviada a ela. A única coisa que o código de grão precisa fazer agora é fornecer um IAsyncObserver<T> para processar os dados. O consumidor precisa anexar novamente a lógica de processamento como parte do método OnActivateAsync(). Para isso, ele pode chamar:

StreamSubscriptionHandle<int> newHandle =
    await subscriptionHandle.ResumeAsync(IAsyncObserver);

O consumidor usa o identificador anterior obtido quando se inscreveu para "retomar o processamento". Observe que ResumeAsync somente atualiza uma assinatura existente com a nova instância da lógica IAsyncObserver e não altera o fato de que esse consumidor já está inscrito no fluxo.

Como o consumidor obtém um subscriptionHandle antigo? Há duas opções. O consumidor pode ter persistido o identificador que foi retornado pela operação SubscribeAsync original e pode usá-lo agora. Como alternativa, se ele não tiver o identificador, poderá solicitar ao IAsyncStream<T> todos os identificadores de assinatura ativos, chamando:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Agora, o consumidor pode retomar todos ou cancelar a assinatura de alguns, se necessário.

Dica

Se o grão do consumidor implementar a interface IAsyncObserver<T> diretamente (public class MyGrain<T> : Grain, IAsyncObserver<T>), não deveria ser necessário, em teoria, reconectar o IAsyncObserver e, portanto, não seria necessário chamar ResumeAsync. O runtime de streaming deve ser capaz de descobrir automaticamente que o grão já implementa IAsyncObserver e somente invocar esses métodos IAsyncObserver. No entanto, não há suporte atualmente no runtime de streaming para isso e o código de grão ainda precisa chamar explicitamente ResumeAsync, mesmo que implemente IAsyncObserver diretamente.

Assinaturas explícitas e implícitas

Por padrão, um consumidor de fluxo precisa se inscrever explicitamente no fluxo. Essa assinatura normalmente é disparada por alguma mensagem externa que o grão (ou cliente) recebe com a instrução de inscrever-se. Por exemplo, em um serviço de chat, quando um usuário entra em uma sala, seu grão recebe uma mensagem JoinChatGroup com o nome do chat, o que faz com que o grão do usuário se inscreva nesse fluxo de chat.

Além disso, os streams do Orleans também dão suporte a assinaturas implícitas. Neste modelo, o grão não se inscreve explicitamente no fluxo. Este grão é inscrito de maneira automática e implícita somente com base na identidade de grão respectiva e em um ImplicitStreamSubscriptionAttribute. O principal valor das assinaturas implícitas é permitir que a atividade de fluxo dispare a ativação do grão (disparando a assinatura) automaticamente. Por exemplo, no caso de fluxos SMS, para um grão produzir um fluxo e outro processar esse fluxo, o produtor precisaria saber a identidade do grão do consumidor e fazer uma chamada de grão informando que ele deve se inscrever no fluxo. Só depois disso será possível enviar eventos. Em vez disso, com as assinaturas implícitas, o produtor pode simplesmente começar a produzir eventos em um fluxo e o grão do consumidor será ativado automaticamente e se inscreverá nele. Nesse caso, o produtor não se importa com quem está lendo os eventos

A implementação de granularidade MyGrainType pode declarar um atributo [ImplicitStreamSubscription("MyStreamNamespace")]. Isso informa ao runtime de streaming que, quando um evento é gerado em um fluxo com uma identidade que é a GUID XXX e o namespace "MyStreamNamespace", ele deve ser entregue ao grão com a identidade XXX do tipo MyGrainType. Isso significa que os mapas de runtime transmitem <XXX, MyStreamNamespace> para o grão do consumidor <XXX, MyGrainType>.

A presença de ImplicitStreamSubscription faz com que o runtime de streaming inscreva automaticamente esse grão em um fluxo e entregue os eventos de fluxo a ele. No entanto, o código de grão ainda precisa informar ao runtime como deseja que os eventos sejam processados. Essencialmente, ele precisa anexar o IAsyncObserver. Portanto, quando o grão é ativado, o código de grão dentro de OnActivateAsync precisa chamar:

IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

StreamId streamId =
    StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
    streamProvider.GetStream<T>(streamId);

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

IAsyncStream<T> stream =
    streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);

Elaboração da lógica de assinatura

Veja abaixo as diretrizes sobre como escrever a lógica de assinatura para diversos casos: assinaturas explícitas e implícitas e fluxos rebobináveis e não rebobináveis. A principal diferença entre assinaturas explícitas e implícitas é que, no caso das implícitas, o grão sempre tem exatamente uma assinatura implícita para cada namespace de fluxo. Nesse caso, não é possível criar diversas assinaturas (não há multiplicidade de assinaturas), não é possível cancelar a assinatura e a lógica de grão sempre precisa anexar somente a lógica de processamento. Isso também significa que, para assinaturas implícitas, nunca é preciso retomar uma assinatura. No caso das assinaturas explícitas, é preciso retomar a assinatura, caso contrário, se o grão se inscrever novamente, ele será inscrito diversas vezes.

Assinaturas implícitas:

Para assinaturas implícitas, a granularidade ainda precisa se inscrever para anexar a lógica de processamento. Isso pode ser feito na granularidade do consumidor implementando as interfaces IStreamSubscriptionObserver e IAsyncObserver<T>, permitindo que a granularidade seja ativada separadamente da assinatura. Para assinar o fluxo, a granularidade cria um identificador e chama await handle.ResumeAsync(this) em seu método OnSubscribed(...).

Para processar as mensagens, o método IAsyncObserver<T>.OnNextAsync(...) é implementado para receber dados de fluxo e um token de sequência. Como alternativa, o método ResumeAsync pode usar um conjunto de delegados que representam os métodos da interface IAsyncObserver<T>, onNextAsync, onErrorAsync e onCompletedAsync.

public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
    _logger.LogInformation($"Received an item from the stream: {item}");
}

public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
    var handle = handleFactory.Create<string>();
    await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(
            this.GetPrimaryKey(), "MyStreamNamespace");

    await stream.SubscribeAsync(OnNextAsync);
}

Assinaturas explícitas:

Para assinaturas explícitas, um grão deve chamar SubscribeAsync para assinar o fluxo. Isso cria uma assinatura e anexa a lógica de processamento. A assinatura explícita existirá até que o grão seja cancelado, portanto, se um grão for desativado e reativado, ele ainda estará inscrito explicitamente, mas nenhuma lógica de processamento estará anexada. Nesse caso, o grão precisa reanexar a lógica de processamento. Para isso, no OnActivateAsync associado, o grão precisa primeiro descobrir quais assinaturas possui, chamando IAsyncStream<T>.GetAllSubscriptionHandles(). Ele deve executar ResumeAsync em cada identificador que deseja continuar processando ou UnsubscribeAsync em qualquer identificador que não é mais necessário. Opcionalmente, o grão também pode especificar StreamSequenceToken como um argumento para as chamadas ResumeAsync, o que fará com que essa assinatura explícita comece a consumir esse token.

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    foreach (var handle in subscriptionHandles)
    {
       await handle.ResumeAsync(this);
    } 
}
public async override Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}

Ordem de fluxo e tokens de sequência

A ordem de entrega de eventos entre um produtor individual e um consumidor individual depende do provedor de fluxo.

Com SMS, o produtor controla explicitamente a ordem dos eventos vistos pelo consumidor, controlando a forma como os publica. Por padrão (se a opção SimpleMessageStreamProviderOptions.FireAndForgetDelivery do provedor de SMS estiver configurada como false) e se o produtor aguardar cada chamada OnNextAsync, os eventos chegarão na ordem FIFO. Com SMS, cabe ao produtor decidir como tratar as falhas de entrega indicadas por um Task quebrado retornado pela chamada OnNextAsync.

Os fluxos da Fila do Azure não garantem a ordem FIFO, pois as Filas do Azure subjacentes não garantem a ordem em casos de falha. (Elas garantem a ordem FIFO em execuções sem falhas.) Quando um produtor produz o evento na Fila do Azure, se a operação da fila falha, cabe ao produtor tentar outra fila e, posteriormente, lidar com possíveis mensagens duplicadas. No lado da entrega, o Orleans Streaming Runtime remove o evento da fila e tenta entregá-lo para processamento aos consumidores. O Orleans Streaming Runtime só exclui o evento da fila após a conclusão do processamento. Se a entrega ou o processamento falhar, o evento não será excluído da fila e reaparecerá nela automaticamente mais tarde. O runtime de streaming tentará fazer a entrega novamente, potencialmente quebrando a ordem FIFO. O comportamento acima corresponde à semântica normal das Filas do Azure.

Ordem definida pelo aplicativo: para lidar com os problemas de ordenação acima, um aplicativo tem a opção de especificar sua ordenação. Isso é feito por meio de um StreamSequenceToken, que é um objeto IComparable opaco que pode ser usado para ordenar eventos. Um produtor pode transmitir um StreamSequenceToken opcional para a chamada OnNext. Esse StreamSequenceToken será transmitido ao consumidor e entregue com o evento. Dessa forma, um aplicativo pode raciocinar e reconstruir sua ordem independentemente do runtime de streaming.

Fluxos rebobináveis

Alguns fluxos só permitem que um aplicativo se inscreva neles a partir do último ponto no tempo, enquanto outros permitem "voltar no tempo". O último recurso depende da tecnologia de enfileiramento subjacente e do provedor de fluxo específico. Por exemplo, as Filas do Azure só permitem o consumo dos eventos enfileirados mais recentes, enquanto o Hub de Eventos permite a repetição de eventos de um ponto arbitrário no tempo (até algum tempo de expiração). Os fluxos que possibilitam voltar no tempo são chamados de fluxos rebobináveis.

.O consumidor de um fluxo rebobinável pode transmitir um StreamSequenceToken para a chamada SubscribeAsync. O runtime entregará eventos a ele a partir desse StreamSequenceToken. Um token nulo significa que o consumidor deseja receber eventos a partir do mais recente.

A capacidade de rebobinar um fluxo é muito útil em cenários de recuperação. Por exemplo, considere um grão que se inscreve em um fluxo e verifica periodicamente seu estado com o token de sequência mais recente. Ao se recuperar de uma falha, o grão pode se inscrever novamente no mesmo fluxo a partir do token de sequência de ponto de verificação mais recente, recuperando-se sem perder nenhum evento gerado desde o último ponto de verificação.

O provedor dos Hubs de Eventos é rebobinável. É possível encontrar o código dele no GitHub: Orleans/Azure/Orleans.Streaming.EventHubs. Os provedores de SMS e de Fila do Azurenão são rebobináveis.

Processamento sem estado expandido automaticamente

Por padrão, o Orleans Streaming é utilizado para dar suporte a um grande número de streams relativamente pequenos, cada um processado por um ou mais granularidades com estado. Coletivamente, o processamento de todos os fluxos juntos é fragmentado entre um grande número de grãos regulares (com estado). O código do aplicativo controla essa fragmentação atribuindo IDs de fluxo e de grão e realizando a assinatura explícita. O objetivo é o processamento fragmentado com estado.

No entanto, há também um cenário interessante de processamento sem estado expandido automaticamente. Nesse cenário, um aplicativo tem um pequeno número de fluxos (ou mesmo um grande fluxo) e o objetivo é o processamento sem estado. Um exemplo é um fluxo global de eventos, em que o processamento envolve decodificar cada evento e, potencialmente, encaminhá-lo para outros fluxos para processamento com estado adicional. É possível dar suporte ao processamento de stream expandido sem estado no Orleans por meio de granularidades do StatelessWorkerAttribute.

Status atual do processamento sem estado expandido automaticamente: ainda não foi implementado. Uma tentativa de se inscrever em um fluxo de um grão StatelessWorker resultará em um comportamento indefinido. Estamos considerando dar suporte a esta opção.

Granularidades e clientes do Orleans

Os streams do Orleans funcionam uniformemente em granularidades e em clientes do Orleans. Isso significa que as mesmas APIs podem ser usadas dentro de uma granularidade e em um cliente do Orleans para a produção e o consumo de eventos. Isso simplifica bastante a lógica do aplicativo, tornando redundantes APIs especiais do lado do cliente, como Observadores de Grão.

Pub-Sub de streaming totalmente gerenciado e confiável

Para rastrear assinaturas de stream, o Orleans usa um componente de runtime chamado Streaming Pub-Sub, que atua como ponto de encontro para consumidores e produtores de stream. O Pub-Sub rastreia todas as assinaturas de fluxo e as mantém, além de corresponder consumidores de fluxo com produtores de fluxo.

Os aplicativos podem escolher onde e como os dados do Pub-Sub são armazenados. O próprio componente Pub-Sub é implementado na forma de granularidades (chamadas PubSubRendezvousGrain), que usam a persistência declarativa do Orleans. O PubSubRendezvousGrain usa o provedor de armazenamento PubSubStore. Como acontece com qualquer grão, é possível designar uma implementação para um provedor de armazenamento. Para o Streaming Pub-Sub, é possível alterar a implementação de PubSubStore no momento da construção do silo usando o construtor de host de silo:

O seguinte configura o Pub-Sub para armazenar seu estado nas tabelas do Azure.

hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConnectionString = "<Secret>");

Dessa forma, os dados do Pub-Sub serão armazenados de maneira durável na Tabela do Azure. Para o desenvolvimento inicial, também é possível usar o armazenamento de memória. Além do Pub-Sub, o Orleans Streaming Runtime entrega eventos de produtores a consumidores, gerencia todos os recursos de runtime alocados para streams usados ativamente e faz a coleta de lixo transparente dos recursos de runtime de streams não utilizados.

Configuração

Para usar streams, é necessário habilitar provedores de streaming por meio do host de silo ou dos construtores de cliente de cluster. É possível ler mais sobre provedores de fluxo aqui. Configuração de exemplo do provedor de fluxo:

hostBuilder.AddMemoryStreams("StreamProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConfigureTableServiceClient("<Secret>")))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConnectionString = "<Secret>"))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConnectionString = "<Secret>");

Confira também

Provedores de Streaming do Orleans