Pontos de verificação de streaming estruturado
Pontos de verificação e logs de write-ahead trabalham juntos para fornecer garantias de processamento para cargas de trabalho de Streaming Estruturado. O ponto de verificação rastreia as informações que identificam a consulta, incluindo informações de estado e registros processados. Quando você exclui os arquivos em um diretório de ponto de verificação ou altera para um novo local de ponto de verificação, a próxima execução da consulta começa nova.
Cada consulta deve ter um local de ponto de verificação diferente. Várias consultas nunca devem compartilhar o mesmo local.
Habilitar o ponto de verificação para consultas de Streaming Estruturado
Você deve especificar a checkpointLocation
opção antes de executar uma consulta de streaming, como no exemplo a seguir:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Nota
Alguns coletores, como a saída para display()
notebooks e o memory
coletor, geram automaticamente um local de ponto de verificação temporário se você omitir essa opção. Esses locais de ponto de verificação temporários não garantem tolerância a falhas ou consistência de dados e podem não ser limpos corretamente. A Databricks recomenda sempre especificar um local de ponto de verificação para esses coletores.
Recuperar após alterações em uma consulta de Streaming Estruturado
Há limitações sobre quais alterações em uma consulta de streaming são permitidas entre reinicializações do mesmo local de ponto de verificação. Aqui estão algumas alterações que não são permitidas ou o efeito da alteração não está bem definido. Para todos eles:
- O termo permitido significa que você pode fazer a alteração especificada, mas se a semântica de seu efeito está bem definida depende da consulta e da alteração.
- O termo não permitido significa que você não deve fazer a alteração especificada, pois a consulta reiniciada provavelmente falhará com erros imprevisíveis.
sdf
representa um DataFrame/Dataset de streaming gerado comsparkSession.readStream
.
Tipos de alterações em consultas de Streaming Estruturado
- Alterações no número ou tipo (ou seja, fonte diferente) de fontes de entrada: Isso não é permitido.
- Alterações nos parâmetros das fontes de entrada: se isso é permitido e se a semântica da alteração está bem definida depende da fonte e da consulta. Eis alguns exemplos.
É permitida a adição, supressão e modificação dos limites tarifários:
spark.readStream.format("kafka").option("subscribe", "article")
para
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
Alterações em artigos e arquivos inscritos geralmente não são permitidas, pois os resultados são imprevisíveis:
spark.readStream.format("kafka").option("subscribe", "article")
atéspark.readStream.format("kafka").option("subscribe", "newarticle")
- Alterações no intervalo de gatilho: você pode alterar gatilhos entre lotes incrementais e intervalos de tempo. Consulte Alterando intervalos de gatilho entre execuções.
- Alterações no tipo de dissipador de saída: Alterações entre algumas combinações específicas de dissipadores são permitidas. Isto tem de ser verificado caso a caso. Eis alguns exemplos.
- É permitido o coletor de arquivos para a pia de Kafka. Kafka verá apenas os novos dados.
- Kafka sink to file sink não é permitido.
- Kafka pia mudou para foreach, ou vice-versa é permitido.
- Alterações nos parâmetros do coletor de saída: se isso é permitido e se a semântica da alteração está bem definida depende do coletor e da consulta. Eis alguns exemplos.
- Não são permitidas alterações no diretório de saída de um coletor de arquivos:
sdf.writeStream.format("parquet").option("path", "/somePath")
atésdf.writeStream.format("parquet").option("path", "/anotherPath")
- Alterações no tópico de saída são permitidas:
sdf.writeStream.format("kafka").option("topic", "topic1")
parasdf.writeStream.format("kafka").option("topic", "topic2")
- Alterações no coletor foreach definido pelo usuário (ou seja, o
ForeachWriter
código) são permitidas, mas a semântica da alteração depende do código.
- Não são permitidas alterações no diretório de saída de um coletor de arquivos:
- Alterações nas operações de projeção/filtro/mapa: Alguns casos são permitidos. Por exemplo:
- É permitida a adição/supressão de filtros:
sdf.selectExpr("a")
asdf.where(...).selectExpr("a").filter(...)
. - Alterações nas projeções com o mesmo esquema de saída são permitidas:
sdf.selectExpr("stringColumn AS json").writeStream
asdf.select(to_json(...).as("json")).writeStream
. - Alterações em projeções com esquema de saída diferente são condicionalmente permitidas:
sdf.selectExpr("a").writeStream
tosdf.selectExpr("b").writeStream
é permitido somente se o coletor de saída permitir que o esquema mude de"a"
para"b"
.
- É permitida a adição/supressão de filtros:
- Alterações em operações com monitoração de estado: algumas operações em consultas de streaming precisam manter dados de estado para atualizar continuamente o resultado. O Streaming Estruturado verifica automaticamente os dados de estado para armazenamento tolerante a falhas (por exemplo, DBFS, armazenamento de Blob do Azure) e os restaura após a reinicialização. No entanto, isso pressupõe que o esquema dos dados de estado permaneça o mesmo nas reinicializações. Isso significa que quaisquer alterações (ou seja, adições, exclusões ou modificações de esquema) nas operações com monitoração de estado de uma consulta de streaming não são permitidas entre reinicializações. Aqui está a lista de operações com monitoração de estado cujo esquema não deve ser alterado entre reinicializações para garantir a recuperação de estado:
- Agregação de streaming: Por exemplo,
sdf.groupBy("a").agg(...)
. Não é permitida qualquer alteração no número ou tipo de chaves de agrupamento ou agregados. - Desduplicação de streaming: Por exemplo,
sdf.dropDuplicates("a")
. Não é permitida qualquer alteração no número ou tipo de chaves de agrupamento ou agregados. - Junção stream-stream: Por exemplo, (ou seja,
sdf1.join(sdf2, ...)
ambas as entradas são geradas comsparkSession.readStream
). Não são permitidas alterações no esquema ou nas colunas de equijunção. Não são permitidas alterações no tipo de junção (exterior ou interior). Outras alterações na condição de junção são mal definidas. - Operação com estado arbitrária: Por exemplo,
sdf.groupByKey(...).mapGroupsWithState(...)
ousdf.groupByKey(...).flatMapGroupsWithState(...)
. Qualquer alteração no esquema do estado definido pelo usuário e no tipo de tempo limite não é permitida. Qualquer alteração dentro da função de mapeamento de estado definida pelo usuário é permitida, mas o efeito semântico da alteração depende da lógica definida pelo usuário. Se você realmente quiser oferecer suporte a alterações de esquema de estado, poderá codificar/decodificar explicitamente suas estruturas de dados de estado complexas em bytes usando um esquema de codificação/decodificação que ofereça suporte à migração de esquema. Por exemplo, se você salvar seu estado como bytes codificados em Avro, poderá alterar o esquema Avro-state-schema entre as reinicializações da consulta, pois isso restaura o estado binário.
- Agregação de streaming: Por exemplo,