Partilhar via


Orleans streams detalhes de implementação

Esta secção fornece uma visão geral de alto nível da implementação do Stream de Orleans. Descreve conceitos e detalhes que não são visíveis ao nível da aplicação. Se pretender utilizar streams, não tem de ler esta secção.

Terminologia:

Referimo-nos pela palavra "fila" a qualquer tecnologia de armazenamento durável que possa ingerir eventos de streaming e permita que ou puxe eventos ou forneça um mecanismo baseado em impulso para consumir eventos. Normalmente, para proporcionar escalabilidade, essas tecnologias fornecem filas de fragmentos/divisórias. Por exemplo, as filas Azure permitem criar várias filas e os Centros de Eventos têm vários centros.

Riachos persistentes

Todos os fornecedores de fluxos persistentes de Orleans partilham uma implementação PersistentStreamProvidercomum. Estes fornecedores genéricos de fluxo precisam de ser configurados com uma tecnologia específica IQueueAdapterFactory.

Por exemplo, para efeitos de teste, temos adaptadores de fila que geram os seus dados de teste em vez de ler os dados a partir de uma fila. O código abaixo mostra como configuramos um fornecedor de fluxo persistente para usar o nosso adaptador de fila personalizado (gerador). Fá-lo configurando o fornecedor de fluxo persistente com uma função de fábrica utilizada para criar o adaptador.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Quando um produtor de fluxo gera um novo item de fluxo e chama stream.OnNext(), o tempo de streaming de Orleans invoca o método adequado no IQueueAdapter fornecedor de fluxo que encosta o item diretamente na fila apropriada.

Agentes de puxar

No coração do Provedor de Fluxo Persistente estão os agentes de puxar. Os agentes puxam eventos de um conjunto de filas duradouras e entregam-nos ao código de aplicação em grãos que os consomem. Pode-se pensar nos agentes de puxar como um "micro-serviço" distribuído, um componente distribuído, altamente disponível e elástico. Os agentes de puxar correm dentro dos mesmos silos que acolhem grãos de aplicação e são totalmente geridos pelo Orleans Streaming Runtime.

StreamQueueMapper e StreamQueueBalancer

Os agentes de puxar são parametrizados com IStreamQueueMapper e IStreamQueueBalancer. O IStreamQueueMapper fornece uma lista de todas as filas e é também responsável por mapear streams para filas. Desta forma, o lado produtor do Persistent Stream Provider sabe em que fila para ensoar a mensagem.

O IStreamQueueBalancer expresso expressa a forma como as filas são equilibradas através de silos e agentes de Orleães. O objetivo é atribuir filas aos agentes de forma equilibrada, para evitar estrangulamentos e apoiar a elasticidade. Quando um novo silo é adicionado ao cluster de Orleães, as filas são automaticamente reequilibadas através dos velhos e novos silos. Permite StreamQueueBalancer personalizar esse processo. Orleans tem vários StreamQueueBalancers incorporados, para suportar diferentes cenários de equilíbrio (grande e pequeno número de filas) e diferentes ambientes (Azure, on-prem, estático).

Utilizando o exemplo do gerador de teste a partir de cima, o código abaixo mostra como se pode configurar o mapeador de fila e o equilibrador de fila.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

O código acima configura o GeneratorAdapterFactory uso de um mapper de fila com oito filas, e equilibra as filas em todo o cluster usando o DynamicClusterConfigDeploymentBalancer.

Protocolo de puxar

Cada silo dirige um conjunto de agentes, todos os agentes puxam de uma fila. Os próprios agentes de puxar são implementados por um componente interno do tempo de execução, chamado SystemTarget. Os Sistemasrgets são essencialmente grãos de tempo de execução, estão sujeitos a uma concurrency de rosca única, podem usar mensagens regulares de cereais, e são tão leves como os grãos. Ao contrário dos grãos, os SystemTargets não são virtuais: são explicitamente criados (pelo tempo de execução) e não são transparentes na localização. Ao implementar agentes de puxar como SystemTargets, o Orleans Streaming Runtime pode contar com funcionalidades incorporadas em Orleães e pode escalar para um grande número de filas, uma vez que criar um novo agente de puxar é tão barato como criar um novo grão.

