CREATE STREAMING TABLE
Gäller för: Databricks SQL
Skapar en strömmande table, en Delta-table med extra stöd för direktuppspelning eller inkrementell databearbetning.
Strömmande tables stöds endast i Delta Live Tables och i Databricks SQL med Unity Catalog. Om du kör det här kommandot på Databricks Runtime-beräkning som stöds parsas endast syntaxen. Se Utveckla pipelinekod med SQL.
Syntax
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
Parameters
REFRESH
Om det är angivet, uppdaterar table med de senaste tillgängliga data från de källor som definierats i frågan. Endast nya data som tas emot innan frågan startar bearbetas. Nya data som läggs till i källorna under körningen av kommandot ignoreras tills nästa refresh. Åtgärden refresh från CREATE OR REFRESH är helt deklarativ. Om ett refresh-kommando inte anger alla metadata från den ursprungliga table skapande-instruktionen tas de ospecificerade metadata bort.
OM INTE FINNS
Skapar strömmen table om den inte finns. Om det redan finns en table med det här namnet ignoreras
CREATE STREAMING TABLE
-instruktionen.Du kan ange högst en av
IF NOT EXISTS
ellerOR REFRESH
.-
Namnet på table som ska skapas. Namnet får inte innehålla en temporal specifikation eller alternativspecifikation. Om namnet inte är kvalificerat skapas table i den aktuella schema.
table_specification
Den här valfria satsen definierar list för columns, deras typer, egenskaper, beskrivningar och column begränsningar.
Om du inte definierar columns i tableschema måste du ange
AS query
.-
Ett unikt namn för column.
-
Anger datatyp för column.
INTE NULL
Om det anges accepterar column inte
NULL
values.KOMMENTAR column_comment
En textsträng som beskriver column.
-
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Lägger till en primärnyckel eller främmande nyckel constraint till column i en strömmande table. Begränsningar stöds inte för tables i
hive_metastore
catalog. -
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Lägger till en column maskfunktion för att anonymisera känsliga data. Alla efterföljande frågor från den column får resultatet av att utvärdera funktionen över column, istället för column:s ursprungliga värde. Detta kan vara användbart i detaljerade åtkomstkontrollsyften where funktionen kan kontrollera identitets- eller gruppmedlemskapen för den anropande användaren för att avgöra om värdet ska redigeras.
CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | TA BORT RAD } ]
Lägger till datakvalitetsförväntningar i table. Dessa förväntningar på datakvalitet kan spåras över tid och nås via strömningens table:s -händelselogg. En
FAIL UPDATE
förväntan gör att bearbetningen misslyckas när både table skapas och tableuppdateras. EnDROP ROW
förväntan gör att hela raden tas bort om förväntningarna inte uppfylls.expectation_expr
kan bestå av literaler, column identifierare inom tableoch deterministiska, inbyggda SQL-funktioner eller operatorer förutom:-
Mängdfunktioner
- analysfunktioner window
- Ranking window funktioner
- Table värderade generatorfunktioner
Får inte heller
expr
innehålla någon underfråga.-
Mängdfunktioner
-
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Lägger till en informativ primärnyckel eller informativa främmande nyckelbegränsningar till en strömmande table. Viktiga begränsningar stöds inte för tables i
hive_metastore
catalog.
-
-
table_clauses
Du kan också ange partitionering, kommentarer, användardefinierade egenskaper och ett refresh schema för den nya table. Varje undersats kan bara anges en gång.
-
En valfri list av columns av table att partitiontable av.
KOMMENTAR table_comment
En
STRING
literal som beskriver table.-
Du kan också ange en eller flera användardefinierade egenskaper.
Använd den här inställningen om du vill ange delta live-Tables-körningskanal som används för att köra den här instruktionen. Ange Set värdet på egenskapen
pipelines.channel
till"PREVIEW"
eller"CURRENT"
. Standardvärdet är"CURRENT"
. Mer information om Delta Live Tables kanaler finns i Delta Live Tables runtime-kanaler. SCHEDULE [ REFRESH ] schema_klausul
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
För att schemalägga en refresh som inträffar regelbundet, använd syntaxen
EVERY
. OmEVERY
syntax anges uppdateras den strömmande table- eller materialiserade vyn regelbundet med det angivna intervallet baserat på det angivna värdet, till exempelHOUR
,HOURS
,DAY
,DAYS
,WEEK
ellerWEEKS
. Följande table visar godkända heltal values förnumber
.Time unit Heltalsvärde HOUR or HOURS
1 <= H <= 72 DAY or DAYS
1 <= D <= 31 WEEK or WEEKS
1 <= W <= 8 Kommentar
Singular- och pluralformerna i den inkluderade tidsenheten är semantiskt likvärdiga.
CRON cron_string [ AT TIME ZONE timezone_id ]
Så här schemalägger du en refresh med hjälp av ett kvarts cron- värde. Giltiga time_zone_values accepteras.
AT TIME ZONE LOCAL
stöds inte.Om
AT TIME ZONE
den saknas används tidszonen för sessionen. OmAT TIME ZONE
saknas och sessionens tidszon inte är setutlöses ett fel.SCHEDULE
är semantiskt likvärdigt medSCHEDULE REFRESH
.
Schemat kan anges som en del av
CREATE
kommandot. Använd kommandot ALTER STREAMING TABLE eller körCREATE OR REFRESH
medSCHEDULE
-satsen för att ändra schemat för en strömmande table efter skapandet.-
WITH ROW FILTER-klausul
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Lägger till en radfilterfunktion i table. Alla efterföljande frågor från den table tar emot en delmängd av raderna where som utvärderas av funktionen till boolesk TRUE. Detta kan vara användbart för detaljerad åtkomstkontroll where funktionen kan inspektera identitets- eller gruppmedlemskap för den anropande användaren för att avgöra om vissa rader ska filtreras.
-
Den här satsen fyller i table med hjälp av data från
query
. Den här frågan måste vara en direktuppspelningsfråga . Detta kan uppnås genom att lägga till nyckelordet iSTREAM
valfri relation som du vill bearbeta stegvis. När du anger enquery
och entable_specification
tillsammans måste tableschema som anges itable_specification
innehålla alla columns som returneras avquery
, annars get du ett fel. Alla columns som anges itable_specification
men som inte returneras avquery
returnerarnull
values när de efterfrågas.
Skillnader mellan strömmande tables och andra tables
Strömmande tables är tillståndsorienterade tables, utformade för att endast bearbeta varje rad en gång när du hanterar ett expanderande dataset. Eftersom de flesta datauppsättningar växer kontinuerligt över tid är strömmande tables bra för de flesta inmatningsarbetsbelastningar. Strömmande tables är optimala för pipelines som kräver data färskhet och låg svarstid. Streaming tables kan också vara användbart för omfattande transformationer i stor skala, eftersom resultaten kan beräknas inkrementellt när nya data anländer, vilket håller resultaten uppdaterade utan att behöva beräkna om alla källdata med varje update. Strömmande tables är utformade för datakällor som endast läggs till.
Strömmande tables accepterar ytterligare kommandon, till exempel REFRESH
, som bearbetar de senaste tillgängliga data i källorna i frågan. Ändringar i den angivna frågan get återspeglas på nya data genom att anropa REFRESH
som inte tidigare bearbetats. Om du även vill tillämpa ändringarna på befintliga data måste du köra REFRESH TABLE <table_name> FULL
för att utföra en FULL REFRESH
. Fullständiga uppdateringar bearbetar om alla data som är tillgängliga i källan med den senaste definitionen. Vi rekommenderar inte att du utför total uppdatering på källor som inte bevarar hela datahistoriken eller har korta kvarhållningsperioder, till exempel Kafka, eftersom hela refresh kan radera befintliga data. Du kanske inte kan återställa gamla data om data inte längre är tillgängliga i källan.
Radfilter och column-maskerna
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Med radfilter kan du ange en funktion som används som ett filter när en table genomsökning hämtar rader. Dessa filter säkerställer att efterföljande frågor endast returnerar rader som filterpredikatet utvärderas till sant för.
Column masker låter dig maskera en column:s values när en table genomsökning hämtar rader. Alla framtida frågor som rör den column får resultatet av utvärderingen av funktionen via columnoch ersätter columnursprungliga värde.
Mer information om hur du använder radfilter och column masker finns i Filtrera känsliga table data med hjälp av radfilter och column masker.
Hantera radfilter och Column masker
Radfilter och column-masker på direktuppspelning tables ska läggas till, uppdateras eller tas bort via CREATE OR REFRESH
-instruktionen.
Funktionssätt
-
Refresh som Definer: När
CREATE OR REFRESH
- ellerREFRESH
-uttrycken refresh en strömmande table, körs radfilterfunktionerna med definierarens rättigheter (som table-ägare). Det innebär att tablerefresh använder säkerhetskontexten för den användare som skapade det strömmande table. -
Fråga: De flesta filter körs med definierarens rättigheter, men funktioner som kontrollerar användarkontexten (till exempel
CURRENT_USER
ochIS_MEMBER
) är undantag. Dessa funktioner körs som anropare. Den här metoden tillämpar användarspecifika datasäkerhets- och åtkomstkontroller baserat på den aktuella användarens kontext.
Överskådlighet
Använd DESCRIBE EXTENDED
, INFORMATION_SCHEMA
eller Catalog Explorer för att undersöka befintliga radfilter och column-masker som gäller för en viss strömmande table. Den här funktionen gör det möjligt för användare att kontrollera och granska dataåtkomst och skyddsåtgärder för strömning tables.
Begränsningar
- Endast table innehavare kan refresh strömma tables till get de senaste uppgifterna.
-
ALTER TABLE
-kommandon är inte tillåtna för direktuppspelning tables. Definitionen och egenskaperna för table bör ändras via instruktionenCREATE OR REFRESH
eller ALTER STREAMING TABLE. - Utveckling av tableschema via DML-kommandon som
INSERT INTO
ochMERGE
stöds inte. - Följande kommandon stöds inte vid direktuppspelning tables:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
- Deltadelning stöds inte.
- Det går inte att byta namn på table eller ändra ägaren.
-
Table begränsningar som
PRIMARY KEY
ochFOREIGN KEY
stöds inte. - Genererade columns, identitet columnsoch standard columns stöds inte.
Exempel
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')