Condividi tramite


Eseguire query sui dati di streaming

È possibile usare Azure Databricks per eseguire query sulle origini dati di streaming usando Structured Streaming. Azure Databricks offre un supporto completo per i carichi di lavoro di streaming in Python e Scala e supporta la maggior parte delle funzionalità di streaming strutturato con SQL.

Gli esempi seguenti illustrano l'uso di un sink di memoria per l'ispezione manuale dei dati di streaming durante lo sviluppo interattivo nei notebook. A causa dei limiti di output delle righe nell'interfaccia utente del notebook, è possibile che non si osservino tutti i dati letti dalle query di streaming. Nelle operazioni di produzione, dovresti attivare le query di streaming solo scrivendole a un sistema di destinazione table o esterno.

Nota

Il supporto SQL per le query interattive sui dati di streaming è limitato ai notebook in esecuzione in calcolo a tutti gli scopi. È anche possibile usare SQL quando dichiari i tables di streaming in Databricks SQL o Delta Live Tables. Vedere Caricare dati usando tables di streaming in Databricks SQL e Che cos'è delta Live Tables?.

Eseguire query sui dati dai sistemi di streaming

Azure Databricks offre lettori di dati di streaming per i sistemi di streaming seguenti:

  • Kafka
  • Cinesi
  • PubSub
  • Pulsar

È necessario specificare i dettagli di configurazione quando si inizializzano le query su questi sistemi, che variano a seconda dell'ambiente configurato e del sistema da cui si sceglie di leggere. Vedere Configurare le origini dei dati di streaming.

I carichi di lavoro comuni che coinvolgono i sistemi di streaming includono l'inserimento di dati nella lakehouse e l'elaborazione dei flussi per il sink dei dati in sistemi esterni. Per altre informazioni sui carichi di lavoro di streaming, vedere Streaming in Azure Databricks.

Gli esempi seguenti illustrano un flusso interattivo letto da Kafka:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Eseguire una query su un table in modalità di lettura in streaming

Azure Databricks crea tutte le tables usando Delta Lake di default. Quando si esegue una query di streaming su un tableDelta, la query preleva automaticamente nuovi record non appena viene effettuato il commit di una versione del table. Per impostazione predefinita, le query di streaming si aspettano che l'origine tables contenga solo record aggiunti. Se è necessario usare i dati di streaming che contengono aggiornamenti ed eliminazioni, Databricks consiglia di usare Delta Live Tables e APPLY CHANGES INTO. Consulta LE API PER APPLICARE LE MODIFICHE: Semplifica l'acquisizione di dati modificati con Delta Live Tables.

Gli esempi seguenti illustrano come effettuare una lettura interattiva in streaming da un table:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Eseguire query sui dati nell'archiviazione di oggetti cloud con il caricatore automatico

È possibile trasmettere i dati dall'archiviazione di oggetti cloud usando il caricatore automatico, il connettore dati cloud di Azure Databricks. È possibile usare il connettore con i file archiviati in Unity Catalogvolumes o in altri percorsi di archiviazione di oggetti cloud. Databricks consiglia di usare volumes per gestire l'accesso ai dati nell'archiviazione di oggetti cloud. Vedere Connettersi alle origini dati.

Azure Databricks ottimizza questo connettore per l'inserimento in streaming dei dati nell'archiviazione di oggetti cloud archiviato in formati strutturati, semistrutturati e non strutturati più diffusi. Databricks consiglia di archiviare i dati inseriti in un formato quasi grezzo per ottimizzare le prestazioni e ridurre al minimo la potenziale perdita di dati a causa di registrazioni corrotte o modifiche schema.

Per altre raccomandazioni sull'inserimento di dati dall'archiviazione di oggetti cloud, vedere Inserire dati in un lakehouse di Databricks.

Gli esempi seguenti illustrano un flusso interattivo letto da una directory di file JSON in un volume:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')