Вставка или обновление в Delta Lake table с помощью слияния
Данные можно обновлять или вставлять из источника table, представления или кадра данных в целевую Delta table, используя SQL-операцию MERGE
. Delta Lake поддерживает вставки, обновления и удаления, MERGE
а также поддерживает расширенный синтаксис за пределами стандартов SQL для упрощения расширенных вариантов использования.
Предположим, у вас есть источник table с именем people10mupdates
или путь к источнику /tmp/delta/people-10m-updates
, содержащему новые данные для целевого table с именем people10m
или целевой путь по адресу /tmp/delta/people-10m
. Некоторые из этих новых записей уже могут присутствовать в целевых данных. Чтобы объединить новые данные, необходимо update строки, whereid
пользователя уже присутствует, и insert новые строки where отсутствуют соответствующие id
. Вы можете выполнить следующий запрос:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Внимание
Только одна строка из исходного table может соответствовать определённой строке в целевой table. В Databricks Runtime 16.0 и более поздних версий вычисляет условия, MERGE
указанные в WHEN MATCHED
предложениях и ON
определяющих повторяющиеся совпадения. В Databricks Runtime 15.4 LTS и ниже MERGE
операции рассматривают только условия, указанные в предложении ON
.
Дополнительные сведения о синтаксисе Scala и Python см. в документации по API Delta Lake. Сведения о синтаксисе SQL см. в MERGE INTO
Изменение всех несовпаденных строк с помощью слияния
В Databricks SQL и Databricks Runtime 12.2 LTS и более поздних версиях вы можете использовать клаузу WHEN NOT MATCHED BY SOURCE
для UPDATE
или DELETE
тех записей в целевом table, которые не имеют соответствующих записей в исходном table. Databricks рекомендует добавить необязательную условную конструкцию, чтобы избежать полной перезаписи целевой table.
В следующем примере кода показан базовый синтаксис использования этого для удаления, перезапись содержимого из исходного table в целевой table и удаление несовпадающих записей в целевом table. Более масштабируемый шаблон для tableswhere, в котором исходные обновления и удаления привязаны к времени, см. инкрементально sync Delta table с исходным.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
В следующем примере добавляются условия в предложение WHEN NOT MATCHED BY SOURCE
и указываются values для update в несовпаденных целевых строках.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Семантика операции слияния
Ниже приведено подробное описание семантики программной merge
операции.
Может существовать любое количество предложений
whenMatched
иwhenNotMatched
.whenMatched
предложения выполняются, когда исходная строка совпадает с целевой table строкой согласно условию соответствия. Эти предложения имеют следующую семантику.Предложения
whenMatched
могут иметь не более одного действияupdate
и одного действияdelete
. Действиеupdate
вmerge
обновляет только указанные columns (как и операцияupdate
) в соответствующей найденной целевой строке. Действиеdelete
удаляет сопоставленную строку.Каждое предложение
whenMatched
может иметь необязательное условие. Если это условие предложения существует, действиеupdate
илиdelete
выполняется для любой соответствующей пары исходной и целевой строк, только если условие предложения истинно.Если существует несколько предложений
whenMatched
, они вычисляются в том порядке, в котором указаны. Все предложенияwhenMatched
, за исключением последнего, должны иметь условия.Если ни одно из условий
whenMatched
не вычисляется как истинное для исходной и целевой пары строк, которая соответствует условию слияния, целевая строка остается неизменной.Чтобы update все columns целевого разностного table с соответствующим columns исходного набора данных, используйте
whenMatched(...).updateAll()
. Это соответствует следующей записи:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
для всех columns целевой Delta table. Поэтому это действие предполагает, что исходный table имеет те же columns, что и в целевом table, в противном случае запрос выдает ошибку анализа.
Примечание.
Это поведение изменяется при включенной автоматической schema эволюции. Для подробностей см. автоматическую schema эволюцию.
Предложения
whenNotMatched
выполняются, когда исходная строка не соответствует какой-либо целевой строке на основе условия соответствия. Эти предложения имеют следующую семантику.Предложения
whenNotMatched
могут иметь только действиеinsert
. Новая строка создается на основе указанных column и соответствующих выражений. Не нужно указывать все columns в целевом table. Для неопределенного целевого columnsвставляетсяNULL
.Каждое предложение
whenNotMatched
может иметь необязательное условие. Если условие предложения присутствует, исходная строка вставляется, только если это условие истинно для этой строки. В противном случае исходный column игнорируется.Если существует несколько предложений
whenNotMatched
, они вычисляются в том порядке, в котором указаны. Все предложенияwhenNotMatched
, за исключением последнего, должны иметь условия.Чтобы insert все columns целевой Delta table с соответствующим columns исходного набора данных, используйте
whenNotMatched(...).insertAll()
. Это соответствует следующей записи:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
для всех columns целевой разностной table. Поэтому это действие предполагает, что исходный table имеет те же columns, что и в целевом table, в противном случае запрос выдает ошибку анализа.
Примечание.
Это поведение изменяется при включенной автоматической schema эволюции. Дополнительные сведения см. в автоматической schema эволюции.
whenNotMatchedBySource
предложения выполняются, если целевая строка не соответствует исходной строке в зависимости от условия слияния. Эти предложения имеют следующую семантику.-
whenNotMatchedBySource
предложения могут указыватьdelete
иupdate
выполнять действия. - Каждое предложение
whenNotMatchedBySource
может иметь необязательное условие. Если условие предложения присутствует, целевая строка изменяется только в том случае, если это условие имеет значение true для этой строки. В противном случае целевая строка остается без изменений. - Если существует несколько предложений
whenNotMatchedBySource
, они вычисляются в том порядке, в котором указаны. Все предложенияwhenNotMatchedBySource
, за исключением последнего, должны иметь условия. - По определению,
whenNotMatchedBySource
предложения не имеют исходной строки для извлечения columnvalues, и поэтому на источник columns не удается ссылаться. Для каждого column, который необходимо изменить, можно указать литерал или выполнить действие над целевой column, напримерSET target.deleted_count = target.deleted_count + 1
.
-
Внимание
- Операция
merge
может завершиться ошибкой, если совпадают несколько строк исходного набора данных, а слияние пытается update те же самые строки целевого Delta table. Согласно семантике слияния SQL, такая операция update неоднозначна, так как неясно, какую исходную строку следует использовать для update соответствующей целевой строки. Вы можете предварительно обработать исходный table, чтобы исключить множественные совпадения. - Вы можете применить операцию SQL
MERGE
к представлению SQL, только если представление определено какCREATE VIEW viewName AS SELECT * FROM deltaTable
.
дедупликация данных при записи в Delta tables
Один из распространенных вариантов использования ETL — это собирать журналы в Delta table, добавляя их в table. Однако часто источники могут generate повторяющиеся записи журналов и дальнейшие действия дедупликации необходимы для их ухода. С помощью merge
можно избежать вставки дублирующихся записей.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Примечание.
Набор данных, содержащий новые журналы, нужно дедуплицировать внутри самого себя. В соответствии с семантикой слияния SQL, новые данные сопоставляются и дедуплицируются с существующими данными в table, однако если в новом наборе данных есть совпадающие данные, они всё равно вставляются. Удалите дубликаты новых данных перед их слиянием в table.
Если вы знаете, что вы можете выполнять get для повторяющихся записей только в течение нескольких дней, вы можете улучшить optimize запрос, разделив table по дате, а затем указав диапазон дат для целевого table для сопоставления.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Это более эффективно, чем предыдущая команда, так как она ищет дубликаты только за последние 7 дней журналов, а не весь table. Кроме того, этот insertможно использовать только в сочетании со структурированной потоковой передачей для выполнения постоянной дедупликации журналов.
- В потоковом запросе можно использовать операцию слияния в
foreachBatch
для непрерывной записи данных потоковой передачи в delta table с дедупликацией. Дополнительные сведения о см. в следующемforeachBatch
. - В другом потоковом запросе вы можете непрерывно считывать дедупликатированные данные из этого Delta table. Это возможно, так как insert-only merge добавляет новые данные только в Delta table.
Медленно меняющиеся данные (SCD) и запись измененных данных (CDC) с Delta Lake
Delta Live Tables имеет встроенную поддержку отслеживания и применения SCD Type 1 и Type 2. Используйте APPLY CHANGES INTO
с Delta Live Tables, чтобы обеспечить правильную обработку записей, не по порядку, при обработке потоков данных CDC. См. Интерфейсы API APPLY CHANGES: упрощение фиксации изменений с помощью функции Delta Live Tables.
добавочно sync Delta table с источником
В Databricks SQL и Databricks Runtime 12.2 LTS и более поздних версиях можно использовать WHEN NOT MATCHED BY SOURCE
для создания произвольных условий для атомарного удаления и замены части table. Это может быть особенно полезно, если у вас есть исходные записи tableиwhere, которые могут изменяться или удаляться в течение нескольких дней после начального ввода данных, но в конечном итоге приходят в окончательное состояние.
Следующий запрос показывает использование этого шаблона для select записей за последние 5 дней из источника, update совпадающих записей в целевом объекте, insert новых записей, перенесенных из источника в целевой объект, и удаляет все несовпавшие записи в целевом объекте за последние 5 дней.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
При предоставлении того же логического фильтра для исходного и целевого tables, вы сможете динамически распространять изменения из источника в целевой tables, включая удаления.
Примечание.
Хотя этот шаблон можно использовать без каких-либо условных предложений, это приведет к полной перезаписи целевого table, что может оказаться дорогостоящим.