Compartilhar via


Pontos de verificação de Fluxo Estruturado

Os pontos de verificação e os logs de gravação antecipada trabalham juntos para fornecer garantias de processamento para cargas de trabalho de Fluxo Estruturado. O ponto de verificação rastreia as informações que identificam a consulta, incluindo informações de estado e registros processados. Quando você exclui os arquivos em um diretório de ponto de verificação ou altera para um novo local de ponto de verificação, a próxima execução da consulta começa do zero.

Cada consulta deve ter um local de ponto de verificação diferente. Nunca use o mesmo local para compartilhar várias consultas.

Habilitar pontos de verificação para consultas do Fluxo Estruturado

Você deve especificar a opção checkpointLocation antes de executar uma consulta de streaming, como no exemplo a seguir:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Observação

Alguns coletores, como a saída em notebooks display() e o coletor memory, geram automaticamente um local de ponto de verificação temporário se você omitir essa opção. Os locais de ponto de verificação temporários não garantem nenhuma tolerância a falhas ou consistência de dados e podem não ser limpos corretamente. A Databricks recomenda sempre especificar um local de ponto de verificação para esses coletores.

Recuperar-se após alterações em uma consulta do Fluxo Estruturado

Há limitações no que as alterações em uma consulta de streaming são permitidas entre as reinicializações do mesmo local de ponto de verificação. Aqui estão algumas alterações que não são permitidas ou o efeito da alteração não está bem definido. Para todos eles:

  • O termo permitido significa que você pode fazer a alteração especificada, mas se a semântica de seu efeito é bem definida depende da consulta e da alteração.
  • O termo não permitido significa que você não deve fazer a alteração especificada, pois a consulta reiniciada provavelmente falhará com erros imprevisíveis.
  • sdf representa um DataFrame de streaming/Conjunto de dados gerado com sparkSession.readStream.

Tipos de alterações em consultas do Fluxo Estruturado

  • Alterações no número ou tipo (ou seja, fonte diferente) de fontes de entrada: isso não é permitido.
  • Alterações nos parâmetros de fontes de entrada: se isso é permitido e se a semântica da alteração é bem definida depende da origem e da consulta. Veja alguns exemplos.
    • Adição, exclusão e modificação de limites de taxa são permitidos:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      para

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • As alterações nos artigos e arquivos assinados geralmente não são permitidas, pois os resultados são imprevisíveis: spark.readStream.format("kafka").option("subscribe", "article") para spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Alterações no intervalo de gatilho: você pode alterar gatilhos entre lotes incrementais e intervalos de tempo. Consulte Alterando intervalos de gatilho entre execuções.
  • Alterações no tipo de coletor de saída: são permitidas alterações entre algumas combinações específicas de coletores. Isso precisa ser verificado em uma base caso a caso. Veja alguns exemplos.
    • O coletor de arquivo para o coletor Kafka é permitido. O Kafka verá apenas os novos dados.
    • O coletor de Kafka para o coletor de arquivos não é permitido.
    • O coletor Kafka mudou para foreach, ou vice-versa é permitido.
  • Alterações nos parâmetros do coletor de saída: se isso é permitido e se a semântica da alteração é bem definida conta com o coletor e a consulta. Veja alguns exemplos.
    • As alterações no diretório de saída de um coletor de arquivos não são permitidas: sdf.writeStream.format("parquet").option("path", "/somePath") para sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • As alterações no tópico de saída têm permissão para: sdf.writeStream.format("kafka").option("topic", "topic1") para sdf.writeStream.format("kafka").option("topic", "topic2")
    • As alterações no coletor foreach definido pelo usuário (ou seja, o código ForeachWriter) são permitidas, mas a semântica da alteração conta com o código.
  • Alterações em operações semelhantes de projeção/filtro/mapa: alguns casos são permitidos. Por exemplo:
    • Adição/exclusão de filtros é permitido: sdf.selectExpr("a") para sdf.where(...).selectExpr("a").filter(...).
    • As alterações nas projeções com o mesmo esquema de saída são permitidas: sdf.selectExpr("stringColumn AS json").writeStream para sdf.select(to_json(...).as("json")).writeStream.
    • As alterações nas projeções com um esquema de saída diferente são condicionalmente permitidas: sdf.selectExpr("a").writeStream a sdf.selectExpr("b").writeStream é permitida somente se o coletor de saída permitir que o esquema mude de "a" para "b".
  • Alterações em operações com estado: algumas operações em consultas de streaming precisam manter dados de estado para atualizar continuamente o resultado. O Fluxo estruturado automaticamente verifica os dados de estado no armazenamento tolerante a falhas (por exemplo, DBFS, Armazenamento de BLOBs do Azure) e restaura-os após a reinicialização. No entanto, isso pressupõe que o esquema dos dados de estado permaneça o mesmo entre as reinicializações. Isso significa que quaisquer alterações (isto é, adições, exclusões ou modificações de esquema) para as operações com estado de uma consulta de streaming não são permitidas entre as reinicializações. Aqui está a lista de operações com estado cujo esquema não deve ser alterado entre as reinicializações para garantir a recuperação de estado:
    • Agregação de streaming: por exemplo, sdf.groupBy("a").agg(...). Qualquer alteração no número ou tipo de chaves ou agregações de agrupamento não é permitida.
    • Eliminação de duplicação de streaming: por exemplo, sdf.dropDuplicates("a"). Qualquer alteração no número ou tipo de chaves ou agregações de agrupamento não é permitida.
    • Junção de fluxo-fluxo: por exemplo, sdf1.join(sdf2, ...) (ou seja, ambas as entradas são geradas com sparkSession.readStream ). As alterações no esquema ou nas colunas de junção de equivalência não são permitidas. Não são permitidas alterações no tipo de junção (externa ou interna). Outras alterações na condição de junção são mal definidas.
    • Operação com estado arbitrário: por exemplo, sdf.groupByKey(...).mapGroupsWithState(...) ou sdf.groupByKey(...).flatMapGroupsWithState(...). Qualquer alteração no esquema do estado definido pelo usuário e no tipo de tempo limite não é permitida. Qualquer alteração na função de mapeamento de estado definida pelo usuário é permitida, mas o efeito semântico da alteração depende da lógica definida pelo usuário. Se você realmente quiser dar suporte a alterações de esquema de estado, poderá codificar/decodificar explicitamente suas estruturas de dados de estado complexo em bytes usando um esquema de codificação/decodificação que dá suporte à migração de esquema. Por exemplo, se você salvar seu estado como bytes codificados em Avro, será possível alterar o esquema de estado Avro entre as reinicializações de consulta, pois isso restaura o estado binário.