Orleans APIs de streaming
Os aplicativos interagem com fluxos por meio de APIs que são muito semelhantes às conhecidas extensões reativas (Rx) no .NET. A principal diferença é que Orleans as extensões de fluxo são assíncronas, para tornar o processamento mais eficiente em Orleans' malha de computação distribuída e escalável.
Fluxo assíncrono
Um aplicativo começa usando um provedor de fluxo para obter um identificador para um fluxo. Você pode ler mais sobre provedores de fluxo aqui, mas, por enquanto, você pode pensar nele como uma fábrica de fluxo que permite que os implementadores personalizem o comportamento e a semântica dos fluxos:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Um aplicativo pode obter uma referência ao provedor de fluxo chamando o Grain.GetStreamProvider método quando estiver dentro de um grão ou chamando o GrainClient.GetStreamProvider método quando estiver no cliente.
Orleans.Streams.IAsyncStream<T> é um identificador lógico e fortemente tipado para um fluxo virtual. É semelhante em espírito ao Orleans Grain Reference. As chamadas para GetStreamProvider
e GetStream
são puramente locais. Os argumentos a serem GetStream
são um GUID e uma cadeia de caracteres adicional que chamamos de namespace de fluxo (que pode ser null). Juntos, o GUID e a cadeia de caracteres do namespace compõem a identidade do fluxo (semelhante em espírito aos argumentos para IGrainFactory.GetGrain). A combinação de GUID e cadeia de caracteres de namespace fornece flexibilidade extra na determinação de identidades de fluxo. Assim como o grão 7 pode existir dentro do tipo PlayerGrain
de grão e um grão 7 diferente pode existir dentro do tipo ChatRoomGrain
de grão, o fluxo 123 pode existir com o namespace PlayerEventsStream
de fluxo e um fluxo 123 diferente pode existir dentro do namespace ChatRoomMessagesStream
de fluxo.
Produzir e consumir
IAsyncStream<T> implementa as IAsyncObserver<T> interfaces e IAsyncObservable<T> . Dessa forma, um aplicativo pode usar o fluxo para produzir novos eventos no fluxo usando Orleans.Streams.IAsyncObserver<T>
ou para assinar e consumir eventos de um fluxo usando Orleans.Streams.IAsyncObservable<T>
.
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
Para produzir eventos no fluxo, um aplicativo apenas chama
await stream.OnNextAsync<T>(event)
Para assinar um fluxo, um aplicativo chama
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
O argumento para SubscribeAsync pode ser um objeto que implementa a IAsyncObserver<T> interface ou uma combinação de funções lambda para processar eventos de entrada. Mais opções estão SubscribeAsync
disponíveis via AsyncObservableExtensions aula. SubscribeAsync
Retorna um StreamSubscriptionHandle<T>, que é um identificador opaco que pode ser usado para cancelar a assinatura do fluxo (semelhante em espírito a uma versão assíncrona do IDisposable).
await subscriptionHandle.UnsubscribeAsync()
É importante notar que a assinatura é para um grão, não para ativação. Uma vez que o código grain é inscrito no stream, essa assinatura ultrapassa a vida útil dessa ativação e permanece durável para sempre, até que o código grain (potencialmente em uma ativação diferente) cancele explicitamente a inscrição. Este é o coração de uma abstração de fluxo virtual: não apenas todos os fluxos sempre existem, logicamente, mas também uma assinatura de fluxo é durável e vive além de uma ativação física específica que criou a assinatura.
Cardinalidade
Um Orleans fluxo pode ter vários produtores e vários consumidores. Uma mensagem publicada por um produtor será entregue a todos os consumidores que estavam inscritos no fluxo antes de a mensagem ser publicada.
Além disso, o consumidor pode assinar o mesmo fluxo várias vezes. Cada vez que se inscreve recebe de volta um único StreamSubscriptionHandle<T>. Se um grão (ou cliente) for inscrito X vezes no mesmo fluxo, ele receberá o mesmo evento X vezes, uma vez para cada assinatura. O consumidor também pode cancelar a assinatura de uma assinatura individual. Pode encontrar todas as suas subscrições atuais ligando para:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Recuperando-se de falhas
Se o produtor de um riacho morre (ou seu grão é desativado), não há nada que ele precise fazer. Da próxima vez que este grão quiser produzir mais eventos, ele pode fazer o fluxo lidar novamente e produzir novos eventos da mesma maneira.
A lógica do consumidor está um pouco mais envolvida. Como dissemos anteriormente, uma vez que um grão de consumidor é inscrito em um fluxo, essa assinatura é válida até que o grão cancele explicitamente a inscrição. Se o consumidor do fluxo morrer (ou seu grão for desativado) e um novo evento for gerado no fluxo, o grão do consumidor será automaticamente reativado (assim como qualquer grão regular Orleans é ativado automaticamente quando uma mensagem é enviada a ele). A única coisa que o código de grãos precisa fazer agora é fornecer um IAsyncObserver<T> para processar os dados. O consumidor precisa reanexar a lógica de processamento como parte do OnActivateAsync() método. Para fazer isso, ele pode chamar:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
O consumidor usa o identificador anterior que obteve quando se inscreveu pela primeira vez em "retomar o processamento". Observe que ResumeAsync apenas atualiza uma assinatura existente com a nova instância de lógica e não altera o fato de IAsyncObserver
que esse consumidor já está inscrito neste fluxo.
Como é que o consumidor recebe um velho subscriptionHandle
? Existem 2 opções. O consumidor pode ter persistido o identificador que lhe foi devolvido da operação original SubscribeAsync
e pode usá-lo agora. Em alternativa, se o consumidor não tiver o identificador, pode pedir todos os IAsyncStream<T>
seus identificadores de subscrição ativos, ligando para:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
O consumidor pode agora retomar todos eles ou cancelar a subscrição de alguns, se assim o desejar.
Gorjeta
Se o grão do consumidor implementa a IAsyncObserver<T> interface diretamente (public class MyGrain<T> : Grain, IAsyncObserver<T>
), em teoria não deve ser obrigado a re-anexar o IAsyncObserver
e, portanto, não precisará chamar ResumeAsync
. O tempo de execução do streaming deve ser capaz de descobrir automaticamente que o grão já implementa IAsyncObserver
e apenas invocará esses IAsyncObserver
métodos. No entanto, o tempo de execução de streaming atualmente não suporta isso e o código grain ainda precisa chamar ResumeAsync
explicitamente , mesmo que o grain implemente IAsyncObserver
diretamente.
Subscrições explícitas e implícitas
Por padrão, um consumidor de fluxo tem que assinar explicitamente o fluxo. Essa assinatura geralmente seria acionada por alguma mensagem externa que o grão (ou cliente) recebe que o instrui a se inscrever. Por exemplo, em um serviço de bate-papo, quando um usuário entra em uma sala de bate-papo, seu grão recebe uma JoinChatGroup
mensagem com o nome do bate-papo, o que fará com que o grão do usuário se inscreva nesse fluxo de bate-papo.
Além disso, Orleans os fluxos também suportam assinaturas implícitas. Neste modelo, o grão não subscreve explicitamente o fluxo. Este grão é subscrito automaticamente, implicitamente, apenas com base na sua identidade de grão e um ImplicitStreamSubscriptionAttribute. O principal valor das assinaturas implícitas é permitir que a atividade de fluxo acione a ativação de grãos (acionando assim a assinatura) automaticamente. Por exemplo, usando fluxos de SMS, se um grão quisesse produzir um fluxo e outro grão processasse esse fluxo, o produtor precisaria saber a identidade do grão consumidor e fazer uma chamada de grão para ele dizendo para se inscrever no fluxo. Só depois disso pode começar a enviar eventos. Em vez disso, usando assinaturas implícitas, o produtor pode simplesmente começar a produzir eventos para um stream, e o grão do consumidor será automaticamente ativado e assinará o stream. Nesse caso, o produtor não se importa em nada com quem está lendo os eventos
A implementação MyGrainType
de grão pode declarar um atributo [ImplicitStreamSubscription("MyStreamNamespace")]
. Isso informa ao tempo de execução de streaming que, quando um evento é gerado em um fluxo cuja identidade é GUID XXX e "MyStreamNamespace"
namespace, ele deve ser entregue ao grão cuja identidade é XXX do tipo MyGrainType
. Ou seja, o tempo de execução mapeia o fluxo <XXX, MyStreamNamespace>
para o grão <XXX, MyGrainType>
do consumidor.
A presença de faz com que o tempo de execução do streaming inscreva automaticamente esse grão em um fluxo e entregue os eventos de ImplicitStreamSubscription
fluxo para ele. No entanto, o código grain ainda precisa dizer ao tempo de execução como ele deseja que os eventos sejam processados. Essencialmente, ele precisa anexar o IAsyncObserver
. Portanto, quando o grão é ativado, o código de grão dentro OnActivateAsync
precisa chamar:
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Escrevendo a lógica da assinatura
Abaixo estão as diretrizes sobre como escrever a lógica de assinatura para vários casos: assinaturas explícitas e implícitas, fluxos rebobináveis e não rebobináveis. A principal diferença entre assinaturas explícitas e implícitas é que, para implícitas, o grão sempre tem exatamente uma assinatura implícita para cada namespace de fluxo; Não há como criar várias assinaturas (não há multiplicidade de assinaturas), não há como cancelar a assinatura e a lógica de grãos sempre só precisa anexar a lógica de processamento. Isso também significa que, para assinaturas implícitas, nunca há necessidade de retomar uma assinatura. Por outro lado, para assinaturas explícitas, é preciso retomar a assinatura, caso contrário, se o grão se inscrever novamente, isso resultará na assinatura do grão várias vezes.
Subscrições implícitas:
Para assinaturas implícitas, o grão ainda precisa se inscrever para anexar a lógica de processamento. Isso pode ser feito no grão do consumidor, implementando as IStreamSubscriptionObserver
interfaces e IAsyncObserver<T>
, permitindo que o grão seja ativado separadamente da assinatura. Para se inscrever no fluxo, o grão cria um identificador e chama await handle.ResumeAsync(this)
em seu OnSubscribed(...)
método.
Para processar mensagens, o IAsyncObserver<T>.OnNextAsync(...)
método é implementado para receber dados de fluxo e um token de sequência. Alternativamente, o ResumeAsync
método pode ter um conjunto de delegados representando os métodos da IAsyncObserver<T>
interface, onNextAsync
, onErrorAsync
, e onCompletedAsync
.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Subscrições explícitas:
Para assinaturas explícitas, um grão deve ligar SubscribeAsync
para se inscrever no fluxo. Isso cria uma assinatura, bem como anexa a lógica de processamento. A assinatura explícita existirá até que o grão cancele a inscrição, portanto, se um grão for desativado e reativado, o grão ainda será explicitamente assinado, mas nenhuma lógica de processamento será anexada. Neste caso, o grão precisa reanexar a lógica de processamento. Para fazer isso, em seu OnActivateAsync
, o grão primeiro precisa descobrir quais assinaturas ele tem, ligando IAsyncStream<T>.GetAllSubscriptionHandles()para . O grão deve ser executado ResumeAsync
em cada identificador que deseja continuar processando ou UnsubscribeAsync em qualquer identificador com o qual tenha terminado. O grão também pode, opcionalmente, especificar o como um argumento para as chamadas, o StreamSequenceToken
ResumeAsync
que fará com que essa assinatura explícita comece a consumir a partir desse token.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Ordem de fluxo e tokens de sequência
A ordem de entrega do evento entre um produtor individual e um consumidor individual depende do provedor de fluxo.
Com o SMS, o produtor controla explicitamente a ordem dos eventos vistos pelo consumidor, controlando a forma como o produtor os publica. Por padrão (se a SimpleMessageStreamProviderOptions.FireAndForgetDelivery opção para provedor de SMS estiver definida como false) e se o produtor aguarda cada OnNextAsync
chamada, os eventos chegam em ordem FIFO. No SMS cabe ao produtor decidir como lidar com falhas de entrega que serão indicadas por uma quebra Task
devolvida pela OnNextAsync
chamada.
Os fluxos de fila do Azure não garantem a ordem FIFO, uma vez que as filas subjacentes do Azure não garantem a ordem em casos de falha. (Eles garantem a ordem FIFO em execuções sem falhas.) Quando um produtor produz o evento na Fila do Azure, se a operação de fila falhar, cabe ao produtor tentar outra fila e, posteriormente, lidar com possíveis mensagens duplicadas. No lado da entrega, o Orleans tempo de execução do Streaming retira o evento da fila e tenta entregá-lo para processamento aos consumidores. O Orleans tempo de execução do Streaming exclui o evento da fila somente após o processamento bem-sucedido. Se a entrega ou o processamento falharem, o evento não será excluído da fila e reaparecerá automaticamente na fila mais tarde. O tempo de execução do Streaming tentará entregá-lo novamente, potencialmente quebrando a ordem FIFO. O comportamento acima corresponde à semântica normal das Filas do Azure.
Ordem definida pelo aplicativo: Para lidar com os problemas de pedido acima, um aplicativo pode, opcionalmente, especificar seu pedido. Isto é conseguido através de um StreamSequenceToken, que é um objeto opaco IComparable que pode ser usado para ordenar eventos. Um produtor pode passar um opcional StreamSequenceToken
para a OnNext
chamada. Este StreamSequenceToken
será passado ao consumidor e será entregue juntamente com o evento. Dessa forma, um aplicativo pode raciocinar e reconstruir sua ordem independentemente do tempo de execução do streaming.
Correntes rebobináveis
Alguns fluxos só permitem que um aplicativo os assine a partir do último momento, enquanto outros fluxos permitem "voltar no tempo". Esta última capacidade depende da tecnologia de fila subjacente e do provedor de fluxo específico. Por exemplo, as Filas do Azure só permitem consumir os eventos enfileirados mais recentes, enquanto o EventHub permite reproduzir eventos de um ponto arbitrário no tempo (até algum tempo de expiração). Os fluxos que suportam voltar no tempo são chamados de fluxos rebobináveis.
O consumidor de um fluxo rebobinável pode passar um StreamSequenceToken
para a SubscribeAsync
chamada. O tempo de execução entregará eventos a partir disso StreamSequenceToken
. Um token nulo significa que o consumidor deseja receber eventos a partir do mais recente.
A capacidade de retroceder um fluxo é muito útil em cenários de recuperação. Por exemplo, considere um grão que assina um fluxo e periodicamente verifica seu estado junto com o token de sequência mais recente. Ao se recuperar de uma falha, o grão pode se inscrever novamente no mesmo fluxo a partir do token de sequência de ponto de verificação mais recente, recuperando-se sem perder nenhum evento que foi gerado desde o último ponto de verificação.
O provedor de Hubs de Eventos é rebobinável. Você pode encontrar seu código no GitHub: Orleans/Azure/Orleans. Streaming.EventHubs. Os provedores de SMS e Azure Queue não são rebobináveis.
Processamento escalonado automaticamente sem estado
Por padrão, Orleans o Streaming é direcionado para suportar um grande número de fluxos relativamente pequenos, cada um processado por um ou mais grãos com monitoração de estado. Coletivamente, o processamento de todos os fluxos juntos é fragmentado entre um grande número de grãos regulares (stateful). O código do aplicativo controla essa fragmentação atribuindo IDs de fluxo e IDs de grão e inscrevendo-se explicitamente. O objetivo é o processamento com estado fragmentado.
No entanto, há também um cenário interessante de processamento sem estado expandido automaticamente. Nesse cenário, um aplicativo tem um pequeno número de fluxos (ou até mesmo um grande fluxo) e o objetivo é o processamento sem monitoração de estado. Um exemplo é um fluxo global de eventos, onde o processamento envolve a decodificação de cada evento e potencialmente encaminhá-lo para outros fluxos para processamento com monitoração de estado. O processamento de fluxo escalonado sem estado pode ser suportado por meio StatelessWorkerAttribute de Orleans grãos.
Status atual do processamento expandido automaticamente sem monitoração de estado: isso ainda não foi implementado. Uma tentativa de subscrever um fluxo a partir de um StatelessWorker
grão resultará num comportamento indefinido. Estamos a considerar apoiar esta opção.
Grãos e Orleans clientes
Orleans Os fluxos funcionam uniformemente entre grãos e Orleans clientes. Ou seja, as mesmas APIs podem ser usadas dentro de um grão e em um Orleans cliente para produzir e consumir eventos. Isso simplifica muito a lógica do aplicativo, tornando redundantes APIs especiais do lado do cliente, como o Grain Observers.
Streaming pub-sub totalmente gerenciado e confiável
Para rastrear assinaturas de streaming, Orleans usa um componente de tempo de execução chamado Streaming Pub-Sub , que serve como um ponto de encontro para consumidores e produtores de streaming. O Pub-sub rastreia todas as assinaturas de streaming e as persiste, além de fazer a correspondência entre os consumidores de streaming e os produtores de streaming.
Os aplicativos podem escolher onde e como os dados Pub-Sub são armazenados. O próprio componente Pub-Sub é implementado como grãos (chamados PubSubRendezvousGrain
), que usam Orleans persistência declarativa. PubSubRendezvousGrain
usa o provedor de armazenamento chamado PubSubStore
. Como acontece com qualquer grão, você pode designar uma implementação para um provedor de armazenamento. Para Streaming Pub-Sub você pode alterar a implementação do tempo de construção do silo PubSubStore
usando o construtor de host de silo:
O seguinte configura o Pub-Sub para armazenar seu estado nas tabelas do Azure.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
Dessa forma, os dados Pub-Sub serão armazenados de forma durável na Tabela do Azure. Para o desenvolvimento inicial, você também pode usar o armazenamento de memória. Além do Pub-Sub, o Orleans Streaming Runtime entrega eventos de produtores para consumidores, gerencia todos os recursos de tempo de execução alocados para fluxos usados ativamente e coleta recursos de tempo de execução de forma transparente de fluxos não utilizados.
Configuração
Para usar fluxos, você precisa habilitar provedores de fluxo por meio do host de silo ou construtores de clientes de cluster. Você pode ler mais sobre provedores de fluxo aqui. Exemplo de configuração do provedor de fluxo:
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");