Cada agente de puxar corre um temporizador periódico que sai da fila invocando o IQueueAdapterReceiver.GetQueueMessagesAsync método. As mensagens devolvidas são colocadas na estrutura interna de dados por agente chamada IQueueCache. Cada mensagem é inspecionada para descobrir o seu fluxo de destino. O agente usa o Pub-Sub para descobrir a lista de consumidores de streaming que subscreveram este fluxo. Uma vez recuperada a lista de consumidores, o agente armazena-a localmente (na sua cache pub-sub) para que não seja necessário consultar Pub-Sub em cada mensagem. O agente também subscreve o pub-sub para receber a notificação de quaisquer novos consumidores que subscrevam esse fluxo. Este aperto de mão entre o agente e o pub-sub garante uma forte semântica de subscrição de streaming: uma vez que o consumidor tenha subscrito o stream, verá todos os eventos que foram gerados depois de ter subscrito. Além disso, a utilização StreamSequenceToken permite-lhe subscrever no passado.

Cache de fila

IQueueCache é uma estrutura interna de dados por agente que permite dissociar novos eventos da fila e entregá-los aos consumidores. Também permite dissociar a entrega a diferentes fluxos e diferentes consumidores.

Imagine uma situação em que um fluxo tem 3 consumidores de fluxo e um deles é lento. Se não for tomado cuidado, este consumidor lento pode ter impacto no progresso do agente, abrandando o consumo de outros consumidores desse fluxo, e até abrandando a desabamento e entrega de eventos para outros fluxos. Para evitar isso e permitir o máximo paralelismo no agente, utilizamos IQueueCache.

IQueueCache buffers stream eventos e fornece uma maneira para o agente entregar eventos a cada consumidor ao seu próprio ritmo. A entrega por consumidor é implementada pela componente interna denominada IQueueCacheCursor, que acompanha o progresso por consumidor. Desta forma, cada consumidor recebe eventos ao seu próprio ritmo: os consumidores rápidos recebem eventos tão rapidamente quanto são despromovidos da fila, enquanto os consumidores lentos os recebem mais tarde. Uma vez que a mensagem é entregue a todos os consumidores, pode ser eliminada da cache.

Pressão traseira

A retropressão no Streamtime orleans aplica-se em dois locais: trazer eventos de streaming da fila para o agente e entregar os eventos do agente para transmitir os consumidores.

Este último é fornecido pelo mecanismo de entrega de mensagens embuti-in Orleans. Cada evento de streaming é entregue do agente aos consumidores através da mensagem padrão de grãos de Orleães, um de cada vez. Ou seja, os agentes enviam um evento (ou um lote de eventos de tamanho limitado) para cada consumidor de fluxo e aguardam esta chamada. O próximo evento não começará a ser entregue até que a Tarefa para o evento anterior seja resolvida ou quebrada. Desta forma, limitamos, naturalmente, a taxa de entrega por consumidor a uma mensagem de cada vez.

Ao trazer eventos de streaming da fila para o agente, o Orleans Streaming fornece um novo mecanismo especial de backpressure. Uma vez que o agente se dissocia desabrocha os acontecimentos da fila e os entrega aos consumidores, um único consumidor lento pode ficar para trás tanto que o IQueueCache irá encher. Para evitar que IQueueCache cresça indefinidamente, limitamos o seu tamanho (o limite de tamanho é configurável). No entanto, o agente nunca deita fora eventos não entregues.

Em vez disso, quando a cache começa a encher, os agentes abrandam a taxa de desassossar eventos da fila. Dessa forma, podemos "ultrapassar" os períodos de entrega lentos ajustando a velocidade a que consumimos a partir da fila ("backpressure") e voltar a ter taxas de consumo rápido mais tarde. Para detetar os vales de "entrega lenta", utiliza uma estrutura interna de dados de baldes de cache que acompanha o progresso da entrega de eventos aos consumidores IQueueCache individuais de streaming. Isto resulta num sistema muito responsivo e auto-ajuste.