Поделиться через


Оптимизация обработки с отслеживанием состояния в разностных динамических таблицах с подложками

Для эффективного управления данными, хранящихся в состоянии, используйте подложки при обработке потоков с отслеживанием состояния в разностных динамических таблицах, включая агрегаты, соединения и дедупликацию. В этой статье описывается, как использовать подложки в запросах Delta Live Tables и содержать примеры рекомендуемых операций.

Примечание.

Чтобы запросы, выполняющие агрегирование, обрабатывались добавочно и не полностью перекомпилировались с каждым обновлением, необходимо использовать подложки.

Что такое водяной знак?

В потоковой обработке водяной знак — это функция Apache Spark, которая может определять пороговое значение на основе времени для обработки данных при выполнении операций с отслеживанием состояния, таких как агрегаты. Поступающие данные обрабатываются до достижения порогового значения, в течение которого время, определенное пороговым значением, закрывается. Водяные знаки можно использовать для предотвращения проблем во время обработки запросов, главным образом при обработке больших наборов данных или длительной обработки. Эти проблемы могут включать высокую задержку при создании результатов и даже ошибок вне памяти (OOM) из-за объема данных, хранящихся в состоянии во время обработки. Так как потоковые данные по сути неупорядочены, подложки также поддерживают правильное вычисление таких операций, как агрегирование временных окон.

Дополнительные сведения об использовании подложек в потоковой обработке см. в разделе "Подложка" в Apache Spark Структурированная потоковая передача и применение подложки для управления порогами обработки данных.

Как определить подложку?

Вы определяете подложку, указывая поле метки времени и значение, представляющее пороговое значение времени для получения поздних данных . Данные считаются поздними, если они поступают после определенного порогового значения времени. Например, если пороговое значение определяется как 10 минут, записи, поступающие после 10-минутного порогового значения, могут быть удалены.

Так как записи, поступающие после определенного порогового значения, могут быть удалены, важно выбрать пороговое значение, соответствующее требованиям задержки и правильности. Выбор меньшего порогового значения приводит к тому, что записи создаются раньше, но также означает, что поздние записи, скорее всего, будут удалены. Более большое пороговое значение означает более длительное ожидание, но, возможно, больше полноты данных. Из-за большего размера состояния также может потребоваться дополнительное вычислительное значение. Так как пороговое значение зависит от требований к данным и обработке, тестирование и мониторинг обработки важно определить оптимальное пороговое значение.

Функция в Python используется withWatermark() для определения водяного знака. В SQL используйте WATERMARK предложение для определения водяного знака:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Использование подложек с соединениями потокового потока

Для соединения потокового потока необходимо определить водяной знак на обеих сторонах соединения и предложение интервала времени. Так как каждый источник соединения имеет неполное представление данных, предложение интервала времени требуется, чтобы сообщить обработчику потоковой передачи, когда дальнейшие совпадения не могут быть выполнены. Предложение интервала времени должно использовать те же поля, которые используются для определения подложек.

Так как в каждом потоке требуются разные пороговые значения для подложек, потоки не должны иметь одинаковые пороговые значения. Чтобы избежать отсутствия данных, модуль потоковой передачи поддерживает один глобальный водяной знак на основе самого медленного потока.

В следующем примере присоединяется поток рекламных впечатлений и поток пользователей щелкает рекламу. В этом примере щелчк должен происходить в течение 3 минут от впечатления. После прохождения 3-минутного интервала времени строки из состояния, которое больше не может быть сопоставлено, удаляются.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Выполнение оконных агрегатов с подложками

Распространенная операция с отслеживанием состояния потоковых данных — это агрегирование с окном. Агрегаты с окнами похожи на сгруппированные агрегаты, за исключением того, что статистические значения возвращаются для набора строк, входящих в определенное окно.

Окно можно определить как определенную длину, и операцию агрегирования можно выполнить во всех строках, входящих в это окно. Потоковая передача Spark поддерживает три типа окон:

  • Переворачивающиеся (фиксированные) окна: ряд фиксированных размеров, не перекрывающихся и смежных интервалов времени. Входная запись принадлежит только одному окну.
  • Скользящие окна: как и переворачивающиеся окна, скользящие окна имеют фиксированный размер, но окна могут перекрываться, и запись может попасть в несколько окон.

Когда данные поступают в конец окна, а также длина подложки, новые данные не принимаются для окна, результат агрегирования создается, а состояние окна удаляется.

В следующем примере вычисляется сумма впечатлений каждые 5 минут с помощью фиксированного окна. В этом примере предложение select использует псевдоним impressions_window, а затем само окно определяется как часть GROUP BY предложения. Окно должно основываться на том же столбце метки времени, что и подложка, clickTimestamp столбц в этом примере.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Аналогичный пример в Python для вычисления прибыли в течение часа фиксированного окна:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Дедупликация записей потоковой передачи

Структурированная потоковая передача имеет точно один раз гарантии обработки, но не автоматически отменяет повторяющиеся записи из источников данных. Например, так как многие очереди сообщений имеют по крайней мере один раз гарантии, при чтении из одной из этих очередей сообщений следует ожидать повторяющиеся записи. Функцию dropDuplicatesWithinWatermark() можно использовать для отмены повторяющихся записей в любом указанном поле, удаляя дубликаты из потока, даже если некоторые поля отличаются (например, время события или время прибытия). Чтобы использовать функцию, необходимо указать водяной dropDuplicatesWithinWatermark() знак. Все повторяющиеся данные, поступающие в диапазон времени, указанный подложкой, удаляются.

Упорядоченные данные важны, так как данные вне порядка приводят к неправильному переходу к значению водяного знака. Затем, когда старые данные поступают, считается поздним и удаленным. withEventTimeOrder Используйте параметр для обработки начального моментального снимка в порядке на основе метки времени, указанной в подложке. Параметр withEventTimeOrder можно объявить в коде, определяющем набор данных или в параметрах конвейера с помощью spark.databricks.delta.withEventTimeOrder.enabled. Рассмотрим пример.

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Примечание.

Этот withEventTimeOrder параметр поддерживается только в Python.

В следующем примере данные обрабатываются по упорядочению clickTimestamp, а записи, поступающие в течение 5 секунд друг от друга, содержащие повторяющиеся userId clickAdId и столбцы, удаляются.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Оптимизация конфигурации конвейера для обработки с отслеживанием состояния

Чтобы предотвратить проблемы с рабочей средой и чрезмерную задержку, Databricks рекомендует включить управление состоянием на основе RocksDB для обработки потоков с отслеживанием состояния, особенно если обработка требует сохранения большого количества промежуточного состояния.

Конвейеры без использования автоматически управляют конфигурациями хранилища состояний.

Вы можете включить управление состоянием на основе RocksDB, задав следующую конфигурацию перед развертыванием конвейера:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Дополнительные сведения о хранилище состояний RocksDB, включая рекомендации по настройке Для RocksDB, см. в статье Настройка хранилища состояний RocksDB в Azure Databricks.