CREATE STREAMING TABLE

Applies to: check marked yes Databricks SQL

Creates a streaming table, a Delta table with extra support for streaming or incremental data processing.

Streaming tables are only supported in Delta Live Tables and on Databricks SQL with Unity Catalog. Running this command on supported Databricks Runtime compute only parses the syntax. See Develop pipeline code with SQL.

Syntax

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

Parameters

  • REFRESH

    If specified, refreshes the table with the latest data available from the sources defined in the query. Only new data that arrives before the query starts is processed. New data that gets added to the sources during the execution of the command is ignored until the next refresh. The refresh operation from CREATE OR REFRESH is fully declarative. If a refresh command does not specify all metadata from the original table creation statement, the unspecified metadata is deleted.

  • IF NOT EXISTS

    Creates the streaming table if it does not exist. If a table by this name already exists, the CREATE STREAMING TABLE statement is ignored.

    You may specify at most one of IF NOT EXISTS or OR REFRESH.

  • table_name

    The name of the table to be created. The name must not include a temporal specification or options specification. If the name is not qualified the table is created in the current schema.

  • table_specification

    This optional clause defines the list of columns, their types, properties, descriptions, and column constraints.

    If you do not define columns in the table schema you must specify AS query.

    • column_identifier

      A unique name for the column.

      • column_type

        Specifies the data type of the column.

      • NOT NULL

        If specified the column does not accept NULL values.

      • COMMENT column_comment

        A string literal to describe the column.

      • column_constraint

        Important

        This feature is in Public Preview.

        Adds a primary key or foreign key constraint to the column in a streaming table. Constraints are not supported for tables in the hive_metastore catalog.

      • MASK clause

        Important

        This feature is in Public Preview.

        Adds a column mask function to anonymize sensitive data. All subsequent queries from that column receive the result of evaluating that function over the column in place of the column’s original value. This can be useful for fine-grained access control purposes where the function can inspect the identity or group memberships of the invoking user to decide whether to redact the value.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        Adds data quality expectations to the table. These data quality expectations can be tracked over time and accessed through the streaming table’s event log. A FAIL UPDATE expectation causes the processing to fail when both creating the table as well as refreshing the table. A DROP ROW expectation causes the entire row to be dropped if the expectation is not met.

        expectation_expr may be composed of literals, column identifiers within the table, and deterministic, built-in SQL functions or operators except:

        Also expr must not contain any subquery.

      • table_constraint

        Important

        This feature is in Public Preview.

        Adds an informational primary key or informational foreign key constraints to a streaming table. Key constraints are not supported for tables in the hive_metastore catalog.

  • table_clauses

    Optionally specify partitioning, comments, user defined properties, and a refresh schedule for the new table. Each sub clause may only be specified once.

    • PARTITIONED BY

      An optional list of columns of the table to partition the table by.

    • COMMENT table_comment

      A STRING literal to describe the table.

    • TBLPROPERTIES

      Optionally sets one or more user defined properties.

      Use this setting to specify the Delta Live Tables runtime channel used to run this statement. Set the value of the pipelines.channel property to "PREVIEW" or "CURRENT". The default value is "CURRENT". For more information about Delta Live Tables channels, see Delta Live Tables runtime channels.

    • SCHEDULE [ REFRESH ] schedule_clause

      • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

        Important

        This feature is in Public Preview.

        To schedule a refresh that occurs periodically, use EVERY syntax. If EVERY syntax is specified, the streaming table or materialized view is refreshed periodically at the specified interval based on the provided value, such as HOUR, HOURS, DAY, DAYS, WEEK, or WEEKS. The following table lists accepted integer values for number.

        Time unit Integer value
        HOUR or HOURS 1 <= H <= 72
        DAY or DAYS 1 <= D <= 31
        WEEK or WEEKS 1 <= W <= 8

        Note

        The singular and plural forms of the included time unit are semantically equivalent.

      • CRON cron_string [ AT TIME ZONE timezone_id ]

        To schedule a refresh using a quartz cron value. Valid time_zone_values are accepted. AT TIME ZONE LOCAL is not supported.

        If AT TIME ZONE is absent, the session time zone is used. If AT TIME ZONE is absent and the session time zone is not set, an error is thrown. SCHEDULE is semantically equivalent to SCHEDULE REFRESH.

      The schedule can be provided as part of the CREATE command. Use ALTER STREAMING TABLE or run CREATE OR REFRESH command with SCHEDULE clause to alter the schedule of a streaming table after creation.

    • WITH ROW FILTER clause

      Important

      This feature is in Public Preview.

      Adds a row filter function to the table. All subsequent queries from that table receive a subset of the rows where the function evaluates to boolean TRUE. This can be useful for fine-grained access control purposes where the function can inspect the identity or group memberships of the invoking user to decide whether to filter certain rows.

  • AS query

    This clause populates the table using the data from query. This query must be a streaming query. This can be achieved by adding the STREAM keyword to any relation you want to process incrementally. When you specify a query and a table_specification together, the table schema specified in table_specification must contain all the columns returned by the query, otherwise you get an error. Any columns specified in table_specification but not returned by query return null values when queried.

