Compartir vía


API APPLY CHANGES: simplificar la captura de datos modificados con Delta Live Tables

Delta Live Tables simplifica la captura de datos modificados (CDC) con las API APPLY CHANGES y APPLY CHANGES FROM SNAPSHOT. La interfaz que use depende del origen de los datos modificados:

  • Use APPLY CHANGES para procesar los cambios de una fuente de datos de cambios (CDF).
  • Use APPLY CHANGES FROM SNAPSHOT (versión preliminar pública) para procesar los cambios en las instantáneas de base de datos.

Anteriormente, la MERGE INTO instrucción se usaba normalmente para procesar registros CDC en Azure Databricks. Sin embargo, MERGE INTO puede generar resultados incorrectos debido a registros fuera de secuencia o requerir lógica compleja para volver a ordenar los registros.

La API APPLY CHANGES se admite en el SQL de Delta Live Tables y en las interfaces de Python. La API APPLY CHANGES FROM SNAPSHOT se admite en la interfaz de Python de Delta Live Tables.

Tanto APPLY CHANGES como APPLY CHANGES FROM SNAPSHOT admiten la actualización de tablas mediante el SCD de tipo 1 y tipo 2:

  • Use SCD de tipo 1 para actualizar los registros directamente. El historial no se conserva para los registros actualizados.
  • Use el SCD de tipo 2 para conservar un historial de registros, ya sea en todas las actualizaciones o en actualizaciones de un conjunto especificado de columnas.

Para ver la sintaxis y otras referencias, consulte:

Nota:

En este artículo se describe cómo actualizar tablas en la canalización de Delta Live Tables en función de los cambios en los datos de origen. Para aprender a registrar y consultar información de cambio a nivel de fila para las tablas Delta, consulte Uso de la fuente de datos de cambios de Delta Lake en Azure Databricks.

Requisitos

Para usar las API CDC, la canalización debe configurarse para usar canalizaciones DLT sin servidor o Delta Live Tables Pro o Advancedediciones.

¿Cómo se implementa la CDC con la API APPLY CHANGES?

Al controlar automáticamente los registros fuera de secuencia, la API de APPLY CHANGES en Delta Live Tables garantiza el procesamiento correcto de los registros CDC y elimina la necesidad de desarrollar lógica compleja para controlar registros fuera de secuencia. Se debe especificar una columna en los datos de origen sobre los que se secuenciarán los registros, que Delta Live Tables interpreta como una representación que crece de forma regular de la ordenación adecuada de los datos de origen. Delta Live Tables controla automáticamente los datos que llegan desordenados. Para los cambios de SCD de tipo 2, Delta Live Tables propaga los valores de secuenciación adecuados a las columnas de __START_AT y __END_AT de la tabla de destino. Debería haber una actualización distinta por clave en cada valor de secuenciación y no se admiten valores de secuenciación NULL.

Para realizar el procesamiento de CDC con APPLY CHANGES, primero debe crear una tabla de streaming y, a continuación, usar la instrucción APPLY CHANGES INTO en SQL o la función apply_changes() de Python para especificar el origen, las claves y la secuenciación de la fuente de cambios. Para crear la tabla de streaming de destino, use la instrucción CREATE OR REFRESH STREAMING TABLE en SQL o la función create_streaming_table() en Python. Consulte los ejemplos de procesamiento de SCD de tipo 1 y tipo 2.

Para obtener más información sobre la sintaxis, consulte la referencia de SQL o la referencia de Python de Delta Live Tables.

¿Cómo se implementa la CDC con la API APPLY CHANGES FROM SNAPSHOT?

Importante

La API APPLY CHANGES FROM SNAPSHOT está en versión preliminar pública.

APPLY CHANGES FROM SNAPSHOT es una API declarativa que determina eficazmente los cambios en los datos de origen comparando una serie de instantáneas en orden y, a continuación, ejecuta el procesamiento necesario para el procesamiento CDC de los registros en las instantáneas. APPLY CHANGES FROM SNAPSHOT solo es compatible con la interfaz de Python de Delta Live Tables.

