Поделиться через


Delta table чтение и запись в потоковом режиме

Delta Lake надежно интегрирована со Структурированной потоковой передачей Spark с помощью readStream и writeStream. Delta Lake преодолевает многие ограничения, которые обычно связаны с системами потоковой передачи и файлами, включая:

  • Объединение небольших файлов, созданных при низкой задержке приема.
  • Обслуживание обработки "ровно один раз" с несколькими потоками (или параллельными пакетными заданиями).
  • Эффективное обнаружение новых файлов при использовании файлов в качестве источника для потока.

Примечание.

В этой статье описывается использование Delta Lake tables в качестве источников потоковой передачи и приемников. Сведения о загрузке данных с помощью потоковой tables в Databricks SQL см. в статье Загрузка данных с помощью потоковой tables в Databricks SQL.

Сведения о потоковых статических соединениях с Delta Lake см. в разделе "Поток-статические соединения".

Delta table в качестве источника

Структурированная потоковая обработка постепенно считывает Delta tables. Хотя потоковый запрос активен для delta table, новые записи обрабатываются идемпотентно, так как новые версии table фиксируются в исходном table.

В следующих примерах кода показано, как настроить потоковое чтение с помощью table имени или пути к файлу.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Внимание

Если schema для Delta table изменяется после начала потокового чтения с table, запрос завершается ошибкой. Для большинства schema изменений можно перезапустить поток, чтобы устранить несоответствие schema и продолжить обработку.

В Databricks Runtime 12.2 LTS и более ранних версиях невозможно передавать поток из Delta table с включенным сопоставлением column, которое прошло неаддитивную эволюцию schema, например, переименование или удаление columns. Дополнительные сведения см. в разделе Потоковая передача с изменениями schema и сопоставлением column.

скорость ввода Limit

Для управления микропакетами доступны следующие параметры:

  • maxFilesPerTrigger: Сколько новых файлов будет учитываться в каждом микропакете. Значение по умолчанию — 1000.
  • maxBytesPerTrigger: Сколько данных обрабатывается в каждом микропакете. Этот параметр задает "мягкий максимум", значит, что пакетная обработка приблизительно такого объема данных и может обрабатывать больше данных, чем указано в limit для того, чтобы продвинуть потоковый запрос вперед в случаях, когда наименьший входной блок превышает это значение limit. Это не set по умолчанию.

Если вы используете maxBytesPerTrigger в сочетании с maxFilesPerTrigger, данные микро пакетной службы обрабатываются до достижения maxFilesPerTrigger или maxBytesPerTriggerlimit.

Примечание.

В случаях, когда исходные транзакции table очищаются из-за конфигурации logRetentionDuration и потоковый запрос пытается обработать эти версии, по умолчанию запрос завершается сбоем, чтобы избежать потери данных. Вы можете set использовать опцию failOnDataLoss, чтобы false игнорировать потерянные данные и продолжать обработку.

Потоковая передача потока отслеживания измененных данных Delta Lake (CDC)

Delta Lake поток данных изменений записывает изменения в Delta table, включая обновления и удаления. При включении можно выполнять потоковую передачу данных об изменениях и записывать логику для обработки вставок, обновлений и удалений в нижестоящий tables. Хотя выходные данные потока данных изменений немного отличаются от описания Delta table, это обеспечивает решение для распространения инкрементных изменений в нижестоящие системы tables в медальной архитектуре .

Внимание

В Databricks Runtime 12.2 LTS и ниже невозможно воспроизвести поток данных из канала изменений для Delta table с включенным сопоставлением column, которое прошло нераспространяющуюся эволюцию schema, например, переименование или удаление columns. Смотрите потоковое воспроизведение с сопоставлением column и изменениями schema.

Игнорировать обновления и удаления

Структурированная потоковая передача не обрабатывает входные данные, которые не являются добавлением и создает исключение, если какие-либо изменения происходят в table, используемой в качестве источника. Существуют две основные стратегии для работы с изменениями, которые не могут быть автоматически распространены по нисходящей:

  • Вы можете удалить выходные данные и контрольную точку и перезапустить поток с самого начала.
  • Вы можете set один из следующих двух вариантов:
    • ignoreDeletes: игнорируйте транзакции, которые удаляют данные на границах partition.
    • skipChangeCommits: игнорировать транзакции, которые удаляют или изменяют существующие записи. skipChangeCommits включает ignoreDeletes.

