CREATE STREAMING TABLE
Область применения: Databricks SQL
Создает потоковую передачу table, Delta table с дополнительной поддержкой потоковой или инкрементной обработки данных.
Поддержка потоков tables осуществляется только в Delta Live Tables и в Databricks SQL с Unity Catalog. При выполнении этой команды в поддерживаемой среде выполнения Databricks вычисляется только синтаксический анализ. См. статью "Разработка кода конвейера с помощью SQL".
Синтаксис
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
Parameters
REFRESH
Если указано, обновляет table последними данными, доступными из источников, которые определены в запросе. Обрабатываются только новые данные, поступающие до запуска запроса. Новые данные, добавляемые в источники во время выполнения команды, игнорируются до следующей refresh. Операция refresh из CREATE OR REFRESH полностью декларативна. Если команда refresh не указывает все метаданные из исходной инструкции создания table, удаляются неопределенные метаданные.
IF NOT EXISTS
Создает поток table, если он не существует. Если table с таким именем уже существует, оператор
CREATE STREAMING TABLE
игнорируется.Можно указать не более одного предложения из числа
IF NOT EXISTS
иOR REFRESH
.-
Имя создаваемого table. Имя не должно включать темпоральную спецификацию или спецификацию параметров. Если имя не квалифицировано, table создается в текущем schema.
table_specification
Это необязательное предложение определяет listcolumns, их типы, свойства, описания и ограничения column.
Если в columnstable не определено schema, необходимо указать
AS query
.-
Уникальное имя для column.
-
Указывает тип данных для из column.
NOT NULL
Если column указан, он не принимает
NULL
values.COMMENT column_comment
Строковый литерал для описания column.
-
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Добавляет первичный ключ или внешний ключ constraint к column в потоковом table. Ограничения не поддерживаются для tables в
hive_metastore
catalog. -
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Добавляет функцию маски column для анонимизации конфиденциальных данных. Все последующие запросы из этого column получают результат оценки этой функции по column вместо исходного значения column. Это может быть полезно для досконального контроля доступа, where функция может проверить удостоверение или членство в группах вызывающего пользователя, чтобы решить, следует ли скрыть значение.
CONSTRAINT EXPECTATION_NAME ОЖИДАТЬ (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
Добавляет ожидания качества данных в table. Эти ожидания качества данных можно отслеживать с течением времени и получать доступ через поток событий tableжурнала . Ожидание
FAIL UPDATE
приводит к сбою обработки как при создании table, так и при обновлении table. ОжиданиеDROP ROW
приводит к тому, что вся строка будет удалена, если ожидание не выполнено.expectation_expr
могут состоять из литералов, column идентификаторов в table, а также детерминированных, встроенных функций ИЛИ операторов SQL, кроме следующих:-
Агрегатные функции
- Аналитические window функции
- функции ранжирования window
- Table ценностные функции генератора
Кроме того,
expr
не должен содержать какой-либо вложенный запрос.-
Агрегатные функции
-
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Добавляет в потоковую tableинформационный первичный ключ или информационные ограничения внешнего ключа. Ограничения ключей не поддерживаются для tables в
hive_metastore
catalog.
-
-
table_clauses
При необходимости укажите разбиение на разделы, комментарии, пользовательские свойства и refresh расписание для нового table. Каждое вложенное предложение может быть указано только один раз.
-
Необязательный list из columnstable для partitiontable.
COMMENT table_comment
Литерал
STRING
для описания table.-
При необходимости задает одно или несколько свойств, определяемых пользователем.
Используйте этот параметр, чтобы указать канал среды выполнения Delta Live Tables, используемый для выполнения этой инструкции. Set значение свойства
pipelines.channel
для"PREVIEW"
или"CURRENT"
. Значение по умолчанию —"CURRENT"
. Дополнительные сведения о каналах delta Live Tables см. в Tablesканалах выполнения Delta Live . ГРАФИК [ REFRESH ] пункт графика
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
Чтобы запланировать периодические refresh, используйте синтаксис
EVERY
. Если указан синтаксисEVERY
, потоковый table или материализованное представление обновляется регулярно с интервалом, зависящим от указанного значения, напримерHOUR
,HOURS
,DAY
,DAYS
,WEEK
илиWEEKS
. Следующие table перечисляют принятые целочисленные values дляnumber
.Time unit Целое значение HOUR or HOURS
1 <= H <= 72 DAY or DAYS
1 <= D <= 31 WEEK or WEEKS
1 <= W <= 8 Примечание.
Семантические и множественные формы включенной единицы времени семантики.
CRON cron_string [ AT TIME ZONE timezone_id ]
Чтобы запланировать refresh, используя кварцевое кронное значение . Допустимые time_zone_values принимаются. Функция
AT TIME ZONE LOCAL
не поддерживается.Если
AT TIME ZONE
нет, используется часовой пояс сеанса. ЕслиAT TIME ZONE
отсутствует, а часовой пояс сеанса не set, возникает ошибка.SCHEDULE
семантически эквивалентенSCHEDULE REFRESH
.
Расписание можно указать как часть
CREATE
команды. Используйте ALTER STREAMING TABLE или выполните командуCREATE OR REFRESH
с предложениемSCHEDULE
, чтобы изменить расписание для потокового ресурса table после его создания.-
-
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Добавляет функцию фильтра строк в table. Все последующие запросы из этого table получают подмножество строк where, для которых функция возвращает логическое TRUE. Это может быть полезно для точного контроля доступа, where функция может проверить удостоверение или членство в группах вызывающего пользователя, чтобы решить, следует ли фильтровать определенные строки.
AS query
Это предложение заполняет table с помощью данных из
query
. Этот запрос должен быть потоковым запросом. Это можно сделать, добавив ключевоеSTREAM
слово в любое отношение, которое требуется обработать постепенно. Если указатьquery
иtable_specification
вместе, tableschema, указанные вtable_specification
, должны содержать все columns, возвращаемыеquery
, в противном случае get ошибку. Любые columns, указанные вtable_specification
, но не возвращаемыеquery
, возвращаютnull
values при запросе.
Различия между стримингом tables и другими tables
Потоковая tables — это tablesс отслеживанием состояния, предназначенная для обработки каждой строки только один раз при обработке растущего набора данных. Поскольку большинство наборов данных со временем постоянно увеличиваются, потоковая передача tables хорошо подходит для большинства задач по приему данных. Потоковая передача tables является оптимальной для конвейеров, требующих свежести данных и низкой задержки. Потоковая tables также может быть полезной для крупномасштабных преобразований, так как результаты могут постепенно вычисляться по мере поступления новых данных, обновляя результаты до актуального состояния без необходимости полностью пересчитывать все исходные данные с каждым update. Потоки tables предназначены для источников данных, предназначенных только для добавления.
Потоковая tables принимает дополнительные команды, такие как REFRESH
, которые обрабатывают последние данные, доступные в источниках, указанных в запросе. Изменения в предоставленном запросе, такие как get, влияют только на новые данные путем вызова REFRESH
, но не на уже обработанные данные. Чтобы применить изменения к существующим данным, необходимо выполнить REFRESH TABLE <table_name> FULL
его FULL REFRESH
. Полные обновления повторно обрабатывают все данные, доступные в источнике с помощью последнего определения. Не рекомендуется вызывать полные обновления в источниках, которые не хранят всю историю данных или имеют короткие периоды хранения, например Kafka, так как полная refresh усечение существующих данных. Возможно, вы не сможете восстановить старые данные, если данные больше не доступны в источнике.
Фильтры строк и маски column
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Фильтры строк позволяют указать функцию, которая применяется в качестве фильтра всякий раз, когда table сканирует строки. Эти фильтры гарантируют, что последующие запросы возвращают только строки, для которых предикат фильтра оценивается как true.
Маски Column позволяют вам маскировать column в valuesвсякий раз, когда сканирование table извлекает строки. Все будущие запросы, связанные с этим column, получат результат оценки функции по column, заменив исходное значение column.
Дополнительные сведения о том, как использовать фильтры строк и маски column, см. в статье "Фильтрация конфиденциальных данных table с использованием фильтров строк и масок column".
Управление фильтрами строк и масками Column
Фильтры строк и маски column на стриминге tables должны быть добавлены, обновлены или удалены с использованием команды CREATE OR REFRESH
.
Поведение
-
Refresh в качестве определителя: когда операторы
CREATE OR REFRESH
илиREFRESH
refresh потоковой table, функции фильтрации строк выполняются с правами определителя (владелец table). Это означает, что tablerefresh использует контекст безопасности пользователя, который создал потоковую table. -
Запрос. Хотя большинство фильтров выполняются с правами определителя, функции, которые проверяют контекст пользователя (например
CURRENT_USER
, иIS_MEMBER
) являются исключениями. Эти функции выполняются в качестве вызывающего средства. Этот подход применяет элементы управления безопасностью и доступом для определенных пользователей на основе контекста текущего пользователя.
Наблюдаемость
Используйте DESCRIBE EXTENDED
, INFORMATION_SCHEMA
или проводник Catalog для проверки существующих фильтров строк и масок column, которые применяются к tableпотоковой передачи. **
Эта функция позволяет пользователям проводить аудит и анализировать меры доступа к данным и защиты для потокового контента tables.
Ограничения
- Только владельцы table могут refresh транслировать tables последние данные на get.
- команды
ALTER TABLE
запрещены в потоковом режиме tables. Определение и свойства table должны быть изменены с помощью инструкцииCREATE OR REFRESH
или ALTER STREAMING TABLE. - Развитие tableschema с помощью команд DML, таких как
INSERT INTO
, иMERGE
не поддерживается. - Следующие команды не поддерживаются в потоковом режиме tables:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
- Разностный общий доступ не поддерживается.
- Переименование table или изменение владельца не поддерживается.
-
Table ограничения, такие как
PRIMARY KEY
иFOREIGN KEY
, не поддерживаются. - Созданные columns, идентификатор columnsи columns по умолчанию не поддерживаются.
Примеры
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')