Поделиться через


Вставка или обновление в Delta Lake table с помощью слияния

Данные можно обновлять или вставлять из источника table, представления или кадра данных в целевую Delta table, используя SQL-операцию MERGE. Delta Lake поддерживает вставки, обновления и удаления, MERGEа также поддерживает расширенный синтаксис за пределами стандартов SQL для упрощения расширенных вариантов использования.

Предположим, у вас есть источник table с именем people10mupdates или путь к источнику /tmp/delta/people-10m-updates, содержащему новые данные для целевого table с именем people10m или целевой путь по адресу /tmp/delta/people-10m. Некоторые из этих новых записей уже могут присутствовать в целевых данных. Чтобы объединить новые данные, необходимо update строки, whereid пользователя уже присутствует, и insert новые строки where отсутствуют соответствующие id. Вы можете выполнить следующий запрос:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Внимание

Только одна строка из исходного table может соответствовать определённой строке в целевой table. В Databricks Runtime 16.0 и более поздних версий вычисляет условия, MERGE указанные в WHEN MATCHED предложениях и ON определяющих повторяющиеся совпадения. В Databricks Runtime 15.4 LTS и ниже MERGE операции рассматривают только условия, указанные в предложении ON .

Дополнительные сведения о синтаксисе Scala и Python см. в документации по API Delta Lake. Сведения о синтаксисе SQL см. в MERGE INTO

Изменение всех несовпаденных строк с помощью слияния

В Databricks SQL и Databricks Runtime 12.2 LTS и более поздних версиях вы можете использовать клаузу WHEN NOT MATCHED BY SOURCE для UPDATE или DELETE тех записей в целевом table, которые не имеют соответствующих записей в исходном table. Databricks рекомендует добавить необязательную условную конструкцию, чтобы избежать полной перезаписи целевой table.

В следующем примере кода показан базовый синтаксис использования этого для удаления, перезапись содержимого из исходного table в целевой table и удаление несовпадающих записей в целевом table. Более масштабируемый шаблон для tableswhere, в котором исходные обновления и удаления привязаны к времени, см. инкрементально sync Delta table с исходным.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

В следующем примере добавляются условия в предложение WHEN NOT MATCHED BY SOURCE и указываются values для update в несовпаденных целевых строках.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Семантика операции слияния

Ниже приведено подробное описание семантики программной merge операции.

  • Может существовать любое количество предложений whenMatched и whenNotMatched.

  • whenMatched предложения выполняются, когда исходная строка совпадает с целевой table строкой согласно условию соответствия. Эти предложения имеют следующую семантику.

    • Предложения whenMatched могут иметь не более одного действия update и одного действия delete. Действие update в merge обновляет только указанные columns (как и операция update) в соответствующей найденной целевой строке. Действие delete удаляет сопоставленную строку.

    • Каждое предложение whenMatched может иметь необязательное условие. Если это условие предложения существует, действие update или delete выполняется для любой соответствующей пары исходной и целевой строк, только если условие предложения истинно.

    • Если существует несколько предложений whenMatched, они вычисляются в том порядке, в котором указаны. Все предложения whenMatched, за исключением последнего, должны иметь условия.

    • Если ни одно из условий whenMatched не вычисляется как истинное для исходной и целевой пары строк, которая соответствует условию слияния, целевая строка остается неизменной.

    • Чтобы update все columns целевого разностного table с соответствующим columns исходного набора данных, используйте whenMatched(...).updateAll(). Это соответствует следующей записи:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      для всех columns целевой Delta table. Поэтому это действие предполагает, что исходный table имеет те же columns, что и в целевом table, в противном случае запрос выдает ошибку анализа.

      Примечание.

      Это поведение изменяется при включенной автоматической schema эволюции. Для подробностей см. автоматическую schema эволюцию.

  • Предложения whenNotMatched выполняются, когда исходная строка не соответствует какой-либо целевой строке на основе условия соответствия. Эти предложения имеют следующую семантику.

    • Предложения whenNotMatched могут иметь только действие insert. Новая строка создается на основе указанных column и соответствующих выражений. Не нужно указывать все columns в целевом table. Для неопределенного целевого columnsвставляется NULL.

    • Каждое предложение whenNotMatched может иметь необязательное условие. Если условие предложения присутствует, исходная строка вставляется, только если это условие истинно для этой строки. В противном случае исходный column игнорируется.

    • Если существует несколько предложений whenNotMatched, они вычисляются в том порядке, в котором указаны. Все предложения whenNotMatched, за исключением последнего, должны иметь условия.

    • Чтобы insert все columns целевой Delta table с соответствующим columns исходного набора данных, используйте whenNotMatched(...).insertAll(). Это соответствует следующей записи:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      для всех columns целевой разностной table. Поэтому это действие предполагает, что исходный table имеет те же columns, что и в целевом table, в противном случае запрос выдает ошибку анализа.

      Примечание.

      Это поведение изменяется при включенной автоматической schema эволюции. Дополнительные сведения см. в автоматической schema эволюции.

  • whenNotMatchedBySource предложения выполняются, если целевая строка не соответствует исходной строке в зависимости от условия слияния. Эти предложения имеют следующую семантику.

    • whenNotMatchedBySource предложения могут указывать delete и update выполнять действия.
    • Каждое предложение whenNotMatchedBySource может иметь необязательное условие. Если условие предложения присутствует, целевая строка изменяется только в том случае, если это условие имеет значение true для этой строки. В противном случае целевая строка остается без изменений.
    • Если существует несколько предложений whenNotMatchedBySource, они вычисляются в том порядке, в котором указаны. Все предложения whenNotMatchedBySource, за исключением последнего, должны иметь условия.
    • По определению, whenNotMatchedBySource предложения не имеют исходной строки для извлечения columnvalues, и поэтому на источник columns не удается ссылаться. Для каждого column, который необходимо изменить, можно указать литерал или выполнить действие над целевой column, например SET target.deleted_count = target.deleted_count + 1.

