Поделиться через


Что такое асинхронное отслеживание хода выполнения?

Важный

Эта функция доступна в общедоступной предварительной версии.

Асинхронное отслеживание хода выполнения позволяет потокам структурированной потоковой передачи отмечать контрольные точки асинхронно и параллельно с фактической обработкой данных в микропакете, уменьшая задержку, связанную с поддержанием offsetLog и commitLog.

асинхронное отслеживание хода выполнения

Заметка

Асинхронное отслеживание хода выполнения не работает с триггерами Trigger.once или Trigger.availableNow. Попытка включить эту функцию с этими триггерами приводит к сбою запроса.

Как асинхронное отслеживание хода выполнения работает для уменьшения задержки?

Структурированная потоковая передача зависит от сохранения и ведения учёта смещений в качестве индикаторов хода выполнения для обработки запросов. Операция управления смещением напрямую влияет на задержку обработки, так как обработка данных не может произойти до завершения этих операций. Асинхронное отслеживание хода выполнения позволяет конвейерам потоковой обработки данных сохранять отметки о ходе выполнения без влияния на эти операции управления смещением.

Когда следует настроить частоту контрольных точек?

Пользователи могут настроить частоту, с которой фиксируется состояние прогресса. Параметры по умолчанию для частоты контрольных точек обеспечивают хорошую пропускную способность для большинства запросов. Настройка частоты полезна для сценариев, в которых операции управления смещением выполняются с более высокой скоростью, чем их можно обработать, что создает все больше невыполненных операций управления смещением. Для устранения этой растущей невыполненной работы обработка данных блокируется или замедляется, по сути, отменяя поведение обработки для устранения преимуществ асинхронного отслеживания хода выполнения.

Заметка

Время восстановления сбоя увеличивается при увеличении интервала контрольных точек. В случае сбоя конвейер должен повторно обработать все данные до предыдущей успешной контрольной точки. Пользователи могут рассмотреть этот компромисс между низкой задержкой во время регулярной обработки и восстановления в случае сбоя.

Какие конфигурации связаны с асинхронным отслеживанием хода выполнения?

Выбор Ценность По умолчанию Описание
включено асинхронное отслеживание прогресса истина/ложь ложный включение или отключение асинхронного отслеживания хода выполнения
ИнтервалКонтрольнойТочкиАсинхронногоОтслеживанияПрогрессаМс Миллисекунд 1000 интервал, в котором выполняется фиксация смещений и завершения действий

Как пользователи могут включить асинхронное отслеживание хода выполнения?

Пользователи могут использовать код, аналогичный приведенному ниже, чтобы включить эту функцию:

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

Отключение асинхронного отслеживания хода выполнения

Если включено асинхронное отслеживание хода выполнения, фреймворк не выполняет контрольную точку для каждого пакета. Чтобы устранить эту проблему, перед отключением асинхронного отслеживания хода выполнения обработайте по крайней мере два микропакета со следующими параметрами:

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

Остановите запрос после завершения обработки по крайней мере двух микропакетов. Теперь вы можете безопасно отключить асинхронное отслеживание хода выполнения и перезапустить запрос.

Если вы отключили асинхронное отслеживание хода выполнения без завершения этого шага, может возникнуть следующая ошибка:

java.lang.IllegalStateException: batch x doesn't exist

В журналах драйверов может появиться следующая ошибка:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Следуя инструкциям в этом разделе по отключению асинхронного отслеживания хода выполнения, вы сможете устранить эти ошибки и восстановить рабочую нагрузку потоковой передачи.

Ограничения с асинхронным отслеживанием прогресса

Эта функция имеет следующие ограничения:

  • Асинхронное отслеживание прогресса поддерживается только в конвейерах без сохранения состояния при использовании Kafka в качестве хранилища.
  • Обработка "ровно один раз" от начала до конца не гарантируется при асинхронном отслеживании прогресса, поскольку диапазоны смещений для пакетов могут быть изменены в случае сбоя. Некоторые приемники, такие как Kafka, никогда не предоставляют точно один раз гарантии.