Compartir a través de


Referencia del lenguaje SQL de Delta Live Tables

En este artículo están los detalles de la interfaz de programación SQL de Delta Live Tables.

Puede usar funciones definidas por el usuario (UDF) de Python en las consultas SQL, pero debe definir estas UDF en archivos de Python antes de llamarlas en archivos de origen de SQL. Consulte Función escalar definida por el usuario: Python.

Limitaciones

No se admite la cláusula PIVOT. La operación pivot en Spark requiere una carga diligente de los datos de entrada para calcular el esquema de salida. Esta funcionalidad no se admite en Delta Live Tables.

Crear una vista materializada de Delta Live Tables o una tabla de streaming

Nota:

  • La sintaxis CREATE OR REFRESH LIVE TABLE para crear una vista materializada está en desuso. En su lugar, use CREATE OR REFRESH MATERIALIZED VIEW.
  • A fin de usar la cláusula CLUSTER BY para habilitar la agrupación en clústeres líquidos, la canalización se debe configurar para usar el canal de vista previa.

Use la misma sintaxis SQL básica al declarar una tabla de streaming o una vista materializada.

Declaración de una vista materializada de Delta Live Tables con SQL

A continuación se describe la sintaxis para declarar una vista materializada en Delta Live Tables con SQL:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Declaración de una tabla de streaming de Delta Live Tables con SQL

Solo puede declarar tablas de streaming mediante consultas que leen en un origen de streaming. Databricks recomienda usar Auto Loader para la ingesta de streaming de archivos del almacenamiento de objetos en la nube. Consulte Sintaxis SQL del cargador automático.

Al especificar otras tablas o vistas en la canalización como orígenes de streaming, debe incluir la función STREAM() en torno a un nombre de conjunto de datos.

A continuación se describe la sintaxis para declarar una tabla de streaming en Delta Live Tables con SQL:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Crear una vista Delta Live Tables

A continuación se describe la sintaxis para declarar vistas con SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Sintaxis SQL del cargador automático

A continuación se describe la sintaxis para trabajar con Auto Loader en SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Puede usar las opciones de formato admitidas con el cargador automático. Con la función map(), puede pasar opciones al método read_files(). Las opciones son pares clave-valor, donde las claves y los valores son cadenas. Para obtener más información sobre los formatos y las opciones de soporte técnico, consulte Opciones de formato de archivo.

Ejemplo: Definición de tablas

Puede crear un conjunto de datos leyendo un origen de datos externo o conjuntos de datos definidos en una canalización. Para leer desde un conjunto de datos interno, anteponga LIVE al nombre del conjunto de datos. En el ejemplo siguiente, se definen dos conjuntos de datos diferentes: una tabla denominada taxi_raw que toma un archivo JSON como origen de entrada y una tabla denominada filtered_data que toma la tabla taxi_raw como entrada:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Ejemplo: Lectura desde un origen de streaming

Para leer datos de un origen de streaming, por ejemplo, Auto Loader o un conjunto de datos interno, defina una tabla de STREAMING:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Para obtener más información sobre los datos de streaming, consulte Transformación de datos con Delta Live Tables.

Controlar cómo se materializan las tablas

Las tablas también ofrecen un control adicional de su materialización:

Nota:

En el caso de las tablas de menos de 1 TB de tamaño, Databricks recomienda permitir que Delta Live Tables controle la organización de datos. A menos que espere que la tabla crezca más allá de un terabyte, Databricks recomienda no especificar columnas de partición.

Ejemplo: especificar un esquema y columnas de partición

Opcionalmente, puede especificar un esquema al definir una tabla. En el ejemplo siguiente se especifica el esquema de la tabla de destino, incluido el uso de columnas generadas por Delta Lake y la definición de columnas de partición para la tabla:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

De manera predeterminada, Delta Live Tables deduce el esquema de la definición de tablesi no se especifica ningún esquema.

Ejemplo: definir restricciones de tabla

Nota:

La compatibilidad con Delta Live Tables para las restricciones de tabla está en versión preliminar pública. Para definir restricciones de tabla, la canalización debe ser una canalización habilitada para Unity Catalog y configurada para usar el canal preview.

Al especificar un esquema, puede definir claves principales y externas. Las restricciones son informativas y no se aplican. Consulte la cláusula CONSTRAINT en la referencia del lenguaje SQL.

En el ejemplo siguiente se define una tabla con una restricción de clave principal y externa:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Parametrización de valores usados al declarar tablas o vistas con SQL

