Partilhar via


Inserção ou atualização em um Delta Lake table usando fusione

Você pode inserir ou atualizar dados de uma origem table, exibição ou DataFrame para um destino Delta table usando a operação SQL MERGE. O Delta Lake oferece suporte a inserções, atualizações e exclusões no MERGE, e oferece suporte à sintaxe estendida além dos padrões SQL para facilitar casos de uso avançados.

Suponha que você tenha um table de origem chamado people10mupdates ou um caminho de origem em /tmp/delta/people-10m-updates que contenha novos dados para um table de destino chamado people10m ou um caminho de destino em /tmp/delta/people-10m. Alguns destes novos registos podem já estar presentes nos dados de destino. Para mesclar os novos dados, você deseja update linhas where o id da pessoa já está presente e insert as novas linhas where nenhuma id correspondente esteja presente. Você pode executar a seguinte consulta:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Importante

Apenas uma única linha do table de origem pode corresponder a uma determinada linha no tablede destino. No Databricks Runtime 16.0 e superior, MERGE avalia as WHEN MATCHED condições especificadas nas cláusulas e ON para determinar correspondências duplicadas. No Databricks Runtime 15.4 LTS e inferior, MERGE as operações consideram apenas as condições especificadas na ON cláusula.

Consulte a documentação da API Delta Lake para obter detalhes de sintaxe Scala e Python. Para obter detalhes da sintaxe SQL, consulte MERGE INTO

Modificar todas as linhas incomparáveis usando mesclagem

Nas Databricks SQL e Databricks Runtime 12.2 LTS e posteriores, é possível utilizar a cláusula WHEN NOT MATCHED BY SOURCE para UPDATE ou DELETE registros no alvo table que não tenham registros correspondentes na origem table. Databricks recomenda a adição de uma cláusula condicional opcional para evitar a reescrita completa do elemento tablede destino.

O exemplo de código a seguir mostra a sintaxe básica de usar isso para exclusões, substituindo o table de destino pelo conteúdo do table de origem e excluindo registros incompatíveis no tablede destino. Para obter um padrão mais escalável para tableswhere, as atualizações e exclusões de origem têm limite de tempo, consulte Incrementalmente sync Delta table com a fonte.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

O exemplo a seguir adiciona condições à cláusula WHEN NOT MATCHED BY SOURCE e especifica de values a update em linhas de destino que não correspondem.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Semântica da operação de mesclagem

Segue-se uma descrição detalhada da semântica da merge operação programática.

  • Pode haver qualquer número de whenMatched e whenNotMatched cláusulas.

  • whenMatched cláusulas são executadas quando uma linha de origem corresponde a uma linha de destino table com base na condição de correspondência. Estas cláusulas têm a seguinte semântica.

    • whenMatched as cláusulas podem ter, no máximo, uma update única delete ação. A ação update no merge apenas atualiza a columns especificada (semelhante à operação update) da linha de destino correspondente. A delete ação exclui a linha correspondente.

    • Cada whenMatched cláusula pode ter uma condição facultativa. Se essa condição de cláusula existir, a ação ou update será executada delete para qualquer par de linha origem-destino correspondente somente quando a condição da cláusula for verdadeira.

    • Se houver várias whenMatched cláusulas, elas são avaliadas na ordem em que são especificadas. Todas as whenMatched cláusulas, exceto a última, devem ter condições.

    • Se nenhuma das whenMatched condições for avaliada como verdadeira para um par de linhas de origem e destino que corresponda à condição de mesclagem, a linha de destino será mantida inalterada.

    • Para update todos os columns do Delta alvo table com os columns correspondentes do conjunto de dados de origem, use whenMatched(...).updateAll(). Isto é equivalente a:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      para todos os columns do alvo Delta table. Portanto, essa ação pressupõe que o table de origem tenha os mesmos columns que os do tablede destino, caso contrário, a consulta lança um erro de análise.

      Nota

      Esse comportamento muda quando a evolução automática do schema está habilitada. Consulte de evolução automática do schema para obter detalhes.

  • whenNotMatched As cláusulas são executadas quando uma linha de origem não corresponde a nenhuma linha de destino com base na condição de correspondência. Estas cláusulas têm a seguinte semântica.

    • whenNotMatched cláusulas só podem ter a insert ação. A nova linha é gerada com base no column especificado e expressões correspondentes. Não é necessário especificar todos os columns no alvo table. Para destino columnsnão especificado, NULL é inserido.

    • Cada whenNotMatched cláusula pode ter uma condição facultativa. Se a condição da cláusula estiver presente, uma linha de origem será inserida somente se essa condição for verdadeira para essa linha. Caso contrário, o column de origem será ignorado.

    • Se houver várias whenNotMatched cláusulas, elas são avaliadas na ordem em que são especificadas. Todas as whenNotMatched cláusulas, exceto a última, devem ter condições.

    • Para insert todos os columns do Delta table de destino com o columns correspondente do conjunto de dados de origem, utilize whenNotMatched(...).insertAll(). Isto é equivalente a:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      para todos os columns do Delta alvo table. Portanto, essa ação pressupõe que o table de origem tenha os mesmos columns que os do tablede destino, caso contrário, a consulta lança um erro de análise.

      Nota

      Esse comportamento muda quando a evolução automática do schema está habilitada. Consulte de evolução automática do schema para obter detalhes.

  • whenNotMatchedBySource As cláusulas são executadas quando uma linha de destino não corresponde a nenhuma linha de origem com base na condição de mesclagem. Estas cláusulas têm a seguinte semântica.

    • whenNotMatchedBySource cláusulas podem especificar delete e update ações.
    • Cada whenNotMatchedBySource cláusula pode ter uma condição facultativa. Se a condição da cláusula estiver presente, uma linha de destino será modificada somente se essa condição for verdadeira para essa linha. Caso contrário, a linha de destino será mantida inalterada.
    • Se houver várias whenNotMatchedBySource cláusulas, elas são avaliadas na ordem em que são especificadas. Todas as whenNotMatchedBySource cláusulas, exceto a última, devem ter condições.
    • Por definição, as cláusulas whenNotMatchedBySource não têm uma linha de origem de onde extrair columnvalues e, portanto, não se pode referenciar columns de origem. Para cada column a ser modificado, pode-se especificar um literal ou realizar uma ação no columnde destino, como SET target.deleted_count = target.deleted_count + 1.

