Condividi tramite


CREATE STREAMING TABLE

Si applica a:segno di spunta sì Databricks SQL

Crea uno streaming table, Delta table con supporto aggiuntivo per lo streaming o l'elaborazione incrementale dei dati.

Streaming tables è supportato solo in Delta Live Tables e su Databricks SQL con Unity Catalog. L'esecuzione di questo comando nel calcolo di Databricks Runtime supportato analizza solo la sintassi. Vedere Sviluppare codice della pipeline con SQL.

Sintassi

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

Parameters

  • REFRESH

    Se viene specificato, aggiorna il table con i dati più recenti disponibili dalle fonti definite nella query. Vengono elaborati solo i nuovi dati che arrivano prima dell'avvio della query. Durante l'esecuzione del comando, i nuovi dati che vengono aggiunti alle origini sono ignorati fino al successivo refresh. L'operazione refresh di CREATE OR REFRESH è completamente dichiarativa. Se un comando refresh non specifica tutti i metadati dell'istruzione di creazione table originale, i metadati non specificati vengono eliminati.

  • SE NON ESISTE

    Crea lo streaming table se non esiste già. Se esiste già un table con questo nome, l'istruzione CREATE STREAMING TABLE viene ignorata.

    È possibile specificare al massimo uno di IF NOT EXISTS o OR REFRESH.

  • table_name

    Nome del table da creare. Il nome non deve includere una specifica temporale o una specifica delle opzioni. Se il nome non è qualificato, il table viene creato nel schemacorrente.

  • table_specification

    Questa clausola facoltativa definisce il list di columns, i relativi tipi, proprietà, descrizioni e vincoli column.

    Se non si definiscono columns nel tableschema è necessario specificare AS query.

    • column_identifier

      Nome univoco per il column.

      • column_type

        Specifica il tipo di dati del column.

      • NOT NULL

        Se specificato il column non accetta NULLvalues.

      • COMMENT column_comment

        Stringa letterale per descrivere il column.

      • column_constraint

        Importante

        Questa funzionalità è disponibile in anteprima pubblica.

        Aggiunge una chiave primaria o una chiave esterna constraint al column in un flusso table. Vincoli non sono supportati per tables nel hive_metastorecatalog.

      • Clausola MASK

        Importante

        Questa funzionalità è disponibile in anteprima pubblica.

        Aggiunge una funzione maschera column per rendere anonimi i dati sensibili. Tutte le query successive da tale column ricevono il risultato della valutazione di tale funzione sul column al posto del valore originale del column. Ciò può essere utile per scopi di controllo degli accessi con granularità fine. where La funzione può esaminare l'identità o le appartenenze ai gruppi dell'utente che la richiama per decidere se oscurare il valore.

      • CONSTRAINT nome_attesa ATTENDI (espressione_attesa) [ SU VIOLAZIONE { FALLIMENTO UPDATE | RIDUCI RIGA } ]

        Aggiunge requisiti di qualità dei dati al table. Queste aspettative sulla qualità dei dati possono essere monitorate nel tempo e possono essere accessibili tramite il registro eventi di streaming tabledi . Una previsione di FAIL UPDATE causa l'esito negativo dell'elaborazione durante la creazione del table e l'aggiornamento del table. Se l'aspettativa DROP ROW non viene soddisfatta, l'intera riga viene eliminata.

        expectation_expr può essere costituito da valori letterali, identificatori column all'interno di tablee da funzioni o operatori SQL deterministici e integrati, ad eccezione di:

        Inoltre expr, non deve contenere alcuna sottoquery.

      • table_constraint

        Importante

        Questa funzionalità è disponibile in anteprima pubblica.

        Aggiunge una chiave primaria informativa o vincoli informativi di chiave esterna a uno streaming table. I vincoli di chiave non sono supportati per tables nel hive_metastorecatalog.

  • table_clauses

    Se lo desideri, specifica il partizionamento, i commenti, le proprietà definite dall'utente e una pianificazione refresh per il nuovo table. Ogni clausola secondaria può essere specificata una sola volta.

    • PARTIZIONATO DA

      Un list facoltativo di columns del table per partition il table.

    • COMMENT table_comment

      Valore letterale STRING per descrivere il table.

    • TBLPROPERTIES

      Facoltativamente, imposta una o più proprietà definite dall'utente.

      Usare questa impostazione per specificare il canale di runtime Delta Live Tables utilizzato per eseguire questa istruzione. Set il valore della proprietà pipelines.channel per "PREVIEW" o "CURRENT". Il valore predefinito è "CURRENT". Per ulteriori informazioni sui canali Delta Live Tables, vedere i canali runtime Delta Live Tables.

    • SCHEDULE [ REFRESH ] schedule_clause

    • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

      Per pianificare un refresh che si verifica periodicamente, utilizzare la sintassi EVERY. Se si specifica la sintassi EVERY, lo streaming table o la vista materializzata viene aggiornata periodicamente secondo l'intervallo specificato basato sul valore fornito, ad esempio HOUR, HOURS, DAY, DAYS, WEEKo WEEKS. Nell'table seguente sono elencati i values integer accettati per number.

      Time unit Valore intero
      HOUR or HOURS 1 <= H <= 72
      DAY or DAYS 1 <= D <= 31
      WEEK or WEEKS 1 <= W <= 8

      Nota

      Le forme singolari e plurali dell'unità temporale inclusa sono semanticamente equivalenti.

    • CRON cron_string [ AT TIME ZONE timezone_id ]

      Per pianificare un refresh utilizzando un valore cron . Vengono accettati time_zone_values validi. AT TIME ZONE LOCAL non è supportata.

      Se AT TIME ZONE è assente, viene usato il fuso orario della sessione. Se AT TIME ZONE è assente e il fuso orario della sessione non è set, viene generato un errore. SCHEDULE è semanticamente equivalente a SCHEDULE REFRESH.

    La pianificazione può essere specificata come parte del comando CREATE. Usare ALTER STREAMING TABLE o eseguire il comando CREATE OR REFRESH con la clausola SCHEDULE per modificare la pianificazione di un table di streaming dopo la creazione.

  • CON ROW FILTER clausola

    Importante

    Questa funzionalità è disponibile in anteprima pubblica.

    Aggiunge una funzione di filtro di riga al table. Tutte le query successive da tale table ricevono un subset delle righe where per cui la funzione è valutata come TRUE booleano. Ciò può essere utile per scopi di controllo di accesso con granularità fine where la funzione può controllare l'identità o le appartenenze ai gruppi dell'utente richiamando per decidere se filtrare determinate righe.

  • Query AS

    Questa clausola popola il table usando i dati di query. Questa query deve essere una query di streaming. A tale scopo, è possibile aggiungere la parola chiave STREAM a qualsiasi relazione da elaborare in modo incrementale. Quando si specifica un query e un table_specification insieme, il tableschema specificato in table_specification deve contenere tutte le columns restituite dal query, altrimenti si get un errore. Qualsiasi columns specificato in table_specification ma non restituito da query restituisce nullvalues quando si effettua una query.

