Compartir a través de


Desarrollo de código de canalización con SQL

Delta Live Tables presenta varias nuevas palabras clave y funciones sql para definir vistas materializadas y tablas de streaming en canalizaciones. La compatibilidad con SQL para desarrollar canalizaciones se basa en los conceptos básicos de Spark SQL y agrega compatibilidad con la funcionalidad structured Streaming.

Los usuarios familiarizados con dataFrames de PySpark pueden preferir desarrollar código de canalización con Python. Python admite pruebas y operaciones más amplias que son difíciles de implementar con SQL, como las operaciones de metaprogramación. Consulte Desarrollo de código de canalización con Python.

Para obtener una referencia completa de la sintaxis SQL de Delta Live Tables, consulte Referencia del lenguaje SQL de Delta Live Tables.

Conceptos básicos de SQL para el desarrollo de canalizaciones

El código SQL que crea conjuntos de datos de Delta Live Tables usa la CREATE OR REFRESH sintaxis para definir vistas materializadas y tablas de streaming en los resultados de la consulta.

La STREAM palabra clave indica si el origen de datos al que se hace referencia en una SELECT cláusula debe leerse con la semántica de streaming.

El código fuente de Delta Live Tables difiere críticamente de los scripts SQL: Delta Live Tables evalúa todas las definiciones de conjuntos de datos en todos los archivos de código fuente configurados en una canalización y compila un gráfico de flujo de datos antes de que se ejecuten las consultas. El orden de las consultas que aparecen en un cuaderno o script no define el orden de ejecución.

Creación de una vista materializada con SQL

En el ejemplo de código siguiente se muestra la sintaxis básica para crear una vista materializada con SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Creación de una tabla de streaming con SQL

En el ejemplo de código siguiente se muestra la sintaxis básica para crear una tabla de streaming con SQL:

Nota:

No todos los orígenes de datos admiten lecturas de streaming y algunos orígenes de datos siempre deben procesarse con semántica de streaming.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Carga de datos desde el almacenamiento de objetos

Delta Live Tables admite la carga de datos de todos los formatos admitidos por Azure Databricks. Consulte Opciones de formato de datos.

Nota:

En estos ejemplos se usan los datos disponibles en el montaje automático en el /databricks-datasets área de trabajo. Databricks recomienda usar rutas de acceso de volumen o URI en la nube para hacer referencia a los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué son los volúmenes de Unity Catalog?.

Databricks recomienda usar el cargador automático y las tablas de streaming al configurar cargas de trabajo de ingesta incrementales en los datos almacenados en el almacenamiento de objetos en la nube. Consulte ¿Qué es Auto Loader?.

SQL usa la read_files función para invocar la funcionalidad del cargador automático. También debe usar la STREAM palabra clave para configurar una lectura de streaming con read_files.

En el ejemplo siguiente se crea una tabla de streaming a partir de archivos JSON mediante el cargador automático:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

La read_files función también admite la semántica por lotes para crear vistas materializadas. En el ejemplo siguiente se usa la semántica por lotes para leer un directorio JSON y crear una vista materializada:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Validación de datos con expectativas

Puede usar expectativas para establecer y aplicar restricciones de calidad de datos. Consulte Administración de la calidad de los datos con Delta Live Tables.

El código siguiente define una expectativa denominada valid_data que quita los registros que son NULL durante la ingesta de datos:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Consulta de vistas materializadas y tablas de streaming definidas en la canalización

Use el LIVE esquema para consultar otras vistas materializadas y tablas de streaming definidas en la canalización.

En el ejemplo siguiente se definen cuatro conjuntos de datos:

  • Una tabla de streaming denominada orders que carga datos JSON.
  • Una vista materializada denominada customers que carga datos CSV.
  • Una vista materializada denominada customer_orders que combina registros de los orders conjuntos de datos y customers convierte la marca de tiempo de pedido en una fecha y selecciona los customer_idcampos , order_number, statey order_date .
  • Vista materializada denominada daily_orders_by_state que agrega el recuento diario de pedidos para cada estado.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;