Condividi tramite


Caricamento di dati tramite streaming tables in Databricks SQL

Databricks consiglia di usare lo streaming tables per inserire dati usando Databricks SQL. Un streaming table è un table registrato a Unity Catalog con supporto aggiuntivo per l'elaborazione dati in streaming o incrementali. Viene creata automaticamente una pipeline Delta Live di Tables per ogni tabledi streaming. È possibile utilizzare lo streaming tables per il caricamento incrementale dei dati da Kafka e dall'archiviazione cloud di oggetti.

Questo articolo illustra l'utilizzo dello streaming tables per caricare dati dall'archiviazione a oggetti nel cloud configurata come volume Unity Catalog (scelta consigliata) o posizione esterna.

Nota

Per informazioni su come usare Delta Lake tables come origini e destinazioni di streaming, vedere letture e scritture di streaming Delta table.

Importante

La streaming tables creata in Databricks SQL è sostenuta da una pipeline serverless di Delta Live Tables. L'area di lavoro deve supportare le pipeline serverless per usare questa funzionalità.

Prima di iniziare

Prima di iniziare, è necessario soddisfare i requisiti seguenti:

Requisiti dell'area di lavoro:

Requisiti di calcolo:

È necessario usare uno dei seguenti elementi:

  • Un'istanza di SQL Warehouse che usa il canale Current.

  • Calcolo con modalità di accesso condiviso in Databricks Runtime 13.3 LTS o versione successiva.

  • Calcolo con modalità di accesso utente singolo in Databricks Runtime 15.4 LTS o versione successiva.

    Nei Databricks Runtime 15.3 e versioni inferiori, non puoi usare il calcolo per utente singolo per interrogare i tables di streaming di proprietà di altri utenti. È possibile usare il calcolo utente singolo in Databricks Runtime 15.3 e versioni precedenti solo se si è proprietari dello streaming table. L'ideatore del table è il proprietario.

    Databricks Runtime 15.4 LTS e versioni successive supportano le query su Delta Live Tablesgenerate tables su un singolo computer utente, indipendentemente dalla proprietà table. Per sfruttare i vantaggi del filtro dei dati fornito in Databricks Runtime 15.4 LTS e versioni successive, è necessario verificare che 'area di lavoro sia abilitata per l'elaborazione serverless perché la funzionalità di filtro dei dati che supporta Delta Live Tables-generate tables viene eseguita nell'ambiente di calcolo serverless. È possibile che vengano addebitati costi per le risorse di calcolo serverless quando si usa il calcolo utente singolo per eseguire operazioni di filtro dei dati. Vedere Controllo di accesso con granularità fine per il calcolo di un singolo utente.

Requisiti di autorizzazione:

Altri requisiti:

  • Il percorso dei dati di origine.

    Esempio di percorso del volume: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    Esempio di percorso esterno: abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    Nota

    Questo articolo presuppone che i dati da caricare si trovino in un percorso di archiviazione cloud corrispondente a un volume unity Catalog o a una posizione esterna a cui si ha accesso.

Individuare e visualizzare in anteprima i dati di origine

  1. Nella barra laterale dell'area di lavoro cliccare su Query e quindi su Crea query.

  2. Nell'editor di query, select un SQL Warehouse che utilizza il canale Current dall'elenco a discesa list.

  3. Incollare il codice seguente nell'editor, sostituendo values tra parentesi angolari (<>) per le informazioni che identificano i dati di origine e quindi fare clic su Esegui.

    Nota

    Potresti incontrare errori di inferenza schema durante l'esecuzione della funzione con valori read_filestable se le impostazioni predefinite della funzione non riescono a elaborare i tuoi dati. Ad esempio, potrebbe essere necessario configurare la modalità a più righe per i file CSV o JSON su più righe. Per un elenco list di opzioni del parser, vedere la funzione read_files tablea valori.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

Caricare dati in un table di streaming

Per creare un table di streaming dai dati nell'archiviazione di oggetti cloud, incolla il seguente nell'editor delle query e quindi fai clic su Esegui:

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

Set il canale di runtime

I flussi di streaming tables creati usando i magazzini SQL vengono aggiornati automaticamente tramite una pipeline Delta Live Tables. Per impostazione predefinita, le pipeline di Tables Delta Live usano il runtime nel canale current. Per informazioni sul processo di rilascio, vedere note sulla versione di Delta Live Tables e il processo di aggiornamento della versione.

Databricks consiglia di usare il current canale per i carichi di lavoro di produzione. Le nuove funzionalità vengono rilasciate per la prima volta al preview canale. È possibile set una pipeline al canale di anteprima Delta Live Tables per testare nuove funzionalità specificando preview come proprietà table. È possibile specificare questa proprietà quando si crea il table o dopo la creazione del table usando un'istruzione ALTER.

L'esempio di codice seguente illustra come set il canale da visualizzare in anteprima in un'istruzione CREATE:

CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
  *
FROM
  range(5)

Refresh un table di streaming usando una pipeline DLT

Questa sezione descrive i modelli per aggiornare un flusso di streaming table con i dati più recenti disponibili dalle origini definite nella query.

