Поделиться через


Применение подложек для контроля пороговых значений обработки данных

В этой статье приведены основные понятия водяного знака и рекомендации по использованию подложек в общих операциях потоковой передачи с отслеживанием состояния. Необходимо применять подложки к операциям потоковой передачи с отслеживанием состояния, чтобы избежать бесконечного расширения объема данных, хранящихся в состоянии, что может привести к проблемам с памятью и увеличить задержку обработки во время длительных операций потоковой передачи.

Что такое водяной знак?

Структурированная потоковая передача использует подложки для управления пороговым значением для продолжения обработки обновлений для заданной сущности состояния. Ниже приведены распространенные примеры сущностей состояния:

  • Агрегации по временному окну.
  • Уникальные ключи в соединении между двумя потоками данных.

При объявлении водяного знака необходимо указать поле метки времени и пороговое значение подложки для потокового кадра данных. По мере поступления новых данных диспетчер состояний отслеживает самую последнюю метку времени в указанном поле и обрабатывает все записи в пороге задержки.

В следующем примере применяется пороговое значение 10 минут к окну подсчета.

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

В этом примере:

  • Столбец event_time используется для определения 10-минутного водяного знака и 5-минутного скользящего окна.
  • Счетчик собирается для каждого id наблюдаемого для каждого не перекрывающегося 5-минутного окна.
  • Сведения о состоянии сохраняются для каждого счетчика до тех пор, пока конец окна не станет на 10 минут позже последнего наблюдаемого event_time.

Внимание

Пороговые значения гарантируют, что записи, поступающие в пределах указанного порога, обрабатываются в соответствии с семантикой определенного запроса. Записи, поступающие за пределы указанного порогового значения, могут по-прежнему обрабатываться с помощью метрик запросов, но это не гарантируется.

Как водяной знак влияет на время обработки и пропускную способность?

Подложки взаимодействуют с режимами вывода для управления записью данных в приемник. Поскольку подложки сокращают общий объем сведений о состоянии, эффективное использование подложек важно для эффективной пропускной способности потоковой передачи с отслеживанием состояния.

Примечание.

Не все режимы вывода поддерживаются для всех операций с отслеживанием состояния.

Подложки и выходной режим для оконных агрегатов

Следующая таблица подробно описывает обработку запросов с агрегированием по метке времени с заданным водяным знаком.

Режим вывода Поведение
Добавление Строки записываются в целевую таблицу при превышении порогового значения, установленного для водяного знака. Все записи задерживаются на основе порога задержки. Старое состояние агрегирования удаляется после прохождения порогового значения.
Обновить Строки записываются в целевую таблицу по мере вычисления результатов и могут быть обновлены и перезаписаны по мере поступления новых данных. Старое состояние агрегирования удаляется после прохождения порогового значения.
Завершено Состояние агрегирования не удаляется. Целевая таблица перезаписывается с каждым триггером.

Подложки и выходные данные для соединений потокового потока

Соединения между несколькими потоками поддерживают только режим добавления, а соответствующие записи записываются в каждом пакете, который они обнаруживают. Для внутренних соединений Databricks рекомендует задать пороговое значение подложки для каждого источника данных потоковой передачи. Это позволяет удалять сведения о состоянии для старых записей. Без водяных знаков структурированная потоковая обработка пытается соединить каждый ключ с каждой стороны соединения с каждым триггером.

Структурированная потоковая передача имеет специальную семантику для поддержки внешних соединений. Подложка является обязательной для внешних соединений, так как указывает, когда ключ должен быть записан со значением NULL после завершения несоответствен. Обратите внимание, что в то время как внешние соединения могут быть полезны для записи записей, которые никогда не совпадают во время обработки данных, так как соединения записываются только в таблицы в виде операций с добавлением, эти отсутствующие данные не записываются до тех пор, пока порог задержки не прошел.

управлять пороговым значением поздних данных с помощью нескольких политик водяного знака в структурированной потоковой передаче

При работе с несколькими входами структурированной потоковой передачи можно установить несколько водяных знаков для контроля порогов терпимости для поздних поступающих данных. Настройка пределов позволяет управлять сведениями о состоянии и влиять на задержку.

Потоковый запрос может иметь несколько входных потоков, которые объединяются или присоединяются. Каждый входной поток может иметь разные пороговые значения для запоздавших данных, которые нужно допускать для операций с отслеживанием состояния. Эти пороговые значения указываются с помощью withWatermarks("eventTime", delay) для каждого входного потока. Ниже приведен пример запроса с соединениями "поток — поток".

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

При выполнении запроса Structured Streaming по отдельности отслеживает максимальное время события, наблюдаемое в каждом входном потоке, вычисляет водяные знаки с учётом соответствующей задержки и выбирает один глобальный водяной знак для использования в операциях с сохранением состояния. По умолчанию минимальное значение выбирается в качестве глобального контрольного показателя, поскольку это гарантирует, что данные не будут случайно отброшены как слишком поздние, если один из потоков отстанет от других (например, один из потоков перестанет получать данные из-за сбоев в вышестоящем узле). Другими словами, глобальный водяной знак безопасно перемещается в темпе самого медленного потока, и выходные данные запроса задерживаются соответствующим образом.

Если вы хотите получить результаты быстрее, можно задать политику нескольких водяных знаков, чтобы выбрать максимальное значение в качестве глобального водяного знака, задав конфигурацию SQL spark.sql.streaming.multipleWatermarkPolicy в значение max (по умолчанию используется min). Это позволяет глобальному маркеру двигаться в темпе самого быстрого потока. Но эта конфигурация удаляет данные из самых медленных потоков. Из-за этого Databricks рекомендует использовать эту конфигурацию разумно.

Удаление дубликатов в пределах водяного знака

В Databricks Runtime 13.3 LTS и более поздних версиях вы можете удалять дубликаты записей, соблюдая пороговое значение водяного знака, с помощью уникального идентификатора.

Структурированная потоковая передача обеспечивает точно один раз гарантии обработки, но не выполняет автоматическое дедупликация записей из источников данных. Вы можете использовать dropDuplicatesWithinWatermark для дедупликации записей в любом указанном поле, что позволяет удалять дубликаты из потока, даже если некоторые поля отличаются (например, время события или время прибытия).

Повторяющиеся записи, поступающие в указанный предел, гарантированно будут удалены. Эта гарантия является строгой только в одном направлении, и повторяющиеся записи, поступающие за пределы указанного порогового значения, также могут быть удалены. Необходимо задать порог задержки водяного знака дольше, чем максимальные различия меток времени между повторяющимися событиями, чтобы удалить все повторяющиеся.

Чтобы использовать метод dropDuplicatesWithinWatermark, необходимо указать водяной знак, как показано в следующем примере:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])