VYTVOŘENÍ TABULKY STREAMOVÁNÍ
Platí pro: Databricks SQL
Vytvoří streamovací tabulku, tabulku Delta s dodatečnou podporou streamování nebo přírůstkového zpracování dat.
Streamované tabulky jsou podporovány pouze v rozdílových živých tabulkách a v Databricks SQL s katalogem Unity. Spuštěním tohoto příkazu na podporovaných výpočetních prostředcích Databricks Runtime se analyzuje pouze syntaxe. Viz Vývoj kódu kanálu pomocí SQL.
Syntaxe
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
Parametry
REFRESH
Pokud je zadáno, aktualizuje tabulku nejnovějšími daty dostupnými ze zdrojů definovaných v dotazu. Pouze nová data, která přicházejí před zahájením dotazu, budou zpracována. Nová data, která se přidají do zdrojů během provádění příkazu, se ignorují až do další aktualizace. Operace aktualizace z příkazu CREATE OR REFRESH je plně deklarativní. Pokud příkaz aktualizace nezadá všechna metadata z původního příkazu pro vytvoření tabulky, odstraní se nezadaná metadata.
POKUD NEEXISTUJE
Vytvoří tabulku streamování, pokud neexistuje. Pokud tabulka s tímto názvem již existuje,
CREATE STREAMING TABLE
příkaz se ignoruje.Můžete zadat nejvýše jednu z nebo
IF NOT EXISTS
OR REFRESH
.-
Název tabulky, kterou chcete vytvořit. Název nesmí obsahovat dočasnou specifikaci ani specifikaci možností. Pokud název není kvalifikovaný, vytvoří se tabulka v aktuálním schématu.
table_specification
Tato volitelná klauzule definuje seznam sloupců, jejich typů, vlastností, popisů a omezení sloupců.
Pokud nedefinujete sloupce ve schématu tabulky, je nutné zadat
AS query
.-
Jedinečný název sloupce
-
Určuje datový typ sloupce.
NOT NULL
Pokud je zadaný sloupec nepřijme
NULL
hodnoty.COLUMN_COMMENT KOMENTÁŘE
Řetězcový literál, který popisuje sloupec.
-
Důležité
Tato funkce je ve verzi Public Preview.
Přidá omezení primárního klíče nebo cizího klíče do sloupce v tabulce streamování. Omezení nejsou podporována pro tabulky v
hive_metastore
katalogu. -
Důležité
Tato funkce je ve verzi Public Preview.
Přidá funkci masky sloupce pro anonymizaci citlivých dat. Všechny následné dotazy z tohoto sloupce obdrží výsledek vyhodnocení této funkce místo původní hodnoty sloupce. To může být užitečné pro jemně odstupňované účely řízení přístupu, ve kterých může funkce zkontrolovat členství v identitě nebo skupině vyvolání uživatele a rozhodnout se, jestli se má hodnota redigovat.
CONSTRAINT expectation_name EXPECT (expectation_expr) [ PŘI PORUŠENÍ { FAIL UPDATE | DROP ROW } ]
Přidá do tabulky očekávání kvality dat. Tato očekávání kvality dat je možné sledovat v průběhu času a přistupovat k němu prostřednictvím protokolu událostí streamované tabulky. Očekávání
FAIL UPDATE
způsobí selhání zpracování při vytváření tabulky i při aktualizaci tabulky. OčekáváníDROP ROW
způsobí, že se celý řádek zahodí, pokud se očekávání nesplní.expectation_expr
mohou se skládat z literálů, identifikátorů sloupců v tabulce a deterministické předdefinované funkce nebo operátory SQL s výjimkou:- Agregační funkce
- Analytické funkce oken
- Funkce okna řazení
- Funkce generátoru hodnotných tabulek
- Agregační funkce
-
Důležité
Tato funkce je ve verzi Public Preview.
Přidá do tabulky streamování omezení informačního primárního klíče nebo informačního cizího klíče. Pro tabulky v
hive_metastore
katalogu nejsou podporována klíčová omezení.
-
-
table_clauses
Volitelně můžete zadat dělení, komentáře, uživatelem definované vlastnosti a plán aktualizace nové tabulky. Každou dílčí klauzuli lze zadat pouze jednou.
-
Volitelný seznam sloupců tabulky, podle kterých chcete tabulku rozdělit.
TABLE_COMMENT KOMENTÁŘE
Literál
STRING
, který popisuje tabulku.-
Volitelně nastaví jednu nebo více uživatelem definovaných vlastností.
Pomocí tohoto nastavení můžete určit kanál modulu runtime Delta Live Tables použitý ke spuštění tohoto příkazu. Nastavte hodnotu
pipelines.channel
vlastnosti na"PREVIEW"
hodnotu nebo"CURRENT"
. Výchozí hodnota je"CURRENT"
. Další informace o kanálech Delta Live Tables naleznete v tématu Kanály modulu runtime Delta Live Tables. SCHEDULE [ REFRESH ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
Důležité
Tato funkce je ve verzi Public Preview.
Pokud chcete naplánovat aktualizaci, která se pravidelně provádí, použijte
EVERY
syntaxi. PokudEVERY
je zadaná syntaxe, je streamovaná tabulka nebo materializované zobrazení pravidelně aktualizována v zadaném intervalu na základě zadané hodnoty, napříkladHOUR
, ,HOURS
,DAY
DAYS
,WEEK
neboWEEKS
. Následující tabulka uvádí přijaté celočíselné hodnoty pronumber
.Časová jednotka Celočíselná hodnota HOUR or HOURS
1 <= H <= 72 DAY or DAYS
1 <= D <= 31 WEEK or WEEKS
1 <= W <= 8 Poznámka:
Jednotné a množné číslo zahrnuté časové jednotky jsou sémanticky ekvivalentní.
CRON cron_string [ AT TIME ZONE timezone_id ]
Naplánování aktualizace pomocí hodnoty quartz cron . Jsou přijímány platné time_zone_values .
AT TIME ZONE LOCAL
není podporováno.Pokud
AT TIME ZONE
chybí, použije se časové pásmo relace. PokudAT TIME ZONE
chybí a časové pásmo relace není nastavené, vyvolá se chyba.SCHEDULE
je sémanticky ekvivalentníSCHEDULE REFRESH
.
Plán lze zadat jako součást
CREATE
příkazu. Pomocí příkazu ALTER STREAMING TABLE nebo spuštěnímCREATE OR REFRESH
příkazu sSCHEDULE
klauzulí můžete po vytvoření změnit plán tabulky streamování.KLAUZULE WITH ROW FILTER
Důležité
Tato funkce je ve verzi Public Preview.
Přidá do tabulky funkci filtru řádků. Všechny následné dotazy z této tabulky obdrží podmnožinu řádků, ve kterých se funkce vyhodnotí jako logická hodnota PRAVDA. To může být užitečné pro jemně odstupňované účely řízení přístupu, kdy může funkce zkontrolovat členství v identitě nebo skupině vyvolání uživatele a rozhodnout se, jestli se mají určité řádky filtrovat.
-
-
Tato klauzule naplní tabulku pomocí dat z
query
. Tento dotaz musí být streamovaným dotazem. Toho lze dosáhnout přidáním klíčovéhoSTREAM
slova do libovolného vztahu, který chcete zpracovat přírůstkově. Když zadáte aquery
a společnětable_specification
, schéma tabulky zadané vtable_specification
musí obsahovat všechny sloupce vrácené adresouquery
, jinak se zobrazí chyba. Všechny sloupce zadané vtable_specification
vrácené hodnotě, které nejsou vrácenyquery
null
při dotazech.
Rozdíly mezi streamovanými tabulkami a jinými tabulkami
Streamované tabulky jsou stavové tabulky navržené tak, aby zpracovávaly každý řádek pouze jednou při zpracování rostoucí datové sady. Vzhledem k tomu, že většina datových sad v průběhu času roste, jsou streamované tabulky vhodné pro většinu úloh příjmu dat. Tabulky streamování jsou optimální pro kanály, které vyžadují aktuálnost dat a nízkou latenci. Streamované tabulky můžou být také užitečné pro masivní transformace škálování, protože výsledky se dají postupně vypočítat při příchodu nových dat, přičemž výsledky budou aktuální, aniž by bylo nutné plně překompilovat všechna zdrojová data s každou aktualizací. Streamované tabulky jsou navržené pro zdroje dat, které jsou jen pro připojení.
Streamované tabulky přijímají další příkazy, například REFRESH
, které zpracovávají nejnovější data dostupná ve zdrojích poskytovaných v dotazu. Změny zadaného dotazu se projeví jenom na nových datech voláním REFRESH
dříve nezpracovaných dat. Pokud chcete změny použít i u existujících dat, musíte REFRESH TABLE <table_name> FULL
provést provedení příkazu FULL REFRESH
. Úplné aktualizace znovu zpracovávají všechna data dostupná ve zdroji s nejnovější definicí. Nedoporučuje se volat úplné aktualizace zdrojů, které nezachovají celou historii dat nebo mají krátké doby uchovávání, například Kafka, protože úplná aktualizace zkracuje stávající data. Pokud už data nejsou ve zdroji dostupná, možná nebudete moct obnovit stará data.
Filtry řádků a masky sloupců
Důležité
Tato funkce je ve verzi Public Preview.
Filtry řádků umožňují zadat funkci, která se použije jako filtr při každém načtení řádků v tabulce. Tyto filtry zajišťují, aby následné dotazy vracely pouze řádky, pro které se predikát filtru vyhodnotí jako true.
Masky sloupců umožňují maskovat hodnoty sloupce pokaždé, když tabulka načte řádky. Všechny budoucí dotazy týkající se tohoto sloupce obdrží výsledek vyhodnocení funkce nad sloupcem a nahrazení původní hodnoty sloupce.
Další informace o tom, jak používat filtry řádků a masky sloupců, najdete v tématu Filtrování citlivých dat tabulky pomocí filtrů řádků a mask sloupců.
Správa filtrů řádků a masek sloupců
Filtry řádků a masky sloupců u streamovaných tabulek by měly být přidány, aktualizovány nebo vynechány příkazem CREATE OR REFRESH
.
Chování
- Refresh as Definer: Když
CREATE OR REFRESH
příkazy aktualizujíREFRESH
streamovací tabulku, spustí se funkce filtru řádků s právy definovatele (jako vlastník tabulky). To znamená, že aktualizace tabulky používá kontext zabezpečení uživatele, který vytvořil streamovací tabulku. - Dotaz: Zatímco většina filtrů běží s právy defineru, funkce, které kontrolují kontext uživatele (například
CURRENT_USER
aIS_MEMBER
) jsou výjimky. Tyto funkce se spouští jako invoker. Tento přístup vynucuje zabezpečení dat a řízení přístupu specifické pro uživatele na základě kontextu aktuálního uživatele.
Pozorovatelnost
Pomocí funkce DESCRIBE EXTENDED
, INFORMATION_SCHEMA
nebo Průzkumníka katalogu můžete prozkoumat existující filtry řádků a masky sloupců, které platí pro danou streamovací tabulku. Tato funkce umožňuje uživatelům auditovat a kontrolovat přístup k datům a míry ochrany u streamovaných tabulek.
Omezení
Nejnovější data můžou získat jenom vlastníci tabulek, kteří můžou aktualizovat streamované tabulky.
ALTER TABLE
příkazy jsou u streamovaných tabulek zakázány. Definice a vlastnosti tabulky by měly být změněny pomocíCREATE OR REFRESH
příkazu ALTER STREAMING TABLE .Dotazy na časovou cestu se nepodporují.
Vývoj schématu tabulky pomocí příkazů DML, jako je
INSERT INTO
, aMERGE
není podporován.U streamovaných tabulek se nepodporují následující příkazy:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Rozdílové sdílení se nepodporuje.
Přejmenování tabulky nebo změna vlastníka se nepodporuje.
Omezení tabulek, jako
PRIMARY KEY
jsou aFOREIGN KEY
nejsou podporována.Vygenerované sloupce, sloupce identit a výchozí sloupce se nepodporují.
Příklady
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')