Importante

  • Uma operação de merge pode falhar se várias linhas do conjunto de dados de origem corresponderem e se a mesclagem tentar update as mesmas linhas no Delta de destino table. De acordo com a semântica SQL da mesclagem, tal operação de update é ambígua, pois não está claro qual linha de origem deve ser usada para update a linha de destino correspondente. Você pode pré-processar a fonte table para eliminar a possibilidade de várias correspondências.
  • Você pode aplicar uma operação SQL MERGE em um SQL VIEW somente se o modo de exibição tiver sido definido como CREATE VIEW viewName AS SELECT * FROM deltaTable.

Desduplicação de dados ao gravar no Delta tables

Um caso de uso comum de ETL é coletar logs no Delta table anexando-os a um table. No entanto, muitas vezes as fontes podem generate registros de log duplicados e etapas de desduplicação a jusante são necessárias para cuidar deles. Com mergeo , você pode evitar inserir os registros duplicados.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Nota

O conjunto de dados que contém os novos logs precisa ser desduplicado dentro de si mesmo. De acordo com a semântica SQL de junção, ele compara e remove duplicados dos novos dados com os dados existentes no table, mas se houver dados duplicados dentro do novo conjunto de dados, estes são inseridos. Portanto, desduplique os novos dados antes de mesclar no table.

Se você sabe que pode get registros duplicados apenas por alguns dias, você pode optimize sua consulta ainda mais particionando o table por data e, em seguida, especificando o intervalo de datas do table de destino para corresponder.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Isso é mais eficiente do que o comando anterior, pois procura duplicatas apenas nos últimos 7 dias de logs, não em todo o table. Além disso, é possível usar a mesclagem exclusiva insertcom o Structured Streaming para executar a desduplicação contínua dos logs.

  • Em uma consulta de streaming, pode-se usar a operação de fusão em foreachBatch para gravar continuamente dados de streaming em um Delta table com desduplicação. Consulte o exemplo de streaming a seguir para obter mais informações sobre foreachBatch.
  • Em outra consulta de streaming, você pode ler continuamente dados desduplicados deste Delta table. Isso é possível porque uma mesclagem somente insertapenas acrescenta novos dados ao tableDelta.

Alteração lenta de dados (SCD) e captura de dados de alteração (CDC) com Delta Lake

O Delta Live Tables tem suporte nativo para rastreamento e aplicação de SCD Tipo 1 e Tipo 2. Use APPLY CHANGES INTO com o Delta Live Tables para garantir que os registros fora de ordem sejam tratados corretamente ao processar feeds CDC. Consulte As APIs APPLY CHANGES: Simplifique a captura de dados de alteração com o Delta Live Tables.

Incrementalmente sync Delta table com a origem

No Databricks SQL e no Databricks Runtime 12.2 LTS e superior, pode usar WHEN NOT MATCHED BY SOURCE para criar condições arbitrárias para excluir e substituir atomicamente uma parte de um table. Isso pode ser especialmente útil quando você tem uma fonte tablewhere os registros podem ser alterados ou excluídos por vários dias após a entrada inicial de dados, mas eventualmente se estabelecem em um estado final.

A consulta a seguir demonstra a utilização deste padrão para select registros dos últimos 5 dias da origem, update registros correspondentes no destino, insert novos registros da origem para o destino, e elimina todos os registros não correspondentes dos últimos 5 dias no destino.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Ao fornecer o mesmo filtro booleano na tablesde origem e na tablesde destino, consegue propagar dinamicamente as alterações da sua origem para o destino, incluindo eliminações.

Nota

Embora esse padrão possa ser usado sem cláusulas condicionais, isso levaria a uma reformulação completa do table alvo, o que pode ser caro.