Примечание.

В Databricks Runtime 12.2 LTS и более поздних skipChangeCommits версиях не рекомендуется использовать предыдущий параметр ignoreChanges. В Databricks Runtime 11.3 LTS и более низкий ignoreChanges вариант поддерживается.

Семантика параметраignoreChanges существенно отличается от семантики параметра skipChangeCommits. С включенной ignoreChanges перезаписанные файлы данных в исходном table повторно создаются после операции изменения данных, например UPDATE, MERGE INTO, DELETE (внутри разделов) или OVERWRITE. Неизменяемые строки часто создаются вместе с новыми строками, поэтому нижестоящие потребители должны иметь возможность обрабатывать дубликаты. Удаления не распространяются по нисходящей. ignoreChanges включает ignoreDeletes.

skipChangeCommits полностью игнорирует операции изменения файлов. Файлы данных, которые переписываются в исходном table из-за операции изменения данных, например UPDATE, MERGE INTO, DELETEи OVERWRITE, полностью игнорируются. Чтобы отразить изменения в вышестоящем исходном tables, необходимо реализовать отдельную логику для распространения этих изменений.

Рабочие нагрузки, настроенные с продолжением работы с ignoreChanges известной семантикой, но Databricks рекомендует использовать skipChangeCommits для всех новых рабочих нагрузок. Перенос рабочих нагрузок, использующихся ignoreChanges для skipChangeCommits выполнения рефакторинга, требует логики рефакторинга.

Пример

Например, предположим, что у вас есть tableuser_events с date, user_emailи actioncolumns, распределённых по date. Вы осуществляете передачу данных из user_eventstable и вам необходимо удалить данные оттуда в соответствии с ОРЗД.

При удалении по границам partition (т. е. WHERE находится на partitioncolumn), файлы уже сегментированы по значению, поэтому удаление просто убирает эти файлы из метаданных. При удалении всех данных partition можно использовать следующее:

spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

При удалении данных в нескольких разделах (в этом примере фильтрация по user_email) используйте следующий синтаксис:

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

Если вы применяете update к user_email с помощью оператора UPDATE, файл, содержащий заданный user_email, перезаписывается. Используйте skipChangeCommits для пропуска измененных файлов данных.

Указать начальное расположение

Для указания начальной точки источника потоковой передачи Delta Lake можно использовать следующие параметры, не обрабатывая всю table.

  • startingVersion: Версия Delta Lake, с которой начинается запуск. Databricks рекомендует не использовать этот параметр для большинства рабочих нагрузок. Если не set, поток начинается с последней доступной версии, включая полный моментальный снимок table в тот момент.

    Если задано, поток считывает все изменения в Delta table, начиная с указанной версии (включительно). Если указанная версия больше недоступна, поток не запускается. Вы можете получить версии коммитов из versioncolumn результатов выполнения команды DESCRIBE HISTORY.

    Чтобы вернуть только последние изменения, укажите latest.

  • startingTimestamp: Метка времени для запуска. Все изменения table, зафиксированные в момент времени или после него (включительно), считываются потоковым считывателем. Если указанная метка времени предшествует всем коммитам table, потоковая передача начинается с самой ранней доступной метки времени. Одно из двух значений:

    • Строка метки времени. Например, "2019-01-01T00:00:00.000Z".
    • Строка даты. Например, "2019-01-01".

Нельзя одновременно использовать оба варианта set. Они вступают в силу только при запуске нового потокового запроса. Если потоковый запрос запущен и в его контрольной точке записан ход выполнения, эти параметры игнорируются.

Внимание

Хотя вы можете запустить потоковый источник из указанной версии или метки времени, schema этого источника всегда является самым последним schema Delta table. Необходимо убедиться, что несовместимые schema не изменяются на delta table после указанной версии или метки времени. В противном случае источник потоковой передачи может возвращать неверные результаты при чтении данных с неправильным schema.

Пример

Например, предположим, что у вас есть tableuser_events. Если вы хотите считать изменения начиная с версии 5, используйте:

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

Если вы хотите считать изменения начиная с 18 октября 2018 г., используйте:

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