Quando si esegue CREATE o REFRESH un tabledi streaming, il update elabora tramite una pipeline Delta Live serverless di Tables. Ogni flusso table definito ha una Delta Live pipeline Tables associata.

Dopo aver eseguito il comando REFRESH, viene restituito il collegamento alla pipeline DLT. È possibile usare il collegamento alla pipeline DLT per controllare lo stato del refresh.

Nota

Solo il proprietario table può refresh un table di streaming per get i dati aggiornati. L'utente che crea il table è il proprietario e il proprietario non può essere modificato. Potrebbe essere necessario il di streaming prima di usare query di tempo di viaggio.

Vedi Che cos'è Delta Live Tables?.

Inserire solo nuovi dati

Per impostazione predefinita, la funzione read_files legge tutti i dati esistenti nella directory di origine durante la creazione di table e quindi elabora i record appena arrivati con ogni refresh.

Per evitare di ingestire dati già presenti nella directory di origine al momento della creazione di table, set l'opzione includeExistingFiles per false. Ciò significa che solo i dati che arrivano nella directory dopo la creazione di table vengono elaborati. Ad esempio:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

Completamente un refreshtable di streaming

L'aggiornamento completo rielabora tutti i dati disponibili nell'origine usando la definizione più recente. Non è consigliabile chiamare aggiornamenti completi sulle origini che non mantengono l'intera cronologia dei dati o hanno brevi periodi di conservazione, ad esempio Kafka, perché il refresh completo tronca i dati esistenti. Potrebbe non essere possibile recuperare i dati obsoleti se i dati non sono più disponibili nell'origine.

Ad esempio:

REFRESH STREAMING TABLE my_bronze_table FULL

Pianifica uno streaming table per l'esecuzione automatica refresh

Per configurare un table di streaming per refresh automaticamente in base a una pianificazione definita, incollare quanto segue nell'editor di query e quindi fare clic su Esegui:

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

Ad esempio, per le query di pianificazione refresh, vedere ALTER STREAMING TABLE.

Tenere traccia dello stato di un refresh

È possibile visualizzare lo stato di un tablerefresh di streaming visualizzando la pipeline che gestisce l'table di streaming nell'interfaccia utente Tables Delta Live o visualizzando le informazioni Refresh restituite dal comando DESCRIBE EXTENDED per lo streaming table.

DESCRIBE EXTENDED <table-name>

Inserimento in streaming da Kafka

Per un esempio di inserimento in streaming da Kafka, vedere read_kafka.

L'accesso a un table di streaming agli utenti Grant

Concedi agli utenti di grant il privilegio di SELECT sullo streaming table in modo che possano interrogarlo, incolla quanto segue nell'editor di query e quindi fai clic su Esegui:

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Per altre informazioni sulla concessione dei privilegi per gli oggetti a protezione diretta di Unity Catalog, vedere i privilegi di Unity Catalog e gli oggetti a protezione diretta.

Monitorare le esecuzioni usando la cronologia delle query

È possibile utilizzare la pagina della cronologia delle query per accedere ai dettagli delle query e ai profili delle query, che possono aiutarti a identificare query con prestazioni scarse e colli di bottiglia nella pipeline Delta Live Tables utilizzata per eseguire gli aggiornamenti streaming table. Per una panoramica del tipo di informazioni disponibili nelle cronologie delle query e nei profili di query, vedere Cronologia query e Profilo di query.

Importante

Questa funzionalità è disponibile in anteprima pubblica. Gli amministratori dell'area di lavoro possono abilitare questa funzionalità dalla pagina Anteprime . Vedere Gestire le anteprime di Azure Databricks.

Tutte le dichiarazioni relative allo streaming tables vengono visualizzate nella cronologia delle query. È possibile utilizzare il filtro a discesa nella dichiarazione per select qualsiasi comando ed esaminare le query correlate. Tutte le istruzioni CREATE sono seguite da un'istruzione REFRESH eseguita in modo asincrono in una pipeline Delta Live Tables. Le REFRESH istruzioni includono in genere piani di query dettagliati che forniscono informazioni dettagliate sull'ottimizzazione delle prestazioni.

Per accedere alle REFRESH istruzioni nell'interfaccia utente della cronologia query, seguire questa procedura:

  1. Fare clic Icona Cronologia sulla barra laterale sinistra per aprire l'interfaccia utente cronologia query.
  2. la casella di controllo dal filtro a discesa Statement .
  3. Fare clic sul nome dell'istruzione query per visualizzare i dettagli di riepilogo, ad esempio la durata della query e le metriche aggregate.
  4. Fare clic su Visualizza profilo di query per aprire il profilo di query. Per informazioni dettagliate sull'esplorazione del profilo di query, vedere Profilo di query .
  5. Facoltativamente, è possibile usare i collegamenti nella sezione Origine query per aprire la query o la pipeline correlata.

È anche possibile accedere ai dettagli delle query usando collegamenti nell'editor SQL o da un notebook collegato a un'istanza di SQL Warehouse.

Nota

Lo streaming table deve essere configurato per funzionare utilizzando il canale di anteprima . Vedi Set il canale di runtime.

Risorse aggiuntive