APPLY CHANGES FROM SNAPSHOT admite la ingesta de instantáneas de varios tipos de origen:

  • Use la ingesta periódica de instantáneas para ingerir instantáneas de una tabla o vista existente. APPLY CHANGES FROM SNAPSHOT tiene una interfaz sencilla y simplificada para admitir la ingesta periódica de instantáneas desde un objeto de base de datos existente. Se ingiere una nueva instantánea con cada actualización de canalización y el tiempo de ingesta se usa como versión de instantánea. Cuando se ejecuta una canalización en modo continuo, se ingieren varias instantáneas con cada actualización de la canalización en un período determinado por la configuración del intervalo de desencadenador para el flujo que contiene el procesamiento APPLY CHANGES FROM SNAPSHOT.
  • Use la ingesta de instantáneas históricas para procesar archivos que contienen instantáneas de base de datos, como instantáneas generadas a partir de una base de datos Oracle o MySQL o un almacenamiento de datos.

Para realizar el procesamiento de CDC desde cualquier tipo de origen con APPLY CHANGES FROM SNAPSHOT, primero debe crear una tabla de streaming y, a continuación, usar la función apply_changes_from_snapshot() en Python para especificar la instantánea, las claves y otros argumentos necesarios para implementar el procesamiento. Consulte los ejemplos de ingesta de instantáneas periódicas e ingesta de instantáneas históricas.

Las instantáneas pasadas a la API deben estar en orden ascendente por versión. Si Delta Live Tables detecta una instantánea desordenada, se produce un error.

Para obtener más información sobre la sintaxis, consulte la referencia de Python de Delta Live Tables.

Limitaciones

La columna utilizada para la secuenciación debe ser un tipo de datos ordenable.

Ejemplo: procesamiento de SCD de tipo 1 y SCD de tipo 2 con datos de origen de CDF

En las secciones siguientes, se proporcionan ejemplos de consultas SCD de tipo 1 y de tipo 2 de Delta Live Tables que actualizan las tablas de destino en función de los eventos de origen de una fuente de datos de cambios que:

  1. Crea nuevos registros de usuario.
  2. Elimina un registro de usuario.
  3. Actualiza registros de usuario. En el ejemplo de SCD de tipo 1, las últimas UPDATE operaciones llegan tarde y se anulan en la tabla de destino, lo que demuestra el control de eventos desordenados.

En los ejemplos siguientes, se supone que está familiarizado con la configuración y actualización de canalizaciones de Delta Live Tables. Consulte Tutorial: ejecute su primera canalización de Delta Live Tables.

Para ejecutar estos ejemplos, deberá comenzar creando un conjunto de datos de ejemplo. Consulte Generación de datos de prueba.

A continuación, se muestran los registros de entrada de estos ejemplos:

userId name city operation sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancún INSERT 2
123 null null Delete 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Si quita la marca de comentario de la última final de los datos de ejemplo, se insertará el registro siguiente, que especifica dónde se deberían truncar los registros:

userId name city operation sequenceNum
null null null TRUNCATE 3

Nota:

Todos los ejemplos siguientes incluyen opciones para especificar las operaciones DELETE y TRUNCATE, pero cada una es opcional.

Proceso de actualizaciones de SCD de tipo 1

En el ejemplo siguiente, se muestra cómo procesar actualizaciones de SCD de tipo 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Después de ejecutar el ejemplo de SCD de tipo 1, la tabla de destino contiene los registros siguientes:

userId name city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancún

Después de ejecutar el ejemplo de SCD tipo 1 con el registro adicional TRUNCATE, los registros 124 y 126 se truncan por la operación TRUNCATE en sequenceNum=3 y la tabla de destino contiene el registro siguiente:

userId name city
125 Mercedes Guadalajara

Proceso de actualizaciones de SCD de tipo 2

En el ejemplo siguiente, se muestra cómo procesar actualizaciones de SCD de tipo 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Después de ejecutar el ejemplo de SCD de tipo 2, la tabla de destino contiene los registros siguientes:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancún 2 null

Una consulta de tipo 2 de SCD también puede especificar un subconjunto de columnas de salida de las que se va a realizar un seguimiento del historial en la tabla de destino. Los cambios en otras columnas se actualizan en lugar de generar nuevos registros de historial. En el ejemplo siguiente, se muestra la exclusión de la columna city del seguimiento:

En el ejemplo siguiente, se muestra el uso del historial de seguimiento con SCD de tipo 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Después de ejecutar este ejemplo sin el registro TRUNCATE adicional, la tabla de destino contiene los registros siguientes:

userId name city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancún 2 null

Generación de datos de prueba

