Upsert in een Delta Lake-tabel met behulp van samenvoegen
U kunt gegevens uit een brontabel, weergave of DataFrame upsert in een delta-doeltabel met behulp van de MERGE
SQL-bewerking. Delta Lake biedt ondersteuning voor invoegingen, updates en verwijderingen en biedt ondersteuning voor uitgebreide syntaxis buiten de SQL-standaarden om geavanceerde gebruiksvoorbeelden MERGE
mogelijk te maken.
Stel dat u een brontabel hebt met de naam people10mupdates
of een bronpad waarop nieuwe gegevens voor een doeltabel met de naam people10m
of het doelpad /tmp/delta/people-10m-updates
staan/tmp/delta/people-10m
. Sommige van deze nieuwe records zijn mogelijk al aanwezig in de doelgegevens. Als u de nieuwe gegevens wilt samenvoegen, wilt u rijen bijwerken waar de persoon id
al aanwezig is en de nieuwe rijen invoegen waar geen overeenkomende id
items aanwezig zijn. U kunt de volgende query uitvoeren:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Belangrijk
Slechts één rij uit de brontabel kan overeenkomen met een bepaalde rij in de doeltabel. In Databricks Runtime 16.0 en hoger MERGE
evalueert u voorwaarden die zijn opgegeven in de WHEN MATCHED
en ON
componenten om dubbele overeenkomsten te bepalen. In Databricks Runtime 15.4 LTS en hieronder MERGE
worden voor bewerkingen alleen voorwaarden in overweging gegeven die zijn opgegeven in de ON
component.
Zie de Documentatie voor de Delta Lake-API voor de syntaxis van Scala en Python. Zie MERGE INTO voor meer informatie over DE SQL-syntaxis
Alle niet-overeenkomende rijen wijzigen met behulp van samenvoegen
In Databricks SQL en Databricks Runtime 12.2 LTS en hoger kunt u de WHEN NOT MATCHED BY SOURCE
component gebruiken voor UPDATE
of DELETE
records in de doeltabel zonder bijbehorende records in de brontabel. Databricks raadt aan een optionele voorwaardelijke component toe te voegen om te voorkomen dat de doeltabel volledig wordt herschreven.
In het volgende codevoorbeeld ziet u de basissyntaxis van het gebruik hiervan voor verwijderingen, waarbij de doeltabel wordt overschreven met de inhoud van de brontabel en niet-overeenkomende records in de doeltabel worden verwijderd. Zie Incrementeel Delta-tabel synchroniseren met bron voor een meer schaalbaar patroon voor tabellen waarin bronupdates en verwijderingen tijdsgebonden zijn.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
In het volgende voorbeeld worden voorwaarden aan de WHEN NOT MATCHED BY SOURCE
component toegevoegd en worden waarden opgegeven die moeten worden bijgewerkt in niet-overeenkomende doelrijen.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Semantiek van bewerking samenvoegen
Hier volgt een gedetailleerde beschrijving van de semantiek van de merge
programmatische bewerking.
Er kan een willekeurig aantal
whenMatched
componenten zijnwhenNotMatched
.whenMatched
componenten worden uitgevoerd wanneer een bronrij overeenkomt met een doeltabelrij op basis van de voorwaarde van de overeenkomst. Deze componenten hebben de volgende semantiek.whenMatched
componenten kunnen maximaal éénupdate
en ééndelete
actie hebben. Metupdate
de actie wordenmerge
alleen de opgegeven kolommen (vergelijkbaar met deupdate
bewerking) van de overeenkomende doelrij bijgewerkt. Metdelete
de actie wordt de overeenkomende rij verwijderd.Elke
whenMatched
component kan een optionele voorwaarde hebben. Als deze componentvoorwaarde bestaat, wordt deupdate
ofdelete
actie alleen uitgevoerd voor een overeenkomend brondoelrijpaar wanneer aan de componentvoorwaarde wordt voldaan.Als er meerdere
whenMatched
componenten zijn, worden ze geëvalueerd in de volgorde waarin ze zijn opgegeven. AllewhenMatched
componenten, met uitzondering van de laatste, moeten voorwaarden hebben.Als geen van de
whenMatched
voorwaarden waar is voor een bron- en doelrijpaar dat overeenkomt met de samenvoegvoorwaarde, blijft de doelrij ongewijzigd.Als u alle kolommen van de doel-Delta-tabel wilt bijwerken met de bijbehorende kolommen van de brongegevensset, gebruikt u
whenMatched(...).updateAll()
. Dit komt overeen met:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
voor alle kolommen van de doel-Delta-tabel. Daarom gaat deze actie ervan uit dat de brontabel dezelfde kolommen heeft als die in de doeltabel, anders genereert de query een analysefout.
Notitie
Dit gedrag verandert wanneer automatische schemaontwikkeling is ingeschakeld. Zie de automatische schemaontwikkeling voor meer informatie.
whenNotMatched
componenten worden uitgevoerd wanneer een bronrij niet overeenkomt met een doelrij op basis van de overeenkomstvoorwaarde. Deze componenten hebben de volgende semantiek.whenNotMatched
componenten kunnen alleen deinsert
actie hebben. De nieuwe rij wordt gegenereerd op basis van de opgegeven kolom en bijbehorende expressies. U hoeft niet alle kolommen in de doeltabel op te geven. Voor niet-opgegeven doelkolommen wordtNULL
ingevoegd.Elke
whenNotMatched
component kan een optionele voorwaarde hebben. Als de componentvoorwaarde aanwezig is, wordt alleen een bronrij ingevoegd als die voorwaarde waar is voor die rij. Anders wordt de bronkolom genegeerd.Als er meerdere
whenNotMatched
componenten zijn, worden ze geëvalueerd in de volgorde waarin ze zijn opgegeven. AllewhenNotMatched
componenten, met uitzondering van de laatste, moeten voorwaarden hebben.Als u alle kolommen van de doel-Delta-tabel wilt invoegen met de bijbehorende kolommen van de brongegevensset, gebruikt u
whenNotMatched(...).insertAll()
. Dit komt overeen met:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
voor alle kolommen van de doel-Delta-tabel. Daarom gaat deze actie ervan uit dat de brontabel dezelfde kolommen heeft als die in de doeltabel, anders genereert de query een analysefout.
Notitie
Dit gedrag verandert wanneer automatische schemaontwikkeling is ingeschakeld. Zie de automatische schemaontwikkeling voor meer informatie.
whenNotMatchedBySource
componenten worden uitgevoerd wanneer een doelrij niet overeenkomt met een bronrij op basis van de samenvoegvoorwaarde. Deze componenten hebben de volgende semantiek.whenNotMatchedBySource
componenten kunnen opgevendelete
enupdate
acties uitvoeren.- Elke
whenNotMatchedBySource
component kan een optionele voorwaarde hebben. Als de componentvoorwaarde aanwezig is, wordt alleen een doelrij gewijzigd als die voorwaarde waar is voor die rij. Anders blijft de doelrij ongewijzigd. - Als er meerdere
whenNotMatchedBySource
componenten zijn, worden ze geëvalueerd in de volgorde waarin ze zijn opgegeven. AllewhenNotMatchedBySource
componenten, met uitzondering van de laatste, moeten voorwaarden hebben. - Componenten hebben per definitie
whenNotMatchedBySource
geen bronrij waaruit kolomwaarden kunnen worden opgehaald, zodat er niet naar bronkolommen kan worden verwezen. Voor elke kolom die moet worden gewijzigd, kunt u een letterlijke waarde opgeven of een actie uitvoeren op de doelkolom, zoalsSET target.deleted_count = target.deleted_count + 1
.
Belangrijk
- Een
merge
bewerking kan mislukken als meerdere rijen van de brongegevensset overeenkomen en de samenvoegbewerking probeert dezelfde rijen van de delta-doeltabel bij te werken. Volgens de SQL-semantiek van samenvoegen is een dergelijke updatebewerking dubbelzinnig omdat het onduidelijk is welke bronrij moet worden gebruikt om de overeenkomende doelrij bij te werken. U kunt de brontabel vooraf verwerken om de mogelijkheid van meerdere overeenkomsten te elimineren. - U kunt een SQL-bewerking
MERGE
alleen toepassen op een SQL-WEERGAVE als de weergave is gedefinieerd alsCREATE VIEW viewName AS SELECT * FROM deltaTable
.
Gegevensontdubbeling bij het schrijven naar Delta-tabellen
Een veelvoorkomend ETL-gebruiksvoorbeeld is het verzamelen van logboeken in de Delta-tabel door ze toe te voegen aan een tabel. Vaak kunnen de bronnen echter dubbele logboekrecords genereren en downstreamontdubbelingsstappen nodig zijn om ze te kunnen verzorgen. Met merge
kunt u voorkomen dat u de dubbele records invoegt.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Notitie
De gegevensset met de nieuwe logboeken moet op zichzelf worden ontdubbeld. Door de SQL-semantiek van samenvoegen worden de nieuwe gegevens gematcht en ontdubbeld met de bestaande gegevens in de tabel, maar als er dubbele gegevens in de nieuwe gegevensset zijn, wordt deze ingevoegd. Ontdubbel de nieuwe gegevens daarom voordat u deze samenvoegt in de tabel.
Als u weet dat u slechts een paar dagen dubbele records krijgt, kunt u de query verder optimaliseren door de tabel op datum te partitioneren en vervolgens het datumbereik van de doeltabel op te geven waarop moet worden vergeleken.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Dit is efficiënter dan de vorige opdracht, omdat er alleen in de afgelopen zeven dagen aan logboeken naar duplicaten wordt gezocht, niet de hele tabel. Bovendien kunt u deze invoegbewerking alleen gebruiken met Structured Streaming om continue ontdubbeling van de logboeken uit te voeren.
- In een streamingquery kunt u de samenvoegbewerking
foreachBatch
gebruiken om continu streaminggegevens naar een Delta-tabel te schrijven met ontdubbeling. Zie het volgende streamingvoorbeeld voor meer informatie overforeachBatch
. - In een andere streamingquery kunt u continu ontdubbelde gegevens uit deze Delta-tabel lezen. Dit is mogelijk omdat met een samenvoegbewerking alleen nieuwe gegevens worden toegevoegd aan de Delta-tabel.
Langzaam wijzigen van gegevens (SCD) en gegevensopname wijzigen (CDC) met Delta Lake
Delta Live Tables biedt systeemeigen ondersteuning voor het bijhouden en toepassen van SCD-type 1 en Type 2. Gebruik APPLY CHANGES INTO
met Delta Live Tables om ervoor te zorgen dat records buiten de order correct worden verwerkt bij het verwerken van CDC-feeds. Zie de APPLY CHANGES API's: Vereenvoudig het vastleggen van wijzigingsgegevens met Delta Live Tables.
Delta-tabel incrementeel synchroniseren met bron
In Databricks SQL en Databricks Runtime 12.2 LTS en hoger kunt WHEN NOT MATCHED BY SOURCE
u willekeurige voorwaarden maken om een deel van een tabel atomisch te verwijderen en te vervangen. Dit kan met name handig zijn wanneer u een brontabel hebt waarin records enkele dagen na de initiële gegevensinvoer kunnen worden gewijzigd of verwijderd, maar uiteindelijk een definitieve status hebben.
In de volgende query ziet u hoe u dit patroon gebruikt om vijf dagen records uit de bron te selecteren, overeenkomende records in het doel bij te werken, nieuwe records van de bron in te voegen aan het doel en alle niet-overeenkomende records uit de afgelopen 5 dagen in het doel te verwijderen.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Door hetzelfde Booleaanse filter op te geven voor de bron- en doeltabellen, kunt u wijzigingen van uw bron dynamisch doorgeven aan doeltabellen, inclusief verwijderingen.
Notitie
Hoewel dit patroon kan worden gebruikt zonder voorwaardelijke componenten, zou dit leiden tot het volledig herschrijven van de doeltabel die duur kan zijn.