Differenze tra lo streaming di tables e altri tables

Le tables di streaming sono a stato tables, progettate per elaborare ogni riga una sola volta durante l'elaborazione di un set di dati che cresce. Poiché la maggior parte dei set di dati aumenta continuamente nel tempo, le tables di streaming sono valide per la maggior parte dei carichi di lavoro di inserimento. I flussi di streaming tables sono ottimali per le pipeline che richiedono freschezza dei dati e bassa latenza. Lo streaming tables può essere utile anche per le trasformazioni su scala massiva, poiché i risultati possono essere calcolati in modo incrementale man mano che arrivano nuovi dati, mantenendo i risultati aggiornati senza dover ricalcolare completamente tutti i dati di origine con ogni update. I tables di streaming sono progettati per le origini dati che possono solo essere accodate.

Lo streaming tables accetta comandi aggiuntivi, ad esempio REFRESH, che elabora i dati più recenti disponibili nelle origini fornite nella query. Le modifiche apportate alla query fornita get vengono riflesse solo sui nuovi dati chiamati tramite un REFRESH, non sui dati elaborati in precedenza. Per applicare le modifiche anche ai dati esistenti, è necessario eseguire REFRESH TABLE <table_name> FULL per eseguire FULL REFRESH. L'aggiornamento completo rielabora tutti i dati disponibili nell'origine usando la definizione più recente. Non è consigliabile eseguire aggiornamenti completi sulle origini che non mantengono l'intera cronologia dei dati o hanno brevi periodi di conservazione, come Kafka, perché l'aggiornamento completo refresh tronca i dati esistenti. Potrebbe non essere possibile recuperare i dati obsoleti se i dati non sono più disponibili nell'origine.

