TWORZENIE TABELI PRZESYŁANIA STRUMIENIOWEGO
Dotyczy: Databricks SQL
Tworzy tabelę przesyłania strumieniowego, tabelę delty z dodatkową obsługą przesyłania strumieniowego lub przyrostowego przetwarzania danych.
Tabele przesyłania strumieniowego są obsługiwane tylko w tabelach delta live i w usłudze Databricks SQL z wykazem aparatu Unity. Uruchomienie tego polecenia w obsługiwanym środowisku Databricks Runtime oblicza tylko składnię. Zobacz Tworzenie kodu potoku przy użyciu języka SQL.
Składnia
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
Parametry
REFRESH
Jeśli zostanie określona, odświeża tabelę z najnowszymi danymi dostępnymi ze źródeł zdefiniowanych w zapytaniu. Tylko nowe dane, które docierają przed rozpoczęciem zapytania, są przetwarzane. Nowe dane dodawane do źródeł podczas wykonywania polecenia są ignorowane do następnego odświeżania. Operacja odświeżania z polecenia CREATE OR REFRESH jest w pełni deklaratywna. Jeśli polecenie odświeżania nie określa wszystkich metadanych z oryginalnej instrukcji tworzenia tabeli, nieokreślone metadane zostaną usunięte.
JEŚLI NIE ISTNIEJE
Tworzy tabelę przesyłania strumieniowego, jeśli nie istnieje. Jeśli tabela o tej nazwie już istnieje,
CREATE STREAMING TABLE
instrukcja jest ignorowana.Możesz określić co najwyżej jeden z
IF NOT EXISTS
elementów lubOR REFRESH
.-
Nazwa tabeli do utworzenia. Nazwa nie może zawierać specyfikacji czasowej ani specyfikacji opcji. Jeśli nazwa nie jest kwalifikowana, tabela zostanie utworzona w bieżącym schemacie.
table_specification
Ta klauzula opcjonalna definiuje listę kolumn, ich typów, właściwości, opisów i ograniczeń kolumn.
Jeśli nie zdefiniujesz kolumn w schemacie tabeli, musisz określić wartość
AS query
.-
Unikatowa nazwa kolumny.
-
Określa typ danych kolumny.
NOT NULL
Jeśli określono kolumnę, nie akceptuje
NULL
wartości.COLUMN_COMMENT KOMENTARZ
Literał ciągu opisujący kolumnę.
-
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Dodaje ograniczenie klucza podstawowego lub klucza obcego do kolumny w tabeli przesyłania strumieniowego. Ograniczenia nie są obsługiwane w przypadku tabel w wykazie
hive_metastore
. -
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Dodaje funkcję maski kolumn do anonimowości poufnych danych. Wszystkie kolejne zapytania z tej kolumny otrzymują wynik oceny tej funkcji w kolumnie zamiast oryginalnej wartości kolumny. Może to być przydatne w celach szczegółowej kontroli dostępu, w których funkcja może sprawdzić tożsamość lub członkostwo w grupach użytkownika wywołującego, aby zdecydować, czy zredagować wartość.
OGRANICZENIE EXPECTATION_NAME OCZEKIWANO (expectation_expr) [ PRZY NARUSZENIU { NIEPOWODZENIE AKTUALIZACJI | DROP ROW } ]
Dodaje oczekiwania dotyczące jakości danych do tabeli. Te oczekiwania dotyczące jakości danych można śledzić w czasie i uzyskiwać do nich dostęp za pośrednictwem dziennika zdarzeń tabeli przesyłania strumieniowego. Oczekiwanie
FAIL UPDATE
powoduje niepowodzenie przetwarzania podczas tworzenia tabeli, a także odświeżania tabeli. OczekiwanieDROP ROW
powoduje porzucenie całego wiersza, jeśli oczekiwanie nie zostanie spełnione.expectation_expr
Może składać się z literałów, identyfikatorów kolumn w tabeli oraz deterministycznych, wbudowanych funkcji LUB operatorów SQL, z wyjątkiem:- Agregujących
- Funkcje okna analitycznego
- Funkcje okna klasyfikacji
- Funkcje generatora wartości tabeli
Ponadto
expr
nie może zawierać żadnego podzapytania.- Agregujących
-
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Dodaje informacyjne podstawowe lub informacyjne ograniczenia klucza obcego do tabeli przesyłania strumieniowego. Ograniczenia klucza nie są obsługiwane w przypadku tabel w wykazie
hive_metastore
.
-
-
table_clauses
Opcjonalnie określ partycjonowanie, komentarze, właściwości zdefiniowane przez użytkownika i harmonogram odświeżania nowej tabeli. Każda klauzula podrzędna może być określona tylko raz.
-
Opcjonalna lista kolumn tabeli do partycjonowania tabeli według.
TABLE_COMMENT KOMENTARZ
Literał
STRING
opisujący tabelę.-
Opcjonalnie ustawia co najmniej jedną właściwość zdefiniowaną przez użytkownika.
Użyj tego ustawienia, aby określić kanał środowiska uruchomieniowego delta Live Tables używany do uruchamiania tej instrukcji. Ustaw wartość
pipelines.channel
właściwości na"PREVIEW"
lub"CURRENT"
. Domyślna wartość to"CURRENT"
. Aby uzyskać więcej informacji na temat kanałów delta live tables, zobacz Delta Live Tables runtime channels (Kanały środowiska uruchomieniowego delta Live Tables). SCHEDULE [ REFRESH ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Aby zaplanować odświeżanie okresowo, użyj
EVERY
składni. JeśliEVERY
określono składnię, tabela przesyłania strumieniowego lub zmaterializowany widok jest okresowo odświeżany w określonym interwale na podstawie podanej wartości, takiej jakHOUR
, ,HOURS
DAY
,DAYS
WEEK
lubWEEKS
. W poniższej tabeli wymieniono zaakceptowane wartości całkowite dla elementunumber
.Time unit Wartość całkowita HOUR or HOURS
1 <= H <= 72 DAY or DAYS
1 <= D <= 31 WEEK or WEEKS
1 <= W <= 8 Uwaga
Liczba pojedyncza i mnoga dołączonej jednostki czasowej są semantycznie równoważne.
CRON cron_string [ AT TIME ZONE timezone_id ]
Aby zaplanować odświeżanie przy użyciu wartości kronu kwarcowego. Akceptowane są prawidłowe time_zone_values .
AT TIME ZONE LOCAL
nie jest obsługiwana.Jeśli
AT TIME ZONE
jest nieobecny, używana jest strefa czasowa sesji. JeśliAT TIME ZONE
jest nieobecny, a strefa czasowa sesji nie jest ustawiona, zostanie zgłoszony błąd.SCHEDULE
jest semantycznie równoważne .SCHEDULE REFRESH
Harmonogram można podać w ramach
CREATE
polecenia . Użyj polecenia ALTER STREAMING TABLE lub uruchomCREATE OR REFRESH
polecenie z klauzulą ,SCHEDULE
aby zmienić harmonogram tabeli przesyłania strumieniowego po utworzeniu.WITH ROW FILTER, klauzula
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Dodaje do tabeli funkcję filtru wierszy. Wszystkie kolejne zapytania z tej tabeli otrzymują podzbiór wierszy, w których funkcja oblicza wartość logiczną TRUE. Może to być przydatne w celach szczegółowej kontroli dostępu, w których funkcja może sprawdzić tożsamość lub członkostwo w grupach użytkownika wywołującego, aby zdecydować, czy filtrować niektóre wiersze.
-
-
Ta klauzula wypełnia tabelę przy użyciu danych z
query
. To zapytanie musi być zapytaniem przesyłanym strumieniowo. Można to osiągnąć, dodającSTREAM
słowo kluczowe do dowolnej relacji, którą chcesz przetwarzać przyrostowo. Po określeniuquery
elementu itable_specification
razem schemat tabeli określony wtable_specification
elemecie musi zawierać wszystkie kolumny zwrócone przezquery
element , w przeciwnym razie zostanie wyświetlony błąd. Wszystkie kolumny określone w elemecietable_specification
, ale nie są zwracane przezquery
wartości zwracanenull
podczas wykonywania zapytania.
Różnice między tabelami przesyłania strumieniowego a innymi tabelami
Tabele przesyłania strumieniowego to tabele stanowe, przeznaczone do obsługi każdego wiersza tylko raz podczas przetwarzania rosnącego zestawu danych. Ponieważ większość zestawów danych stale rośnie wraz z upływem czasu, tabele przesyłania strumieniowego są dobre dla większości obciążeń pozyskiwania. Tabele przesyłania strumieniowego są optymalne dla potoków, które wymagają świeżości danych i małych opóźnień. Tabele przesyłania strumieniowego mogą być również przydatne w przypadku transformacji na dużą skalę, ponieważ wyniki mogą być obliczane przyrostowo w miarę nadejścia nowych danych, zapewniając aktualność wyników bez konieczności pełnej ponownej kompilacji wszystkich danych źródłowych przy każdej aktualizacji. Tabele przesyłania strumieniowego są przeznaczone dla źródeł danych, które są tylko dołączane.
Tabele przesyłania strumieniowego akceptują dodatkowe polecenia, takie jak REFRESH
, które przetwarzają najnowsze dane dostępne w źródłach podanych w zapytaniu. Zmiany w podanym zapytaniu są odzwierciedlane tylko na nowych danych przez wywołanie REFRESH
elementu , który nie został wcześniej przetworzony. Aby zastosować zmiany w istniejących danych, należy wykonać REFRESH TABLE <table_name> FULL
polecenie , aby wykonać polecenie FULL REFRESH
. Pełne odświeżenia ponownie przetwarzają wszystkie dane dostępne w źródle przy użyciu najnowszej definicji. Nie zaleca się wywoływania pełnych odświeżeń w źródłach, które nie przechowują całej historii danych lub mają krótkie okresy przechowywania, takie jak Kafka, ponieważ pełne odświeżanie obcina istniejące dane. Odzyskanie starych danych może nie być możliwe, jeśli dane nie są już dostępne w źródle.
Filtry wierszy i maski kolumn
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Filtry wierszy umożliwiają określenie funkcji, która ma zastosowanie jako filtr za każdym razem, gdy skanowanie tabeli pobiera wiersze. Te filtry zapewniają, że kolejne zapytania zwracają tylko wiersze, dla których predykat filtru daje wartość true.
Maski kolumn umożliwiają maskowanie wartości kolumny za każdym razem, gdy skanowanie tabeli pobiera wiersze. Wszystkie przyszłe zapytania obejmujące tę kolumnę otrzymają wynik oceny funkcji w kolumnie, zastępując oryginalną wartość kolumny.
Aby uzyskać więcej informacji na temat używania filtrów wierszy i masek kolumn, zobacz Filtrowanie poufnych danych tabeli przy użyciu filtrów wierszy i masek kolumn.
Zarządzanie filtrami wierszy i maskami kolumn
Filtry wierszy i maski kolumn w tabelach przesyłania strumieniowego powinny być dodawane, aktualizowane lub porzucane za pomocą instrukcji CREATE OR REFRESH
.
Zachowanie
- Odśwież jako definiuj: gdy
CREATE OR REFRESH
instrukcje lubREFRESH
odświeżą tabelę strumieniową, funkcje filtrowania wierszy są uruchamiane z prawami definiowanego (jako właściciel tabeli). Oznacza to, że odświeżanie tabeli używa kontekstu zabezpieczeń użytkownika, który utworzył tabelę przesyłania strumieniowego. - Zapytanie: Podczas gdy większość filtrów jest uruchamiana z prawami definiowanego, funkcje sprawdzające kontekst użytkownika (takie jak
CURRENT_USER
iIS_MEMBER
) są wyjątkami. Te funkcje działają jako wywołanie. Takie podejście wymusza zabezpieczenia danych specyficzne dla użytkownika i mechanizmy kontroli dostępu na podstawie kontekstu bieżącego użytkownika.
Wgląd w informacje
INFORMATION_SCHEMA
Użyj DESCRIBE EXTENDED
, lub Eksploratora wykazu, aby zbadać istniejące filtry wierszy i maski kolumn, które mają zastosowanie do danej tabeli przesyłania strumieniowego. Ta funkcja umożliwia użytkownikom przeprowadzanie inspekcji i przeglądania środków dostępu do danych i ochrony w tabelach przesyłania strumieniowego.
Ograniczenia
Tylko właściciele tabel mogą odświeżać tabele przesyłania strumieniowego, aby uzyskać najnowsze dane.
ALTER TABLE
polecenia są niedozwolone w tabelach przesyłania strumieniowego. Definicja i właściwości tabeli powinny zostać zmienione za pomocąCREATE OR REFRESH
instrukcji ALTER STREAMING TABLE .Zapytania dotyczące podróży w czasie nie są obsługiwane.
Ewolucja schematu tabeli za pomocą poleceń DML, takich jak
INSERT INTO
, iMERGE
nie jest obsługiwana.Następujące polecenia nie są obsługiwane w tabelach przesyłania strumieniowego:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Udostępnianie różnicowe nie jest obsługiwane.
Zmiana nazwy tabeli lub zmiana właściciela nie jest obsługiwana.
Ograniczenia tabeli, takie jak
PRIMARY KEY
iFOREIGN KEY
nie są obsługiwane.Wygenerowane kolumny, kolumny tożsamości i kolumny domyślne nie są obsługiwane.
Przykłady
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')