Actualización en una tabla de Delta Lake mediante combinación
Puede actualizar datos de una tabla de origen, vista o dataframe en una tabla Delta de destino mediante la operación SQL MERGE
. Delta Lake admite inserciones, actualizaciones y eliminaciones en MERGE
y admite la sintaxis extendida más allá de los estándares de SQL para facilitar casos de uso avanzados.
Supongamos que tiene una tabla de origen denominada people10mupdates
o una ruta de acceso de origen en /tmp/delta/people-10m-updates
que contiene nuevos datos para una tabla de destino denominada people10m
o una ruta de acceso de destino en /tmp/delta/people-10m
. Es posible que algunos de estos nuevos registros ya están presentes en los datos de destino. Para combinar los nuevos datos, quiere actualizar las filas en las que el id
de la persona ya está presente e insertar las nuevas filas en las que no haya ningún id
que coincida. Puede ejecutar la siguiente consulta:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Importante
Solo una fila de la tabla de origen puede coincidir con una fila determinada en la tabla de destino. En Databricks Runtime 16.0 y versiones posteriores, MERGE
evalúa las condiciones especificadas en las WHEN MATCHED
cláusulas y ON
para determinar las coincidencias duplicadas. En Databricks Runtime 15.4 LTS y versiones posteriores, MERGE
las operaciones solo consideran las condiciones especificadas en la ON
cláusula .
Para obtener información sobre la sintaxis de Scala y Python, consulte la Documentación de la API de Delta Lake. Para más información sobre la sintaxis SQL, consulte MERGE INTO
Modificación de todas las filas no coincidentes mediante la combinación
En Databricks SQL, Databricks Runtime 12.2 LTS y versiones posteriores, puede usar la cláusula WHEN NOT MATCHED BY SOURCE
para los registros UPDATE
o DELETE
de la tabla de destino que no tienen los registros correspondientes en la tabla de origen. Databricks recomienda agregar una cláusula condicional opcional para evitar volver a escribir completamente la tabla de destino.
En el código de ejemplo siguiente se muestra la sintaxis básica para usar esto para las eliminaciones, sobrescribiendo la tabla de destino con el contenido de la tabla de origen y eliminando registros no coincidentes en la tabla de destino. Para obtener un patrón más escalable para las tablas en las que las actualizaciones y eliminaciones de origen están enlazadas por tiempo, consulte Sincronización incremental de la tabla Delta con el origen.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
En el ejemplo siguiente se agregan condiciones a la cláusula WHEN NOT MATCHED BY SOURCE
y se especifican los valores que se van a actualizar en filas de destino no coincidentes.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
Semántica de la operación Merge
A continuación se encuentra una descripción detallada de la semántica de la operación merge
mediante programación.
Puede haber varias cláusulas
whenMatched
ywhenNotMatched
.Las cláusulas
whenMatched
se ejecutan cuando una fila de origen coincide con una fila de tabla de destino en función de la condición de coincidencia. Estas cláusulas tienen la semántica siguiente.Las cláusulas
whenMatched
pueden tener como máximo una acciónupdate
y una accióndelete
. La acciónupdate
demerge
solo actualiza las columnas especificadas (de forma similar a laupdate
) de la fila de destino coincidente. La accióndelete
elimina la fila coincidente.Cada cláusula
whenMatched
puede tener una condición opcional. Si esta condición de cláusula existe, la acciónupdate
odelete
se ejecuta para cualquier fila del par de filas de origen-destino coincidente solo cuando la condición de la cláusula es true.Si hay varias cláusulas
whenMatched
, se evalúan en el orden en que se especifican. Todas las cláusulaswhenMatched
, excepto la última, deben tener condiciones.Si ninguna de las condiciones
whenMatched
se evalúa como true para un par de filas de origen y destino que coincida con la condición de combinación, la fila de destino se deja sin modificar.Para actualizar todas las columnas de la tabla Delta de destino con las columnas correspondientes del conjunto de datos de origen, use
whenMatched(...).updateAll()
. Esto equivale a:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
para todas las columnas de la tabla Delta de destino. Por lo tanto, esta acción supone que la tabla de origen tiene las mismas columnas que la tabla de destino; si no es así, la consulta genera un error de análisis.
Nota:
Este comportamiento cambia cuando se habilita la evolución automática de esquemas. Para más información, consulte Evolución automática del esquema.
Las cláusulas
whenNotMatched
se ejecutan cuando una fila de origen no coincide con ninguna fila de destino según la condición de coincidencia. Estas cláusulas tienen la semántica siguiente.Las cláusulas
whenNotMatched
solo pueden tener la accióninsert
. La nueva fila se genera en función de la columna especificada y las expresiones correspondientes. No es preciso especificar todas las columnas de la tabla de destino. Para las columnas de destino no especificadas, se insertaNULL
.Cada cláusula
whenNotMatched
puede tener una condición opcional. Si la condición de la cláusula está presente, se inserta una fila de origen, pero solo si la condición es "true" en esa fila. De lo contrario, se omite la columna de origen.Si hay varias cláusulas
whenNotMatched
, se evalúan en el orden en que se especifican. Todas las cláusulaswhenNotMatched
, excepto la última, deben tener condiciones.Para insertar todas las columnas de la tabla Delta de destino con las columnas correspondientes del conjunto de datos de origen, use
whenNotMatched(...).insertAll()
. Esto equivale a:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
para todas las columnas de la tabla Delta de destino. Por lo tanto, esta acción supone que la tabla de origen tiene las mismas columnas que la tabla de destino; si no es así, la consulta genera un error de análisis.
Nota:
Este comportamiento cambia cuando se habilita la evolución automática de esquemas. Para más información, consulte Evolución automática del esquema.
Las cláusulas
whenNotMatchedBySource
se ejecutan cuando una fila de destino no coincide con ninguna fila de origen según la condición de combinación. Estas cláusulas tienen la semántica siguiente.- Las cláusulas
whenNotMatchedBySource
pueden especificar las accionesdelete
yupdate
. - Cada cláusula
whenNotMatchedBySource
puede tener una condición opcional. Si la condición de la cláusula está presente, solo se insertan aquellas filas de destino en las que se cumpla la condición. De lo contrario, la fila de destino se quedan sin cambios. - Si hay varias cláusulas
whenNotMatchedBySource
, se evalúan en el orden en que se especifican. Todas las cláusulaswhenNotMatchedBySource
, excepto la última, deben tener condiciones. - Por definición, las cláusulas
whenNotMatchedBySource
no tienen una fila de origen para extraer valores de columna, por lo que no se puede hacer referencia a las columnas de origen. Para cada columna que se va a modificar, puede especificar un literal o realizar una acción en la columna de destino, comoSET target.deleted_count = target.deleted_count + 1
.
- Las cláusulas
Importante
- Una operación
merge
puede generar un error si varias filas del conjunto de datos de origen coinciden y la fusión mediante combinación intenta actualizar las mismas filas de la tabla Delta de destino. Según la semántica SQL de la fusión mediante combinación, esta operación de actualización es ambigua, ya que no está claro qué fila de origen se debe usar para actualizar la fila de destino coincidente. Puede procesar previamente la tabla de origen para eliminar la posibilidad de que haya varias coincidencias. - Puede aplicar una operación SQL
MERGE
a SQL VIEW solo si la vista se ha definido comoCREATE VIEW viewName AS SELECT * FROM deltaTable
.
Desduplicación de datos al escribir en tablas Delta
Un caso de uso común de extracción, transformación y carga de datos es recopilar registros en la tabla Delta anexándolos a una tabla. Sin embargo, a menudo los orígenes pueden generar registros duplicados y se necesitan pasos de desduplicación de bajada para ocuparse de ellos. Con merge
, puede evitar insertar los registros duplicados.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
Nota:
El conjunto de datos que contiene los nuevos registros debe desduplicarse dentro de sí mismo. Por la semántica SQL de la fusión mediante combinación, coincide con los nuevos datos y los desduplica con los datos existentes en la tabla, pero si hay datos duplicados dentro del nuevo conjunto de datos, se insertan. Por lo tanto, desduplica los nuevos datos antes de combinarlos en la tabla.
Si sabe que solo puede obtener registros duplicados durante unos días, para optimizar aún más la consulta cree particiones de la tabla por fecha y, después, especifique el intervalo de fechas de la tabla de destino con el que debe coincidir.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
Esto es más eficaz que el comando anterior, ya que busca duplicados solo en los siete últimos días de registros, no en toda la tabla. Además, puede usar esta fusión mediante combinación de solo inserción con Structured Streaming para realizar la desduplicación continua de los registros.
- En una consulta de streaming, puede usar la operación Merge en
foreachBatch
para escribir continuamente los datos de streaming en una tabla Delta con desduplicación. Consulte el siguiente ejemplo de streaming para más información sobreforeachBatch
. - En otra consulta de streaming, puede leer continuamente datos desduplicados de esta tabla Delta. Esto es posible porque las fusiones mediante combinación de solo inserción solo anexan nuevos datos a la tabla Delta.
Datos de variación lenta (SCD) y captura de datos modificados (CDC) con Delta Lake
Las tablas de Delta Live tiene compatibilidad nativa con el seguimiento y la aplicación de DVL de tipo 1 y tipo 2. Use APPLY CHANGES INTO
con Delta Live Tables para asegurarse de que los registros desordenados se controlan correctamente al procesar fuentes de CDC. Consulte Las APIs APPLY CHANGES: simplificación de la captura de datos modificados con Delta Live Tables.
Sincronización incremental de la tabla Delta con el origen
En Databricks SQL, Databricks Runtime 12.2 LTS y versiones posteriores, puede usar WHEN NOT MATCHED BY SOURCE
para crear condiciones arbitrarias para eliminar y reemplazar de forma atómica una parte de una tabla. Esto puede ser especialmente útil cuando tiene una tabla de origen en la que los registros pueden cambiarse o eliminarse durante varios días después de la entrada de datos inicial, pero acaban quedándose en un estado final.
En la consulta siguiente se muestra el uso de este patrón para seleccionar cinco días de registros en el origen, actualizar los registros coincidentes en el destino, insertar nuevos registros desde el origen al destino y eliminar todos los registros no coincidentes de los últimos cinco días en el destino.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
Al proporcionar el mismo filtro booleano en las tablas de origen y de destino, puede propagar dinámicamente los cambios de las tablas de origen a las de destino, incluidas las eliminaciones.
Nota:
Aunque este patrón se puede usar sin cláusulas condicionales, esto provocaría una reescritura completa de la tabla de destino que puede ser onerosa.