Partilhar via


O que é o controlo de progresso assíncrono?

Importante

Esta funcionalidade está em Pré-visualização Pública.

O controlo de progresso assíncrono permite que os pipelines de Transmissão em Fluxo Estruturado progridam de forma assíncrona e paralela ao processamento de dados real num micro lote, reduzindo a latência associada à manutenção do e commitLogdo offsetLog .

Controlo de Progresso Assíncrono

Nota

O controlo de progresso assíncrono não funciona com Trigger.once ou Trigger.availableNow acionadores. Tentar ativar esta funcionalidade com estes acionadores resulta numa falha de consulta.

Como funciona o controlo do progresso assíncrono para reduzir a latência?

A Transmissão em Fluxo Estruturada depende da persistência e gestão de desvios como indicadores de progresso para o processamento de consultas. A operação de gestão de deslocamento afeta diretamente a latência do processamento, porque não pode ocorrer nenhum processamento de dados até que estas operações estejam concluídas. O controlo de progresso assíncrono permite que os pipelines de Transmissão em Fluxo Estruturada progridam sem serem afetados por estas operações de gestão de desvios.

Quando deve configurar a frequência do ponto de verificação?

Os utilizadores podem configurar a frequência com que o progresso é registado. As predefinições para a frequência de ponto de verificação fornecem um bom débito para a maioria das consultas. Configurar a frequência é útil para cenários em que as operações de gestão de deslocamento ocorrem a uma taxa superior à que podem ser processadas, o que cria um registo de tarefas pendentes cada vez maior de operações de gestão de desvios. Para conter este registo de tarefas pendentes crescente, o processamento de dados é bloqueado ou abrandado, revertendo essencialmente o comportamento de processamento para eliminar os benefícios do controlo de progresso assíncrono.

Nota

O tempo de recuperação da falha aumenta com o aumento do tempo de intervalo do ponto de verificação. Em caso de falha, um pipeline tem de processar novamente todos os dados antes do ponto de verificação com êxito anterior. Os utilizadores podem considerar esta troca entre latência mais baixa durante o processamento regular e o tempo de recuperação em caso de falha.

Que configurações estão associadas ao controlo de progresso assíncrono?

Opção Valor Predefinição Descrição
asyncProgressTrackingEnabled verdadeiro/falso falso ativar ou desativar o controlo de progresso assíncrono
asyncProgressTrackingCheckpointIntervalMs milissegundos 1000 o intervalo em que consolidamos os desvios e as consolidações de conclusão

Como é que os utilizadores podem ativar o controlo do progresso assíncrono?

Os utilizadores podem utilizar código semelhante ao código abaixo para ativar esta funcionalidade:

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

Desativar o controlo de progresso assíncrono

Quando o controlo do progresso assíncrono está ativado, a arquitetura não controla o progresso de cada lote. Para resolver este problema, antes de desativar o controlo de progresso assíncrono, processe pelo menos dois micro lotes com as seguintes definições:

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

Pare a consulta depois de, pelo menos, dois micro lotes terem terminado o processamento. Agora, pode desativar com segurança o controlo do progresso assíncrono e reiniciar a consulta.

Se tiver desativado o controlo do progresso assíncrono sem concluir este passo, poderá encontrar o seguinte erro:

java.lang.IllegalStateException: batch x doesn't exist

Nos registos do controlador, poderá ver o seguinte erro:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Seguir as instruções nesta secção para desativar o controlo de progresso assíncrono permite-lhe resolver estes erros e reparar a carga de trabalho de transmissão em fluxo.

Limitações com o controlo de progresso assíncrono

Esta funcionalidade tem as seguintes limitações:

  • O controlo de progresso assíncrono só é suportado em pipelines sem estado ao utilizar o Kafka como sink.
  • Exatamente uma vez que o processamento ponto a ponto não é garantido com o controlo de progresso assíncrono, porque os intervalos de deslocamento do lote podem ser alterados em caso de falha. Alguns sinks, como o Kafka, nunca fornecem garantias exatamente uma vez.