Обработка исходного моментального снимка без удаления данных

Примечание.

Эта функция доступна в Databricks Runtime 11.3 LTS и выше. Эта функция предоставляется в режиме общедоступной предварительной версии.

При использовании delta table в качестве источника потока запрос сначала обрабатывает все данные, присутствующих в table. Delta table в этой версии называется начальным снимком. По умолчанию файлы данных Delta tableобрабатываются на основе последнего изменения файла. Однако время последнего изменения необязательно соответствует временной последовательности событий.

В запросе потоковой передачи с отслеживанием состояния с определенным watermarkобработка файлов по времени изменения может привести к обработке записей в неправильном порядке. Это может привести к тому, что записи будут отнесены к поздним событиям watermark.

Чтобы избежать проблемы с удалением данных, включите следующий параметр:

  • withEventTimeOrder: следует ли обрабатывать исходный моментальный снимок в порядке времени события.

Если включен временной порядок события, диапазон времени исходного моментального снимка делится на периоды времени. Каждый микропакет обрабатывает период путем фильтрации данных в пределах диапазона времени. Параметры конфигурации maxFilesPerTrigger и maxBytesPerTrigger по-прежнему можно использовать для управления размером микропакета, но только приблизительно, что связано с принципом обработки.

На рисунке ниже показан этот процесс:

Исходный моментальный снимок

Важная информация об этой функции:

  • Проблема с удалением данных возникает только в том случае, если исходный разностный моментальный снимок запроса потоковой передачи с отслеживанием состояния обрабатывается в порядке по умолчанию.
  • После запуска запроса потока во время обработки исходного моментального снимка изменить withEventTimeOrder невозможно. Чтобы запустить повторно с измененным withEventTimeOrder, необходимо удалить контрольную точку.
  • Если выполняется потоковый запрос с включенным параметром EventTimeOrder, вы не сможете перейти на более раннюю версию DBR, которая не поддерживает эту функцию до завершения обработки исходного моментального снимка. Если необходимо перейти на более раннюю версию, можно дождаться завершения обработки исходного моментального снимка или удалить контрольную точку и перезапустить запрос.
  • Эта функция не поддерживается в следующих редких сценариях:
    • Время события column — это созданный column, и между источником Delta и watermarkимеют место непроецируемые преобразования.
    • Существует watermark с несколькими источниками Delta в потоковом запросе.
  • Если включен временной порядок событий, производительность обработки исходных разностных моментальных снимков может быть ниже.
  • Каждый микропакет сканирует исходный моментальный снимок, чтобы отфильтровать данные в пределах соответствующего диапазона времени события. Чтобы ускорить фильтрацию, рекомендуется использовать источник Delta column в качестве времени события, чтобы можно было применить пропуск данных (проверьте на пропуск данных для Delta Lake, когда это применимо). Кроме того, разделение table по времени события column может дополнительно ускорить обработку. С помощью пользовательского интерфейса Spark можно узнать, сколько сканируется разностных файлов для определенного микропакета.

Пример

Предположим, у вас есть tableuser_events с event_timecolumn. Запрос потоковой передачи — это агрегатный запрос. Если вы хотите убедиться, что данные не будут удаляться во время обработки исходных моментальных снимков, можно использовать следующее:

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

Примечание.

Вы также можете включить эту конфигурацию Spark в кластере, которая будет применяться ко всем потоковым запросам: spark.databricks.delta.withEventTimeOrder.enabled true

Delta table в качестве приемника

Вы также можете записывать данные в Delta table с помощью потоковой передачи данных. Журнал транзакций позволяет Delta Lake гарантировать точно однократную обработку, даже если существуют другие потоки или пакетные запросы, выполняемые одновременно с table.

Примечание.

Функция Delta Lake VACUUM удаляет все файлы, не управляемые Delta Lake, но пропускает все каталоги, имена которых начинаются с _. Вы можете безопасно хранить контрольные точки вместе с другими данными и метаданными для delta table с помощью структуры каталогов, например <table-name>/_checkpoints.

Метрики

Количество байтов и файлов, которым еще предстоит пройти обработку, можно узнать в процессе потоковой обработки, с помощью метрик numBytesOutstanding и numFilesOutstanding. К дополнительным метрикам относятся:

  • numNewListedFiles: количество файлов Delta Lake, перечисленных для вычисления невыполненной работы для этого пакета.
    • backlogEndOffset: версия table, используемая для вычисления невыполненной работы.

