Upsert till en Delta Lake-tabell med sammanslagning
Du kan överföra data från en källtabell, vy eller DataFrame till en Delta-måltabell med hjälp MERGE
av SQL-åtgärden. Delta Lake stöder infogningar, uppdateringar och borttagningar i MERGE
och stöder utökad syntax utöver SQL-standarderna för att underlätta avancerade användningsfall.
Anta att du har en källtabell med namnet people10mupdates
eller en källsökväg som /tmp/delta/people-10m-updates
innehåller nya data för en måltabell med namnet people10m
eller en målsökväg på /tmp/delta/people-10m
. Vissa av dessa nya poster kanske redan finns i måldata. Om du vill sammanfoga nya data vill du uppdatera rader där personens id
redan finns och infoga de nya raderna där ingen matchning id
finns. Du kan köra följande fråga:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Viktigt!
Endast en enskild rad från källtabellen kan matcha en viss rad i måltabellen. I Databricks Runtime 16.0 och senare MERGE
utvärderar villkor som anges i satserna WHEN MATCHED
och ON
för att fastställa dubblettmatchningar. I Databricks Runtime 15.4 LTS och nedan MERGE
tar åtgärder endast hänsyn till villkor som anges i ON
-satsen.
Se Delta Lake API-dokumentationen för Scala- och Python-syntaxinformation. Information om SQL-syntax finns i MERGE INTO
Ändra alla omatchade rader med hjälp av sammanslagning
I Databricks SQL och Databricks Runtime 12.2 LTS och senare kan du använda WHEN NOT MATCHED BY SOURCE
satsen till UPDATE
eller DELETE
poster i måltabellen som inte har motsvarande poster i källtabellen. Databricks rekommenderar att du lägger till en valfri villkorssats för att undvika att skriva om måltabellen helt.
I följande kodexempel visas den grundläggande syntaxen för att använda detta för borttagningar, skriva över måltabellen med innehållet i källtabellen och ta bort omatchade poster i måltabellen. Ett mer skalbart mönster för tabeller där källuppdateringar och borttagningar är tidsbundna finns i Synkronisera Delta-tabell stegvis med källan.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
I följande exempel läggs villkor till i WHEN NOT MATCHED BY SOURCE
-satsen och värden som ska uppdateras i omatchade målrader.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Kopplingsåtgärdssemantik
Följande är en detaljerad beskrivning av semantiken för merge
programmatisk åtgärd.
Det kan finnas valfritt antal
whenMatched
ochwhenNotMatched
satser.whenMatched
-satser körs när en källrad matchar en måltabellrad baserat på matchningsvillkoret. Dessa satser har följande semantik.whenMatched
satser kan ha högst enupdate
och endelete
åtgärd. Åtgärdenupdate
imerge
uppdaterar endast de angivna kolumnerna (liknarupdate
åtgärden) för den matchade målraden. Åtgärdendelete
tar bort den matchade raden.Varje
whenMatched
sats kan ha ett valfritt villkor. Om det här villkoret finnsupdate
körs eller-åtgärdendelete
endast för matchande källmålradpar när villkoret för satsen är sant.Om det finns flera
whenMatched
satser utvärderas de i den ordning de anges. AllawhenMatched
satser, utom den sista, måste ha villkor.Om inget av
whenMatched
villkoren utvärderas till sant för ett käll- och målradpar som matchar kopplingsvillkoret lämnas målraden oförändrad.Om du vill uppdatera alla kolumner i måldeltabeln med motsvarande kolumner i källdatauppsättningen använder du
whenMatched(...).updateAll()
. Detta motsvarar:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
för alla kolumner i deltatabellen för mål. Därför förutsätter den här åtgärden att källtabellen har samma kolumner som de i måltabellen, annars utlöser frågan ett analysfel.
Kommentar
Det här beteendet ändras när automatisk schemautveckling är aktiverad. Mer information finns i automatisk schemautveckling .
whenNotMatched
-satser körs när en källrad inte matchar någon målrad baserat på matchningsvillkoret. Dessa satser har följande semantik.whenNotMatched
-satser kan bara ha åtgärdeninsert
. Den nya raden genereras baserat på den angivna kolumnen och motsvarande uttryck. Du behöver inte ange alla kolumner i måltabellen. För ospecificerade målkolumnerNULL
infogas.Varje
whenNotMatched
sats kan ha ett valfritt villkor. Om villkoret finns infogas endast en källrad om villkoret är sant för den raden. Annars ignoreras källkolumnen.Om det finns flera
whenNotMatched
satser utvärderas de i den ordning de anges. AllawhenNotMatched
satser, utom den sista, måste ha villkor.Om du vill infoga alla kolumner i måldeltabellen med motsvarande kolumner i källdatauppsättningen använder du
whenNotMatched(...).insertAll()
. Detta motsvarar:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
för alla kolumner i deltatabellen för mål. Därför förutsätter den här åtgärden att källtabellen har samma kolumner som de i måltabellen, annars utlöser frågan ett analysfel.
Kommentar
Det här beteendet ändras när automatisk schemautveckling är aktiverad. Mer information finns i automatisk schemautveckling .
whenNotMatchedBySource
-satser körs när en målrad inte matchar någon källrad baserat på kopplingsvillkoret. Dessa satser har följande semantik.-
whenNotMatchedBySource
satser kan angedelete
ochupdate
åtgärder. - Varje
whenNotMatchedBySource
sats kan ha ett valfritt villkor. Om villkoret för satsen finns ändras endast en målrad om villkoret är sant för den raden. Annars lämnas målraden oförändrad. - Om det finns flera
whenNotMatchedBySource
satser utvärderas de i den ordning de anges. AllawhenNotMatchedBySource
satser, utom den sista, måste ha villkor. - Satser har per definition
whenNotMatchedBySource
ingen källrad att hämta kolumnvärden från, så källkolumner kan inte refereras till. För varje kolumn som ska ändras kan du antingen ange en literal eller utföra en åtgärd på målkolumnen, till exempelSET target.deleted_count = target.deleted_count + 1
.
-
Viktigt!
- En
merge
åtgärd kan misslyckas om flera rader i källdatauppsättningen matchar och sammanfogningen försöker uppdatera samma rader i måldeltabeln. Enligt SQL-semantiken för sammanslagning är en sådan uppdateringsåtgärd tvetydig eftersom det är oklart vilken källrad som ska användas för att uppdatera den matchade målraden. Du kan förbearbeta källtabellen för att eliminera risken för flera matchningar. - Du kan endast tillämpa en SQL-åtgärd
MERGE
på en SQL VIEW om vyn har definierats somCREATE VIEW viewName AS SELECT * FROM deltaTable
.
Datadeduplicering när du skriver till Delta-tabeller
Ett vanligt ETL-användningsfall är att samla in loggar i Delta-tabellen genom att lägga till dem i en tabell. Men ofta kan källorna generera dubbletter av loggposter och underordnade dedupliceringssteg krävs för att ta hand om dem. Med merge
kan du undvika att infoga de duplicerade posterna.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Kommentar
Datamängden som innehåller de nya loggarna måste dedupliceras inom sig själv. Med SQL-semantiken för sammanslagning matchar och deduplicerar den nya data med befintliga data i tabellen, men om det finns duplicerade data i den nya datauppsättningen infogas de. Därför deduplicerar du de nya data innan de sammanfogas i tabellen.
Om du vet att du bara kan hämta dubbletter av poster i några dagar kan du optimera frågan ytterligare genom att partitionera tabellen efter datum och sedan ange datumintervallet för måltabellen som ska matchas.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Det här är effektivare än föregående kommando eftersom det bara söker efter dubbletter under de senaste 7 dagarnas loggar, inte hela tabellen. Dessutom kan du använda den här infogade sammanfogningen med Structured Streaming för att utföra kontinuerlig deduplicering av loggarna.
- I en direktuppspelningsfråga kan du använda sammanslagningsåtgärden i
foreachBatch
för att kontinuerligt skriva strömmande data till en Delta-tabell med deduplicering. Mer information om finns i följande strömningsexempel påforeachBatch
. - I en annan direktuppspelningsfråga kan du kontinuerligt läsa deduplicerade data från den här Delta-tabellen. Detta är möjligt eftersom en sammanfogning endast för infogning lägger till nya data i Delta-tabellen.
Ändra data (SCD) långsamt och ändra datainsamling (CDC) med Delta Lake
Delta Live Tables har inbyggt stöd för spårning och tillämpning av SCD Typ 1 och Typ 2. Använd APPLY CHANGES INTO
med Delta Live Tables för att säkerställa att poster i fel ordning hanteras korrekt vid bearbetning av CDC-feeds. Se API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med Delta Live Tables.
Synkronisera Delta-tabellen stegvis med källan
I Databricks SQL och Databricks Runtime 12.2 LTS och senare kan du använda WHEN NOT MATCHED BY SOURCE
för att skapa godtyckliga villkor för att atomiskt ta bort och ersätta en del av en tabell. Detta kan vara särskilt användbart när du har en källtabell där poster kan ändras eller tas bort i flera dagar efter den första datainmatningen, men slutligen regleras till ett slutligt tillstånd.
Följande fråga visar hur du använder det här mönstret för att välja 5 dagars poster från källan, uppdatera matchande poster i målet, infoga nya poster från källan till målet och ta bort alla omatchade poster från de senaste 5 dagarna i målet.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Genom att tillhandahålla samma booleska filter på käll- och måltabellerna kan du dynamiskt sprida ändringar från källan till måltabeller, inklusive borttagningar.
Kommentar
Även om det här mönstret kan användas utan några villkorssatser, skulle detta leda till att måltabellen skrivs om helt och hållet, vilket kan vara dyrt.