Funzionamento delle tabelle di streaming
Una tabella di streaming è una normale tabella Delta con supporto aggiuntivo per lo streaming o l'elaborazione incrementale dei dati.
Le tabelle di streaming sono una scelta ottimale per l'inserimento dei dati per i motivi seguenti:
- Ogni riga di input viene gestita una sola volta, un approccio che rispecchia la stragrande maggioranza dei carichi di lavoro di inserimento, ovvero l'aggiunta o l'aggiornamento delle righe in una tabella.
- Possono gestire grandi volumi di dati solo accodabili.
Le tabelle di streaming sono anche una scelta ottimale per le trasformazioni di streaming a bassa latenza per i motivi seguenti:
- Motivo delle righe e delle finestre di tempo
- Gestire volumi elevati di dati
- Bassa latenza
Il diagramma seguente illustra il funzionamento delle tabelle di streaming.
Le tabelle di streaming vengono definite e aggiornate da una singola pipeline DLT. Quando si crea una pipeline DLT, è possibile definire in modo esplicito le tabelle di streaming nel codice sorgente della pipeline. Queste tabelle vengono quindi definite da questa pipeline e non possono essere modificate o aggiornate da altre pipeline. Quando si crea una tabella di streaming in Databricks SQL, Databricks crea una pipeline DLT usata per aggiornare questa tabella.
Tabelle di streaming per l'ingestione
Le tabelle di streaming sono progettate per origini dati di sola aggiunta e processano gli input una sola volta.
L'aggiornamento completo fa sì che le tabelle di streaming rielaborino i dati già elaborati. L'azione di aggiornamento completo consente a una tabella di streaming di rielaborare tutti gli input, inclusi quelli già elaborati in precedenza.
L'esempio seguente illustra come usare una tabella di streaming per inserire nuovi file dall'archiviazione cloud. Quando si usa una o più chiamate spark.readStream
in una definizione di set di dati, il DLT considera il set di dati come una tabella di streaming anziché una vista materializzata.
import dlt
@dlt.table
def raw_customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/path/to/files")
)
Il diagramma seguente illustra il funzionamento delle tabelle di streaming solo append.
Streaming di tabelle e streaming a bassa latenza
Le tabelle di streaming sono progettate per lo streaming a bassa latenza con uno stato delimitato. Le tabelle di streaming usano RocksDB per la gestione dei checkpoint, che li rende particolarmente adatti per lo streaming a bassa latenza. Tuttavia, si aspettano flussi che sono naturalmente delimitati o delimitati con una filigrana.
Un flusso naturalmente delimitato viene prodotto da una sorgente di dati di streaming che ha un inizio e una fine ben definiti. Un esempio di flusso delimitato naturalmente è la lettura dei dati da una directory di file in cui non vengono aggiunti nuovi file dopo l'inserimento di un batch iniziale di file. Il flusso viene considerato delimitato perché il numero di file è finito e quindi il flusso termina dopo l'elaborazione di tutti i file.
È anche possibile usare una filigrana per delimitare un flusso. Una filigrana in Spark Structured Streaming è un meccanismo che consente di gestire i dati in ritardo specificando per quanto tempo il sistema deve attendere gli eventi ritardati prima di considerare l'intervallo di tempo come completato. Un flusso illimitato che non dispone di una filigrana può far fallire una pipeline DLT a causa della pressione sulla memoria.
Join tra flusso e snapshot
I join di snapshot di flusso sono unione tra un flusso e una dimensione acquisita come snapshot all'avvio dello streaming. Questi join non vengono ricomputati se la dimensione cambia dopo l'avvio del flusso, perché la tabella delle dimensioni viene considerata come snapshot nel tempo e le modifiche apportate alla tabella delle dimensioni dopo l'avvio del flusso non vengono riflesse a meno che non si ricarica o non si aggiorni la tabella delle dimensioni. Questo comportamento è ragionevole se è possibile accettare piccole discrepanze in un join. Ad esempio, un join approssimativo è accettabile quando il numero di transazioni supera di molti ordini di grandezza quello dei clienti.
Nell'esempio di codice seguente, uniamo la tabella dimensionale 'clienti' con due righe a un dataset in continua crescita, 'transazioni'. Viene materializzato un join tra questi due set di dati in una tabella denominata sales_report
. Si noti che se un processo esterno aggiorna la tabella customers aggiungendo una nuova riga (customer_id=3, name=Zoya
), questa nuova riga non sarà presente nel join perché la tabella delle dimensioni statiche è stata creata tramite snapshot al momento dell'avvio dei flussi.
import dlt
@dlt.view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
return spark.readStream.table("transactions")
# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dlt.view
def v_customers():
return spark.read.table("customers")
@dlt.table
def sales_report():
facts = spark.readStream.table("v_transactions")
dims = spark.read.table("v_customers")
return (
facts.join(dims, on="customer_id", how="inner"
)
Limitazioni delle tabelle di streaming
Le tabelle di streaming presentano le limitazioni seguenti:
- Evoluzione limitata: è possibile modificare la query senza ricompilare l'intero set di dati. Poiché una tabella di streaming vede una riga una sola volta, è possibile avere query diverse che operano su righe diverse. Ciò significa che è necessario essere consapevoli di tutte le versioni precedenti della query in esecuzione nel set di dati. È necessario un aggiornamento completo affinché la tabella di streaming possa riconoscere i dati che sono stati già visti.
- Gestione dello stato: le tabelle di streaming sono a bassa latenza, quindi è necessario assicurarsi che i flussi su cui operano siano naturalmente delimitati o delimitati con filigrana.
- I join non vengono ricompiuti: a differenza delle viste materializzate i cui risultati sono sempre corretti perché ricompilano automaticamente i join nelle tabelle di streaming non ricompilano quando le dimensioni cambiano. Questa caratteristica può essere utile per scenari "veloci ma non corretti".