Если вы выполняете поток в записной книжке, эти метрики можно просмотреть на вкладке "Необработанные данные " на панели мониторинга хода выполнения потокового запроса:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Режим добавления

По умолчанию потоки выполняются в режиме добавления, который добавляет новые записи в table.

Используйте метод toTable для потоковой передачи в tables, как показано в следующем примере:

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Полный режим

Можно также использовать структурированную потоковую передачу, чтобы заменять весь table в каждом пакете. Один из примеров использования — вычисление сводки с помощью статистической обработки:

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

В предыдущем примере непрерывно обновляется table, которая содержит совокупное количество событий от клиента.

Для приложений с менее строгими дополнительными требованиями к задержке можно сэкономить вычислительные ресурсы с одноразовыми триггерами. Используйте их для update сводного агрегирования tables по заданному расписанию, обрабатывая только новые данные, поступившие после последнего update.

Upsert из потоковых запросов с помощью foreachBatch

Можно использовать сочетание merge и foreachBatch для записи сложных апсертов из потокового запроса в Delta table. См. раздел Использование foreachBatch для записи в произвольные приемники данных.

Этот шаблон содержит множество приложений, включая следующее:

  • Запись агрегатов потоковой обработки в режиме Update: это гораздо эффективнее, чем полный режим.
  • запись потока изменений в базе данных в Delta table: запрос слияния для записи данных об изменениях можно использовать в foreachBatch для непрерывного применения потока изменений к Delta table.
  • запись потока данных в Delta table с дедупликацией: запрос на объединение только insertдля дедупликации можно использовать в foreachBatch для непрерывной записи данных (с дубликатами) в разностную table с автоматической дедупликацией.

Примечание.

  • Убедитесь, что инструкция merge в foreachBatch идемпотентная, так как перезагрузка потокового запроса может применить операцию к тому же пакету данных несколько раз.
  • Если merge используется в foreachBatch, скорость входных данных для запроса потоковой передачи (выдаваемых через StreamingQueryProgress и видимых в диаграмме скорости записной книжки) может быть кратна фактической скорости, с которой данные создаются в источнике. Это связано с тем, что merge считывает входные данные несколько раз, что приводит к умножению метрик входных данных. Если это узкое место, можно кэшировать пакетный DataFrame до merge и кэшировать его после merge.

В следующем примере показано, как можно использовать SQL для foreachBatch выполнения этой задачи:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Вы также можете использовать API Delta Lake для выполнения потоковых upserts, как показано в следующем примере:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Идемпотентная table записывается в foreachBatch

Примечание.

Databricks рекомендует настроить отдельную потоковую запись для каждого приемника, который вы хотите update. Использование foreachBatch для записи в несколько tables упорядочивает записи, что снижает параллелизацию и увеличивает общую задержку.

Delta tables поддерживает следующие параметры DataFrameWriter, чтобы делать записи в несколько tables в течение foreachBatch идемпотентными.

  • txnAppId: уникальная строка, которую можно передать для каждой записи dataFrame. Например, идентификатор StreamingQuery можно использовать как txnAppId.
  • txnVersion: Монотонно возрастающее количество, действующее как версия транзакции.

В Delta Lake используется сочетание txnAppId и txnVersion для обнаружения повторяющихся записей и их пропуска.

Если пакетная запись прерывается сбоем, повторное выполнение пакета использует одно и то же приложение и идентификатор пакетной службы, чтобы помочь среде выполнения правильно определить повторяющиеся записи и игнорировать их. Идентификатор приложения (txnAppId) может быть любой уникальной строкой, созданной пользователем, и не обязательно должен быть связан с идентификатором потока. См. раздел Использование foreachBatch для записи в произвольные приемники данных.

Предупреждение

Если удалить контрольную точку потоковой передачи и перезапустить запрос с помощью новой контрольной точки, необходимо указать другой txnAppId. Новые контрольные точки начинаются с пакетного идентификатора 0. Delta Lake использует идентификатор пакета и txnAppId в качестве уникального ключа, и пропускает пакеты с уже видимыми values.

В следующем примере кода показан этот шаблон:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}