Condividi tramite


Monitoraggio delle query di Structured Streaming in Azure Databricks

Azure Databricks offre il monitoraggio predefinito per le applicazioni Structured Streaming tramite l'interfaccia utente spark nella scheda Streaming .

Distinguere le query structured streaming nell'interfaccia utente di Spark

Fornire ai flussi un nome di query univoco aggiungendo .queryName(<query-name>) al writeStream codice per distinguere facilmente le metriche che appartengono al flusso nell'interfaccia utente di Spark.

Eseguire il push delle metriche di Structured Streaming a servizi esterni

È possibile eseguire il push delle metriche di streaming a servizi esterni per gli avvisi o i casi d'uso del dashboard tramite l'interfaccia listener di query di streaming di Apache Spark. In Databricks Runtime 11.3 LTS e versioni successive, il listener di query di streaming è disponibile in Python e Scala.

Importante

Le credenziali e gli oggetti gestiti da Unity Catalog non possono essere usati nella StreamingQueryListener logica.

Nota

La latenza di elaborazione con listener può influire significativamente sulla velocità di elaborazione delle query. È consigliabile limitare la logica di elaborazione in questi listener e optare per la scrittura in sistemi di risposta rapida come Kafka per l'efficienza.

Il codice seguente fornisce esempi di base della sintassi per l'implementazione di un listener:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definizione di metriche osservabili in Structured Streaming

Le metriche osservabili sono funzioni di aggregazione arbitrarie che possono essere definite in una query (DataFrame). Non appena l'esecuzione di un dataframe raggiunge un punto di completamento (ovvero completa una query batch o raggiunge un periodo di streaming), viene generato un evento denominato che contiene le metriche per i dati elaborati dall'ultimo punto di completamento.

È possibile osservare queste metriche collegando un listener alla sessione spark. Il listener dipende dalla modalità di esecuzione:

  • Modalità batch: usare QueryExecutionListener.

    QueryExecutionListener viene chiamato al termine della query. Accedere alle metriche usando la QueryExecution.observedMetrics mappa.

  • Streaming o microbatch: usare StreamingQueryListener.

    StreamingQueryListener viene chiamato quando la query di streaming completa un periodo. Accedere alle metriche usando la StreamingQueryProgress.observedMetrics mappa. Azure Databricks non supporta il flusso di esecuzione continua.

Ad esempio:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Metriche dell'oggetto StreamingQueryListener

Metrico Descrizione
id ID di query univoco che persiste tra i riavvii.
runId ID di query univoco per ogni avvio/riavvio. Vedere StreamingQuery.runId().
name Nome specificato dall'utente della query. Il nome è Null se non viene specificato alcun nome.
timestamp Timestamp per l'esecuzione del microbatch.
batchId ID univoco per il batch di dati corrente da elaborare. In caso di tentativi dopo un errore, un ID batch specificato può essere eseguito più volte. Analogamente, quando non sono presenti dati da elaborare, l'ID batch non viene incrementato.
numInputRows Numero aggregato (in tutte le origini) di record elaborati in un trigger.
inputRowsPerSecond Frequenza di aggregazione (in tutte le origini) dei dati in arrivo.
processedRowsPerSecond Frequenza di aggregazione (in tutte le origini) con cui Spark elabora i dati.

oggetto durationMs

Informazioni sul tempo necessario per completare varie fasi del processo di esecuzione del microbatch.

Metrico Descrizione
durationMs.addBatch Tempo impiegato per eseguire il microbatch. Ciò esclude il tempo impiegato da Spark per pianificare il microbatch.
durationMs.getBatch Tempo necessario per recuperare i metadati relativi agli offset dall'origine.
durationMs.latestOffset Offset più recente utilizzato per il microbatch. Questo oggetto di stato fa riferimento al tempo impiegato per recuperare l'offset più recente dalle origini.
durationMs.queryPlanning Tempo impiegato per generare il piano di esecuzione.
durationMs.triggerExecution Tempo necessario per pianificare ed eseguire il microbatch.
durationMs.walCommit Tempo impiegato per eseguire il commit dei nuovi offset disponibili.

oggetto eventTime

Informazioni sul valore dell'ora dell'evento visualizzato all'interno dei dati elaborati nel microbatch. Questi dati vengono usati dalla filigrana per capire come tagliare lo stato per l'elaborazione di aggregazioni con stato definite nel processo Structured Streaming.

Metrico Descrizione
eventTime.avg Tempo medio dell'evento visualizzato in tale trigger.
eventTime.max Tempo massimo di evento visualizzato in tale trigger.
eventTime.min Ora minima dell'evento visualizzata in tale trigger.
eventTime.watermark Valore della filigrana utilizzata in tale trigger.

oggetto stateOperators

Informazioni sulle operazioni con stato definite nel processo Structured Streaming e sulle aggregazioni generate da tali operazioni.