Use SET para especificar un valor de configuración en una consulta que declare una tabla o una vista, incluidas las configuraciones de Spark. Cualquier tabla o vista que defina en un cuaderno después de la instrucción SET tiene acceso al valor definido. Las configuraciones de Spark especificadas con la instrucción SET se usan al ejecutar la consulta de Spark para cualquier tabla o vista que siga a la instrucción SET. Para leer un valor de configuración en una consulta, use la sintaxis de interpolación de cadenas ${}. En el ejemplo siguiente, se establece un valor de configuración de Spark denominado startDate y se usa ese valor en una consulta:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Para especificar varios valores de configuración, use una instrucción SET aparte para cada valor.

Ejemplo: definición de un filtro de fila y una máscara de columna

Importante

Los filtros de fila y las máscaras de columna se encuentran en versión preliminar pública.

Para crear una vista materializada o una tabla de streaming con un filtro de fila y una máscara de columna, use la cláusula ROW FILTER y la cláusula MASK. En el ejemplo siguiente se muestra cómo definir una vista materializada y una tabla de streaming con un filtro de fila y una máscara de columna:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze

Para más información sobre los filtros de fila y las máscaras de columna, vea Publicación de tablas con filtros de fila y máscaras de columna.

Propiedades de SQL

Nota:

A fin de usar la cláusula CLUSTER BY para habilitar la agrupación en clústeres líquidos, la canalización se debe configurar para usar el canal de vista previa.

CREATE TABLE o VIEW
TEMPORARY

Cree una tabla, pero no publique los metadatos de la tabla. La cláusula TEMPORARY indica a Delta Live Tables que cree una tabla que esté disponible para la canalización, pero que no sea accesible fuera de la canalización. Para reducir el tiempo de procesamiento, una tabla temporal permanece durante toda la duración de la canalización que la crea, y no solo una actualización.
STREAMING

Crea una tabla que lee un conjunto de datos de entrada como una secuencia. El conjunto de datos de entrada debe ser un origen de datos de streaming, por ejemplo, Auto Loader o una tabla de STREAMING.
CLUSTER BY

Habilite la agrupación en clústeres líquidos en la tabla y defina las columnas que se usarán como claves de agrupación en clústeres.

Consulte Uso de clústeres líquidos para tablas Delta.
PARTITIONED BY

Lista opcional de una o varias columnas que se usarán para crear particiones de la tabla.
LOCATION

Ubicación de almacenamiento opcional para los datos de la tabla. Si no se establece, el sistema establecerá de manera predeterminada la ubicación de almacenamiento de la canalización.
COMMENT

Descripción opcional de la tabla.
column_constraint

UUna clave principal informativa opcional o una restricción de clave externa en la columna.
MASK clause (Vista previa pública)

Agrega una función de máscara de columna para anonimizar datos confidenciales. Las futuras consultas de esa columna devolverán el resultado de la función evaluada en lugar del valor original de la columna. Esto resulta útil para el control de acceso específico, ya que la función puede comprobar la identidad del usuario y su pertenencia a un grupo para decidir si se va a censurar el valor.

Consulte Cláusula de máscara de columna.
table_constraint

Una clave principal informativa opcional o una restricción de clave externa en la tabla.
TBLPROPERTIES

Lista opcional de propiedades de la tabla.
WITH ROW FILTER clause (Vista previa pública)

Agrega una función de filtro de fila a la tabla. Las consultas futuras de esa tabla reciben un subconjunto de las filas para las que la función se evalúa como TRUE. Esto resulta útil para el control de acceso específico, ya que permite a la función inspeccionar la identidad y las pertenencias a grupos del usuario que realiza la invocación para decidir si se filtran determinadas filas.

Consulte la Cláusula ROW FILTER.
select_statement

Consulta de Delta Live Tables que define el conjunto de datos de la tabla.
Cláusula CONSTRAINT
EXPECT expectation_name

Define la restricción expectation_name de calidad de los datos. Si no se define la restricción ON VIOLATION, se agregan filas que infringen la restricción al conjunto de datos de destino.
ON VIOLATION

Acción opcional que debe realizarse para las filas que infringen una restricción:

- FAIL UPDATE: detenga inmediatamente la ejecución de la canalización.
- DROP ROW: quite el registro y continúe el procesamiento.

Captura de datos modificados con SQL en Delta Live Tables

