Condividi tramite


Streaming e inserimento incrementale

Azure Databricks usa Apache Spark Structured Streaming per eseguire il backup di numerosi prodotti associati ai carichi di lavoro di inserimento, tra cui:

  • Caricatore Automatico
  • COPY INTO
  • Pipeline DLT
  • Viste materializzate e tabelle di streaming in Databricks SQL

Questo articolo illustra alcune delle differenze tra la semantica di elaborazione batch incrementale e di streaming e offre una panoramica generale della configurazione dei carichi di lavoro di inserimento per la semantica desiderata in Databricks.

Qual è la differenza tra lo streaming e l'inserimento batch incrementale?

Le possibili configurazioni del flusso di lavoro di inserimento vanno dall'elaborazione quasi in tempo reale all'elaborazione batch incrementale rara. Entrambi i modelli usano Apache Spark Structured Streaming per alimentare l'elaborazione incrementale, ma hanno semantica diversa. Per semplicità, questo articolo si riferisce all'inserimento quasi in tempo reale come inserimento in streaming e all'elaborazione incrementale meno frequente come inserimento batch incrementale.

Inserimento in streaming

Streaming, nel contesto dell'inserimento dati e degli aggiornamenti di tabelle, si riferisce all’elaborazione dei dati quasi in tempo reale, dove Azure Databricks elabora i record dalla fonte al sink in microbatches utilizzando un'infrastruttura sempre attiva. Un carico di lavoro di streaming inserisce continuamente gli aggiornamenti da origini dati configurate, a meno che non si verifichi un errore che arresta l'inserimento.

Inserimento batch incrementale

L'ingestione batch incrementale si riferisce a un modello in cui tutti i nuovi record vengono processati a partire da una fonte di dati all'interno di un'operazione di breve durata. L'inserimento batch incrementale si verifica spesso in base a una pianificazione, ma può anche essere attivato manualmente o in base all'arrivo dei file.

L'inserimento batch incrementale è diverso dall'inserimento batch in quanto rileva automaticamente nuovi record nell'origine dati e ignora i record già inseriti.

Ingestione con Compiti

Databricks Jobs consente di pianificare e orchestrare flussi di lavoro e attività che includono notebook, librerie, pipeline DLT e query SQL di Databricks.

Nota

È possibile usare tutti i tipi di calcolo e i tipi di attività di Azure Databricks per configurare l'inserimento batch incrementale. L'inserimento in streaming è supportato solo nell'ambiente di produzione nei processi classici di calcolo e DLT.

I lavori hanno due modalità operative principali:

  • I processi continui riprovano automaticamente in caso di errore. Questa modalità è destinata all'inserimento in streaming.
  • I processi attivati eseguono attività quando vengono attivate. I trigger includono:
    • Trigger basati sul tempo che eseguono processi in base a una pianificazione specificata.
    • Trigger basati su file che eseguono processi quando i file vengono inseriti in una posizione specificata.
    • Altri trigger, ad esempio chiamate API REST, esecuzione dei comandi dell'interfaccia della riga di comando di Azure Databricks o fare clic sul pulsante Esegui ora nell'interfaccia utente dell'area di lavoro.

Per i carichi di lavoro batch incrementali, configurare i processi usando la AvailableNow modalità trigger, come indicato di seguito:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("table_name")

Per i carichi di lavoro di streaming, l'intervallo di trigger predefinito è processingTime ="500ms". L'esempio seguente illustra come elaborare un micro batch ogni 5 secondi:

Python

(df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(processingTime="5 seconds")
  .toTable("table_name")
)

Scala

import org.apache.spark.sql.streaming.Trigger

df.writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.ProcessingTime, "5 seconds")
  .toTable("table_name")

Importante

I processi serverless non supportano Scala, la modalità continua o gli intervalli di trigger basati sul tempo per lo Structured Streaming. Utilizzare i lavori classici se è necessaria una semantica di inserimento quasi in tempo reale.

Ingestione con DLT

Analogamente ai processi, le pipeline DLT possono essere eseguite in modalità attivata o continua. Per ottenere una semantica di streaming quasi in tempo reale con le tabelle di streaming, usare la modalità continua.

Utilizzare le tabelle di streaming per configurare l'ingestione in streaming o in batch incrementale dall'archiviazione di oggetti nel cloud, Apache Kafka, Amazon Kinesis, Google Pub/Sub o Apache Pulsar.

Lakeflow Connect usa DLT per configurare le pipeline di inserimento dai sistemi connessi. Vedere Lakeflow Connect.

Le viste materializzate garantiscono la semantica operativa equivalente ai carichi di lavoro batch, ma possono ottimizzare molte operazioni per calcolare i risultati in modo incrementale. Vedere aggiornamento incrementale per le viste materializzate.

Ingestione con Databricks SQL

Si possono utilizzare le tabelle di streaming per configurare l'inserimento batch incrementale dall'archiviazione di oggetti cloud, Apache Kafka, Amazon Kinesis, Google Pub/Sub o Apache Pulsar.

È possibile usare viste materializzate per configurare l'elaborazione batch incrementale da origini Delta. Vedere aggiornamento incrementale per le viste materializzate.

COPY INTO fornisce una sintassi SQL familiare per l'elaborazione batch incrementale per i file di dati nell'archiviazione di oggetti cloud. COPY INTO comportamento è simile ai modelli supportati dalle tabelle di streaming per l'archiviazione di oggetti cloud, ma non tutte le impostazioni predefinite sono equivalenti per tutti i formati di file supportati.