Introdução ao ReliableConcurrentQueue no Azure Service Fabric
A fila simultânea confiável é uma fila assíncrona, transacional e replicada que apresenta alta simultaneidade para operações de enfileiramento e desfila. Ele foi projetado para oferecer alta taxa de transferência e baixa latência, relaxando os pedidos FIFO rigorosos fornecidos pela Fila Confiável e, em vez disso, fornece um pedido de melhor esforço.
APIs
Fila simultânea | Fila do Reliable Concurrent |
---|---|
void Enqueue(item T) | Task EnqueueAsync(ITransaction tx, item T) |
bool TryDequeue (resultado T out) | Tarefa< ConditionalValue < T >> TryDequeueAsync(ITransaction tx) |
Contagem int() | contagem longa() |
Comparação com fila confiável
A Fila Simultânea Confiável é oferecida como uma alternativa à Fila Confiável. Deve ser usado nos casos em que não é necessária uma ordem FIFO rigorosa, uma vez que garantir FIFO requer uma compensação com simultaneidade. O Reliable Queue usa bloqueios para impor a ordem FIFO, com no máximo uma transação permitida para enfileirar e no máximo uma transação permitida para desfilar de cada vez. Em comparação, a Fila Simultânea Confiável relaxa a restrição de ordenação e permite que qualquer número de transações simultâneas intercale suas operações de enfila e desfila. A ordem de melhor esforço é fornecida, no entanto, a ordenação relativa de dois valores em uma fila simultânea confiável nunca pode ser garantida.
A Fila Simultânea Confiável fornece maior taxa de transferência e menor latência do que a Fila Confiável sempre que há várias transações simultâneas executando enfileiramentos e/ou desfilas.
Um exemplo de caso de uso para o ReliableConcurrentQueue é o cenário Message Queue . Nesse cenário, um ou mais produtores de mensagens criam e adicionam itens à fila e um ou mais consumidores de mensagens extraem mensagens da fila e as processam. Vários produtores e consumidores podem trabalhar de forma independente, usando transações simultâneas para processar a fila.
Diretrizes de uso
- A fila espera que os itens na fila tenham um período de retenção baixo. Ou seja, os itens não ficariam na fila por muito tempo.
- A fila não garante pedidos FIFO rigorosos.
- A fila não lê suas próprias gravações. Se um item estiver enfileirado dentro de uma transação, ele não será visível para um desfilador dentro da mesma transação.
- As filas de espera não são isoladas umas das outras. Se o item A for retirado da fila na transação txnA, mesmo que txnA não esteja confirmado, o item A não será visível para uma transação simultânea txnB. Se txnA abortar, A se tornará visível para txnB imediatamente.
- O comportamento TryPeekAsync pode ser implementado usando um TryDequeueAsync e, em seguida, anulando a transação. Um exemplo desse comportamento pode ser encontrado na seção Padrões de programação.
- A contagem não é transacional. Ele pode ser usado para ter uma ideia do número de elementos na fila, mas representa um point-in-time e não pode ser confiável.
- O processamento dispendioso nos itens em fila não deve ser executado enquanto a transação estiver ativa, para evitar transações de longa duração que possam ter um impacto no desempenho do sistema.
Fragmentos de Código
Vejamos alguns trechos de código e suas saídas esperadas. O tratamento de exceções é ignorado nesta seção.
Instanciação
A criação de uma instância de uma Fila Simultânea Confiável é semelhante a qualquer outra Coleção Confiável.
IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");
EnqueueAsync
Aqui estão alguns trechos de código para usar EnqueueAsync seguido por suas saídas esperadas.
- Caso 1: Tarefa de fila única
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
Suponha que a tarefa foi concluída com êxito e que não houve transações simultâneas modificando a fila. O usuário pode esperar que a fila contenha os itens em qualquer uma das seguintes ordens:
10, 20
20, 10
- Caso 2: Tarefa de enfila paralela
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
await this.Queue.EnqueueAsync(txn, 40, cancellationToken);
await txn.CommitAsync();
}
Suponha que as tarefas foram concluídas com êxito, que as tarefas foram executadas em paralelo e que não houve outras transações simultâneas modificando a fila. Nenhuma inferência pode ser feita sobre a ordem dos itens na fila. Para este trecho de código, os itens podem aparecer em qualquer um dos 4! possíveis encomendas. A fila tentará manter os itens na ordem original (enfileirada), mas poderá ser forçada a reordená-los devido a operações simultâneas ou falhas.
DequeueAsync
Aqui estão alguns trechos de código para usar TryDequeueAsync seguido pelas saídas esperadas. Suponha que a fila já está preenchida com os seguintes itens na fila:
10, 20, 30, 40, 50, 60
- Caso 1: Tarefa única de desfila
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await txn.CommitAsync();
}
Suponha que a tarefa foi concluída com êxito e que não houve transações simultâneas modificando a fila. Como nenhuma inferência pode ser feita sobre a ordem dos itens na fila, qualquer um dos três itens pode ser retirado da fila, em qualquer ordem. A fila tentará manter os itens na ordem original (enfileirada), mas poderá ser forçada a reordená-los devido a operações simultâneas ou falhas.
- Caso 2: Tarefa de desfila paralela
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
Suponha que as tarefas foram concluídas com êxito, que as tarefas foram executadas em paralelo e que não houve outras transações simultâneas modificando a fila. Como nenhuma inferência pode ser feita sobre a ordem dos itens na fila, as listas dequeue1 e dequeue2 conterão, cada uma, dois itens, em qualquer ordem.
O mesmo item não aparecerá em ambas as listas. Assim, se dequeue1 tem 10, 30, então dequeue2 teria 20, 40.
- Caso 3: Desfila de pedidos com Transação Abortada
Abortar uma transação com desfilas durante o voo coloca os itens de volta no topo da fila. A ordem em que os itens são colocados de volta no topo da fila não é garantida. Vejamos o seguinte código:
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
// Abort the transaction
await txn.AbortAsync();
}
Suponha que os itens foram retirados da fila na seguinte ordem:
10, 20
Quando abortamos a transação, os itens são adicionados de volta ao topo da fila em qualquer uma das seguintes ordens:
10, 20
20, 10
O mesmo se aplica a todos os casos em que a transação não foi concluída com êxito.
Padrões de programação
Nesta seção, vamos examinar alguns padrões de programação que podem ser úteis no uso de ReliableConcurrentQueue.
Desfilas em lote
Um padrão de programação recomendado é que a tarefa do consumidor agrupe suas desfilas em vez de executar uma retirada de fila de cada vez. O usuário pode optar por limitar os atrasos entre cada lote ou o tamanho do lote. O trecho de código a seguir mostra esse modelo de programação. Esteja ciente, neste exemplo, o processamento é feito depois que a transação é confirmada, portanto, se ocorrer uma falha durante o processamento, os itens não processados serão perdidos sem terem sido processados. Alternativamente, o processamento pode ser feito dentro do escopo da transação, no entanto, pode ter um impacto negativo no desempenho e requer o manuseio dos itens já processados.
int batchSize = 5;
long delayMs = 100;
while(!cancellationToken.IsCancellationRequested)
{
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
for(int i = 0; i < batchSize; ++i)
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
else
{
// else break the for loop
break;
}
}
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
int delayFactor = batchSize - processItems.Count;
await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}
Processamento baseado em notificações de melhor esforço
Outro padrão de programação interessante usa a API Count. Aqui, podemos implementar o processamento baseado em notificações de melhor esforço para a fila. A Contagem de filas pode ser usada para limitar uma tarefa de enqueue ou dequeue. Observe que, como no exemplo anterior, como o processamento ocorre fora da transação, os itens não processados podem ser perdidos se ocorrer uma falha durante o processamento.
int threshold = 5;
long delayMs = 1000;
while(!cancellationToken.IsCancellationRequested)
{
while (this.Queue.Count < threshold)
{
cancellationToken.ThrowIfCancellationRequested();
// If the queue does not have the threshold number of items, delay the task and check again
await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
}
// If there are approximately threshold number of items, try and process the queue
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
} while (processItems.Count < threshold && ret.HasValue);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
}
Dreno de melhor esforço
Não é possível garantir um esgotamento da fila devido à natureza simultânea da estrutura de dados. É possível que, mesmo que nenhuma operação de usuário na fila esteja em andamento, uma chamada específica para TryDequeueAsync não retorne um item que foi anteriormente enfileirado e confirmado. É garantido que o item enfileirado eventualmente se tornará visível para retirada da fila, no entanto, sem um mecanismo de comunicação fora da banda, um consumidor independente não pode saber que a fila atingiu um estado estacionário, mesmo que todos os produtores tenham sido interrompidos e nenhuma nova operação de fila seja permitida. Assim, a operação de drenagem é o melhor esforço, conforme implementado abaixo.
O usuário deve parar todas as outras tarefas de produtor e consumidor e aguardar que qualquer transação em voo seja confirmada ou abortada, antes de tentar drenar a fila. Se o usuário souber o número esperado de itens na fila, ele poderá configurar uma notificação que sinalize que todos os itens foram retirados da fila.
int numItemsDequeued;
int batchSize = 5;
ConditionalValue ret;
do
{
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if(ret.HasValue)
{
// Buffer the dequeues
processItems.Add(ret.Value);
}
} while (ret.HasValue && processItems.Count < batchSize);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
} while (ret.HasValue);
Pré-visualizar
ReliableConcurrentQueue não fornece a API TryPeekAsync . Os usuários podem obter a semântica de visualização usando um TryDequeueAsync e, em seguida, abortando a transação. Neste exemplo, as desfilas são processadas somente se o valor do item for maior que 10.
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
bool valueProcessed = false;
if (ret.HasValue)
{
if (ret.Value > 10)
{
// Process the item
Console.WriteLine("Value : " + ret.Value);
valueProcessed = true;
}
}
if (valueProcessed)
{
await txn.CommitAsync();
}
else
{
await txn.AbortAsync();
}
}
Leitura obrigatória
- Guia de início rápido de serviços confiáveis
- Trabalhar com as Reliable Collections
- Notificações de serviços confiáveis
- Backup e restauração de serviços confiáveis (recuperação de desastres)
- Configuração confiável do State Manager
- Introdução aos Serviços de API Web do Service Fabric
- Uso avançado do modelo de programação de serviços confiáveis
- Referência do desenvolvedor para coleções confiáveis