Поделиться через


CREATE STREAMING TABLE

Область применения:отмечено Да Databricks SQL

Создает потоковую таблицу, Delta-таблицу с дополнительной поддержкой для потоковой или инкрементальной обработки данных.

Потоковые таблицы поддерживаются только в DLT и в Databricks SQL с Unity Catalog. При запуске этой команды в поддерживаемой среде выполнения Databricks проводится только синтаксический анализ. См. статью "Разработка кода конвейера с помощью SQL".

Синтаксис

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

Параметры

  • REFRESH

    При указании обновляет таблицу с последними данными, доступными из источников, определенных в запросе. Обрабатываются только новые данные, поступающие до запуска запроса. Новые данные, добавляемые в источники во время выполнения команды, игнорируются до следующего обновления. Операция обновления из CREATE OR REFRESH полностью декларативна. Если команда обновления не указывает все метаданные из исходной инструкции создания таблицы, удаляются неопределенные метаданные.

  • ЕСЛИ НЕ СУЩЕСТВУЕТ

    Создает стриминговую таблицу, если она не существует. Если таблица по этому имени уже существует, оператор CREATE STREAMING TABLE игнорируется.

    Можно указать не более одного из IF NOT EXISTS или OR REFRESH.

  • table_name

    Имя создаваемой таблицы. Имя не должно включать темпоральную спецификацию или спецификацию опций. Если имя не задано, таблица создается в текущей схеме.

  • спецификация_таблицы

    Это необязательное предложение определяет список столбцов, их типов, свойств, описаний и ограничений столбцов.

    Если столбцы в схеме таблицы не определены, необходимо указать AS query.

    • column_identifier

      Уникальное имя столбца.

      • column_type

        Указывает тип данных столбца.

      • NOT NULL

        Если указано, столбец не принимает значения NULL.

      • КОММЕНТАРИЙ column_comment

        Строковый литерал для описания столбца.

      • column_constraint

        Внимание

        Эта функция предоставляется в режиме общедоступной предварительной версии.

        Добавляет ограничение первичного или внешнего ключа в столбец потоковой таблицы. Ограничения не поддерживаются для таблиц в каталоге hive_metastore.

      • Предложение MASK

        Внимание

        Эта функция предоставляется в режиме общедоступной предварительной версии.

        Добавляет функцию маски столбца для анонимизации конфиденциальных данных. Все последующие запросы из этого столбца получают результат оценки этой функции по столбцу вместо исходного значения столбца. Это может быть полезно для точного контроля доступа, где функция может инспектировать идентификацию или членство в группах вызывающего пользователя, чтобы решить, следует ли скрыть значение.

      • CONSTRAINT EXPECTATION_NAME ОЖИДАТЬ (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        Добавляет ожидания качества данных в таблицу. Эти ожидания качества данных можно отслеживать с течением времени и получать доступ через журнал событий потоковой таблицы . Ожидание FAIL UPDATE приводит к сбою обработки при создании таблицы, а также обновлении таблицы. Ожидание DROP ROW приводит к тому, что вся строка будет удалена, если ожидание не выполнено.

        expectation_expr могут состоять из литералов, идентификаторов столбцов в таблице и детерминированных встроенных функций ИЛИ операторов SQL, кроме следующих:

        Кроме того, expr не должен содержать какой-либо вложенный запрос.

      • ограничение_таблицы

        Внимание

        Эта функция предоставляется в режиме общедоступной предварительной версии.

        Добавляет в стриминговую таблицу ограничения на информативные первичные или внешние ключи. Ограничения ключей не поддерживаются для таблиц в каталоге hive_metastore.

  • table_clauses

    При необходимости укажите секционирование, комментарии, пользовательские свойства и расписание обновления для новой таблицы. Каждый подпункт может быть указан только один раз.

    • РАЗДЕЛЕНО ПО

      Необязательный список столбцов таблицы для секционирования таблицы по.

      Примечание.

      Liquid Clustering предоставляет гибкое и оптимизированное решение для кластеризации. Рекомендуется использовать CLUSTER BY вместо PARTITIONED BY для потоковых таблиц.

    • CLUSTER BY

      Необязательное предложение для кластеризации подмножеством столбцов. Дополнительные сведения о кластеризации жидкости см. в разделе Использование кластеризации жидкости для таблиц Delta.

      Кластеризация жидкости Delta Lake не может быть объединена с PARTITIONED BY.

    • КОММЕНТАРИЙ table_comment

      Литерал STRING, который описывает таблицу.

    • TBLPROPERTIES

      При необходимости задает одно или несколько свойств, определяемых пользователем.

      Используйте этот параметр, чтобы указать канал среды выполнения DLT, используемый для выполнения этой инструкции. Задайте для свойства pipelines.channel значение "PREVIEW" или "CURRENT". Значение по умолчанию — "CURRENT". Для получения дополнительной информации о каналах среды выполнения DLT см. раздел .

    • ГРАФИК [ REFRESH ] условие графика

    • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

      Чтобы запланировать периодические обновления, используйте синтаксис EVERY. Если указан синтаксис EVERY, то потоковая таблица или материализованное представление периодически обновляется с заданным интервалом на основе указанного значения, например HOUR, HOURS, DAY, DAYS, WEEKили WEEKS. В следующей таблице перечислены принятые целые значения для number.

      единица времени Целое значение
      HOUR or HOURS 1 <= H <= 72
      DAY or DAYS 1 <= D <= 31
      WEEK or WEEKS 1 <= W <= 8

      Примечание.

      Единственное и множественное числа включенной единицы времени являются семантически эквивалентными.

    • CRON cron_string [ AT TIME ZONE timezone_id ]

      Чтобы запланировать обновление, используя значение quartz cron. Принимаются допустимые значения для time_zone_values. Функция AT TIME ZONE LOCAL не поддерживается.

      Если AT TIME ZONE нет, используется часовой пояс сеанса. Если AT TIME ZONE отсутствует, а часовой пояс сеанса не задан, возникает ошибка. SCHEDULE семантически эквивалентен SCHEDULE REFRESH.

    Расписание можно указать как часть CREATE команды. Используйте команду ALTER STREAMING TABLE или выполните команду CREATE OR REFRESH с условием SCHEDULE, чтобы изменить расписание потоковой таблицы после ее создания.

  • с ROW FILTER предложением

    Внимание

    Эта функция предоставляется в режиме общедоступной предварительной версии.

    Добавляет функцию фильтра строк в таблицу. Все последующие запросы из этой таблицы получают подмножество строк, где функция принимает значение истинно. Это может быть полезно для точного контроля доступа, где функция может проверить удостоверение или членство в группах вызывающего пользователя, чтобы решить, следует ли фильтровать определенные строки.

  • AS query

    Это предложение заполняет таблицу с помощью данных из query. Этот запрос должен быть потоковым запросом. Это можно сделать, добавив ключевое STREAM слово в любое отношение, которое требуется обработать постепенно. При указании query и table_specification вместе схема таблицы, указанной в table_specification, должна содержать все столбцы, возвращаемые query, в противном случае возникает ошибка. Все столбцы, указанные в table_specification, но не возвращенные query, при запросе возвращают значения null.

Различия между таблицами потоковой передачи и другими таблицами

Потоковые таблицы — это таблицы с состоянием, предназначенные для обработки каждой строки только один раз при обработке растущего набора данных. Поскольку большинство наборов данных постоянно увеличиваются, потоковые таблицы хорошо подходят для большинства рабочих нагрузок загрузки данных. Таблицы потоковой передачи оптимально подходят для конвейеров, требующих свежести данных и низкой задержки. Потоковые таблицы также могут быть полезны для крупномасштабных преобразований, так как результаты могут вычисляться поэтапно по мере поступления новых данных, сохраняя актуальными без необходимости полностью пересчитывать все исходные данные с каждым обновлением. Потоковые таблицы предназначены для источников данных, в которые можно только добавлять данные.

Потоковые таблицы принимают дополнительные команды, такие как REFRESH, которые обрабатывают последние данные, доступные в источниках, предоставленных в запросе. Изменения предоставленного запроса отражаются только на новых данных путем вызова REFRESH, а не обработанных ранее данных. Чтобы применить изменения к существующим данным, необходимо выполнить REFRESH TABLE <table_name> FULL для проведения FULL REFRESH. Полные обновления повторно обрабатывают все данные, доступные в источнике с помощью последнего определения. Не рекомендуется вызывать полные обновления на источниках, которые не содержат всю историю данных или имеют короткие периоды хранения, таких как Kafka, поскольку полное обновление приводит к усечению существующих данных. Возможно, вы не сможете восстановить старые данные, если данные больше не доступны в источнике.

Фильтры строк и маски столбцов

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Фильтры строк позволяют указать функцию, которая применяется в качестве фильтра всякий раз, когда таблица сканирует строки. Эти фильтры гарантируют, что последующие запросы возвращают только строки, для которых предикат фильтра оценивается как true.

Маски столбцов позволяют маскировать значения столбца всякий раз, когда таблица сканирует строки. Все будущие запросы, связанные с этим столбцом, получат результат оценки функции по столбцу, заменив исходное значение столбца.

Дополнительные сведения о том, как использовать фильтры строк и маски столбцов, см. в разделе Фильтрация конфиденциальных данных таблицы с помощью фильтров строк и маски столбцов.

Управление фильтрами строк и масками столбцов

Фильтры строк и маски столбцов в таблицах потоковой передачи должны быть добавлены, обновлены или удалены с помощью инструкции CREATE OR REFRESH.

Поведение

  • Обновление в качестве определителя: когда операторы CREATE OR REFRESH или REFRESH обновляют потоковую таблицу, функции фильтрации строк выполняются с правами определителя (как владельца таблицы). Это означает, что обновление таблицы использует контекст безопасности пользователя, создавшего потоковую таблицу.
  • Запрос. Хотя большинство фильтров выполняются с правами определителя, функции, которые проверяют контекст пользователя (например CURRENT_USER , и IS_MEMBER) являются исключениями. Эти функции выполняются как вызыватель. Этот подход применяет элементы управления безопасностью и доступом для определенных пользователей на основе контекста текущего пользователя.

Наблюдаемость

Используйте DESCRIBE EXTENDED, INFORMATION_SCHEMAили обозреватель каталогов, чтобы проверить существующие фильтры строк и маски столбцов, применяемые к данной потоковой таблице. Эта функция позволяет пользователям проводить аудит и проверку доступа к данным и мер защиты в потоковых таблицах.

Ограничения

  • Только владельцы таблиц могут обновлять потоковые таблицы, чтобы получить последние данные.
  • Использование команд ALTER TABLE запрещено в потоковых таблицах. Определение и свойства таблицы должны быть изменены с помощью инструкции CREATE OR REFRESH или ALTER STREAMING TABLE.
  • Эволюционирование схемы таблицы с помощью команд DML, таких как INSERT INTO, и MERGE не поддерживается.
  • Следующие команды не поддерживаются в таблицах потоковой передачи:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Delta Sharing не поддерживается.
  • Переименование таблицы или изменение владельца не поддерживается.
  • Ограничения таблиц, такие как PRIMARY KEY и FOREIGN KEY, не поддерживаются.
  • Сгенерированные столбцы, идентификационные столбцы и столбцы по умолчанию не поддерживаются.

Примеры

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')