Поделиться через


CREATE STREAMING TABLE

Область применения:флажок Databricks SQL

Создает потоковую передачу table, Delta table с дополнительной поддержкой потоковой или инкрементной обработки данных.

Поддержка потоков tables осуществляется только в Delta Live Tables и в Databricks SQL с Unity Catalog. При выполнении этой команды в поддерживаемой среде выполнения Databricks вычисляется только синтаксический анализ. См. статью "Разработка кода конвейера с помощью SQL".

Синтаксис

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

Parameters

  • REFRESH

    Если указано, обновляет table последними данными, доступными из источников, которые определены в запросе. Обрабатываются только новые данные, поступающие до запуска запроса. Новые данные, добавляемые в источники во время выполнения команды, игнорируются до следующей refresh. Операция refresh из CREATE OR REFRESH полностью декларативна. Если команда refresh не указывает все метаданные из исходной инструкции создания table, удаляются неопределенные метаданные.

  • IF NOT EXISTS

    Создает поток table, если он не существует. Если table с таким именем уже существует, оператор CREATE STREAMING TABLE игнорируется.

    Можно указать не более одного предложения из числа IF NOT EXISTS и OR REFRESH.

  • table_name

    Имя создаваемого table. Имя не должно включать темпоральную спецификацию или спецификацию параметров. Если имя не квалифицировано, table создается в текущем schema.

  • table_specification

    Это необязательное предложение определяет listcolumns, их типы, свойства, описания и ограничения column.

    Если в columnstable не определено schema, необходимо указать AS query.

    • column_identifier

      Уникальное имя для column.

      • column_type

        Указывает тип данных для из column.

      • NOT NULL

        Если column указан, он не принимает NULLvalues.

      • COMMENT column_comment

        Строковый литерал для описания column.

      • column_constraint

        Внимание

        Эта функция предоставляется в режиме общедоступной предварительной версии.

        Добавляет первичный ключ или внешний ключ constraint к column в потоковом table. Ограничения не поддерживаются для tables в hive_metastorecatalog.

      • Предложение MASK

        Внимание

        Эта функция предоставляется в режиме общедоступной предварительной версии.

        Добавляет функцию маски column для анонимизации конфиденциальных данных. Все последующие запросы из этого column получают результат оценки этой функции по column вместо исходного значения column. Это может быть полезно для досконального контроля доступа, where функция может проверить удостоверение или членство в группах вызывающего пользователя, чтобы решить, следует ли скрыть значение.

      • CONSTRAINT EXPECTATION_NAME ОЖИДАТЬ (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        Добавляет ожидания качества данных в table. Эти ожидания качества данных можно отслеживать с течением времени и получать доступ через поток событий tableжурнала . Ожидание FAIL UPDATE приводит к сбою обработки как при создании table, так и при обновлении table. Ожидание DROP ROW приводит к тому, что вся строка будет удалена, если ожидание не выполнено.

        expectation_expr могут состоять из литералов, column идентификаторов в table, а также детерминированных, встроенных функций ИЛИ операторов SQL, кроме следующих:

        Кроме того, expr не должен содержать какой-либо вложенный запрос.

      • table_constraint

        Внимание

        Эта функция предоставляется в режиме общедоступной предварительной версии.

        Добавляет в потоковую tableинформационный первичный ключ или информационные ограничения внешнего ключа. Ограничения ключей не поддерживаются для tables в hive_metastorecatalog.

  • table_clauses

    При необходимости укажите разбиение на разделы, комментарии, пользовательские свойства и refresh расписание для нового table. Каждое вложенное предложение может быть указано только один раз.

    • PARTITIONED BY

      Необязательный list из columnstable для partitiontable.

    • COMMENT table_comment

      Литерал STRING для описания table.

    • TBLPROPERTIES

      При необходимости задает одно или несколько свойств, определяемых пользователем.

      Используйте этот параметр, чтобы указать канал среды выполнения Delta Live Tables, используемый для выполнения этой инструкции. Set значение свойства pipelines.channel для "PREVIEW" или "CURRENT". Значение по умолчанию — "CURRENT". Дополнительные сведения о каналах delta Live Tables см. в Tablesканалах выполнения Delta Live .

    • ГРАФИК [ REFRESH ] пункт графика

    • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

      Чтобы запланировать периодические refresh, используйте синтаксис EVERY. Если указан синтаксис EVERY, потоковый table или материализованное представление обновляется регулярно с интервалом, зависящим от указанного значения, например HOUR, HOURS, DAY, DAYS, WEEKили WEEKS. Следующие table перечисляют принятые целочисленные values для number.

      Time unit Целое значение
      HOUR or HOURS 1 <= H <= 72
      DAY or DAYS 1 <= D <= 31
      WEEK or WEEKS 1 <= W <= 8

      Примечание.

      Семантические и множественные формы включенной единицы времени семантики.

    • CRON cron_string [ AT TIME ZONE timezone_id ]

      Чтобы запланировать refresh, используя кварцевое кронное значение . Допустимые time_zone_values принимаются. Функция AT TIME ZONE LOCAL не поддерживается.

      Если AT TIME ZONE нет, используется часовой пояс сеанса. Если AT TIME ZONE отсутствует, а часовой пояс сеанса не set, возникает ошибка. SCHEDULE семантически эквивалентен SCHEDULE REFRESH.

    Расписание можно указать как часть CREATE команды. Используйте ALTER STREAMING TABLE или выполните команду CREATE OR REFRESH с предложением SCHEDULE, чтобы изменить расписание для потокового ресурса table после его создания.

  • с ROW FILTER предложение

    Внимание

    Эта функция предоставляется в режиме общедоступной предварительной версии.

    Добавляет функцию фильтра строк в table. Все последующие запросы из этого table получают подмножество строк where, для которых функция возвращает логическое TRUE. Это может быть полезно для точного контроля доступа, where функция может проверить удостоверение или членство в группах вызывающего пользователя, чтобы решить, следует ли фильтровать определенные строки.

  • AS query

    Это предложение заполняет table с помощью данных из query. Этот запрос должен быть потоковым запросом. Это можно сделать, добавив ключевое STREAM слово в любое отношение, которое требуется обработать постепенно. Если указать query и table_specification вместе, tableschema, указанные в table_specification, должны содержать все columns, возвращаемые query, в противном случае get ошибку. Любые columns, указанные в table_specification, но не возвращаемые query, возвращают nullvalues при запросе.

Различия между стримингом tables и другими tables

Потоковая tables — это tablesс отслеживанием состояния, предназначенная для обработки каждой строки только один раз при обработке растущего набора данных. Поскольку большинство наборов данных со временем постоянно увеличиваются, потоковая передача tables хорошо подходит для большинства задач по приему данных. Потоковая передача tables является оптимальной для конвейеров, требующих свежести данных и низкой задержки. Потоковая tables также может быть полезной для крупномасштабных преобразований, так как результаты могут постепенно вычисляться по мере поступления новых данных, обновляя результаты до актуального состояния без необходимости полностью пересчитывать все исходные данные с каждым update. Потоки tables предназначены для источников данных, предназначенных только для добавления.

Потоковая tables принимает дополнительные команды, такие как REFRESH, которые обрабатывают последние данные, доступные в источниках, указанных в запросе. Изменения в предоставленном запросе, такие как get, влияют только на новые данные путем вызова REFRESH, но не на уже обработанные данные. Чтобы применить изменения к существующим данным, необходимо выполнить REFRESH TABLE <table_name> FULL его FULL REFRESH. Полные обновления повторно обрабатывают все данные, доступные в источнике с помощью последнего определения. Не рекомендуется вызывать полные обновления в источниках, которые не хранят всю историю данных или имеют короткие периоды хранения, например Kafka, так как полная refresh усечение существующих данных. Возможно, вы не сможете восстановить старые данные, если данные больше не доступны в источнике.

Фильтры строк и маски column

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Фильтры строк позволяют указать функцию, которая применяется в качестве фильтра всякий раз, когда table сканирует строки. Эти фильтры гарантируют, что последующие запросы возвращают только строки, для которых предикат фильтра оценивается как true.

Маски Column позволяют вам маскировать column в valuesвсякий раз, когда сканирование table извлекает строки. Все будущие запросы, связанные с этим column, получат результат оценки функции по column, заменив исходное значение column.

Дополнительные сведения о том, как использовать фильтры строк и маски column, см. в статье "Фильтрация конфиденциальных данных table с использованием фильтров строк и масок column".

Управление фильтрами строк и масками Column

Фильтры строк и маски column на стриминге tables должны быть добавлены, обновлены или удалены с использованием команды CREATE OR REFRESH.

Поведение

  • Refresh в качестве определителя: когда операторы CREATE OR REFRESH или REFRESHrefresh потоковой table, функции фильтрации строк выполняются с правами определителя (владелец table). Это означает, что tablerefresh использует контекст безопасности пользователя, который создал потоковую table.
  • Запрос. Хотя большинство фильтров выполняются с правами определителя, функции, которые проверяют контекст пользователя (например CURRENT_USER , и IS_MEMBER) являются исключениями. Эти функции выполняются в качестве вызывающего средства. Этот подход применяет элементы управления безопасностью и доступом для определенных пользователей на основе контекста текущего пользователя.

Наблюдаемость

Используйте DESCRIBE EXTENDED, INFORMATION_SCHEMAили проводник Catalog для проверки существующих фильтров строк и масок column, которые применяются к tableпотоковой передачи. ** Эта функция позволяет пользователям проводить аудит и анализировать меры доступа к данным и защиты для потокового контента tables.

Ограничения

  • Только владельцы table могут refresh транслировать tables последние данные на get.
  • команды ALTER TABLE запрещены в потоковом режиме tables. Определение и свойства table должны быть изменены с помощью инструкции CREATE OR REFRESH или ALTER STREAMING TABLE.
  • Развитие tableschema с помощью команд DML, таких как INSERT INTO, и MERGE не поддерживается.
  • Следующие команды не поддерживаются в потоковом режиме tables:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Разностный общий доступ не поддерживается.
  • Переименование table или изменение владельца не поддерживается.
  • Table ограничения, такие как PRIMARY KEY и FOREIGN KEY, не поддерживаются.
  • Созданные columns, идентификатор columnsи columns по умолчанию не поддерживаются.

Примеры

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')