Metrico Descrizione
stateOperators.operatorName Nome dell'operatore con stato a cui sono correlate le metriche, ad esempio symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotal Numero totale di righe nello stato risultante da un operatore o un'aggregazione con stato.
stateOperators.numRowsUpdated Numero totale di righe aggiornate nello stato in seguito a un operatore o un'aggregazione con stato.
stateOperators.allUpdatesTimeMs Questa metrica non è attualmente misurabile da Spark e deve essere rimossa negli aggiornamenti futuri.
stateOperators.numRowsRemoved Numero totale di righe rimosse dallo stato in seguito a un operatore o un'aggregazione con stato.
stateOperators.allRemovalsTimeMs Questa metrica non è attualmente misurabile da Spark e deve essere rimossa negli aggiornamenti futuri.
stateOperators.commitTimeMs Tempo impiegato per eseguire il commit di tutti gli aggiornamenti (inserisce e rimuove) e restituire una nuova versione.
stateOperators.memoryUsedBytes Memoria utilizzata dall'archivio stati.
stateOperators.numRowsDroppedByWatermark Numero di righe considerate troppo tardi per essere incluse in un'aggregazione con stato. Solo aggregazioni di streaming: numero di righe eliminate dopo l'aggregazione (non righe di input non elaborate). Questo numero non è preciso, ma fornisce un'indicazione che i dati in ritardo vengono eliminati.
stateOperators.numShufflePartitions Numero di partizioni casuali per questo operatore con stato.
stateOperators.numStateStoreInstances Istanza effettiva dell'archivio di stati inizializzata e gestita dall'operatore. Per molti operatori con stato, corrisponde al numero di partizioni. Tuttavia, i join stream-stream inizializzano quattro istanze dell'archivio stati per partizione.

oggetto stateOperators.customMetrics

Informazioni raccolte da RocksDB che acquisisce le metriche relative alle prestazioni e alle operazioni relative ai valori con stato che mantiene per il processo Structured Streaming. Per altre informazioni, vedere Configurare l'archivio stati di RocksDB in Azure Databricks.