Differences between streaming tables and other tables

Streaming tables are stateful tables, designed to handle each row only once as you process a growing dataset. Because most datasets grow continuously over time, streaming tables are good for most ingestion workloads. Streaming tables are optimal for pipelines that require data freshness and low latency. Streaming tables can also be useful for massive scale transformations, as results can be incrementally calculated as new data arrives, keeping results up to date without needing to fully recompute all source data with each update. Streaming tables are designed for data sources that are append-only.

Streaming tables accept additional commands such as REFRESH, which processes the latest data available in the sources provided in the query. Changes to the provided query only get reflected on new data by calling a REFRESH, not previously processed data. To apply the changes on existing data as well, you need to execute REFRESH TABLE <table_name> FULL to perform a FULL REFRESH. Full refreshes re-process all data available in the source with the latest definition. It is not recommended to call full refreshes on sources that don’t keep the entire history of the data or have short retention periods, such as Kafka, as the full refresh truncates the existing data. You may not be able to recover old data if the data is no longer available in the source.

Row filters and column masks

Important

This feature is in Public Preview.

Row filters let you specify a function that applies as a filter whenever a table scan fetches rows. These filters ensure that subsequent queries only return rows for which the filter predicate evaluates to true.

Column masks let you mask a column’s values whenever a table scan fetches rows. All future queries involving that column will receive the result of evaluating the function over the column, replacing the column’s original value.

For more information on how to use row filters and column masks, see Filter sensitive table data using row filters and column masks.

Managing Row Filters and Column Masks

Row filters and column masks on streaming tables should be added, updated, or dropped through the CREATE OR REFRESH statement.

Behavior

  • Refresh as Definer: When the CREATE OR REFRESH or REFRESH statements refresh a streaming table, row filter functions run with the definer’s rights (as the table owner). This means the table refresh uses the security context of the user who created the streaming table.
  • Query: While most filters run with the definer’s rights, functions that check user context (such as CURRENT_USER and IS_MEMBER) are exceptions. These functions run as the invoker. This approach enforces user-specific data security and access controls based on the current user’s context.

Observability

Use DESCRIBE EXTENDED, INFORMATION_SCHEMA, or the Catalog Explorer to examine the existing row filters and column masks that apply to a given streaming table. This functionality allows users to audit and review data access and protection measures on streaming tables.

Limitations

  • Only table owners can refresh streaming tables to get the latest data.

  • ALTER TABLE commands are disallowed on streaming tables. The definition and properties of the table should be altered through the CREATE OR REFRESH or ALTER STREAMING TABLE statement.

  • Time travel queries are not supported.

  • Evolving the table schema through DML commands like INSERT INTO, and MERGE is not supported.

  • The following commands are not supported on streaming tables:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Delta Sharing is not supported.

  • Renaming the table or changing the owner is not supported.

  • Table constraints such as PRIMARY KEY and FOREIGN KEY are not supported.

  • Generated columns, identity columns, and default columns are not supported.

Examples

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')