Use la instrucción APPLY CHANGES INTO para usar la funcionalidad CDC de Delta Live Tables, como se describe en lo siguiente:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Las restricciones de calidad de los datos se definen para un destino de APPLY CHANGES mediante la misma cláusula CONSTRAINT que las consultas no APPLY CHANGES. Consulte Administración de la calidad de los datos con Delta Live Tables.

Nota:

El comportamiento predeterminado de los eventos INSERT y UPDATE es actualizar/insertar (upsert) los eventos de captura de datos modificados desde el origen: actualizar las filas de la tabla de destino que coincidan con las claves especificadas o insertar una fila nueva cuando no exista un registro coincidente en la tabla de destino. El control de los eventos DELETE se especifica con la condición APPLY AS DELETE WHEN.

Importante

Debe declarar una tabla de streaming de destino en la que aplicar los cambios. Opcionalmente, puede especificar el esquema de la tabla de destino. Al especificar el esquema de la tabla de destino APPLY CHANGES, también debe incluir las columnas __START_AT y __END_AT con el mismo tipo de datos que el campo sequence_by.

Consulte API APPLY CHANGES: simplificación de la captura de datos modificados con Delta Live Tables.

Cláusulas
KEYS

Columna o combinación de columnas que identifican de forma única una fila en los datos de origen. Se usa para identificar qué eventos de captura de datos modificados se aplican a registros específicos de la tabla de destino.

Para definir una combinación de columnas, use una lista de columnas separadas por comas.

Esta cláusula es obligatoria.
IGNORE NULL UPDATES

Permite la ingesta de actualizaciones que contengan un subconjunto de las columnas de destino. Cuando un evento de captura de datos modificados coincide con una fila existente y se especifica IGNORE NULL UPDATES, las columnas con null conservan sus valores existentes en el destino. Esto también se aplica a las columnas anidadas con un valor de null.

Esta cláusula es opcional.

La acción predeterminada es sobrescribir las columnas existentes con valores null.
APPLY AS DELETE WHEN

Especifica cuándo se debe tratar un evento de captura de datos modificados como DELETE en lugar de como upsert. Para controlar los datos desordenado, la fila eliminada se conserva temporalmente como marcador de exclusión en la tabla de Delta subyacente y se crea una vista en el metastore que filtra estos marcadores. El intervalo de retención se configura con la
pipelines.cdc.tombstoneGCThresholdInSeconds propiedad table.

Esta cláusula es opcional.
APPLY AS TRUNCATE WHEN

Especifica cuándo se debe tratar un evento de captura de datos modificados como TRUNCATE de tabla completa. Dado que esta cláusula desencadena un truncamiento completo de la tabla de destino, solo se debe usar para casos de uso específicos que requieran esta funcionalidad.

La cláusula APPLY AS TRUNCATE WHEN solo se admite para el tipo SCD 1. El tipo SCD 2 no admite la operación de truncamiento.

Esta cláusula es opcional.
SEQUENCE BY

Nombre de columna que especifica el orden lógico de los eventos de captura de datos modificados en los datos de origen. Delta Live Tables usa esta secuenciación para controlar los eventos de cambio que llegan desordenados.

La columna especificada debe ser un tipo de datos ordenable.

Esta cláusula es obligatoria.
COLUMNS

Especifica un subconjunto de columnas que se incluirán en la tabla de destino. Puede:

- Especifique la lista completa de columnas que se incluirán: COLUMNS (userId, name, city).
- Especifique una lista de columnas que se excluirán: COLUMNS * EXCEPT (operation, sequenceNum)

Esta cláusula es opcional.

La acción predeterminada es incluir todas las columnas de la tabla de destino cuando no se especifica la cláusula COLUMNS.
STORED AS

Indica si se van a almacenar los registros como SCD de tipo 1 o SCD de tipo 2.

Esta cláusula es opcional.

El valor predeterminado es SCD de tipo 1.
TRACK HISTORY ON

Especifica un subconjunto de columnas de salida para generar registros de historial cuando hay cambios en esas columnas especificadas. Puede:

- Especifique la lista completa de columnas que se supervisarán: COLUMNS (userId, name, city).
- Especifique una lista de columnas que se excluirán del seguimiento: COLUMNS * EXCEPT (operation, sequenceNum)

Esta cláusula es opcional. El valor predeterminado es realizar un seguimiento del historial de todas las columnas de salida cuando hay cambios equivalentes a TRACK HISTORY ON *.