Metrico Descrizione
customMetrics.rocksdbBytesCopied Numero di byte copiati come rilevato da Gestione file RocksDB.
customMetrics.rocksdbCommitCheckpointLatency Tempo in millisecondi durante l'acquisizione di uno snapshot di RocksDB nativo e la scrittura in una directory locale.
customMetrics.rocksdbCompactLatency Tempo in millisecondi di compattazione (facoltativo) durante il commit del checkpoint.
customMetrics.rocksdbCommitFileSyncLatencyMs Tempo in millisecondi durante la sincronizzazione dello snapshot RocksDB nativo con l'archiviazione esterna (posizione del checkpoint).
customMetrics.rocksdbCommitFlushLatency Tempo in millisecondi di scaricamento delle modifiche in memoria di RocksDB nel disco locale.
customMetrics.rocksdbCommitPauseLatency Tempo in millisecondi che arresta i thread di lavoro in background come parte del commit del checkpoint, ad esempio per la compattazione.
customMetrics.rocksdbCommitWriteBatchLatency Tempo in millisecondi durante l'applicazione delle scritture di staging nella struttura in memoria (WriteBatch) a RocksDB nativo.
customMetrics.rocksdbFilesCopied Numero di file copiati come rilevato da Gestione file RocksDB.
customMetrics.rocksdbFilesReused Numero di file riutilizzati come rilevato da Gestione file RocksDB.
customMetrics.rocksdbGetCount Numero di get chiamate al database (non include gets il WriteBatch batch in memoria usato per le scritture di staging).
customMetrics.rocksdbGetLatency Tempo medio in nanosecondi per la chiamata nativa RocksDB::Get sottostante.
customMetrics.rocksdbReadBlockCacheHitCount Numero di riscontri nella cache dei blocchi in RocksDB utili per evitare letture su disco locale.
customMetrics.rocksdbReadBlockCacheMissCount Il conteggio della cache dei blocchi in RocksDB non è utile per evitare letture su disco locale.
customMetrics.rocksdbSstFileSize Dimensioni di tutti i file SST (Static Sorted Table): la struttura tabulare RocksDB usa per archiviare i dati.
customMetrics.rocksdbTotalBytesRead Numero di byte non compressi letti dalle get operazioni.
customMetrics.rocksdbTotalBytesReadByCompaction Numero di byte letti dal processo di compattazione dal disco.
customMetrics.rocksdbTotalBytesReadThroughIterator Numero totale di byte di dati non compressi letti usando un iteratore. Alcune operazioni con stato (ad esempio, l'elaborazione del timeout in FlatMapGroupsWithState e la filigrana) richiedono la lettura dei dati nel database tramite un iteratore.
customMetrics.rocksdbTotalBytesWritten Numero totale di byte non compressi scritti dalle put operazioni.
customMetrics.rocksdbTotalBytesWrittenByCompaction Numero totale di byte scritti dal processo di compattazione nel disco.
customMetrics.rocksdbTotalCompactionLatencyMs Tempo in millisecondi per le compattazioni di RocksDB, incluse le compattazioni in background e la compattazione facoltativa avviata durante il commit.
customMetrics.rocksdbTotalFlushLatencyMs Tempo totale di scaricamento, incluso lo scaricamento dello sfondo. Le operazioni di scaricamento sono processi in base ai quali l'oggetto MemTable viene scaricato nell'archiviazione una volta che è pieno. MemTables sono il primo livello in cui i dati vengono archiviati in RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed Dimensioni in byte dei file ZIP non compressi come indicato da Gestione file. Gestione file gestisce l'utilizzo e l'eliminazione dello spazio su disco del file SST fisico.

oggetto sources (Kafka)

Metrico Descrizione
sources.description Descrizione dettagliata dell'origine Kafka, specificando l'argomento Kafka esatto da cui leggere. Ad esempio: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
Oggetto sources.startOffset Numero di offset iniziale all'interno dell'argomento Kafka in cui è stato avviato il processo di streaming.
Oggetto sources.endOffset Ultimo offset elaborato dal microbatch. Potrebbe essere uguale a latestOffset per un'esecuzione di microbatch in corso.
Oggetto sources.latestOffset L'offset più recente è stato determinato dal microbatch. Il processo di microbatching potrebbe non elaborare tutti gli offset quando è presente una limitazione, che comporta endOffset e latestOffset differenze.
sources.numInputRows Numero di righe di input elaborate dall'origine.
sources.inputRowsPerSecond Frequenza con cui arrivano i dati per l'elaborazione da questa origine.
sources.processedRowsPerSecond Frequenza con cui Spark elabora i dati da questa origine.

Oggetto sources.metrics (Kafka)

Metrico Descrizione
sources.metrics.avgOffsetsBehindLatest Numero medio di offset che la query di streaming è dietro l'offset disponibile più recente tra tutti gli argomenti sottoscritti.
sources.metrics.estimatedTotalBytesBehindLatest Numero stimato di byte non utilizzati dal processo di query dagli argomenti sottoscritti.
sources.metrics.maxOffsetsBehindLatest Numero massimo di offset che la query di streaming è dietro l'offset disponibile più recente tra tutti gli argomenti sottoscritti.
sources.metrics.minOffsetsBehindLatest Numero minimo di offset che la query di streaming è dietro l'offset disponibile più recente tra tutti gli argomenti sottoscritti.

oggetto sink (Kafka)

Metrico Descrizione
sink.description Descrizione del sink Kafka in cui viene scritta la query di streaming, che descrive in dettaglio l'implementazione del sink Kafka specifica in uso. Ad esempio: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Numero di righe scritte nella tabella di output o nel sink come parte del microbatch. Per alcune situazioni, questo valore può essere "-1" e in genere può essere interpretato come "sconosciuto".

oggetto sources (Delta Lake)

Metrico Descrizione
sources.description Descrizione dell'origine da cui la query di streaming sta leggendo. Ad esempio: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Versione della serializzazione con cui viene codificato questo offset.
sources.[startOffset/endOffset].reservoirId ID della tabella da leggere. Viene usato per rilevare errori di configurazione durante il riavvio di una query.
sources.[startOffset/endOffset].reservoirVersion Versione della tabella attualmente in fase di elaborazione.
sources.[startOffset/endOffset].index Indice nella sequenza di AddFiles in questa versione. Viene usato per suddividere i commit di grandi dimensioni in più batch. Questo indice viene creato ordinando su modificationTimestamp e path.
sources.[startOffset/endOffset].isStartingVersion Identifica se l'offset corrente contrassegna l'inizio di una nuova query di streaming anziché l'elaborazione delle modifiche apportate dopo l'elaborazione dei dati iniziali. Quando si avvia una nuova query, tutti i dati presenti nella tabella all'inizio vengono elaborati per primi e quindi tutti i nuovi dati che arrivano.
sources.latestOffset Offset più recente elaborato dalla query di microbatch.
sources.numInputRows Numero di righe di input elaborate dall'origine.
sources.inputRowsPerSecond Frequenza con cui arrivano i dati per l'elaborazione da questa origine.
sources.processedRowsPerSecond Frequenza con cui Spark elabora i dati da questa origine.
sources.metrics.numBytesOutstanding Dimensioni combinate dei file in sospeso (file rilevati da RocksDB). Si tratta della metrica di backlog per Delta e Auto Loader come origine di streaming.
sources.metrics.numFilesOutstanding Numero di file in sospeso da elaborare. Si tratta della metrica di backlog per Delta e Auto Loader come origine di streaming.

oggetto sink (Delta Lake)

Metrico Descrizione
sink.description Descrizione del sink Delta, che descrive in dettaglio l'implementazione specifica del sink Delta in uso. Ad esempio: “DeltaSink[table]”.
sink.numOutputRows Il numero di righe è sempre "-1" perché Spark non può dedurre le righe di output per i sink DSv1, ovvero la classificazione per il sink Delta Lake.

Esempi

Esempio di evento Kafka-to-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Esempio di evento Delta Lake StreamingQueryListener da Delta Lake a Delta Lake

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Esempio di evento Verso Delta Lake StreamingQueryListener

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Esempio di evento Kafka+Delta Lake StreamingQueryListener da Kafka+Delta Lake

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Esempio di origine della frequenza con l'evento Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}