El código siguiente se proporciona para generar un conjunto de datos de ejemplo para su uso en las consultas de ejemplo presentes de este tutorial. Suponiendo que tenga las credenciales adecuadas para crear un nuevo esquema y crear una nueva tabla, ejecute estas instrucciones con un cuaderno o Databricks SQL. El código siguiente no se diseñó para ejecutarse como parte de una canalización de Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Ejemplo: procesamiento periódico de instantáneas

En el ejemplo siguiente se muestra el procesamiento de SCD de tipo 2 que ingiere instantáneas de una tabla almacenada en mycatalog.myschema.mytable. Los resultados del procesamiento se escriben en una tabla denominada target.

mycatalog.myschema.mytable registros en la marca de tiempo 2024-01-01 00:00:00

Key Value
1 a1
2 a2

mycatalog.myschema.mytable registros en la marca de tiempo 2024-01-01 12:00:00

Key Value
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Después de procesar las instantáneas, la tabla de destino contiene los siguientes registros:

Key Value __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

Ejemplo: procesamiento histórico de instantáneas

En el ejemplo siguiente se muestra el procesamiento de SCD de tipo 2 que actualiza una tabla de destino basada en eventos de origen de dos instantáneas almacenadas en un sistema de almacenamiento en la nube:

Instantánea en timestamp, almacenada en /<PATH>/filename1.csv

Key TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

Instantánea en timestamp + 5, almacenada en /<PATH>/filename2.csv

Key TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

En el ejemplo de código siguiente, se muestra cómo procesar actualizaciones de SCD de tipo 2 con estas instantáneas:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Después de procesar las instantáneas, la tabla de destino contiene los siguientes registros:

Key TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null
3 a3 b3 2 null
4 a4 b4_new 1 null

Agregar, cambiar o eliminar datos en una tabla de streaming de destino

Si la canalización publica tablas en el catálogo de Unity, use instrucciones de lenguaje de manipulación de datos (DML), incluyendo las instrucciones insert, update, delete y merge, para modificar las tablas de streaming de destino creadas por instrucciones APPLY CHANGES INTO.

Nota:

  • No se admiten instrucciones DML que modifican el esquema de tabla de una tabla de streaming. Asegúrese de que las instrucciones DML no intenten evolucionar el esquema de la tabla.
  • Las instrucciones DML que actualizan una tabla de streaming solo se pueden ejecutar en un clúster compartido de Unity Catalog o en un almacenamiento de SQL mediante Databricks Runtime 13.3 LTS y versiones posteriores.
  • Dado que el streaming requiere orígenes de datos de solo anexión, si el procesamiento requiere streaming desde una tabla de streaming de origen con cambios (por ejemplo, por instrucciones de DML), establezca la marca skipChangeCommits al leer la tabla de streaming de origen. Cuando se establezca skipChangeCommits, se omitirán las transacciones que eliminen o modifiquen registros de la tabla de origen. Si el procesamiento no requiere una tabla de streaming, use una vista materializada (que no tiene la restricción de solo anexión) como tabla de destino.

Dado que Delta Live Tables usa una columna SEQUENCE BY especificada y propaga los valores de secuenciación adecuados a las columnas __START_AT y __END_AT de la tabla de destino (para el SCD de tipo 2), asegúrese de que las instrucciones DML usen valores válidos para estas columnas para mantener el orden adecuado de los registros. Consulte ¿Cómo se implementa CDC con la API APPLY CHANGES?.

Para obtener más información sobre el uso de instrucciones DML con tablas de streaming, consulte Agregar, cambiar o eliminar datos en una tabla de streaming.

En el ejemplo siguiente, se inserta un registro activo con una secuencia de inicio de 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Leer una fuente de distribución de datos modificados de una tabla de destino APPLY CHANGES

En Databricks Runtime 15.2 y versiones posteriores, puede leer una fuente de distribución de datos de cambios de una tabla de streaming que sea el destino de las consultas APPLY CHANGES o APPLY CHANGES FROM SNAPSHOT de la misma manera que leer una fuente de distribución de datos modificados de otras tablas Delta. A continuación se requieren para leer la fuente de distribución de datos modificados de una tabla de streaming de destino:

  • La tabla de streaming de destino debe publicarse en el catálogo de Unity. Consulte Utiliza el Catálogo Unity con tus canalizaciones de Tablas Delta Live.
  • Para leer la fuente de distribución de datos modificados de la tabla de streaming de destino, debe usar Databricks Runtime 15.2 o superior. Para leer la fuente de distribución de datos modificados en otra canalización de Delta Live Tables, la canalización debe configurarse para usar Databricks Runtime 15.2 o superior.

