Delta Live Tables SQL language reference
This article has details for the Delta Live Tables SQL programming interface.
- For information on the Python API, see the Delta Live Tables Python language reference.
- For more information about SQL commands, see SQL language reference.
You can use Python user-defined functions (UDFs) in your SQL queries, but you must define these UDFs in Python files before calling them in SQL source files. See User-defined scalar functions - Python.
Limitations
The PIVOT
clause is not supported. The pivot
operation in Spark requires the eager loading of input data to compute the output schema. This capability is not supported in Delta Live Tables.
Create a Delta Live Tables materialized view or streaming table
Note
The CREATE OR REFRESH LIVE TABLE
syntax to create a materialized view is deprecated. Instead, use CREATE OR REFRESH MATERIALIZED VIEW
.
You use the same basic SQL syntax when declaring a streaming table or a materialized view.
Declare a Delta Live Tables materialized view with SQL
The following describes the syntax for declaring a materialized view in Delta Live Tables with SQL:
CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Declare a Delta Live Tables streaming table with SQL
You can only declare streaming tables using queries that read against a streaming source. Databricks recommends using Auto Loader for streaming ingestion of files from cloud object storage. See Auto Loader SQL syntax.
When specifying other tables or views in your pipeline as streaming sources, you must include the STREAM()
function around a dataset name.
The following describes the syntax for declaring a Streaming table in Delta Live Tables with SQL:
CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Create a Delta Live Tables view
The following describes the syntax for declaring views with SQL:
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
Auto Loader SQL syntax
The following describes the syntax for working with Auto Loader in SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
You can use supported format options with Auto Loader. Using the map()
function, you can pass options to the read_files()
method. Options are key-value pairs, where the keys and values are strings. For details on support formats and options, see File format options.
Example: Define tables
You can create a dataset by reading from an external data source or from datasets defined in a pipeline. To read from an internal dataset, prepend the LIVE
keyword to the dataset name. The following example defines two different datasets: a table called taxi_raw
that takes a JSON file as the input source and a table called filtered_data
that takes the taxi_raw
table as input:
CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
Example: Read from a streaming source
To read data from a streaming source, for example, Auto Loader or an internal dataset, define a STREAMING
table:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
For more information on streaming data, see Transform data with Delta Live Tables.
Control how tables are materialized
Tables also offer additional control of their materialization:
- Specify how tables are partitioned using
PARTITIONED BY
. You can use partitioning to speed up queries. - You can set table properties using
TBLPROPERTIES
. See Delta Live Tables table properties. - Set a storage location using the
LOCATION
setting. By default, table data is stored in the pipeline storage location ifLOCATION
isn’t set. - You can use generated columns in your schema definition. See Example: Specify a schema and partition columns.
Note
For tables less than 1 TB in size, Databricks recommends letting Delta Live Tables control data organization. Unless you expect your table to grow beyond a terabyte, Databricks recommends that you do not specify partition columns.
Example: Specify a schema and partition columns
You can optionally specify a schema when you define a table. The following example specifies the schema for the target table, including using Delta Lake generated columns and defining partition columns for the table:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
By default, Delta Live Tables infers the schema from the table
definition if you don’t specify a schema.
Example: Define table constraints
Note
Delta Live Tables support for table constraints is in Public Preview. To define table constraints, your pipeline must be a Unity Catalog-enabled pipeline and configured to use the preview
channel.
When specifying a schema, you can define primary and foreign keys. The constraints are informational and are not enforced. See the CONSTRAINT clause in the SQL language reference.
The following example defines a table with a primary and foreign key constraint:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Parameterize values used when declaring tables or views with SQL
Use SET
to specify a configuration value in a query that declares a table or view, including Spark configurations. Any table or view you define in a notebook after the SET
statement has access to the defined value. Any Spark configurations specified using the SET
statement are used when executing the Spark query for any table or view following the SET statement. To read a configuration value in a query, use the string interpolation syntax ${}
. The following example sets a Spark configuration value named startDate
and uses that value in a query:
SET startDate='2020-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
To specify multiple configuration values, use a separate SET
statement for each value.
Example: Define a row filter and column mask
Important
Row filters and column masks are in Public Preview.
To create a materialized view or Streaming table with a row filter and column mask, use the ROW FILTER clause and the MASK clause. The following example demonstrates how to define a materialized view and a Streaming table with both a row filter and a column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)
CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze
For more information on row filters and column masks, see Publish tables with row filters and column masks.
SQL properties
CREATE TABLE or VIEW |
---|
TEMPORARY Create a table but do not publish metadata for the table. The TEMPORARY clause instructs Delta Live Tables to create a table that is available to the pipeline but should not be accessed outside the pipeline. To reduce processing time, a temporary table persists for the lifetime of the pipeline that creates it, and not just a single update. |
STREAMING Create a table that reads an input dataset as a stream. The input dataset must be a streaming data source, for example, Auto Loader or a STREAMING table. |
CLUSTER BY Enable liquid clustering on the table and define the columns to use as clustering keys. See Use liquid clustering for Delta tables. |
PARTITIONED BY An optional list of one or more columns to use for partitioning the table. |
LOCATION An optional storage location for table data. If not set, the system will default to the pipeline storage location. |
COMMENT An optional description for the table. |
column_constraint An optional informational primary key or foreign key constraint on the column. |
MASK clause (Public Preview)Adds a column mask function to anonymize sensitive data. Future queries for that column return the evaluated function’s result instead of the column’s original value. This is useful for fine-grained access control, because the function can check the user’s identity and group memberships to decide whether to redact the value. See Column mask clause. |
table_constraint An optional informational primary key or foreign key constraint on the table. |
TBLPROPERTIES An optional list of table properties for the table. |
WITH ROW FILTER clause (Public Preview)Adds a row filter function to the table. Future queries for that table receive a subset of the rows for which the function evaluates to TRUE. This is useful for fine-grained access control, because it allows the function to inspect the identity and group memberships of the invoking user to decide whether to filter certain rows. See ROW FILTER clause. |
select_statement A Delta Live Tables query that defines the dataset for the table. |
CONSTRAINT clause |
---|
EXPECT expectation_name Define data quality constraint expectation_name . If the ON VIOLATION constraint is not defined, add rows that violate the constraint to the target dataset. |
ON VIOLATION Optional action to take for failed rows: - FAIL UPDATE : Immediately stop pipeline execution.- DROP ROW : Drop the record and continue processing. |
Change data capture with SQL in Delta Live Tables
Use the APPLY CHANGES INTO
statement to use Delta Live Tables CDC functionality, as described in the following:
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
You define data quality constraints for an APPLY CHANGES
target using the same CONSTRAINT
clause as non-APPLY CHANGES
queries. See Manage data quality with pipeline expectations.
Note
The default behavior for INSERT
and UPDATE
events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE
events can be specified with the APPLY AS DELETE WHEN
condition.
Important
You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. When specifying the schema of the APPLY CHANGES
target table, you must also include the __START_AT
and __END_AT
columns with the same data type as the sequence_by
field.
See The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables.
Clauses |
---|
KEYS The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. To define a combination of columns, use a comma-separated list of columns. This clause is required. |
IGNORE NULL UPDATES Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and IGNORE NULL UPDATES is specified, columns with a null will retain their existing values in the target. This also applies to nested columns with a value of null .This clause is optional. The default is to overwrite existing columns with null values. |
APPLY AS DELETE WHEN Specifies when a CDC event should be treated as a DELETE rather than an upsert. To handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with thepipelines.cdc.tombstoneGCThresholdInSeconds table property.This clause is optional. |
APPLY AS TRUNCATE WHEN Specifies when a CDC event should be treated as a full table TRUNCATE . Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.The APPLY AS TRUNCATE WHEN clause is supported only for SCD type 1. SCD type 2 does not support the truncate operation.This clause is optional. |
SEQUENCE BY The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order. Specified column must be a sortable data type. This clause is required. |
COLUMNS Specifies a subset of columns to include in the target table. You can either: - Specify the complete list of columns to include: COLUMNS (userId, name, city) .- Specify a list of columns to exclude: COLUMNS * EXCEPT (operation, sequenceNum) This clause is optional. The default is to include all columns in the target table when the COLUMNS clause is not specified. |
STORED AS Whether to store records as SCD type 1 or SCD type 2. This clause is optional. The default is SCD type 1. |
TRACK HISTORY ON Specifies a subset of output columns to generate history records when there are any changes to those specified columns. You can either: - Specify the complete list of columns to track: COLUMNS (userId, name, city) .- Specify a list of columns to be excluded from tracking: COLUMNS * EXCEPT (operation, sequenceNum) This clause is optional. The default is to track history for all the output columns when there are any changes, equivalent to TRACK HISTORY ON * . |