Внимание

  • Операция merge может завершиться ошибкой, если совпадают несколько строк исходного набора данных, а слияние пытается update те же самые строки целевого Delta table. Согласно семантике слияния SQL, такая операция update неоднозначна, так как неясно, какую исходную строку следует использовать для update соответствующей целевой строки. Вы можете предварительно обработать исходный table, чтобы исключить множественные совпадения.
  • Вы можете применить операцию SQL MERGE к представлению SQL, только если представление определено как CREATE VIEW viewName AS SELECT * FROM deltaTable.

дедупликация данных при записи в Delta tables

Один из распространенных вариантов использования ETL — это собирать журналы в Delta table, добавляя их в table. Однако часто источники могут generate повторяющиеся записи журналов и дальнейшие действия дедупликации необходимы для их ухода. С помощью merge можно избежать вставки дублирующихся записей.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Примечание.

Набор данных, содержащий новые журналы, нужно дедуплицировать внутри самого себя. В соответствии с семантикой слияния SQL, новые данные сопоставляются и дедуплицируются с существующими данными в table, однако если в новом наборе данных есть совпадающие данные, они всё равно вставляются. Удалите дубликаты новых данных перед их слиянием в table.

Если вы знаете, что вы можете выполнять get для повторяющихся записей только в течение нескольких дней, вы можете улучшить optimize запрос, разделив table по дате, а затем указав диапазон дат для целевого table для сопоставления.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Это более эффективно, чем предыдущая команда, так как она ищет дубликаты только за последние 7 дней журналов, а не весь table. Кроме того, этот insertможно использовать только в сочетании со структурированной потоковой передачей для выполнения постоянной дедупликации журналов.

  • В потоковом запросе можно использовать операцию слияния в foreachBatch для непрерывной записи данных потоковой передачи в delta table с дедупликацией. Дополнительные сведения о см. в следующем foreachBatch.
  • В другом потоковом запросе вы можете непрерывно считывать дедупликатированные данные из этого Delta table. Это возможно, так как insert-only merge добавляет новые данные только в Delta table.

Медленно меняющиеся данные (SCD) и запись измененных данных (CDC) с Delta Lake

Delta Live Tables имеет встроенную поддержку отслеживания и применения SCD Type 1 и Type 2. Используйте APPLY CHANGES INTO с Delta Live Tables, чтобы обеспечить правильную обработку записей, не по порядку, при обработке потоков данных CDC. См. Интерфейсы API APPLY CHANGES: упрощение фиксации изменений с помощью функции Delta Live Tables.

добавочно sync Delta table с источником

В Databricks SQL и Databricks Runtime 12.2 LTS и более поздних версиях можно использовать WHEN NOT MATCHED BY SOURCE для создания произвольных условий для атомарного удаления и замены части table. Это может быть особенно полезно, если у вас есть исходные записи tableиwhere, которые могут изменяться или удаляться в течение нескольких дней после начального ввода данных, но в конечном итоге приходят в окончательное состояние.

Следующий запрос показывает использование этого шаблона для select записей за последние 5 дней из источника, update совпадающих записей в целевом объекте, insert новых записей, перенесенных из источника в целевой объект, и удаляет все несовпавшие записи в целевом объекте за последние 5 дней.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

При предоставлении того же логического фильтра для исходного и целевого tables, вы сможете динамически распространять изменения из источника в целевой tables, включая удаления.

Примечание.

Хотя этот шаблон можно использовать без каких-либо условных предложений, это приведет к полной перезаписи целевого table, что может оказаться дорогостоящим.