SKAPA DIREKTUPPSPELNINGSTABELL
Gäller för: Databricks SQL
Skapar en strömmande tabell, en Delta-tabell med extra stöd för direktuppspelning eller inkrementell databearbetning.
Direktuppspelningstabeller stöds endast i Delta Live Tables och i Databricks SQL med Unity Catalog. Om du kör det här kommandot på Databricks Runtime-beräkning som stöds parsas endast syntaxen. Se Utveckla pipelinekod med SQL.
Syntax
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
Parametrar
SVALKA
Om det anges uppdaterar tabellen med de senaste tillgängliga data från källorna som definierats i frågan. Endast nya data som tas emot innan frågan startar bearbetas. Nya data som läggs till i källorna under körningen av kommandot ignoreras till nästa uppdatering. Uppdateringsåtgärden från CREATE OR REFRESH är helt deklarativ. Om ett uppdateringskommando inte anger alla metadata från den ursprungliga tabellskapandet tas de ospecificerade metadata bort.
OM INTE FINNS
Skapar strömningstabellen om den inte finns. Om det redan finns en tabell med det här namnet ignoreras -instruktionen
CREATE STREAMING TABLE
.Du kan ange högst en av
IF NOT EXISTS
ellerOR REFRESH
.-
Namnet på tabellen som ska skapas. Namnet får inte innehålla en temporal specifikation eller alternativspecifikation. Om namnet inte är kvalificerat skapas tabellen i det aktuella schemat.
table_specification
Den här valfria satsen definierar listan över kolumner, deras typer, egenskaper, beskrivningar och kolumnbegränsningar.
Om du inte definierar kolumner i tabellschemat måste du ange
AS query
.-
Ett unikt namn för kolumnen.
-
Anger kolumnens datatyp .
INTE NULL
Om det anges accepterar
NULL
kolumnen inte värden.KOMMENTAR column_comment
En strängliteral som beskriver kolumnen.
-
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Lägger till en primärnyckel eller sekundärnyckelbegränsning i kolumnen i en strömmande tabell. Begränsningar stöds inte för tabeller i
hive_metastore
katalogen. -
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Lägger till en kolumnmaskfunktion för att anonymisera känsliga data. Alla efterföljande frågor från den kolumnen får resultatet av utvärderingen av funktionen över kolumnen i stället för kolumnens ursprungliga värde. Detta kan vara användbart för detaljerad åtkomstkontroll där funktionen kan kontrollera identitets- eller gruppmedlemskapen för den anropande användaren för att avgöra om värdet ska redigeras.
BEGRÄNSNING expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | SLÄPP RAD } ]
Lägger till datakvalitetsförväntningar i tabellen. Dessa förväntningar på datakvalitet kan spåras över tid och nås via strömningstabellens händelselogg. En
FAIL UPDATE
förväntan gör att bearbetningen misslyckas när både tabellen skapas och tabellen uppdateras. EnDROP ROW
förväntan gör att hela raden tas bort om förväntningarna inte uppfylls.expectation_expr
kan bestå av literaler, kolumnidentifierare i tabellen och deterministiska, inbyggda SQL-funktioner eller operatorer förutom:- Mängdfunktioner
- Analysfönsterfunktioner
- Funktioner för rangordningsfönster
- Generatorfunktioner för tabellvärde
Får inte heller
expr
innehålla någon underfråga.- Mängdfunktioner
-
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Lägger till en informations primärnyckel eller informationsmässiga begränsningar för sekundärnyckel i en strömmande tabell. Viktiga begränsningar stöds inte för tabeller i
hive_metastore
katalogen.
-
-
table_clauses
Du kan också ange partitionering, kommentarer, användardefinierade egenskaper och ett uppdateringsschema för den nya tabellen. Varje undersats kan bara anges en gång.
-
En valfri lista över kolumner i tabellen som tabellen ska partitioneras efter.
KOMMENTAR table_comment
En
STRING
literal för att beskriva tabellen.-
Du kan också ange en eller flera användardefinierade egenskaper.
Använd den här inställningen om du vill ange den Delta Live Tables-körningskanal som används för att köra den här instruktionen. Ange värdet för
pipelines.channel
egenskapen till"PREVIEW"
eller"CURRENT"
. Standardvärdet är"CURRENT"
. Mer information om Delta Live Tables-kanaler finns i Delta Live Tables-körningskanaler. SCHEMA [ UPPDATERA ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Om du vill schemalägga en uppdatering som sker regelbundet använder du
EVERY
syntax. OmEVERY
syntax anges uppdateras strömningstabellen eller den materialiserade vyn regelbundet med det angivna intervallet baserat på det angivna värdet, till exempelHOUR
,HOURS
,DAY
,DAYS
,WEEK
ellerWEEKS
. I följande tabell visas godkända heltalsvärden förnumber
.Time unit Heltalsvärde HOUR or HOURS
1 <= H <= 72 DAY or DAYS
1 <= D <= 31 WEEK or WEEKS
1 <= W <= 8 Kommentar
Singular- och pluralformerna i den inkluderade tidsenheten är semantiskt likvärdiga.
CRON cron_string [ AT TIME ZONE timezone_id ]
Så här schemalägger du en uppdatering med hjälp av ett quartz cron-värde . Giltiga time_zone_values accepteras.
AT TIME ZONE LOCAL
stöds inte.Om
AT TIME ZONE
den saknas används tidszonen för sessionen. OmAT TIME ZONE
är frånvarande och sessionens tidszon inte har angetts utlöses ett fel.SCHEDULE
är semantiskt likvärdigt medSCHEDULE REFRESH
.
Schemat kan anges som en del av
CREATE
kommandot. Använd ALTER STREAMING TABLE eller körCREATE OR REFRESH
kommandot medSCHEDULE
-satsen för att ändra schemat för en strömmande tabell när du har skapat den.MED ROW FILTER-sats
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Lägger till en radfilterfunktion i tabellen. Alla efterföljande frågor från tabellen tar emot en delmängd av de rader där funktionen utvärderas till boolesk TRUE. Detta kan vara användbart för detaljerad åtkomstkontroll där funktionen kan kontrollera identitets- eller gruppmedlemskapen för den anropande användaren för att avgöra om vissa rader ska filtreras.
-
-
Den här satsen fyller i tabellen med hjälp av data från
query
. Den här frågan måste vara en direktuppspelningsfråga . Detta kan uppnås genom att lägga till nyckelordet iSTREAM
valfri relation som du vill bearbeta stegvis. När du anger enquery
och entable_specification
tillsammans måste tabellschemat som anges itable_specification
innehålla alla kolumner som returneras avquery
, annars får du ett fel. Alla kolumner som anges itable_specification
men inte returneras avquery
returvärdennull
när du frågar.
Skillnader mellan strömmande tabeller och andra tabeller
Direktuppspelningstabeller är tillståndskänsliga tabeller som endast är utformade för att hantera varje rad en gång när du bearbetar en växande datauppsättning. Eftersom de flesta datauppsättningar växer kontinuerligt över tid är strömmande tabeller bra för de flesta inmatningsarbetsbelastningar. Strömmande tabeller är optimala för pipelines som kräver data färskhet och låg svarstid. Strömningstabeller kan också vara användbara för omfattande skalningstransformeringar, eftersom resultaten kan beräknas stegvis när nya data anländer, vilket håller resultaten uppdaterade utan att helt omkomplera alla källdata med varje uppdatering. Strömmande tabeller är utformade för datakällor som endast är tilläggstabeller.
Strömmande tabeller accepterar ytterligare kommandon, till exempel REFRESH
, som bearbetar de senaste data som är tillgängliga i de källor som anges i frågan. Ändringar i den angivna frågan återspeglas bara på nya data genom att anropa en REFRESH
, inte tidigare bearbetade data. Om du även vill tillämpa ändringarna på befintliga data måste du köra REFRESH TABLE <table_name> FULL
för att utföra en FULL REFRESH
. Fullständiga uppdateringar bearbetar om alla data som är tillgängliga i källan med den senaste definitionen. Vi rekommenderar inte att du anropar fullständiga uppdateringar på källor som inte behåller hela datahistoriken eller har korta kvarhållningsperioder, till exempel Kafka, eftersom den fullständiga uppdateringen trunkerar befintliga data. Du kanske inte kan återställa gamla data om data inte längre är tillgängliga i källan.
Radfilter och kolumnmasker
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Med radfilter kan du ange en funktion som tillämpas som ett filter när en tabellgenomsökning hämtar rader. Dessa filter säkerställer att efterföljande frågor endast returnerar rader som filterpredikatet utvärderas till sant för.
Med kolumnmasker kan du maskera en kolumns värden när en tabellgenomsökning hämtar rader. Alla framtida frågor som rör den kolumnen får resultatet av utvärderingen av funktionen över kolumnen och ersätter kolumnens ursprungliga värde.
Mer information om hur du använder radfilter och kolumnmasker finns i Filtrera känsliga tabelldata med hjälp av radfilter och kolumnmasker.
Hantera radfilter och kolumnmasker
Radfilter och kolumnmasker i strömmande tabeller ska läggas till, uppdateras eller tas bort via -instruktionen CREATE OR REFRESH
.
Funktionssätt
- Uppdatera som definierare: När -instruktionen
CREATE OR REFRESH
ellerREFRESH
uppdaterar en strömmande tabell körs radfilterfunktionerna med definierarens rättigheter (som tabellägare). Det innebär att tabelluppdateringen använder säkerhetskontexten för den användare som skapade strömningstabellen. - Fråga: De flesta filter körs med definierarens rättigheter, men funktioner som kontrollerar användarkontexten (till exempel
CURRENT_USER
ochIS_MEMBER
) är undantag. Dessa funktioner körs som anropare. Den här metoden tillämpar användarspecifika datasäkerhets- och åtkomstkontroller baserat på den aktuella användarens kontext.
Överskådlighet
Använd DESCRIBE EXTENDED
, INFORMATION_SCHEMA
eller Katalogutforskaren för att undersöka befintliga radfilter och kolumnmasker som gäller för en viss strömmande tabell. Med den här funktionen kan användare granska och granska dataåtkomst och skyddsåtgärder för strömmande tabeller.
Begränsningar
Endast tabellägare kan uppdatera strömmande tabeller för att hämta de senaste data.
ALTER TABLE
kommandon tillåts inte för strömmande tabeller. Tabellens definition och egenskaper bör ändras via instruktionenCREATE OR REFRESH
ELLER ALTER STREAMING TABLE .Frågor om tidsresor stöds inte.
Utveckla tabellschemat via DML-kommandon som
INSERT INTO
, ochMERGE
stöds inte.Följande kommandon stöds inte i strömmande tabeller:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Deltadelning stöds inte.
Det går inte att byta namn på tabellen eller ändra ägaren.
Tabellbegränsningar som
PRIMARY KEY
ochFOREIGN KEY
stöds inte.Genererade kolumner, identitetskolumner och standardkolumner stöds inte.
Exempel
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')