Filtri di riga e maschere di column

Importante

Questa funzionalità è disponibile in anteprima pubblica.

I filtri di riga consentono di specificare una funzione che si applica come filtro ogni volta che una scansione table recupera righe. Questi filtri assicurano che le query successive restituiscano solo righe per le quali il predicato di filtro restituisce TRUE.

Le maschere Column consentono di mascherare il column di un valuesogni volta che una scansione table recupera delle righe. Tutte le query future che coinvolgono tale column riceveranno il risultato della valutazione della funzione sulla base di column, sostituendo il valore originale del column.

Per ulteriori informazioni su come utilizzare i filtri di riga e le maschere column, vedere Filtrare i dati sensibili table utilizzando i filtri di riga e le maschere column.

Gestione dei filtri delle righe e delle maschere Column

I filtri delle righe e le maschere column nello streaming tables devono venire aggiunti, aggiornati o rimossi utilizzando l'istruzione CREATE OR REFRESH.

Comportamento

  • Refresh come definente: quando le istruzioni CREATE OR REFRESH o REFRESHrefresh un flusso in streaming table, le funzioni di filtro di riga vengono eseguite con i diritti del definente (come proprietario del table). Ciò significa che il tablerefresh usa il contesto di sicurezza dell'utente che ha creato lo streaming table.
  • Query: mentre la maggior parte dei filtri viene eseguita con i diritti del definer, le funzioni che controllano il contesto utente (ad esempio CURRENT_USER e IS_MEMBER) costituiscono un’eccezione. Queste funzioni vengono eseguite come invoker. Questo approccio applica controlli di accesso e sicurezza dei dati specifici dell'utente in base al contesto dell'utente corrente.

Osservabilità

Usare DESCRIBE EXTENDED, INFORMATION_SCHEMAo il Catalog Explorer per esaminare i filtri di riga esistenti e le maschere column che si applicano a un dato flusso table. Questa funzionalità consente agli utenti di controllare ed esaminare le misure di accesso ai dati e protezione sullo streaming tables.

Limiti

  • Solo i proprietari table possono refreshtables di streaming per get i dati più recenti.
  • I comandi ALTER TABLE non sono consentiti su streaming tables. La definizione e le proprietà del table devono essere modificate tramite l'istruzione CREATE OR REFRESH o ALTER STREAMING TABLE.
  • L'evoluzione del tableschema tramite comandi DML come INSERT INTOe MERGE non è supportata.
  • I comandi seguenti non sono supportati in streaming tables:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • La condivisione Delta non è supportata.
  • Rinominare il table o cambiare il proprietario non è supportato.
  • Table vincoli come PRIMARY KEY e FOREIGN KEY non sono supportati.
  • I valori columnsgenerati, le identità columnse i valori columns predefiniti non sono supportati.

Esempi

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')