Поделиться через


Структурированные контрольные точки потоковой передачи

Контрольные точки и журналы перед записью работают вместе, чтобы обеспечить гарантии обработки для структурированных рабочих нагрузок потоковой передачи. Контрольная точка отслеживает сведения, определяющие запрос, включая сведения о состоянии и обработанные записи. При удалении файлов в каталоге контрольных точек или изменении в новом расположении контрольной точки следующий запуск запроса начинается свежим.

Каждый запрос должен иметь свое расположение контрольной точки. Несколько запросов никогда не должны совместно использовать одно расположение.

Включение контрольных точек для запросов структурированной потоковой передачи

Перед выполнением потокового запроса необходимо указать checkpointLocation этот параметр, как показано в следующем примере:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Примечание.

Некоторые приемники, такие как выходные данные для display() в записных книжках и приемнике memory, автоматически создают временное расположение контрольной точки, если этот параметр не указан. Эти временные расположения контрольных точек не гарантируют отказоустойчивость или согласованность данных и могут не быть должным образом очищены. Databricks рекомендует всегда указывать расположение контрольной точки для этих приемников.

Восстановление после изменений в структурированном запросе потоковой передачи

Существуют ограничения на то, какие изменения в запросе потоковой передачи разрешены между перезапусками из одного расположения контрольной точки. Ниже приведены некоторые изменения, которые либо не разрешены, либо эффект изменения не определен. Для всех из них:

  • Термин допустимый означает, что вы можете выполнить указанное изменение, но независимо от того, правильно ли определена семантика его воздействия, зависит от запроса и изменений.
  • Термин недопустимый означает, что не следует выполнять указанное изменение, так как перезапущенный запрос, скорее всего, завершится сбоем с непредсказуемыми ошибками.
  • sdf представляет DataFrame/Dataset потоковой передачи, созданный с помощью sparkSession.readStream.

Типы изменений в запросах структурированной потоковой передачи

  • Изменения в числе или типе источников входных данных (то есть другой источник) — это недопустимо.
  • Изменения параметров входных источников: разрешено ли это и правильно ли определяется семантика изменения, зависит от источника и запроса. Далее мы рассмотрим несколько примеров.
    • Добавление, удаление и изменение пределов скорости разрешено:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      до

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Изменения в статьях и файлах с подпиской обычно не разрешены, так как результаты непредсказуемы: spark.readStream.format("kafka").option("subscribe", "article") на spark.readStream.format("kafka").option("subscribe", "newarticle").

  • Изменения в интервале триггера: можно изменять триггеры между добавочными пакетами и интервалами времени. См. раздел "Изменение интервалов триггеров между запусками".
  • Изменения в типе приемника выходных данных — разрешены изменения между несколькими конкретными сочетаниями приемников. Это необходимо проверить отдельно для каждого случая. Далее мы рассмотрим несколько примеров.
    • Менять приемник файлов на приемник Kafka разрешено. Kafka увидит только новые данные.
    • Менять приемник Kafka на приемник файлов запрещено.
    • Менять приемник Kafka на foreach и наоборот разрешено.
  • Изменения параметров приемника выходных данных: разрешено ли это и правильно ли определяется семантика изменения, зависит от приемника и запроса. Далее мы рассмотрим несколько примеров.
    • Изменения в выходном каталоге приемника файлов запрещены: sdf.writeStream.format("parquet").option("path", "/somePath") на sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Изменения в выходном разделе разрешены: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • Изменения в определенном пользователем приемнике foreach (т. е. в коде ForeachWriter) разрешены, но семантика изменения зависит от кода.
  • Изменения в операциях, подобных проекции, фильтру или сопоставлению. Некоторые варианты разрешены. Например:
    • Добавление и удаление фильтров разрешено: sdf.selectExpr("a") на sdf.where(...).selectExpr("a").filter(...).
    • Изменения в проекциях с той же выходной схемой разрешены: sdf.selectExpr("stringColumn AS json").writeStream до sdf.select(to_json(...).as("json")).writeStream.
    • Изменения в проекциях с другой выходной схемой допускаются условно: sdf.selectExpr("a").writeStreamsdf.selectExpr("b").writeStream разрешено только в том случае, если приемник выходных данных разрешает изменение схемы с "a" на "b".
  • Изменения в операциях с состоянием: Некоторые операции с потоковыми данными должны сохранять данные о состоянии, чтобы непрерывно обновлять результат. Структурированная потоковая передача автоматически создает контрольные точки для данных состояния в отказоустойчивом хранилище (например, DBFS, хранилище BLOB-объектов Azure) и восстанавливает его после перезагрузки. Однако предполагается, что схема данных состояния остается одинаковой во время перезапуска. Это означает, что любые изменения (т. е. добавления, удаления или изменения схемы) в операции потокового запроса с сохранением состояния не допускаются между перезапусками. Ниже приведен список операций с отслеживанием состояния, схема которых не должна быть изменена между перезапусками, чтобы обеспечить восстановление состояния:
    • Агрегат потоковой передачи — например, sdf.groupBy("a").agg(...). Любое изменение числа или типа ключей группирования или агрегатов не допускается.
    • Дедупликация потоковой передачи — например, sdf.dropDuplicates("a"). Любое изменение числа или типа ключей группирования или агрегатов не допускается.
    • соединение stream-stream: например, sdf1.join(sdf2, ...) (т. е. оба входа создаются с sparkSession.readStream). Изменения в столбцах схемы или столбцах для эквисоединения не допускаются. Изменения в типе соединения (внешнем или внутреннем) запрещены. Другие изменения в условии соединения не определены.
    • Произвольная операция с отслеживанием состояния — например, sdf.groupByKey(...).mapGroupsWithState(...) или sdf.groupByKey(...).flatMapGroupsWithState(...). Любое изменение схемы определяемого пользователем состояния и тип времени ожидания не допускается. Любое изменение в определяемой пользователем функции сопоставления состояний разрешено, но семантический результат изменения зависит от пользовательской логики. Если вы действительно хотите поддерживать изменения схемы состояния, вы можете явно кодировать и декодировать сложные структуры данных состояния в байтах с помощью схемы кодирования и декодирования, поддерживающей миграцию схем. Например, если вы сохраняете состояние в виде байтов, закодированных в Avro, то вы можете изменить схему Avro-состояния между перезапусками запросов, так как это восстанавливает двоичное состояние.