Referencia del lenguaje Python de Delta Live Tables
En este artículo, están los detalles de la interfaz de programación de Python para Delta Live Tables.
Para obtener información sobre la API de SQL, consulte Referencia del lenguaje SQL de Delta Live Tables.
Para obtener más información específica sobre cómo configurar el cargador automático, consulte ¿Qué es el cargador automático?.
Antes de empezar
A continuación, se indican consideraciones importantes al implementar canalizaciones con la interfaz de Python para Delta Live Tables:
- Dado que las funciones
table()
yview()
de Python se invocan varias veces durante la planeación y ejecución de una actualización de canalización, no incluya código en una de estas funciones que pueda tener efectos secundarios (por ejemplo, código que modifica datos o envía un correo electrónico). Para evitar un comportamiento inesperado, las funciones de Python que definen conjuntos de datos deben incluir solo el código necesario para definir la tabla o vista. - Para realizar operaciones como el envío de correos electrónicos o la integración con un servicio de supervisión externo, especialmente en funciones que definen conjuntos de datos, use enlaces de eventos. La implementación de estas operaciones en las funciones que definen los conjuntos de datos provocará un comportamiento inesperado.
- Las funciones
table
yview
de Python deben devolver un objeto DataFrame. Algunas funciones que funcionan en objetos DataFrame no devuelven objetos DataFrame y no deben usarse. Estas operaciones incluyen funciones comocollect()
,count()
,toPandas()
,save()
ysaveAsTable()
. Dado que las transformaciones DataFrame se ejecutan después de que se haya resuelto el grafo de flujo de datos completo, el uso de estas operaciones podría tener efectos secundarios no deseados.
Importar el módulo Python dlt
Las funciones de Python de Delta Live Tables se definen en el módulo dlt
. Las canalizaciones implementadas con la API de Python deben importar este módulo:
import dlt
Crear una vista materializada de Delta Live Tables o una tabla de streaming
En Python, Delta Live Tables determina si se debe actualizar un conjunto de datos como una vista materializada o una tabla de streaming basada en la consulta definida. El decorador @table
se puede usar para definir vistas materializadas y tablas de streaming.
Para definir una vista materializada en Python, aplique @table
a una consulta que realice una lectura estática en un origen de datos. Para definir una tabla de streaming, aplique @table
a una consulta que realice una lectura de streaming en un origen de datos o use la función create_streaming_table(). Ambos tipos de conjunto de datos tienen la misma especificación de sintaxis que se indica a continuación:
Nota:
A fin de usar el argumento cluster_by
para habilitar la agrupación en clústeres líquidos, la canalización se debe configurar para usar el canal de vista previa.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Crear una vista Delta Live Tables
Para definir una vista en Python, aplique el decorador @view
. Al igual que el decorador @table
, puede usar vistas en Delta Live Tables para conjuntos de datos estáticos o de streaming. A continuación se muestra la sintaxis para definir vistas con Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Ejemplo: definición de tablas y vistas
Para definir una tabla o vista en Python, aplique el decorador @dlt.view
o @dlt.table
a una función. Puede usar el nombre de la función o el parámetro name
para asignar el nombre de la tabla o vista. En el ejemplo siguiente se definen dos conjuntos de datos diferentes: una vista denominada taxi_raw
que toma un archivo JSON como origen de entrada y una tabla denominada filtered_data
que toma la vista taxi_raw
como entrada:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
Ejemplo: acceso a un conjunto de datos definido en la misma canalización
Nota:
Aunque las dlt.read()
funciones y dlt.read_stream()
siguen estando disponibles y totalmente compatibles con la interfaz de Python de Delta Live Tables, Databricks recomienda usar siempre las spark.read.table()
funciones y spark.readStream.table()
debido a lo siguiente:
- Las
spark
funciones admiten la lectura de conjuntos de datos internos y externos, incluidos los conjuntos de datos en el almacenamiento externo o definidos en otras canalizaciones. Lasdlt
funciones solo admiten la lectura de conjuntos de datos internos. - Las
spark
funciones admiten la especificación de opciones, comoskipChangeCommits
, para leer las operaciones. Las funciones no admiten ladlt
especificación de opciones.
Para acceder a un conjunto de datos definido en la misma canalización, use las spark.read.table()
funciones o spark.readStream.table()
, prepending the keyword to the dataset name (Prepending the LIVE
keyword to the dataset name):
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("LIVE.customers_raw").where(...)
Ejemplo: lectura de una tabla registrada en un metastore
Para leer datos de una tabla registrada en el metastore de Hive, en el argumento de la función omita la palabra clave LIVE
y, opcionalmente, califique el nombre de la tabla con el nombre de la base de datos:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Para obtener un ejemplo de lectura desde una tabla de Unity Catalog, consulte Ingerir datos en una canalización de Unity Catalog.
Ejemplo: Acceso a un conjunto de datos mediante spark.sql
También puede devolver un conjunto de datos mediante una expresión spark.sql
en una función de consulta. Para leer desde un conjunto de datos interno, anteponga LIVE.
al nombre del conjunto de datos:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Creación de una tabla que se usará como destino de las operaciones de streaming
Use la función create_streaming_table()
para crear una tabla de destino para los registros de salida mediante operaciones de streaming, incluidos apply_changes() y apply_changes_from_snapshot(), registros de salida de @append_flow.
Nota:
Las funciones create_target_table()
y create_streaming_live_table()
están en desuso. Databricks recomienda actualizar el código existente para usar la función create_streaming_table()
.
Nota:
A fin de usar el argumento cluster_by
para habilitar la agrupación en clústeres líquidos, la canalización se debe configurar para usar el canal de vista previa.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argumentos |
---|
name Tipo: str El nombre de la tabla. Este parámetro es obligatorio. |
comment Tipo: str Descripción opcional de la tabla. |
spark_conf Tipo: dict Lista opcional de configuraciones de Spark para la ejecución de esta consulta. |
table_properties Tipo: dict Lista opcional de propiedades de la tabla. |
partition_cols Tipo: array Lista opcional de una o varias columnas que se usarán para crear particiones de la tabla. |
cluster_by Tipo: array Opcionalmente, habilite la agrupación en clústeres líquidos en la tabla y defina las columnas que se usarán como claves de agrupación en clústeres. Consulte Uso de clústeres líquidos para tablas Delta. |
path Tipo: str Ubicación de almacenamiento opcional para los datos de la tabla. Si no se establece, el sistema establece de manera predeterminada la ubicación de almacenamiento de la canalización. |
schema Tipo: str o StructType Definición de esquema opcional para la tabla. Los esquemas se pueden definir como una cadena de DDL de SQL o con un objeto Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Tipo: dict Restricciones de calidad de datos opcionales para la tabla. Consulte varias expectativas. |
row_filter (Vista previa pública)Tipo: str Una cláusula de filtro de fila opcional para la tabla. Vea Publicación de tablas con filtros de fila y máscaras de columna. |
Controlar cómo se materializan las tablas
Las tablas también ofrecen un control adicional de su materialización:
- Especificar cómo se crean particiones de las tablas con
partition_cols
. Puede usar la creación de particiones para acelerar las consultas. - Puede establecer propiedades de tabla al definir una vista o una tabla. Consulte Propiedades de la tabla Delta Live Tables.
- Establezca una ubicación de almacenamiento para los datos de la tabla mediante la configuración
path
. De manera predeterminada, los datos de la tabla se almacenan en la ubicación de almacenamiento de la canalización si la configuraciónpath
no está establecida. - Puede usar columnas generadas en la definición de esquema. Vea Ejemplo: Especificar un esquema y columnas de partición.
Nota:
En el caso de las tablas de menos de 1 TB de tamaño, Databricks recomienda permitir que Delta Live Tables controle la organización de datos. No debe especificar columnas de partición a menos que espere que la tabla crezca más allá de un terabyte.
Ejemplo: especificar un esquema y columnas de partición
Opcionalmente, puede especificar un esquema de tabla mediante un objeto StructType
de Python o una cadena de DDL de SQL. La a definición puede incluir columnas generadascuando se especifica con una cadena DDL.
En el ejemplo siguiente se crea una tabla denominada sales
con un esquema especificado mediante un Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
En el ejemplo siguiente se especifica el esquema de una tabla mediante una cadena DDL, se define una columna generada y se define una columna de partición:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
De manera predeterminada, Delta Live Tables deduce el esquema de la definición de table
si no se especifica ningún esquema.
Configurar una tabla de streaming para omitir los cambios en una tabla de streaming de origen
Nota:
- La marca
skipChangeCommits
solo funciona conspark.readStream
mediante el uso de la funciónoption()
. No puede usar esta marca en una funcióndlt.read_stream()
. - No se puede usar la marca
skipChangeCommits
cuando la tabla de streaming de origen se define como destino de una función apply_changes().
De forma predeterminada, las tablas de streaming requieren orígenes de solo anexión. Cuando una tabla de streaming usa otra tabla de streaming como origen y la tabla de streaming de origen requiere actualizaciones o eliminaciones, por ejemplo, el procesamiento del "derecho al olvido" del RGPD, la marca skipChangeCommits
se puede establecer al leer la tabla de streaming de origen para omitir esos cambios. Para obtener más información sobre esta marca, consulte Omitir actualizaciones y eliminaciones.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Ejemplo: definir restricciones de tabla
Importante
Las restricciones de tabla están en Versión preliminar pública.
Al especificar un esquema, puede definir claves principales y externas. Las restricciones son informativas y no se aplican. Consulte la cláusula CONSTRAINT en la referencia del lenguaje SQL.
En el ejemplo siguiente se define una tabla con una restricción de clave principal y externa:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Ejemplo: definición de un filtro de fila y una máscara de columna
Importante
Los filtros de fila y las máscaras de columna se encuentran en versión preliminar pública.
Para crear una vista materializada o una tabla streaming con un filtro de fila y una máscara de columna, use la cláusula ROW FILTER y la cláusula MASK. En el ejemplo siguiente se muestra cómo definir una vista materializada y una tabla streaming con un filtro de fila y una máscara de columna:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Para más información sobre los filtros de fila y las máscaras de columna, vea Publicación de tablas con filtros de fila y máscaras de columna.
Propiedades de Delta Live Tables de Python
En las tablas siguientes se describen las opciones y propiedades que puede especificar al definir tablas y vistas con Delta Live Tables:
Nota:
A fin de usar el argumento cluster_by
para habilitar la agrupación en clústeres líquidos, la canalización se debe configurar para usar el canal de vista previa.
@table o @view |
---|
name Tipo: str Nombre opcional de la tabla o la vista. Si no está definido, el nombre de la función se usa como nombre de la tabla o la vista. |
comment Tipo: str Descripción opcional de la tabla. |
spark_conf Tipo: dict Lista opcional de configuraciones de Spark para la ejecución de esta consulta. |
table_properties Tipo: dict Lista opcional de propiedades de la tabla. |
path Tipo: str Ubicación de almacenamiento opcional para los datos de la tabla. Si no se establece, el sistema establece de manera predeterminada la ubicación de almacenamiento de la canalización. |
partition_cols Tipo: a collection of str Recopilación opcional, por ejemplo, un list de una o varias columnas que se usarán para crear particiones de la tabla. |
cluster_by Tipo: array Opcionalmente, habilite la agrupación en clústeres líquidos en la tabla y defina las columnas que se usarán como claves de agrupación en clústeres. Consulte Uso de clústeres líquidos para tablas Delta. |
schema Tipo: str o StructType Definición de esquema opcional para la tabla. Los esquemas se pueden definir como una cadena de DDL de SQL o con un StructType de Python. |
temporary Tipo: bool Cree una tabla, pero no publique los metadatos de la tabla. La palabra clave temporary indica a las tablas Delta Live que creen una tabla que esté disponible para la canalización, pero que no se debe tener acceso fuera de la canalización. Para reducir el tiempo de procesamiento, una tabla temporal persiste durante la duración del pipeline que la crea y no solo una actualización.El valor predeterminado es False. |
row_filter (Vista previa pública)Tipo: str Una cláusula de filtro de fila opcional para la tabla. Vea Publicación de tablas con filtros de fila y máscaras de columna. |
Definición de tabla o vista |
---|
def <function-name>() Función de Python que define el conjunto de datos. Si no se establece el parámetro name , se usa <function-name> como nombre del conjunto de datos de destino. |
query Instrucción de Spark SQL que devuelve un DataFrame de Koalas o un conjunto de datos de Spark. Use dlt.read() o spark.read.table() para realizar una lectura completa desde un conjunto de datos definido en la misma canalización. Para leer un conjunto de datos externo, use la spark.read.table() función . No se puede usar dlt.read() para leer conjuntos de datos externos. Dado spark.read.table() que se puede usar para leer conjuntos de datos internos, conjuntos de datos definidos fuera de la canalización actual y permite especificar opciones para leer datos, Databricks recomienda usarlo en lugar de la dlt.read() función .Cuando se usa la spark.read.table() función para leer desde un conjunto de datos definido en la misma canalización, anteponga la LIVE palabra clave al nombre del conjunto de datos en el argumento de función. Por ejemplo, para leer desde un conjunto de datos denominado customers :spark.read.table("LIVE.customers") También puede usar la función spark.read.table() para leer desde una tabla registrada en el metastore; para ello, omita la palabra clave LIVE y, opcionalmente, califique el nombre de la tabla con el nombre de la base de datos:spark.read.table("sales.customers") Use dlt.read_stream() o spark.readStream.table() para realizar una lectura de streaming de un conjunto de datos definido en la misma canalización. Para realizar una lectura de streaming desde un conjunto de datos externo, usefunción spark.readStream.table() . Dado spark.readStream.table() que se puede usar para leer conjuntos de datos internos, conjuntos de datos definidos fuera de la canalización actual y permite especificar opciones para leer datos, Databricks recomienda usarlo en lugar de la dlt.read_stream() función .Para definir una consulta en una función Delta Live Tables table mediante la sintaxis SQL, use la spark.sql función . Consulte Ejemplo: Acceso a un conjunto de datos mediante spark.sql. Para definir una consulta en una función Delta Live Tables table mediante Python, use la sintaxis de PySpark . |
Expectativas |
---|
@expect("description", "constraint") Declarar una restricción de calidad de datos identificada mediante description . Si una fila infringe la expectativa, incluya la fila en el conjunto de datos de destino. |
@expect_or_drop("description", "constraint") Declarar una restricción de calidad de datos identificada mediante description . Si una fila infringe la expectativa, anule la fila en el conjunto de datos de destino. |
@expect_or_fail("description", "constraint") Declarar una restricción de calidad de datos identificada mediante description . Si una fila infringe la expectativa, detenga inmediatamente la ejecución. |
@expect_all(expectations) Declarar una o varias restricciones de calidad de datos. expectations es un diccionario de Python, donde la clave es la descripción de la expectativa y el valor es la restricción de la expectativa. Si una fila infringe algunas de las expectativas, incluya la fila en el conjunto de datos de destino. |
@expect_all_or_drop(expectations) Declarar una o varias restricciones de calidad de datos. expectations es un diccionario de Python, donde la clave es la descripción de la expectativa y el valor es la restricción de la expectativa. Si una fila infringe alguna de las expectativas, anule la fila en el conjunto de datos de destino. |
@expect_all_or_fail(expectations) Declarar una o varias restricciones de calidad de datos. expectations es un diccionario de Python, donde la clave es la descripción de la expectativa y el valor es la restricción de la expectativa. Si una fila infringe alguna de las expectativas, detenga inmediatamente la ejecución. |
Captura de datos modificados de una fuente de cambios con Python en Delta Live Tables
Use la función apply_changes()
de la API de Python para usar la funcionalidad de captura de datos modificados (CDC) de Delta Live Tables para procesar los datos de origen desde una fuente de distribución de datos modificados (CDF).
Importante
Debe declarar una tabla de streaming de destino en la que aplicar los cambios. Opcionalmente, puede especificar el esquema de la tabla de destino. Al especificar el esquema de la tabla de destino apply_changes()
, debe incluir las columnas __START_AT
y __END_AT
con el mismo tipo de datos que los campos sequence_by
.
Para crear la tabla de destino necesaria, puede usar la función create_streaming_table() en la interfaz de Python de Delta Live Tables.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Nota:
Para el procesamiento de APPLY CHANGES
, el comportamiento predeterminado de los eventos INSERT
y UPDATE
es actualizar/insertar (upsert) los eventos de captura de datos modificados desde el origen: actualizar las filas de la tabla de destino que coincidan con las claves especificadas o insertar una fila nueva cuando no exista un registro coincidente en la tabla de destino. El control de los eventos DELETE
se especifica con la condición APPLY AS DELETE WHEN
.
Para obtener más información sobre el procesamiento de CDC con una fuente de cambios, consulte Las API de APLICAR CAMBIOS: Simplificar la captura de datos modificados con Delta Live Tables. Para obtener un ejemplo del uso de la función apply_changes()
, vea Ejemplo: procesamiento de SCD tipo 1 y SCD tipo 2 con datos de origen de CDF.
Importante
Debe declarar una tabla de streaming de destino en la que aplicar los cambios. Opcionalmente, puede especificar el esquema de la tabla de destino. Al especificar el esquema de la tabla de destino apply_changes
, debe incluir las columnas __START_AT
y __END_AT
con el mismo tipo de datos que el campo sequence_by
.
Consulte API DE APLICAR CAMBIOS: simplificación de la captura de datos modificados con Delta Live Tables.
Argumentos |
---|
target Tipo: str Nombre de la tabla que se va a actualizar. Puede usar la función create_streaming_table() para crear la tabla de destino antes de ejecutar la función apply_changes() .Este parámetro es obligatorio. |
source Tipo: str Origen de datos que contiene los registros de captura de datos modificados. Este parámetro es obligatorio. |
keys Tipo: list Columna o combinación de columnas que identifican de forma única una fila en los datos de origen. Se usa para identificar qué eventos de captura de datos modificados se aplican a registros específicos de la tabla de destino. Puede especificar: - Una lista de cadenas: ["userId", "orderId"] - Una lista de funciones col() de Spark SQL: [col("userId"), col("orderId"] Los argumentos de las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId) , pero no col(source.userId) .Este parámetro es obligatorio. |
sequence_by Tipo: str o col() Nombre de columna que especifica el orden lógico de los eventos de captura de datos modificados en los datos de origen. Delta Live Tables usa esta secuenciación para controlar los eventos de cambio que llegan desordenados. Puede especificar: - Una cadena: "sequenceNum" - Una función col() de Spark SQL: col("sequenceNum") Los argumentos de las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId) , pero no col(source.userId) .La columna especificada debe ser un tipo de datos ordenable. Este parámetro es obligatorio. |
ignore_null_updates Tipo: bool Permite la ingesta de actualizaciones que contengan un subconjunto de las columnas de destino. Cuando un evento de captura de datos modificados coincide con una fila existente y ignore_null_updates es True , las columnas con null conservan sus valores existentes en el destino. Esto también se aplica a las columnas anidadas con un valor de null . Cuando ignore_null_updates es False , los valores existentes se sobrescriben con valores null .Este parámetro es opcional. El valor predeterminado es False . |
apply_as_deletes Tipo: str o expr() Especifica cuándo se debe tratar un evento de captura de datos modificados como DELETE en lugar de como upsert. Para controlar los datos desordenado, la fila eliminada se conserva temporalmente como marcador de exclusión en la tabla de Delta subyacente y se crea una vista en el metastore que filtra estos marcadores. El intervalo de retención se configura con lapipelines.cdc.tombstoneGCThresholdInSeconds propiedad table.Puede especificar: - Una cadena: "Operation = 'DELETE'" - Una función expr() de Spark SQL: expr("Operation = 'DELETE'") Este parámetro es opcional. |
apply_as_truncates Tipo: str o expr() Especifica cuándo se debe tratar un evento de captura de datos modificados como TRUNCATE de tabla completa. Dado que esta cláusula desencadena un truncamiento completo de la tabla de destino, solo se debe usar para casos de uso específicos que requieran esta funcionalidad.El parámetro apply_as_truncates solo se admite para el tipo SCD 1. El tipo SCD 2 no admite operaciones de truncamiento.Puede especificar: - Una cadena: "Operation = 'TRUNCATE'" - Una función expr() de Spark SQL: expr("Operation = 'TRUNCATE'") Este parámetro es opcional. |
column_list except_column_list Tipo: list Subconjunto de columnas que se incluirán en la tabla de destino. Use column_list para especificar la lista completa de columnas que se incluirán. Use except_column_list para especificar las columnas que se excluirán. Puede declarar los valores como lista de cadenas o como funciones col() de Spark SQL:- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") Los argumentos de las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId) , pero no col(source.userId) .Este parámetro es opcional. La acción predeterminada es incluir todas las columnas de la tabla de destino cuando no se pasa ningún argumento column_list o except_column_list a la función. |
stored_as_scd_type Tipo: str o int Indica si se van a almacenar los registros como SCD de tipo 1 o SCD de tipo 2. Establezca el valor en 1 para SCD de tipo 1 o en 2 para SCD de tipo 2.Esta cláusula es opcional. El valor predeterminado es SCD de tipo 1. |
track_history_column_list track_history_except_column_list Tipo: list Subconjunto de columnas de salida de las que se va a realizar un seguimiento del historial en la tabla de destino. Use track_history_column_list para especificar la lista completa de columnas de las que se va a realizar el seguimiento. Usartrack_history_except_column_list para especificar las columnas que se excluirán del seguimiento. Puede declarar los valores como lista de cadenas o como funciones col() de Spark SQL:- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Los argumentos de las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId) , pero no col(source.userId) .Este parámetro es opcional. La acción predeterminada es incluir todas las columnas de la tabla de destino cuando no se pasa ningún argumento track_history_column_list otrack_history_except_column_list a la función. |
Captura de datos modificados de instantáneas de base de datos con Python en Delta Live Tables
Importante
La API APPLY CHANGES FROM SNAPSHOT
está en versión preliminar pública.
Use la función apply_changes_from_snapshot()
de la API de Python para usar la funcionalidad de captura de datos modificados (CDC) de Delta Live Tables para procesar los datos de origen de las instantáneas de base de datos.
Importante
Debe declarar una tabla de streaming de destino en la que aplicar los cambios. Opcionalmente, puede especificar el esquema de la tabla de destino. Al especificar el esquema de la tabla de destino apply_changes_from_snapshot()
, también debe incluir las columnas __START_AT
y __END_AT
con el mismo tipo de datos que el campo sequence_by
.
Para crear la tabla de destino necesaria, puede usar la función create_streaming_table() en la interfaz de Python de Delta Live Tables.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Nota:
Para el procesamiento de APPLY CHANGES FROM SNAPSHOT
, el comportamiento predeterminado es insertar una nueva fila cuando un registro coincidente con la(s) misma(s) clave(s) no existe en el destino. Si existe un registro coincidente, solo se actualiza si alguno de los valores de la fila ha cambiado. Se eliminan las filas con claves presentes en el destino, pero que ya no están presentes en el origen.
Para obtener más información sobre el procesamiento de CDC con instantáneas, consulte Las API de APLICAR CAMBIOS: Simplificar la captura de datos modificados con Delta Live Tables. Para obtener ejemplos de uso de la función apply_changes_from_snapshot()
, consulte ejemplos de ingesta de instantáneas periódica e ingesta de instantáneas.
Argumentos |
---|
target Tipo: str Nombre de la tabla que se va a actualizar. Puede usar la función create_streaming_table() para crear la tabla de destino antes de ejecutar la función apply_changes() .Este parámetro es obligatorio. |
source Tipo: str o lambda function Nombre de una tabla o vista para realizar instantáneas periódicamente o una función lambda de Python que devuelve la instantánea DataFrame que se va a procesar y la versión de la instantánea. Consulte Implementar el argumento de origen. Este parámetro es obligatorio. |
keys Tipo: list Columna o combinación de columnas que identifican de forma única una fila en los datos de origen. Se usa para identificar qué eventos de captura de datos modificados se aplican a registros específicos de la tabla de destino. Puede especificar: - Una lista de cadenas: ["userId", "orderId"] - Una lista de funciones col() de Spark SQL: [col("userId"), col("orderId"] Los argumentos de las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId) , pero no col(source.userId) .Este parámetro es obligatorio. |
stored_as_scd_type Tipo: str o int Indica si se van a almacenar los registros como SCD de tipo 1 o SCD de tipo 2. Establezca el valor en 1 para SCD de tipo 1 o en 2 para SCD de tipo 2.Esta cláusula es opcional. El valor predeterminado es SCD de tipo 1. |
track_history_column_list track_history_except_column_list Tipo: list Subconjunto de columnas de salida de las que se va a realizar un seguimiento del historial en la tabla de destino. Use track_history_column_list para especificar la lista completa de columnas de las que se va a realizar el seguimiento. Usartrack_history_except_column_list para especificar las columnas que se excluirán del seguimiento. Puede declarar los valores como lista de cadenas o como funciones col() de Spark SQL:- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Los argumentos de las funciones col() no pueden incluir calificadores. Por ejemplo, puede usar col(userId) , pero no col(source.userId) .Este parámetro es opcional. La acción predeterminada es incluir todas las columnas de la tabla de destino cuando no se pasa ningún argumento track_history_column_list otrack_history_except_column_list a la función. |
Implementar el argumento source
La función apply_changes_from_snapshot()
incluye el argumento source
. Para procesar instantáneas históricas, se espera que el argumento source
sea una función lambda de Python que devuelva dos valores a la función apply_changes_from_snapshot()
: un DataFrame de Python que contiene los datos de instantánea que se van a procesar y una versión de instantánea.
A continuación se muestra la firma de la función lambda:
lambda Any => Optional[(DataFrame, Any)]
- El argumento de la función lambda es la versión de la instantánea procesada más recientemente.
- El valor devuelto de la función lambda es
None
o una tupla de dos valores: el primer valor de la tupla es un DataFrame que contiene la instantánea que se va a procesar. El segundo valor de la tupla es la versión de instantánea que representa el orden lógico de la instantánea.
Ejemplo que implementa y llama a la función lambda:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
El runtime de Delta Live Tables realiza los pasos siguientes cada vez que se desencadena la canalización que contiene la función apply_changes_from_snapshot()
:
- Ejecuta la función
next_snapshot_and_version
para cargar el DataFrame de instantánea siguiente y la versión de instantánea correspondiente. - Si no devuelve ningún DataFrame, la ejecución finaliza y la actualización de la canalización se marca como completa.
- Detecta los cambios en la nueva instantánea y los aplica incrementalmente a la tabla de destino.
- Devuelve al paso nro. 1 para cargar la siguiente instantánea y su versión.
Limitaciones
La interfaz de Python para Delta Live Tables tiene las siguientes limitaciones:
No se admite el uso de la función pivot()
. La operación pivot
en Spark requiere una carga diligente de los datos de entrada para calcular el esquema de salida. Esta funcionalidad no se admite en Delta Live Tables.