Develop pipeline code with SQL
Delta Live Tables introduces several new SQL keywords and functions for defining materialized views and streaming tables in pipelines. SQL support for developing pipelines builds upon the basics of Spark SQL and adds support for Structured Streaming functionality.
Users familiar with PySpark DataFrames might prefer developing pipeline code with Python. Python supports more extensive testing and operations that are challenging to implement with SQL, such as metaprogramming operations. See Develop pipeline code with Python.
For a full reference of Delta Live Tables SQL syntax, see Delta Live Tables SQL language reference.
Basics of SQL for pipeline development
SQL code that creates Delta Live Tables datasets uses the CREATE OR REFRESH
syntax to define materialized views and streaming tables against query results.
The STREAM
keyword indicates if the data source referenced in a SELECT
clause should be read with streaming semantics.
Delta Live Tables source code critically differs from SQL scripts: Delta Live Tables evaluates all dataset definitions across all source code files configured in a pipeline and builds a dataflow graph before any queries are run. The order of queries appearing in a notebook or script does not define the order of execution.
Create a materialized view with SQL
The following code example demonstrates the basic syntax for creating a materialized view with SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Create a streaming table with SQL
The following code example demonstrates the basic syntax for creating a streaming table with SQL:
Note
Not all data sources support streaming reads, and some data sources should always be processed with streaming semantics.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Load data from object storage
Delta Live Tables supports loading data from all formats supported by Azure Databricks. See Data format options.
Note
These examples use data available under the /databricks-datasets
automatically mounted to your workspace. Databricks recommends using volume paths or cloud URIs to reference data stored in cloud object storage. See What are Unity Catalog volumes?.
Databricks recommends using Auto Loader and streaming tables when configuring incremental ingestion workloads against data stored in cloud object storage. See What is Auto Loader?.
SQL uses the read_files
function to invoke Auto Loader functionality. You must also use the STREAM
keyword to configure a streaming read with read_files
.
The following example creates a streaming table from JSON files using Auto Loader:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
The read_files
function also supports batch semantics to create materialized views. The following example uses batch semantics to read a JSON directory and create a materialized view:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Validate data with expectations
You can use expectations to set and enforce data quality constraints. See Manage data quality with Delta Live Tables.
The following code defines an expectation named valid_data
that drops records that are null during data ingestion:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Query materialized views and streaming tables defined in your pipeline
Use the LIVE
schema to query other materialized views and streaming tables defined in your pipeline.
The following example defines four datasets:
- A streaming table named
orders
that loads JSON data. - A materialized view named
customers
that loads CSV data. - A materialized view named
customer_orders
that joins records from theorders
andcustomers
datasets, casts the order timestamp to a date, and selects thecustomer_id
,order_number
,state
, andorder_date
fields. - A materialized view named
daily_orders_by_state
that aggregates the daily count of orders for each state.
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;