Delta Live Tables SQL language reference

This article has details for the Delta Live Tables SQL programming interface.

You can use Python user-defined functions (UDFs) in your SQL queries, but you must define these UDFs in Python files before calling them in SQL source files. See User-defined scalar functions - Python.

Limitations

The PIVOT clause is not supported. The pivot operation in Spark requires the eager loading of input data to compute the output schema. This capability is not supported in Delta Live Tables.

Create a Delta Live Tables materialized view or streaming table

Note

The CREATE OR REFRESH LIVE TABLE syntax to create a materialized view is deprecated. Instead, use CREATE OR REFRESH MATERIALIZED VIEW.

You use the same basic SQL syntax when declaring a streaming table or a materialized view.

Declare a Delta Live Tables materialized view with SQL

The following describes the syntax for declaring a materialized view in Delta Live Tables with SQL:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Declare a Delta Live Tables streaming table with SQL

You can only declare streaming tables using queries that read against a streaming source. Databricks recommends using Auto Loader for streaming ingestion of files from cloud object storage. See Auto Loader SQL syntax.

When specifying other tables or views in your pipeline as streaming sources, you must include the STREAM() function around a dataset name.

The following describes the syntax for declaring a Streaming table in Delta Live Tables with SQL:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Create a Delta Live Tables view

The following describes the syntax for declaring views with SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Auto Loader SQL syntax

The following describes the syntax for working with Auto Loader in SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

You can use supported format options with Auto Loader. Using the map() function, you can pass options to the read_files() method. Options are key-value pairs, where the keys and values are strings. For details on support formats and options, see File format options.

Example: Define tables

You can create a dataset by reading from an external data source or from datasets defined in a pipeline. To read from an internal dataset, prepend the LIVE keyword to the dataset name. The following example defines two different datasets: a table called taxi_raw that takes a JSON file as the input source and a table called filtered_data that takes the taxi_raw table as input:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Example: Read from a streaming source

To read data from a streaming source, for example, Auto Loader or an internal dataset, define a STREAMING table:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

For more information on streaming data, see Transform data with Delta Live Tables.

Control how tables are materialized

Tables also offer additional control of their materialization:

Note

For tables less than 1 TB in size, Databricks recommends letting Delta Live Tables control data organization. Unless you expect your table to grow beyond a terabyte, Databricks recommends that you do not specify partition columns.

Example: Specify a schema and partition columns

You can optionally specify a schema when you define a table. The following example specifies the schema for the target table, including using Delta Lake generated columns and defining partition columns for the table:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

By default, Delta Live Tables infers the schema from the table definition if you don’t specify a schema.

Example: Define table constraints

Note

Delta Live Tables support for table constraints is in Public Preview. To define table constraints, your pipeline must be a Unity Catalog-enabled pipeline and configured to use the preview channel.

When specifying a schema, you can define primary and foreign keys. The constraints are informational and are not enforced. See the CONSTRAINT clause in the SQL language reference.

The following example defines a table with a primary and foreign key constraint:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Parameterize values used when declaring tables or views with SQL

Use SET to specify a configuration value in a query that declares a table or view, including Spark configurations. Any table or view you define in a notebook after the SET statement has access to the defined value. Any Spark configurations specified using the SET statement are used when executing the Spark query for any table or view following the SET statement. To read a configuration value in a query, use the string interpolation syntax ${}. The following example sets a Spark configuration value named startDate and uses that value in a query:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

To specify multiple configuration values, use a separate SET statement for each value.

Example: Define a row filter and column mask

Important

Row filters and column masks are in Public Preview.

To create a materialized view or Streaming table with a row filter and column mask, use the ROW FILTER clause and the MASK clause. The following example demonstrates how to define a materialized view and a Streaming table with both a row filter and a column mask:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze

For more information on row filters and column masks, see Publish tables with row filters and column masks.

SQL properties

CREATE TABLE or VIEW
TEMPORARY

Create a table but do not publish metadata for the table. The TEMPORARY clause instructs Delta Live Tables to create a table that is available to the pipeline but should not be accessed outside the pipeline. To reduce processing time, a temporary table persists for the lifetime of the pipeline that creates it, and not just a single update.
STREAMING

