Condividi tramite


Streaming e inserimento incrementale

Azure Databricks usa Apache Spark Structured Streaming per eseguire il backup di numerosi prodotti associati ai carichi di lavoro di inserimento, tra cui:

  • Autoloader
  • COPY INTO
  • Pipeline di Delta Live Tables
  • Viste materializzate e tabelle di streaming in Databricks SQL

Questo articolo illustra alcune delle differenze tra la semantica di elaborazione batch incrementale e di streaming e offre una panoramica generale della configurazione dei carichi di lavoro di inserimento per la semantica desiderata in Databricks.

Qual è la differenza tra il flusso e l'inserimento batch incrementale?

Le possibili configurazioni del flusso di lavoro di inserimento vanno dall'elaborazione quasi in tempo reale all'elaborazione batch incrementale rara. Entrambi i modelli usano Apache Spark Structured Streaming per alimentare l'elaborazione incrementale, ma hanno semantica diversa. Per semplicità, questo articolo si riferisce all'inserimento quasi in tempo reale come inserimento in streaming e all'elaborazione incrementale più rara come inserimento batch incrementale.

Inserimento del flusso

Streaming, nel contesto dell'inserimento dati e degli aggiornamenti di tabelle, si riferisce all’elaborazione dei dati quasi in tempo reale, dove Azure Databricks elabora i record dalla fonte al sink in microbatches utilizzando un'infrastruttura sempre attiva. Un carico di lavoro di streaming inserisce continuamente gli aggiornamenti da origini dati configurate, a meno che non si verifichi un errore che arresta l'inserimento.

Inserimento batch incrementale

L'inserimento batch incrementale fa riferimento a un modello in cui tutti i nuovi record vengono elaborati da un'origine dati in un processo di breve durata. L'inserimento batch incrementale si verifica spesso in base a una pianificazione, ma può anche essere attivato manualmente o in base all'arrivo dei file.

L'inserimento batch incrementale è diverso dall'inserimento in batch in quanto rileva automaticamente nuovi record nell'origine dati e ignora i record già inseriti.

Inserimento con processi

L'uso delle funzionalità Databricks Jobs consente di orchestrare flussi di lavoro e pianificare attività che includono notebook, librerie, pipeline di Delta Live Tables e query SQL di Databricks.

Nota

È possibile usare tutti i tipi di calcolo e i tipi di attività di Azure Databricks per configurare l'inserimento batch incrementale. L'inserimento in streaming è supportato solo nell'ambiente di produzione nei processi classici di calcolo e nelle tabelle Live Delta.

I processi hanno due modalità principali di funzionamento:

  • I processi continui riprovano automaticamente in caso di errore. Questa modalità è destinata all'inserimento in streaming.
  • I processi attivati eseguono attività quando vengono attivate. I trigger includono:
    • Trigger basati sul tempo che eseguono processi in base a una pianificazione specificata.
    • Trigger basati su file che eseguono processi quando i file vengono inseriti in una posizione specificata.
    • Altri trigger, ad esempio chiamate API REST, esecuzione dei comandi dell'interfaccia della riga di comando di Azure Databricks o fare clic sul pulsante Esegui ora nell'interfaccia utente dell'area di lavoro.

Per i carichi di lavoro batch incrementali, configurare i processi usando la AvailableNow modalità trigger, come indicato di seguito:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("table_name")

Per i carichi di lavoro di streaming, l'intervallo di trigger predefinito è processingTime ="500ms". L'esempio seguente illustra come elaborare un micro batch ogni 5 secondi:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(processingTime="5 seconds")
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.ProcessingTime, "5 seconds")
  .toTable("table_name")

Importante

I processi serverless non supportano intervalli di trigger scala, modalità continua o basati sul tempo per Structured Streaming. Usare i processi classici se è necessaria una semantica di inserimento quasi in tempo reale.

Ingestione con Tabelle Live Delta

Analogamente ai Job, le pipeline di tabelle Live Delta possono funzionare in modalità attivata o continua. Per la semantica di streaming quasi in tempo reale con le tabelle di streaming, usare la modalità continua.

Usare le tabelle di streaming per configurare l'inserimento in batch o incrementale in streaming dall'archiviazione di oggetti nel cloud, Apache Kafka, Amazon Kinesis, Google Pub/Sub o Apache Pulsar.

LakeFlow Connect utilizza Delta Live Tables per configurare pipeline di ingestione dai sistemi connessi. Consultare LakeFlow Connect.

Le viste materializzate garantiscono la semantica operativa equivalente ai carichi di lavoro batch, ma possono ottimizzare molte operazioni per calcolare i risultati in modo incrementale. Vedere aggiornamento incrementale per le viste materializzate.

Inserimento con Databricks SQL

Si possono utilizzare le tabelle di streaming per configurare l'inserimento batch incrementale dall'archiviazione di oggetti cloud, Apache Kafka, Amazon Kinesis, Google Pub/Sub o Apache Pulsar.

È possibile usare viste materializzate per configurare l'elaborazione batch incrementale da origini Delta. Vedere aggiornamento incrementale per le viste materializzate.

COPY INTO fornisce una sintassi SQL familiare per l'elaborazione batch incrementale per i file di dati nell'archiviazione di oggetti cloud. COPY INTO comportamento è simile ai modelli supportati dalle tabelle di streaming per l'archiviazione di oggetti cloud, ma non tutte le impostazioni predefinite sono equivalenti per tutti i formati di file supportati.