Monitoraggio delle query di Structured Streaming in Azure Databricks
Azure Databricks offre il monitoraggio predefinito per le applicazioni Structured Streaming tramite l'interfaccia utente spark nella scheda Streaming .
Distinguere le query structured streaming nell'interfaccia utente di Spark
Fornire ai flussi un nome di query univoco aggiungendo .queryName(<query-name>)
al writeStream
codice per distinguere facilmente le metriche che appartengono al flusso nell'interfaccia utente di Spark.
Eseguire il push delle metriche di Structured Streaming a servizi esterni
È possibile eseguire il push delle metriche di streaming a servizi esterni per gli avvisi o i casi d'uso del dashboard tramite l'interfaccia listener di query di streaming di Apache Spark. In Databricks Runtime 11.3 LTS e versioni successive, il listener di query di streaming è disponibile in Python e Scala.
Importante
Le credenziali e gli oggetti gestiti da Unity Catalog non possono essere usati nella StreamingQueryListener
logica.
Nota
La latenza di elaborazione con listener può influire significativamente sulla velocità di elaborazione delle query. È consigliabile limitare la logica di elaborazione in questi listener e optare per la scrittura in sistemi di risposta rapida come Kafka per l'efficienza.
Il codice seguente fornisce esempi di base della sintassi per l'implementazione di un listener:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definizione di metriche osservabili in Structured Streaming
Le metriche osservabili sono funzioni di aggregazione arbitrarie che possono essere definite in una query (DataFrame). Non appena l'esecuzione di un dataframe raggiunge un punto di completamento (ovvero completa una query batch o raggiunge un periodo di streaming), viene generato un evento denominato che contiene le metriche per i dati elaborati dall'ultimo punto di completamento.
È possibile osservare queste metriche collegando un listener alla sessione spark. Il listener dipende dalla modalità di esecuzione:
Modalità batch: usare
QueryExecutionListener
.QueryExecutionListener
viene chiamato al termine della query. Accedere alle metriche usando laQueryExecution.observedMetrics
mappa.Streaming o microbatch: usare
StreamingQueryListener
.StreamingQueryListener
viene chiamato quando la query di streaming completa un periodo. Accedere alle metriche usando laStreamingQueryProgress.observedMetrics
mappa. Azure Databricks non supporta il flusso di esecuzione continua.
Ad esempio:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Metriche dell'oggetto StreamingQueryListener
Metrico | Descrizione |
---|---|
id |
ID di query univoco che persiste tra i riavvii. |
runId |
ID di query univoco per ogni avvio/riavvio. Vedere StreamingQuery.runId(). |
name |
Nome specificato dall'utente della query. Il nome è Null se non viene specificato alcun nome. |
timestamp |
Timestamp per l'esecuzione del microbatch. |
batchId |
ID univoco per il batch di dati corrente da elaborare. In caso di tentativi dopo un errore, un ID batch specificato può essere eseguito più volte. Analogamente, quando non sono presenti dati da elaborare, l'ID batch non viene incrementato. |
numInputRows |
Numero aggregato (in tutte le origini) di record elaborati in un trigger. |
inputRowsPerSecond |
Frequenza di aggregazione (in tutte le origini) dei dati in arrivo. |
processedRowsPerSecond |
Frequenza di aggregazione (in tutte le origini) con cui Spark elabora i dati. |
oggetto durationMs
Informazioni sul tempo necessario per completare varie fasi del processo di esecuzione del microbatch.
Metrico | Descrizione |
---|---|
durationMs.addBatch |
Tempo impiegato per eseguire il microbatch. Ciò esclude il tempo impiegato da Spark per pianificare il microbatch. |
durationMs.getBatch |
Tempo necessario per recuperare i metadati relativi agli offset dall'origine. |
durationMs.latestOffset |
Offset più recente utilizzato per il microbatch. Questo oggetto di stato fa riferimento al tempo impiegato per recuperare l'offset più recente dalle origini. |
durationMs.queryPlanning |
Tempo impiegato per generare il piano di esecuzione. |
durationMs.triggerExecution |
Tempo necessario per pianificare ed eseguire il microbatch. |
durationMs.walCommit |
Tempo impiegato per eseguire il commit dei nuovi offset disponibili. |
oggetto eventTime
Informazioni sul valore dell'ora dell'evento visualizzato all'interno dei dati elaborati nel microbatch. Questi dati vengono usati dalla filigrana per capire come tagliare lo stato per l'elaborazione di aggregazioni con stato definite nel processo Structured Streaming.
Metrico | Descrizione |
---|---|
eventTime.avg |
Tempo medio dell'evento visualizzato in tale trigger. |
eventTime.max |
Tempo massimo di evento visualizzato in tale trigger. |
eventTime.min |
Ora minima dell'evento visualizzata in tale trigger. |
eventTime.watermark |
Valore della filigrana utilizzata in tale trigger. |
oggetto stateOperators
Informazioni sulle operazioni con stato definite nel processo Structured Streaming e sulle aggregazioni generate da tali operazioni.
Metrico | Descrizione |
---|---|
stateOperators.operatorName |
Nome dell'operatore con stato a cui sono correlate le metriche, ad esempio symmetricHashJoin , dedupe , stateStoreSave . |
stateOperators.numRowsTotal |
Numero totale di righe nello stato risultante da un operatore o un'aggregazione con stato. |
stateOperators.numRowsUpdated |
Numero totale di righe aggiornate nello stato in seguito a un operatore o un'aggregazione con stato. |
stateOperators.allUpdatesTimeMs |
Questa metrica non è attualmente misurabile da Spark e deve essere rimossa negli aggiornamenti futuri. |
stateOperators.numRowsRemoved |
Numero totale di righe rimosse dallo stato in seguito a un operatore o un'aggregazione con stato. |
stateOperators.allRemovalsTimeMs |
Questa metrica non è attualmente misurabile da Spark e deve essere rimossa negli aggiornamenti futuri. |
stateOperators.commitTimeMs |
Tempo impiegato per eseguire il commit di tutti gli aggiornamenti (inserisce e rimuove) e restituire una nuova versione. |
stateOperators.memoryUsedBytes |
Memoria utilizzata dall'archivio stati. |
stateOperators.numRowsDroppedByWatermark |
Numero di righe considerate troppo tardi per essere incluse in un'aggregazione con stato. Solo aggregazioni di streaming: numero di righe eliminate dopo l'aggregazione (non righe di input non elaborate). Questo numero non è preciso, ma fornisce un'indicazione che i dati in ritardo vengono eliminati. |
stateOperators.numShufflePartitions |
Numero di partizioni casuali per questo operatore con stato. |
stateOperators.numStateStoreInstances |
Istanza effettiva dell'archivio di stati inizializzata e gestita dall'operatore. Per molti operatori con stato, corrisponde al numero di partizioni. Tuttavia, i join stream-stream inizializzano quattro istanze dell'archivio stati per partizione. |
oggetto stateOperators.customMetrics
Informazioni raccolte da RocksDB che acquisisce le metriche relative alle prestazioni e alle operazioni relative ai valori con stato che mantiene per il processo Structured Streaming. Per altre informazioni, vedere Configurare l'archivio stati di RocksDB in Azure Databricks.
Metrico | Descrizione |
---|---|
customMetrics.rocksdbBytesCopied |
Numero di byte copiati come rilevato da Gestione file RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
Tempo in millisecondi durante l'acquisizione di uno snapshot di RocksDB nativo e la scrittura in una directory locale. |
customMetrics.rocksdbCompactLatency |
Tempo in millisecondi di compattazione (facoltativo) durante il commit del checkpoint. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Tempo in millisecondi durante la sincronizzazione dello snapshot RocksDB nativo con l'archiviazione esterna (posizione del checkpoint). |
customMetrics.rocksdbCommitFlushLatency |
Tempo in millisecondi di scaricamento delle modifiche in memoria di RocksDB nel disco locale. |
customMetrics.rocksdbCommitPauseLatency |
Tempo in millisecondi che arresta i thread di lavoro in background come parte del commit del checkpoint, ad esempio per la compattazione. |
customMetrics.rocksdbCommitWriteBatchLatency |
Tempo in millisecondi durante l'applicazione delle scritture di staging nella struttura in memoria (WriteBatch ) a RocksDB nativo. |
customMetrics.rocksdbFilesCopied |
Numero di file copiati come rilevato da Gestione file RocksDB. |
customMetrics.rocksdbFilesReused |
Numero di file riutilizzati come rilevato da Gestione file RocksDB. |
customMetrics.rocksdbGetCount |
Numero di get chiamate al database (non include gets il WriteBatch batch in memoria usato per le scritture di staging). |
customMetrics.rocksdbGetLatency |
Tempo medio in nanosecondi per la chiamata nativa RocksDB::Get sottostante. |
customMetrics.rocksdbReadBlockCacheHitCount |
Numero di riscontri nella cache dei blocchi in RocksDB utili per evitare letture su disco locale. |
customMetrics.rocksdbReadBlockCacheMissCount |
Il conteggio della cache dei blocchi in RocksDB non è utile per evitare letture su disco locale. |
customMetrics.rocksdbSstFileSize |
Dimensioni di tutti i file SST (Static Sorted Table): la struttura tabulare RocksDB usa per archiviare i dati. |
customMetrics.rocksdbTotalBytesRead |
Numero di byte non compressi letti dalle get operazioni. |
customMetrics.rocksdbTotalBytesReadByCompaction |
Numero di byte letti dal processo di compattazione dal disco. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Numero totale di byte di dati non compressi letti usando un iteratore. Alcune operazioni con stato (ad esempio, l'elaborazione del timeout in FlatMapGroupsWithState e la filigrana) richiedono la lettura dei dati nel database tramite un iteratore. |
customMetrics.rocksdbTotalBytesWritten |
Numero totale di byte non compressi scritti dalle put operazioni. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Numero totale di byte scritti dal processo di compattazione nel disco. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Tempo in millisecondi per le compattazioni di RocksDB, incluse le compattazioni in background e la compattazione facoltativa avviata durante il commit. |
customMetrics.rocksdbTotalFlushLatencyMs |
Tempo totale di scaricamento, incluso lo scaricamento dello sfondo. Le operazioni di scaricamento sono processi in base ai quali l'oggetto MemTable viene scaricato nell'archiviazione una volta che è pieno. MemTables sono il primo livello in cui i dati vengono archiviati in RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
Dimensioni in byte dei file ZIP non compressi come indicato da Gestione file. Gestione file gestisce l'utilizzo e l'eliminazione dello spazio su disco del file SST fisico. |
oggetto sources (Kafka)
Metrico | Descrizione |
---|---|
sources.description |
Descrizione dettagliata dell'origine Kafka, specificando l'argomento Kafka esatto da cui leggere. Ad esempio: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
Oggetto sources.startOffset |
Numero di offset iniziale all'interno dell'argomento Kafka in cui è stato avviato il processo di streaming. |
Oggetto sources.endOffset |
Ultimo offset elaborato dal microbatch. Potrebbe essere uguale a latestOffset per un'esecuzione di microbatch in corso. |
Oggetto sources.latestOffset |
L'offset più recente è stato determinato dal microbatch. Il processo di microbatching potrebbe non elaborare tutti gli offset quando è presente una limitazione, che comporta endOffset e latestOffset differenze. |
sources.numInputRows |
Numero di righe di input elaborate dall'origine. |
sources.inputRowsPerSecond |
Frequenza con cui arrivano i dati per l'elaborazione da questa origine. |
sources.processedRowsPerSecond |
Frequenza con cui Spark elabora i dati da questa origine. |
Oggetto sources.metrics (Kafka)
Metrico | Descrizione |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Numero medio di offset che la query di streaming è dietro l'offset disponibile più recente tra tutti gli argomenti sottoscritti. |
sources.metrics.estimatedTotalBytesBehindLatest |
Numero stimato di byte non utilizzati dal processo di query dagli argomenti sottoscritti. |
sources.metrics.maxOffsetsBehindLatest |
Numero massimo di offset che la query di streaming è dietro l'offset disponibile più recente tra tutti gli argomenti sottoscritti. |
sources.metrics.minOffsetsBehindLatest |
Numero minimo di offset che la query di streaming è dietro l'offset disponibile più recente tra tutti gli argomenti sottoscritti. |
oggetto sink (Kafka)
Metrico | Descrizione |
---|---|
sink.description |
Descrizione del sink Kafka in cui viene scritta la query di streaming, che descrive in dettaglio l'implementazione del sink Kafka specifica in uso. Ad esempio: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Numero di righe scritte nella tabella di output o nel sink come parte del microbatch. Per alcune situazioni, questo valore può essere "-1" e in genere può essere interpretato come "sconosciuto". |
oggetto sources (Delta Lake)
Metrico | Descrizione |
---|---|
sources.description |
Descrizione dell'origine da cui la query di streaming sta leggendo. Ad esempio: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
Versione della serializzazione con cui viene codificato questo offset. |
sources.[startOffset/endOffset].reservoirId |
ID della tabella da leggere. Viene usato per rilevare errori di configurazione durante il riavvio di una query. |
sources.[startOffset/endOffset].reservoirVersion |
Versione della tabella attualmente in fase di elaborazione. |
sources.[startOffset/endOffset].index |
Indice nella sequenza di AddFiles in questa versione. Viene usato per suddividere i commit di grandi dimensioni in più batch. Questo indice viene creato ordinando su modificationTimestamp e path . |
sources.[startOffset/endOffset].isStartingVersion |
Identifica se l'offset corrente contrassegna l'inizio di una nuova query di streaming anziché l'elaborazione delle modifiche apportate dopo l'elaborazione dei dati iniziali. Quando si avvia una nuova query, tutti i dati presenti nella tabella all'inizio vengono elaborati per primi e quindi tutti i nuovi dati che arrivano. |
sources.latestOffset |
Offset più recente elaborato dalla query di microbatch. |
sources.numInputRows |
Numero di righe di input elaborate dall'origine. |
sources.inputRowsPerSecond |
Frequenza con cui arrivano i dati per l'elaborazione da questa origine. |
sources.processedRowsPerSecond |
Frequenza con cui Spark elabora i dati da questa origine. |
sources.metrics.numBytesOutstanding |
Dimensioni combinate dei file in sospeso (file rilevati da RocksDB). Si tratta della metrica di backlog per Delta e Auto Loader come origine di streaming. |
sources.metrics.numFilesOutstanding |
Numero di file in sospeso da elaborare. Si tratta della metrica di backlog per Delta e Auto Loader come origine di streaming. |
oggetto sink (Delta Lake)
Metrico | Descrizione |
---|---|
sink.description |
Descrizione del sink Delta, che descrive in dettaglio l'implementazione specifica del sink Delta in uso. Ad esempio: “DeltaSink[table]” . |
sink.numOutputRows |
Il numero di righe è sempre "-1" perché Spark non può dedurre le righe di output per i sink DSv1, ovvero la classificazione per il sink Delta Lake. |
Esempi
Esempio di evento Kafka-to-Kafka StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Esempio di evento Delta Lake StreamingQueryListener da Delta Lake a Delta Lake
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Esempio di evento Verso Delta Lake StreamingQueryListener
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Esempio di evento Kafka+Delta Lake StreamingQueryListener da Kafka+Delta Lake
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Esempio di origine della frequenza con l'evento Delta Lake StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}