Create a table that reads an input dataset as a stream. The input dataset must be a streaming data source, for example, Auto Loader or a STREAMING table.
CLUSTER BY

Enable liquid clustering on the table and define the columns to use as clustering keys.

See Use liquid clustering for Delta tables.
PARTITIONED BY

An optional list of one or more columns to use for partitioning the table.
LOCATION

An optional storage location for table data. If not set, the system will default to the pipeline storage location.
COMMENT

An optional description for the table.
column_constraint

An optional informational primary key or foreign key constraint on the column.
MASK clause (Public Preview)

Adds a column mask function to anonymize sensitive data. Future queries for that column return the evaluated function’s result instead of the column’s original value. This is useful for fine-grained access control, because the function can check the user’s identity and group memberships to decide whether to redact the value.

See Column mask clause.
table_constraint

An optional informational primary key or foreign key constraint on the table.
TBLPROPERTIES

An optional list of table properties for the table.
WITH ROW FILTER clause (Public Preview)

Adds a row filter function to the table. Future queries for that table receive a subset of the rows for which the function evaluates to TRUE. This is useful for fine-grained access control, because it allows the function to inspect the identity and group memberships of the invoking user to decide whether to filter certain rows.

See ROW FILTER clause.
select_statement

A Delta Live Tables query that defines the dataset for the table.
CONSTRAINT clause
EXPECT expectation_name

Define data quality constraint expectation_name. If the ON VIOLATION constraint is not defined, add rows that violate the constraint to the target dataset.
ON VIOLATION

Optional action to take for failed rows:

- FAIL UPDATE: Immediately stop pipeline execution.
- DROP ROW: Drop the record and continue processing.

Change data capture with SQL in Delta Live Tables

Use the APPLY CHANGES INTO statement to use Delta Live Tables CDC functionality, as described in the following:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

You define data quality constraints for an APPLY CHANGES target using the same CONSTRAINT clause as non-APPLY CHANGES queries. See Manage data quality with pipeline expectations.

Note

The default behavior for INSERT and UPDATE events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE events can be specified with the APPLY AS DELETE WHEN condition.

Important

You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. When specifying the schema of the APPLY CHANGES target table, you must also include the __START_AT and __END_AT columns with the same data type as the sequence_by field.

See The APPLY CHANGES APIs: Simplify change data capture with Delta Live Tables.

Clauses
KEYS

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

To define a combination of columns, use a comma-separated list of columns.

This clause is required.
IGNORE NULL UPDATES

Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and IGNORE NULL UPDATES is specified, columns with a null will retain their existing values in the target. This also applies to nested columns with a value of null.

This clause is optional.

The default is to overwrite existing columns with null values.
APPLY AS DELETE WHEN

Specifies when a CDC event should be treated as a DELETE rather than an upsert. To handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with the
pipelines.cdc.tombstoneGCThresholdInSeconds table property.

This clause is optional.
APPLY AS TRUNCATE WHEN

Specifies when a CDC event should be treated as a full table TRUNCATE. Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.

The APPLY AS TRUNCATE WHEN clause is supported only for SCD type 1. SCD type 2 does not support the truncate operation.

This clause is optional.
SEQUENCE BY

The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order.

Specified column must be a sortable data type.

This clause is required.
COLUMNS

Specifies a subset of columns to include in the target table. You can either:

- Specify the complete list of columns to include: COLUMNS (userId, name, city).
- Specify a list of columns to exclude: COLUMNS * EXCEPT (operation, sequenceNum)

This clause is optional.

The default is to include all columns in the target table when the COLUMNS clause is not specified.
STORED AS

Whether to store records as SCD type 1 or SCD type 2.

This clause is optional.

The default is SCD type 1.
TRACK HISTORY ON

Specifies a subset of output columns to generate history records when there are any changes to those specified columns. You can either:

- Specify the complete list of columns to track: COLUMNS (userId, name, city).
- Specify a list of columns to be excluded from tracking: COLUMNS * EXCEPT (operation, sequenceNum)

This clause is optional. The default is to track history for all the output columns when there are any changes, equivalent to TRACK HISTORY ON *.