แชร์ผ่าน


Develop pipeline code with SQL

Delta Live Tables introduces several new SQL keywords and functions for defining materialized views and streaming tables in pipelines. SQL support for developing pipelines builds upon the basics of Spark SQL and adds support for Structured Streaming functionality.

Users familiar with PySpark DataFrames might prefer developing pipeline code with Python. Python supports more extensive testing and operations that are challenging to implement with SQL, such as metaprogramming operations. See Develop pipeline code with Python.

For a full reference of Delta Live Tables SQL syntax, see Delta Live Tables SQL language reference.

Basics of SQL for pipeline development

SQL code that creates Delta Live Tables datasets uses the CREATE OR REFRESH syntax to define materialized views and streaming tables against query results.

The STREAM keyword indicates if the data source referenced in a SELECT clause should be read with streaming semantics.

Delta Live Tables source code critically differs from SQL scripts: Delta Live Tables evaluates all dataset definitions across all source code files configured in a pipeline and builds a dataflow graph before any queries are run. The order of queries appearing in a notebook or script does not define the order of execution.

Create a materialized view with SQL

The following code example demonstrates the basic syntax for creating a materialized view with SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Create a streaming table with SQL

The following code example demonstrates the basic syntax for creating a streaming table with SQL:

Note

Not all data sources support streaming reads, and some data sources should always be processed with streaming semantics.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Load data from object storage

Delta Live Tables supports loading data from all formats supported by Azure Databricks. See Data format options.

Note

These examples use data available under the /databricks-datasets automatically mounted to your workspace. Databricks recommends using volume paths or cloud URIs to reference data stored in cloud object storage. See What are Unity Catalog volumes?.

Databricks recommends using Auto Loader and streaming tables when configuring incremental ingestion workloads against data stored in cloud object storage. See What is Auto Loader?.

SQL uses the read_files function to invoke Auto Loader functionality. You must also use the STREAM keyword to configure a streaming read with read_files.

The following example creates a streaming table from JSON files using Auto Loader:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

The read_files function also supports batch semantics to create materialized views. The following example uses batch semantics to read a JSON directory and create a materialized view:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Validate data with expectations

You can use expectations to set and enforce data quality constraints. See Manage data quality with Delta Live Tables.

The following code defines an expectation named valid_data that drops records that are null during data ingestion:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Query materialized views and streaming tables defined in your pipeline

Use the LIVE schema to query other materialized views and streaming tables defined in your pipeline.

The following example defines four datasets:

  • A streaming table named orders that loads JSON data.
  • A materialized view named customers that loads CSV data.
  • A materialized view named customer_orders that joins records from the orders and customers datasets, casts the order timestamp to a date, and selects the customer_id, order_number, state, and order_date fields.
  • A materialized view named daily_orders_by_state that aggregates the daily count of orders for each state.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;