Puede leer la fuente de distribución de datos modificados de una tabla de streaming de destino que se creó en una canalización de Delta Live Tables de la misma manera que leer una fuente de distribución de datos modificados de otras tablas Delta. Para obtener más información sobre el uso de la funcionalidad de fuente de distribución de datos de cambios de Delta, incluidos ejemplos en Python y SQL, consulte Uso de la fuente de datos de cambios de Delta Lake en Azure Databricks.

Nota:

El registro de fuente de distribución de datos modificados incluye metadatos que identifican el tipo de evento de cambio. Cuando se actualiza un registro en una tabla, los metadatos de los registros de cambios asociados suelen incluir valores _change_type establecidos en eventos update_preimage y update_postimage.

Sin embargo, los valores _change_type son diferentes si se realizan actualizaciones en la tabla de streaming de destino que incluyen el cambio de valores de clave principal. Cuando los cambios incluyen actualizaciones de las claves principales, los campos de metadatos _change_type se establecen en eventos insert y delete. Los cambios en las claves principales pueden producirse cuando se realizan actualizaciones manuales en uno de los campos clave con una instrucción UPDATE o MERGE o, para las tablas de tipo SCD 2, cuando el campo __start_atcambia para reflejar un valor de secuencia de inicio anterior.

La consulta APPLY CHANGES determina los valores de clave principal, que difieren para el procesamiento de tipo 1 y SCD 2:

  • Para el procesamiento del tipo 1 de SCD y la interfaz de Python Delta Live Tables, la clave principal es el valor del parámetro keys en la función apply_changes(). Para la interfaz SQL de Delta Live Tables, la clave principal es las columnas definidas por la cláusula KEYS en la instrucción APPLY CHANGES INTO.
  • Para el tipo SCD 2, la clave principal es el parámetro keys o cláusula KEYS más el valor devuelto de la operación coalesce(__START_AT, __END_AT), donde __START_AT y __END_AT son las columnas correspondientes de la tabla de streaming de destino.

Obtener datos sobre los registros procesados por una consulta CDC de Delta Live Tables

Nota:

Las siguientes métricas solo se capturan mediante consultas APPLY CHANGES y no mediante consultas APPLY CHANGES FROM SNAPSHOT.

Las consultas APPLY CHANGES capturas las siguientes métricas:

  • num_upserted_rows: número de filas de salida insertadas en el conjunto de datos durante una actualización.
  • num_deleted_rows: número de filas de salida existentes eliminadas del conjunto de datos durante una actualización.

La métrica num_output_rows, que es la salida de los flujos que no son CDC, no se captura para las consultas apply changes.

¿Qué objetos de datos se usan para el procesamiento de CDC de Delta Live Tables?

Nota:

  • Estas estructuras de datos solo se aplican al procesamiento de APPLY CHANGES, no al procesamiento APPLY CHANGES FROM SNAPSHOT.
  • Estas estructuras de datos solo se aplican cuando la tabla de destino se publica en el metastore de Hive. Si una canalización se publica en el catálogo de Unity, las tablas de respaldo internas no serán accesibles para los usuarios.

Al declarar la tabla de destino en el metastore de Hive, se crean dos estructuras de datos:

  • Vista con el nombre asignado a la tabla de destino.
  • Tabla de respaldo interna usada por Delta Live Tables para administrar el procesamiento de CDC. Esta tabla se denomina anteponiendo __apply_changes_storage_ al nombre de la tabla de destino.

Por ejemplo, si declara una tabla de destino denominada dlt_cdc_target, verá una vista denominada dlt_cdc_target y una tabla denominada __apply_changes_storage_dlt_cdc_target en el metastore. La creación de una vista permite que Delta Live Tables filtre la información adicional (por ejemplo, los marcadores de exclusión y las versiones) necesaria para manipular los datos desordenados. Para ver los datos procesados, consulte la vista de destino. Dado que el esquema de la tabla __apply_changes_storage_ podría cambiar para admitir futuras características o mejoras, no se debería consultar la tabla para su uso en producción. Si agrega datos manualmente a la tabla, se presume que los registros vendrán antes que otros cambios porque faltan las columnas de versión.