Delta table чтение и запись в потоковом режиме
Delta Lake надежно интегрирована со Структурированной потоковой передачей Spark с помощью readStream
и writeStream
. Delta Lake преодолевает многие ограничения, которые обычно связаны с системами потоковой передачи и файлами, включая:
- Объединение небольших файлов, созданных при низкой задержке приема.
- Обслуживание обработки "ровно один раз" с несколькими потоками (или параллельными пакетными заданиями).
- Эффективное обнаружение новых файлов при использовании файлов в качестве источника для потока.
Примечание.
В этой статье описывается использование Delta Lake tables в качестве источников потоковой передачи и приемников. Сведения о загрузке данных с помощью потоковой tables в Databricks SQL см. в статье Загрузка данных с помощью потоковой tables в Databricks SQL.
Сведения о потоковых статических соединениях с Delta Lake см. в разделе "Поток-статические соединения".
Delta table в качестве источника
Структурированная потоковая обработка постепенно считывает Delta tables. Хотя потоковый запрос активен для delta table, новые записи обрабатываются идемпотентно, так как новые версии table фиксируются в исходном table.
В следующих примерах кода показано, как настроить потоковое чтение с помощью table имени или пути к файлу.
Python
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Scala
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Внимание
Если schema для Delta table изменяется после начала потокового чтения с table, запрос завершается ошибкой. Для большинства schema изменений можно перезапустить поток, чтобы устранить несоответствие schema и продолжить обработку.
В Databricks Runtime 12.2 LTS и более ранних версиях невозможно передавать поток из Delta table с включенным сопоставлением column, которое прошло неаддитивную эволюцию schema, например, переименование или удаление columns. Дополнительные сведения см. в разделе Потоковая передача с изменениями schema и сопоставлением column.
скорость ввода Limit
Для управления микропакетами доступны следующие параметры:
-
maxFilesPerTrigger
: Сколько новых файлов будет учитываться в каждом микропакете. Значение по умолчанию — 1000. -
maxBytesPerTrigger
: Сколько данных обрабатывается в каждом микропакете. Этот параметр задает "мягкий максимум", значит, что пакетная обработка приблизительно такого объема данных и может обрабатывать больше данных, чем указано в limit для того, чтобы продвинуть потоковый запрос вперед в случаях, когда наименьший входной блок превышает это значение limit. Это не set по умолчанию.
Если вы используете maxBytesPerTrigger
в сочетании с maxFilesPerTrigger
, данные микро пакетной службы обрабатываются до достижения maxFilesPerTrigger
или maxBytesPerTrigger
limit.
Примечание.
В случаях, когда исходные транзакции table очищаются из-за конфигурации logRetentionDuration
и потоковый запрос пытается обработать эти версии, по умолчанию запрос завершается сбоем, чтобы избежать потери данных. Вы можете set использовать опцию failOnDataLoss
, чтобы false
игнорировать потерянные данные и продолжать обработку.
Потоковая передача потока отслеживания измененных данных Delta Lake (CDC)
Delta Lake поток данных изменений записывает изменения в Delta table, включая обновления и удаления. При включении можно выполнять потоковую передачу данных об изменениях и записывать логику для обработки вставок, обновлений и удалений в нижестоящий tables. Хотя выходные данные потока данных изменений немного отличаются от описания Delta table, это обеспечивает решение для распространения инкрементных изменений в нижестоящие системы tables в медальной архитектуре .
Внимание
В Databricks Runtime 12.2 LTS и ниже невозможно воспроизвести поток данных из канала изменений для Delta table с включенным сопоставлением column, которое прошло нераспространяющуюся эволюцию schema, например, переименование или удаление columns. Смотрите потоковое воспроизведение с сопоставлением column и изменениями schema.
Игнорировать обновления и удаления
Структурированная потоковая передача не обрабатывает входные данные, которые не являются добавлением и создает исключение, если какие-либо изменения происходят в table, используемой в качестве источника. Существуют две основные стратегии для работы с изменениями, которые не могут быть автоматически распространены по нисходящей:
- Вы можете удалить выходные данные и контрольную точку и перезапустить поток с самого начала.
- Вы можете set один из следующих двух вариантов:
-
ignoreDeletes
: игнорируйте транзакции, которые удаляют данные на границах partition. -
skipChangeCommits
: игнорировать транзакции, которые удаляют или изменяют существующие записи.skipChangeCommits
включаетignoreDeletes
.
-
Примечание.
В Databricks Runtime 12.2 LTS и более поздних skipChangeCommits
версиях не рекомендуется использовать предыдущий параметр ignoreChanges
. В Databricks Runtime 11.3 LTS и более низкий ignoreChanges
вариант поддерживается.
Семантика параметраignoreChanges
существенно отличается от семантики параметра skipChangeCommits
. С включенной ignoreChanges
перезаписанные файлы данных в исходном table повторно создаются после операции изменения данных, например UPDATE
, MERGE INTO
, DELETE
(внутри разделов) или OVERWRITE
. Неизменяемые строки часто создаются вместе с новыми строками, поэтому нижестоящие потребители должны иметь возможность обрабатывать дубликаты. Удаления не распространяются по нисходящей.
ignoreChanges
включает ignoreDeletes
.
skipChangeCommits
полностью игнорирует операции изменения файлов. Файлы данных, которые переписываются в исходном table из-за операции изменения данных, например UPDATE
, MERGE INTO
, DELETE
и OVERWRITE
, полностью игнорируются. Чтобы отразить изменения в вышестоящем исходном tables, необходимо реализовать отдельную логику для распространения этих изменений.
Рабочие нагрузки, настроенные с продолжением работы с ignoreChanges
известной семантикой, но Databricks рекомендует использовать skipChangeCommits
для всех новых рабочих нагрузок. Перенос рабочих нагрузок, использующихся ignoreChanges
для skipChangeCommits
выполнения рефакторинга, требует логики рефакторинга.
Пример
Например, предположим, что у вас есть tableuser_events
с date
, user_email
и action
columns, распределённых по date
. Вы осуществляете передачу данных из user_events
table и вам необходимо удалить данные оттуда в соответствии с ОРЗД.
При удалении по границам partition (т. е. WHERE
находится на partitioncolumn), файлы уже сегментированы по значению, поэтому удаление просто убирает эти файлы из метаданных. При удалении всех данных partition можно использовать следующее:
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
При удалении данных в нескольких разделах (в этом примере фильтрация по user_email
) используйте следующий синтаксис:
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
Если вы применяете update к user_email
с помощью оператора UPDATE
, файл, содержащий заданный user_email
, перезаписывается. Используйте skipChangeCommits
для пропуска измененных файлов данных.
Указать начальное расположение
Для указания начальной точки источника потоковой передачи Delta Lake можно использовать следующие параметры, не обрабатывая всю table.
startingVersion
: Версия Delta Lake, с которой начинается запуск. Databricks рекомендует не использовать этот параметр для большинства рабочих нагрузок. Если не set, поток начинается с последней доступной версии, включая полный моментальный снимок table в тот момент.Если задано, поток считывает все изменения в Delta table, начиная с указанной версии (включительно). Если указанная версия больше недоступна, поток не запускается. Вы можете получить версии коммитов из
version
column результатов выполнения команды DESCRIBE HISTORY.Чтобы вернуть только последние изменения, укажите
latest
.startingTimestamp
: Метка времени для запуска. Все изменения table, зафиксированные в момент времени или после него (включительно), считываются потоковым считывателем. Если указанная метка времени предшествует всем коммитам table, потоковая передача начинается с самой ранней доступной метки времени. Одно из двух значений:- Строка метки времени. Например,
"2019-01-01T00:00:00.000Z"
. - Строка даты. Например,
"2019-01-01"
.
- Строка метки времени. Например,
Нельзя одновременно использовать оба варианта set. Они вступают в силу только при запуске нового потокового запроса. Если потоковый запрос запущен и в его контрольной точке записан ход выполнения, эти параметры игнорируются.
Внимание
Хотя вы можете запустить потоковый источник из указанной версии или метки времени, schema этого источника всегда является самым последним schema Delta table. Необходимо убедиться, что несовместимые schema не изменяются на delta table после указанной версии или метки времени. В противном случае источник потоковой передачи может возвращать неверные результаты при чтении данных с неправильным schema.
Пример
Например, предположим, что у вас есть tableuser_events
. Если вы хотите считать изменения начиная с версии 5, используйте:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
Если вы хотите считать изменения начиная с 18 октября 2018 г., используйте:
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
Обработка исходного моментального снимка без удаления данных
Примечание.
Эта функция доступна в Databricks Runtime 11.3 LTS и выше. Эта функция предоставляется в режиме общедоступной предварительной версии.
При использовании delta table в качестве источника потока запрос сначала обрабатывает все данные, присутствующих в table. Delta table в этой версии называется начальным снимком. По умолчанию файлы данных Delta tableобрабатываются на основе последнего изменения файла. Однако время последнего изменения необязательно соответствует временной последовательности событий.
В запросе потоковой передачи с отслеживанием состояния с определенным watermarkобработка файлов по времени изменения может привести к обработке записей в неправильном порядке. Это может привести к тому, что записи будут отнесены к поздним событиям watermark.
Чтобы избежать проблемы с удалением данных, включите следующий параметр:
- withEventTimeOrder: следует ли обрабатывать исходный моментальный снимок в порядке времени события.
Если включен временной порядок события, диапазон времени исходного моментального снимка делится на периоды времени. Каждый микропакет обрабатывает период путем фильтрации данных в пределах диапазона времени. Параметры конфигурации maxFilesPerTrigger и maxBytesPerTrigger по-прежнему можно использовать для управления размером микропакета, но только приблизительно, что связано с принципом обработки.
На рисунке ниже показан этот процесс:
Важная информация об этой функции:
- Проблема с удалением данных возникает только в том случае, если исходный разностный моментальный снимок запроса потоковой передачи с отслеживанием состояния обрабатывается в порядке по умолчанию.
- После запуска запроса потока во время обработки исходного моментального снимка изменить
withEventTimeOrder
невозможно. Чтобы запустить повторно с измененнымwithEventTimeOrder
, необходимо удалить контрольную точку. - Если выполняется потоковый запрос с включенным параметром EventTimeOrder, вы не сможете перейти на более раннюю версию DBR, которая не поддерживает эту функцию до завершения обработки исходного моментального снимка. Если необходимо перейти на более раннюю версию, можно дождаться завершения обработки исходного моментального снимка или удалить контрольную точку и перезапустить запрос.
- Эта функция не поддерживается в следующих редких сценариях:
- Время события column — это созданный column, и между источником Delta и watermarkимеют место непроецируемые преобразования.
- Существует watermark с несколькими источниками Delta в потоковом запросе.
- Если включен временной порядок событий, производительность обработки исходных разностных моментальных снимков может быть ниже.
- Каждый микропакет сканирует исходный моментальный снимок, чтобы отфильтровать данные в пределах соответствующего диапазона времени события. Чтобы ускорить фильтрацию, рекомендуется использовать источник Delta column в качестве времени события, чтобы можно было применить пропуск данных (проверьте на пропуск данных для Delta Lake, когда это применимо). Кроме того, разделение table по времени события column может дополнительно ускорить обработку. С помощью пользовательского интерфейса Spark можно узнать, сколько сканируется разностных файлов для определенного микропакета.
Пример
Предположим, у вас есть tableuser_events
с event_time
column. Запрос потоковой передачи — это агрегатный запрос. Если вы хотите убедиться, что данные не будут удаляться во время обработки исходных моментальных снимков, можно использовать следующее:
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
Примечание.
Вы также можете включить эту конфигурацию Spark в кластере, которая будет применяться ко всем потоковым запросам: spark.databricks.delta.withEventTimeOrder.enabled true
Delta table в качестве приемника
Вы также можете записывать данные в Delta table с помощью потоковой передачи данных. Журнал транзакций позволяет Delta Lake гарантировать точно однократную обработку, даже если существуют другие потоки или пакетные запросы, выполняемые одновременно с table.
Примечание.
Функция Delta Lake VACUUM
удаляет все файлы, не управляемые Delta Lake, но пропускает все каталоги, имена которых начинаются с _
. Вы можете безопасно хранить контрольные точки вместе с другими данными и метаданными для delta table с помощью структуры каталогов, например <table-name>/_checkpoints
.
Метрики
Количество байтов и файлов, которым еще предстоит пройти обработку, можно узнать в процессе потоковой обработки, с помощью метрик numBytesOutstanding
и numFilesOutstanding
. К дополнительным метрикам относятся:
-
numNewListedFiles
: количество файлов Delta Lake, перечисленных для вычисления невыполненной работы для этого пакета.-
backlogEndOffset
: версия table, используемая для вычисления невыполненной работы.
-
Если вы выполняете поток в записной книжке, эти метрики можно просмотреть на вкладке "Необработанные данные " на панели мониторинга хода выполнения потокового запроса:
{
"sources" : [
{
"description" : "DeltaSource[file:/path/to/source]",
"metrics" : {
"numBytesOutstanding" : "3456",
"numFilesOutstanding" : "8"
},
}
]
}
Режим добавления
По умолчанию потоки выполняются в режиме добавления, который добавляет новые записи в table.
Используйте метод toTable
для потоковой передачи в tables, как показано в следующем примере:
Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
Scala
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Полный режим
Можно также использовать структурированную потоковую передачу, чтобы заменять весь table в каждом пакете. Один из примеров использования — вычисление сводки с помощью статистической обработки:
Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
Scala
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
В предыдущем примере непрерывно обновляется table, которая содержит совокупное количество событий от клиента.
Для приложений с менее строгими дополнительными требованиями к задержке можно сэкономить вычислительные ресурсы с одноразовыми триггерами. Используйте их для update сводного агрегирования tables по заданному расписанию, обрабатывая только новые данные, поступившие после последнего update.
Upsert из потоковых запросов с помощью foreachBatch
Можно использовать сочетание merge
и foreachBatch
для записи сложных апсертов из потокового запроса в Delta table. См. раздел Использование foreachBatch для записи в произвольные приемники данных.
Этот шаблон содержит множество приложений, включая следующее:
- Запись агрегатов потоковой обработки в режиме Update: это гораздо эффективнее, чем полный режим.
-
запись потока изменений в базе данных в Delta table: запрос слияния для записи данных об изменениях можно использовать в
foreachBatch
для непрерывного применения потока изменений к Delta table. -
запись потока данных в Delta table с дедупликацией: запрос на объединение только insertдля дедупликации можно использовать в
foreachBatch
для непрерывной записи данных (с дубликатами) в разностную table с автоматической дедупликацией.
Примечание.
- Убедитесь, что инструкция
merge
вforeachBatch
идемпотентная, так как перезагрузка потокового запроса может применить операцию к тому же пакету данных несколько раз. - Если
merge
используется вforeachBatch
, скорость входных данных для запроса потоковой передачи (выдаваемых черезStreamingQueryProgress
и видимых в диаграмме скорости записной книжки) может быть кратна фактической скорости, с которой данные создаются в источнике. Это связано с тем, чтоmerge
считывает входные данные несколько раз, что приводит к умножению метрик входных данных. Если это узкое место, можно кэшировать пакетный DataFrame доmerge
и кэшировать его послеmerge
.
В следующем примере показано, как можно использовать SQL для foreachBatch
выполнения этой задачи:
Scala
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Вы также можете использовать API Delta Lake для выполнения потоковых upserts, как показано в следующем примере:
Scala
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Идемпотентная table записывается в foreachBatch
Примечание.
Databricks рекомендует настроить отдельную потоковую запись для каждого приемника, который вы хотите update. Использование foreachBatch
для записи в несколько tables упорядочивает записи, что снижает параллелизацию и увеличивает общую задержку.
Delta tables поддерживает следующие параметры DataFrameWriter
, чтобы делать записи в несколько tables в течение foreachBatch
идемпотентными.
-
txnAppId
: уникальная строка, которую можно передать для каждой записи dataFrame. Например, идентификатор StreamingQuery можно использовать какtxnAppId
. -
txnVersion
: Монотонно возрастающее количество, действующее как версия транзакции.
В Delta Lake используется сочетание txnAppId
и txnVersion
для обнаружения повторяющихся записей и их пропуска.
Если пакетная запись прерывается сбоем, повторное выполнение пакета использует одно и то же приложение и идентификатор пакетной службы, чтобы помочь среде выполнения правильно определить повторяющиеся записи и игнорировать их. Идентификатор приложения (txnAppId
) может быть любой уникальной строкой, созданной пользователем, и не обязательно должен быть связан с идентификатором потока. См. раздел Использование foreachBatch для записи в произвольные приемники данных.
Предупреждение
Если удалить контрольную точку потоковой передачи и перезапустить запрос с помощью новой контрольной точки, необходимо указать другой txnAppId
. Новые контрольные точки начинаются с пакетного идентификатора 0
. Delta Lake использует идентификатор пакета и txnAppId
в качестве уникального ключа, и пропускает пакеты с уже видимыми values.
В следующем примере кода показан этот шаблон:
Python
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
Scala
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}