STREAMINGTABEL MAKEN
Van toepassing op: Databricks SQL
Hiermee maakt u een streamingtabel, een Delta-tabel met extra ondersteuning voor streaming of incrementele gegevensverwerking.
Streamingtabellen worden alleen ondersteund in Delta Live Tables en in Databricks SQL met Unity Catalog. Als u deze opdracht uitvoert op ondersteunde Databricks Runtime-berekening, wordt de syntaxis alleen geparseerd. Zie Pijplijncode ontwikkelen met SQL.
Syntaxis
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
Parameters
OPFRISSEN
Indien opgegeven, vernieuwt u de tabel met de meest recente gegevens die beschikbaar zijn vanuit de bronnen die in de query zijn gedefinieerd. Alleen nieuwe gegevens die binnenkomen voordat de query wordt gestart, worden verwerkt. Nieuwe gegevens die worden toegevoegd aan de bronnen tijdens de uitvoering van de opdracht, worden genegeerd tot de volgende vernieuwing. De vernieuwingsbewerking van CREATE OR REFRESH is volledig declaratief. Als met een vernieuwingsopdracht niet alle metagegevens uit de oorspronkelijke instructie voor het maken van tabellen worden opgegeven, worden de niet-opgegeven metagegevens verwijderd.
ALS DEZE NIET BESTAAT
Hiermee maakt u de streamingtabel als deze niet bestaat. Als er al een tabel met deze naam bestaat, wordt de
CREATE STREAMING TABLE
instructie genegeerd.U kunt maximaal één van
IF NOT EXISTS
ofOR REFRESH
.-
De naam van de tabel die moet worden gemaakt. De naam mag geen tijdelijke specificatie of optiesspecificatie bevatten. Als de naam niet is gekwalificeerd, wordt de tabel gemaakt in het huidige schema.
table_specification
Deze optionele component definieert de lijst met kolommen, hun typen, eigenschappen, beschrijvingen en kolombeperkingen.
Als u geen kolommen in het tabelschema definieert, moet u opgeven
AS query
.-
Een unieke naam voor de kolom.
-
Hiermee geeft u het gegevenstype van de kolom.
NIET NULL
Als de kolom is opgegeven, worden geen waarden geaccepteerd
NULL
.OPMERKING column_comment
Een letterlijke tekenreeks om de kolom te beschrijven.
-
Belangrijk
Deze functie is beschikbaar als openbare preview.
Hiermee voegt u een primaire sleutel of refererende sleutelbeperking toe aan de kolom in een streamingtabel. Beperkingen worden niet ondersteund voor tabellen in de
hive_metastore
catalogus. -
Belangrijk
Deze functie is beschikbaar als openbare preview.
Voegt een kolommaskerfunctie toe om gevoelige gegevens anoniem te maken. Alle volgende query's van die kolom ontvangen het resultaat van het evalueren van die functie ten opzichte van de kolom in plaats van de oorspronkelijke waarde van de kolom. Dit kan handig zijn voor verfijnde toegangsbeheerdoeleinden, waarbij de functie de identiteit of groepslidmaatschappen van de aanroepende gebruiker kan inspecteren om te bepalen of de waarde moet worden bewerkt.
CONSTRAINT expectation_name EXPECT (expectation_expr) [ BIJ SCHENDING { FAIL UPDATE | DROP ROW } ]
Voegt verwachtingen voor gegevenskwaliteit toe aan de tabel. Deze verwachtingen voor gegevenskwaliteit kunnen in de loop van de tijd worden bijgehouden en worden geopend via het gebeurtenislogboek van de streamingtabel. Een
FAIL UPDATE
verwachting zorgt ervoor dat de verwerking mislukt bij het maken van de tabel en het vernieuwen van de tabel. EenDROP ROW
verwachting zorgt ervoor dat de hele rij wordt verwijderd als niet aan de verwachting wordt voldaan.expectation_expr
kan bestaan uit letterlijke waarden, kolom-id's in de tabel en deterministische, ingebouwde SQL-functies of -operators, met uitzondering van:- Statistische functies
- Analytische vensterfuncties
- Classificatievensterfuncties
- Generatorfuncties met tabelwaarde
Mag ook
expr
geen subquery bevatten.- Statistische functies
-
Belangrijk
Deze functie is beschikbaar als openbare preview.
Voegt een informatieve primaire sleutel of informatieve refererende sleutelbeperkingen toe aan een streamingtabel. Sleutelbeperkingen worden niet ondersteund voor tabellen in de
hive_metastore
catalogus.
-
-
table_clauses
Geef desgewenst partitionering, opmerkingen, door de gebruiker gedefinieerde eigenschappen en een vernieuwingsschema voor de nieuwe tabel op. Elke subcomponent mag slechts eenmaal worden opgegeven.
-
Een optionele lijst met kolommen van de tabel om de tabel te partitioneren op.
OPMERKING table_comment
Een
STRING
letterlijke om de tabel te beschrijven.-
U kunt desgewenst een of meer door de gebruiker gedefinieerde eigenschappen instellen.
Gebruik deze instelling om het runtimekanaal van Delta Live Tables op te geven dat wordt gebruikt om deze instructie uit te voeren. Stel de waarde van de
pipelines.channel
eigenschap in op"PREVIEW"
of"CURRENT"
. De standaardwaarde is"CURRENT"
. Zie Delta Live Tables Runtime-kanalen voor meer informatie over Delta Live Tables-kanalen. PLANNING [ VERNIEUWEN ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
Belangrijk
Deze functie is beschikbaar als openbare preview.
Als u een vernieuwing wilt plannen die periodiek plaatsvindt, gebruikt
EVERY
u de syntaxis. AlsEVERY
de syntaxis is opgegeven, wordt de streamingtabel of gerealiseerde weergave periodiek vernieuwd met het opgegeven interval op basis van de opgegeven waarde, zoalsHOUR
,HOURS
,DAY
,DAYS
, , ofWEEKS
WEEK
. De volgende tabel bevat geaccepteerde gehele getallen voornumber
.Time unit Integerwaarde HOUR or HOURS
1 <= H <= 72 DAY or DAYS
1 <= D <= 31 WEEK or WEEKS
1 <= W <= 8 Notitie
De enkelvoudige en meervoudvormen van de opgenomen tijdseenheid zijn semantisch gelijkwaardig.
CRON cron_string [ AT TIME ZONE timezone_id ]
Een vernieuwing plannen met een kwarts cron-waarde . Geldige time_zone_values worden geaccepteerd.
AT TIME ZONE LOCAL
wordt niet ondersteund.Als
AT TIME ZONE
deze afwezig is, wordt de sessietijdzone gebruikt. AlsAT TIME ZONE
deze afwezig is en de sessietijdzone niet is ingesteld, wordt er een fout gegenereerd.SCHEDULE
is semantisch gelijk aanSCHEDULE REFRESH
.
De planning kan worden opgegeven als onderdeel van de
CREATE
opdracht. Gebruik ALTER STREAMING TABLE of voerCREATE OR REFRESH
een opdracht uit met eenSCHEDULE
component om het schema van een streamingtabel te wijzigen nadat deze is gemaakt.WITH ROW FILTER component
Belangrijk
Deze functie is beschikbaar als openbare preview.
Hiermee voegt u een rijfilterfunctie toe aan de tabel. Alle volgende query's uit die tabel ontvangen een subset van de rijen waarin de functie booleaanse WAAR oplevert. Dit kan handig zijn voor verfijnde toegangsbeheerdoeleinden, waarbij de functie de identiteit of groepslidmaatschappen van de aanroepende gebruiker kan inspecteren om te bepalen of bepaalde rijen moeten worden gefilterd.
-
-
Met deze component wordt de tabel gevuld met behulp van de gegevens uit
query
. Deze query moet een streamingquery zijn. Dit kan worden bereikt door hetSTREAM
trefwoord toe te voegen aan elke relatie die u incrementeel wilt verwerken. Wanneer u eenquery
en eentable_specification
samen opgeeft, moet het opgegeventable_specification
tabelschema alle kolommen bevatten die worden geretourneerd door dequery
, anders krijgt u een foutmelding. Kolommen die zijn opgegeven intable_specification
maar niet worden geretourneerd doorquery
retourwaardennull
wanneer er query's worden uitgevoerd.
Verschillen tussen streamingtabellen en andere tabellen
Streamingtabellen zijn stateful tabellen, ontworpen om elke rij slechts eenmaal te verwerken wanneer u een groeiende gegevensset verwerkt. Omdat de meeste gegevenssets in de loop van de tijd continu groeien, zijn streamingtabellen geschikt voor de meeste opnameworkloads. Streamingtabellen zijn optimaal voor pijplijnen waarvoor nieuwe gegevens en lage latentie nodig zijn. Streamingtabellen kunnen ook handig zijn voor grootschalige transformaties, omdat resultaten incrementeel kunnen worden berekend wanneer nieuwe gegevens binnenkomen, zodat de resultaten up-to-date blijven zonder dat alle brongegevens volledig hoeven te worden gecomputeerd met elke update. Streamingtabellen zijn ontworpen voor gegevensbronnen die alleen worden toegevoegd.
Streamingtabellen accepteren aanvullende opdrachten, zoals REFRESH
, waarmee de meest recente gegevens worden verwerkt die beschikbaar zijn in de bronnen die in de query zijn opgegeven. Wijzigingen in de opgegeven query worden alleen doorgevoerd in nieuwe gegevens door een REFRESH
, niet eerder verwerkte gegevens aan te roepen. Als u ook de wijzigingen op bestaande gegevens wilt toepassen, moet u uitvoeren REFRESH TABLE <table_name> FULL
om een FULL REFRESH
. Alle gegevens die beschikbaar zijn in de bron, worden opnieuw verwerkt met de meest recente definitie. Het is niet raadzaam om volledige vernieuwingen aan te roepen voor bronnen die de volledige geschiedenis van de gegevens niet behouden of korte bewaarperioden hebben, zoals Kafka, omdat de volledige vernieuwing de bestaande gegevens afkapt. Mogelijk kunt u oude gegevens niet herstellen als de gegevens niet meer beschikbaar zijn in de bron.
Rijfilters en kolommaskers
Belangrijk
Deze functie is beschikbaar als openbare preview.
Met rijfilters kunt u een functie opgeven die als filter wordt toegepast wanneer een tabelscan rijen ophaalt. Deze filters zorgen ervoor dat volgende query's alleen rijen retourneren waarvoor het filterpredicaat waar wordt geëvalueerd.
Met kolommaskers kunt u de waarden van een kolom maskeren wanneer een tabelscan rijen ophaalt. Alle toekomstige query's met betrekking tot die kolom ontvangen het resultaat van de evaluatie van de functie over de kolom, waarbij de oorspronkelijke waarde van de kolom wordt vervangen.
Rijfilters en kolommaskers beheren
Rijfilters en kolommaskers in streamingtabellen moeten worden toegevoegd, bijgewerkt of verwijderd door de CREATE OR REFRESH
instructie.
Gedrag
- Vernieuwen als definitie: wanneer de
CREATE OR REFRESH
ofREFRESH
instructies een streamingtabel vernieuwen, worden rijfilterfuncties uitgevoerd met de rechten van de definitier (als eigenaar van de tabel). Dit betekent dat de tabelvernieuwing gebruikmaakt van de beveiligingscontext van de gebruiker die de streamingtabel heeft gemaakt. - Query: Terwijl de meeste filters worden uitgevoerd met de rechten van de definieerer, zijn functies die gebruikerscontext controleren (zoals
CURRENT_USER
enIS_MEMBER
) uitzonderingen. Deze functies worden uitgevoerd als de aanroeper. Deze aanpak dwingt gebruikersspecifieke gegevensbeveiliging en toegangsbeheer af op basis van de context van de huidige gebruiker.
Waarneembaarheid
Gebruik DESCRIBE EXTENDED
, INFORMATION_SCHEMA
of Catalog Explorer om de bestaande rijfilters en kolommaskers te onderzoeken die van toepassing zijn op een bepaalde streamingtabel. Met deze functionaliteit kunnen gebruikers gegevenstoegang en beveiligingsmaatregelen voor streamingtabellen controleren en controleren.
Beperkingen
Alleen eigenaren van tabellen kunnen streamingtabellen vernieuwen om de meest recente gegevens op te halen.
ALTER TABLE
opdrachten zijn niet toegestaan voor streamingtabellen. De definitie en eigenschappen van de tabel moeten worden gewijzigd via deCREATE OR REFRESH
instructie ALTER STREAMING TABLE .Query's voor tijdreizen worden niet ondersteund.
Het tabelschema ontwikkelen via DML-opdrachten zoals
INSERT INTO
enMERGE
wordt niet ondersteund.De volgende opdrachten worden niet ondersteund voor streamingtabellen:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Delta Sharing wordt niet ondersteund.
Het wijzigen van de naam van de tabel of het wijzigen van de eigenaar wordt niet ondersteund.
Tabelbeperkingen zoals
PRIMARY KEY
enFOREIGN KEY
worden niet ondersteund.Gegenereerde kolommen, identiteitskolommen en standaardkolommen worden niet ondersteund.
Voorbeelden
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')