Überwachung strukturierter Streaming-Abfragen auf Azure Databricks
Azure Databricks bietet über die Spark-Benutzeroberfläche auf der Registerkarte Streaming eine integrierte Überwachung für strukturierte Streaming-Anwendungen.
Unterscheiden Sie strukturierte Streaming-Abfragen in der Spark-Benutzeroberfläche
Geben Sie Ihren Streams einen eindeutigen Abfragenamen, indem Sie .queryName(<query-name>)
zu Ihrem writeStream
-Code hinzufügen, damit Sie in der Spark-Benutzeroberfläche leicht unterscheiden können, welche Metriken zu welchem Stream gehören.
Pushen Sie strukturierte Streaming-Metriken an externe Dienste
Streamingmetriken können unter Verwendung der Streamingabfragelistener-Schnittstelle von Apache Spark an externe Dienste für Warnungs- oder Dashboardanwendungsfälle übertragen werden. Ab Databricks Runtime 11.3 LTS ist der Streamingabfragelistener in Python und Scala verfügbar.
Wichtig
Anmeldeinformationen und Objekte, die von Unity Catalog verwaltet werden, können nicht in der StreamingQueryListener
-Logik verwendet werden.
Hinweis
Die Verarbeitungslatenz mit Listenern kann sich erheblich auf die Abfrageverarbeitungsgeschwindigkeiten auswirken. Es wird empfohlen, die Verarbeitungslogik in diesen Listenern einzuschränken und sich für das Schreiben in Fast-Response-Systeme wie Kafka zur Effizienz zu entscheiden.
Der folgende Code enthält einfache Syntaxbeispiele für die Implementierung eines Listeners:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definieren von beobachtbaren Metriken in strukturiertem Streaming
Beobachtbare Metriken werden als beliebige Aggregatfunktionen bezeichnet, die in einer Abfrage (DataFrame) definiert werden können. Sobald die Ausführung eines DataFrames einen Abschlusspunkt erreicht (d. h. eine Batchabfrage beendet oder eine Streamingepoche erreicht), wird ein benanntes Ereignis ausgegeben, das die Metriken für die seit dem letzten Abschlusspunkt verarbeiteten Daten enthält.
Sie können diese Metriken beobachten, indem Sie einen Listener an die Spark-Sitzung anfügen. Der Listener hängt vom Ausführungsmodus ab:
Batchmodus: Verwenden Sie
QueryExecutionListener
.QueryExecutionListener
wird aufgerufen, wenn die Abfrage abgeschlossen ist. Greifen Sie mithilfe derQueryExecution.observedMetrics
-Karte auf die Metriken zu.Streaming oder Microbatch: Verwenden Sie
StreamingQueryListener
.StreamingQueryListener
wird aufgerufen, wenn die Streamingabfrage eine Epoche abgeschlossen hat. Greifen Sie mithilfe derStreamingQueryProgress.observedMetrics
-Karte auf die Metriken zu. Azure Databricks unterstützt kein fortlaufendes Ausführungsstreaming.
Beispiel:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
StreamingQueryListener-Objektmetriken
Metrik | Beschreibung |
---|---|
id |
Eine eindeutige Abfrage-ID, die über Neustarts hinweg bestehen bleibt. |
runId |
Eine Abfrage-ID, die für jeden Start/Neustart eindeutig ist. Siehe StreamingQuery.runId(). |
name |
Die benutzerdefinierter Name der Abfrage. Der Name ist NULL, wenn kein Name angegeben wird. |
timestamp |
Der Zeitstempel für die Ausführung des Micro-Batch. |
batchId |
Ein eindeutige ID für den aktuellen Datenbatch der verarbeiteten Daten. Im Falle von Wiederholungsversuchen nach einem Fehler kann eine bestimmte Batch-ID mehr als einmal ausgeführt werden. Wenn keine Daten zu verarbeiten sind, wird die Batch-ID nicht erhöht. |
numInputRows |
Die Gesamtzahl der in einem Trigger verarbeiteten Datensätze (für alle Quellen hinweg). |
inputRowsPerSecond |
Die aggregierte Rate der eingehenden Daten (über alle Quellen hinweg). |
processedRowsPerSecond |
Die aggregierte Geschwindigkeit, mit der Spark Daten verarbeitet (über alle Quellen hinweg). |
durationMs-Objekt
Informationen über die Zeit, die benötigt wird, um die verschiedenen Phasen der Ausführung des Micro-Batch-Prozesses abzuschließen.
Metrik | Beschreibung |
---|---|
durationMs.addBatch |
Die Zeit, die für die Ausführung des Mikro-Batch benötigt wird. Dies schließt die Zeit aus, die Spark für die Planung des Mikro-Batch benötigt. |
durationMs.getBatch |
Die Zeit, die benötigt wird, um die Metadaten zu den Offsets von der Quelle abzurufen. |
durationMs.latestOffset |
Zuletzt verbrauchter Offset für das Mikro-Batch. Dieses Fortschrittsobjekt bezieht sich auf die Zeit, die benötigt wird, um den letzten Offset aus den Quellen abzurufen. |
durationMs.queryPlanning |
Die Zeit, die für die Erstellung des Ausführungsplans benötigt wird. |
durationMs.triggerExecution |
Die Zeit, die zum Planen und Ausführen des Mikro-Batch benötigt wird. |
durationMs.walCommit |
Die Zeit, die für die Übertragung der neuen verfügbaren Offsets benötigt wird. |
eventTime-Objekt
Informationen über den Zeitwert des Ereignisses in den Daten, die im Micro-Batch verarbeitet werden. Diese Daten werden vom Wasserzeichen verwendet, um herauszufinden, wie der Zustand für die Verarbeitung von zustandsbehafteten Aggregationen, die im Auftrag für strukturiertes Streaming definiert sind, gekürzt werden kann.
Metrik | Beschreibung |
---|---|
eventTime.avg |
Die durchschnittliche Ereigniszeit, die in diesem Trigger zu sehen ist. |
eventTime.max |
Die maximale Ereigniszeit, die in diesem Trigger angezeigt wird. |
eventTime.min |
Die minimale Ereigniszeit, die in diesem Trigger zu sehen ist. |
eventTime.watermark |
Der Wert des Wasserzeichens, das im Auslöser verwendet wird. |
stateOperators-Objekt
Informationen über die zustandsbehafteten Vorgänge, die im Auftrag für strukturiertes Streaming definiert sind, und die daraus resultierenden Aggregationen.
Metrik | Beschreibung |
---|---|
stateOperators.operatorName |
Der Name des zustandsbehafteten Operators, auf den sich die Metriken beziehen, z. B. symmetricHashJoin , dedupe . stateStoreSave . |
stateOperators.numRowsTotal |
Die Gesamtanzahl der Zeilen im Zustand als Ergebnis des zustandsbehafteten Operators oder der Aggregation. |
stateOperators.numRowsUpdated |
Die Gesamtanzahl der Zeilen, die als Ergebnis des zustandsbehafteten Operators oder der Aggregation im Zustand aktualisiert wurden. |
stateOperators.allUpdatesTimeMs |
Diese Metrik ist derzeit von Spark nicht messbar und soll in zukünftigen Updates entfernt werden. |
stateOperators.numRowsRemoved |
Die Gesamtanzahl der Zeilen, die als Ergebnis des zustandsbehafteten Operators oder der Aggregation aus dem Zustand entfernt wurden. |
stateOperators.allRemovalsTimeMs |
Diese Metrik ist derzeit von Spark nicht messbar und soll in zukünftigen Updates entfernt werden. |
stateOperators.commitTimeMs |
Die Zeit, die benötigt wird, um alle Aktualisierungen (hinzugefügt und entfernt) zu übertragen und eine neue Version zu erstellen. |
stateOperators.memoryUsedBytes |
Vom Zustandsspeicher verwendeter Speicher. |
stateOperators.numRowsDroppedByWatermark |
Die Anzahl der Zeilen, die als zu spät angesehen werden, um in die zustandsbehaftete Aggregation einbezogen zu werden. Gilt nur für Streamingaggregationen: Die Anzahl der Zeilen, die nach der Aggregation verworfen wurden, und keine unformatierten Eingabezeilen. Diese Zahl ist nicht genau, gibt aber einen Hinweis darauf, dass verspätete Daten verworfen werden. |
stateOperators.numShufflePartitions |
Die Anzahl der Shuffle-Partitionen für diesen zustandsbehafteten Operator. |
stateOperators.numStateStoreInstances |
Die tatsächliche Zustandsspeicherinstanz, die der Operator initialisiert und verwaltet hat. Bei vielen zustandsbehafteten Operatoren entspricht dies der Anzahl der Partitionen. Stream-Stream-Verknüpfungen initialisieren jedoch vier Zustandsspeicherinstanzen pro Partition. |
stateOperators.customMetrics-Objekt
Von RocksDB gesammelte Informationen, die Metriken über die Leistung und die Vorgänge in Bezug auf die zustandsbehafteten Werte erfassen, die für den strukturierten Streaming-Auftrag verwaltet werden. Weitere Informationen finden Sie unter Konfigurieren Sie den RocksDB-Statusspeicher auf Azure Databricks.
Metrik | Beschreibung |
---|---|
customMetrics.rocksdbBytesCopied |
Die Anzahl der kopierten Bytes, die vom RocksDB File Manager nachverfolgt werden. |
customMetrics.rocksdbCommitCheckpointLatency |
Die Zeit in Millisekunden, um eine Momentaufnahme der nativen RocksDB zu machen und sie in ein lokales Verzeichnis zu schreiben. |
customMetrics.rocksdbCompactLatency |
Die Zeit in Millisekunden für die Komprimierung (optional) während des Prüfpunktcommits. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Die Zeit in Millisekunden, die die native RocksDB-Momentaufnahme mit externem Speicher synchronisiert (der Prüfpunktstandort). |
customMetrics.rocksdbCommitFlushLatency |
Die Zeit in Millisekunden beim Leeren der RocksDB-Speicheränderungen auf dem lokalen Datenträger. |
customMetrics.rocksdbCommitPauseLatency |
Die Zeit in Millisekunden, die die Hintergrundarbeitsthreads als Teil des Prüfpunkt-Commits beendet, z. B. zur Komprimierung. |
customMetrics.rocksdbCommitWriteBatchLatency |
Die Zeit in Millisekunden, um die gestaffelten Schreibvorgänge in der In-Memory-Struktur (WriteBatch ) auf die native RocksDB-Instanz anzuwenden. |
customMetrics.rocksdbFilesCopied |
Die Anzahl der kopierten Dateien, die vom RocksDB File Manager nachverfolgt werden. |
customMetrics.rocksdbFilesReused |
Die Anzahl der wiederverwendeten Dateien, die vom RocksDB File Manager nachverfolgt werden. |
customMetrics.rocksdbGetCount |
Die Anzahl der get -Aufrufe an die DB (beinhaltet nicht gets von WriteBatch In-Memory-Batch, der für Staging-Schreibvorgänge verwendet wird). |
customMetrics.rocksdbGetLatency |
Die durchschnittliche Zeit in Nanosekunden pro zugrunde liegendem nativen RocksDB::Get -Aufruf. |
customMetrics.rocksdbReadBlockCacheHitCount |
Die Anzahl der Cachetreffer aus dem Blockcache in RocksDB, die nützlich sind, um lokale Datenträgerlesevorgänge zu vermeiden. |
customMetrics.rocksdbReadBlockCacheMissCount |
Die Anzahl des Blockcaches in RocksDB ist nicht hilfreich, um lokale Datenträgerlesevorgänge zu vermeiden. |
customMetrics.rocksdbSstFileSize |
Die Größe aller SST-Datei (Static Sorted Table) – die tabellarische Struktur RocksDB verwendet, um Daten zu speichern. |
customMetrics.rocksdbTotalBytesRead |
Die Anzahl unkomprimierter Bytes, die durch get -Vorgänge gelesen wurden. |
customMetrics.rocksdbTotalBytesReadByCompaction |
Die Anzahl der Bytes, die der Komprimierungsprozess vom Datenträger liest. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Die Gesamtzahl der Bytes von nicht komprimierten Daten, die mit einem Iterator gelesen werden. Einige der zustandsbehafteten Vorgänge (z. B. Timeout-Verarbeitung in FlatMapGroupsWithState und Wasserzeichen) erfordern das Lesen von Daten in der DB über einen Iterator. |
customMetrics.rocksdbTotalBytesWritten |
Die Gesamtanzahl der unkomprimierten Bytes, die durch put -Vorgänge geschrieben wurden. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Die Gesamtanzahl der Bytes, die der Komprimierungsprozess auf den Datenträger schreibt. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Die Zeit in Millisekunden für die Komprimierung von RocksDB, einschließlich der Komprimierung im Hintergrund und der optionalen Komprimierung, die während des Commits eingeleitet wird. |
customMetrics.rocksdbTotalFlushLatencyMs |
Die Gesamtleerungsdauer, einschließlich Hintergrundleerung. Leerungsvorgänge sind Vorgänge, mit denen die MemTable in den Speicher geleert wird, sobald sie voll ist. MemTables sind die erste Ebene, auf der Daten in RocksDB gespeichert werden. |
customMetrics.rocksdbZipFileBytesUncompressed |
Die Größe in Byte der nicht komprimierten ZIP-Dateien, wie vom Datei-Manager angegeben. Der File Manager verwaltet die Nutzung und Löschung des physischen SST-Dateispeichers. |
sources-Objekt (Kafka)
Metrik | Beschreibung |
---|---|
sources.description |
Eine detaillierte Beschreibung der Kafka-Quelle, die das genaue Kafka-Thema angibt, aus dem gelesen wird. Beispiel: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” |
sources.startOffset -Objekt |
Die Start-Offset-Nummer innerhalb des Kafka-Themas, mit dem der Streaming-Auftrag begonnen hat. |
sources.endOffset -Objekt |
Der letzte vom Mikro-Batch verarbeiteter Offset. Bei einer laufenden Mikro-Batch-Ausführung könnte dies gleich latestOffset sein. |
sources.latestOffset -Objekt |
Der letzte vom Mikrobatch ermittelte Wert. Der Mikrobatchingsprozess verarbeitet möglicherweise nicht alle Offsets, wenn eine Drosselung vorhanden ist, was zu unterschiedlichen Ergebnissen von endOffset und latestOffset führt. |
sources.numInputRows |
Die Anzahl der verarbeiteten Eingabezeilen aus dieser Quelle. |
sources.inputRowsPerSecond |
Die Rate, mit der die Daten von dieser Quelle zur Verarbeitung ankommen. |
sources.processedRowsPerSecond |
Die Rate, mit der Spark die Daten von dieser Quelle verarbeitet. |
sources.metrics-Objekt (Kafka)
Metrik | Beschreibung |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Die durchschnittliche Anzahl der Offsets, um die die Streaming-Anfrage unter allen abonnierten Themen hinter dem letzten verfügbaren Offset zurückliegt. |
sources.metrics.estimatedTotalBytesBehindLatest |
Die geschätzte Anzahl der Bytes, die der Abfrageprozess nicht von den abonnierten Themen verbraucht hat. |
sources.metrics.maxOffsetsBehindLatest |
Die maximale Anzahl der Offsets, um die die Streaming-Anfrage hinter dem letzten verfügbaren Offset aller abonnierten Themen zurückliegt. |
sources.metrics.minOffsetsBehindLatest |
Die Mindestanzahl der Offsets, um die die Streaming-Anfrage hinter dem letzten verfügbaren Offset aller abonnierten Themen zurückliegt. |
sink-Objekt (Kafka)
Metrik | Beschreibung |
---|---|
sink.description |
Die Beschreibung der Kafka-Spüle, in die die Streamingabfrage geschrieben wird, und die spezifische Kafka-Senkenimplementierung wird verwendet. Beispiel: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” |
sink.numOutputRows |
Die Anzahl der Zeilen, die als Teil des Mikro-Batch in die Ausgabetabelle oder Senke geschrieben wurden. In einigen Situationen kann dieser Wert „-1“ sein und kann im Allgemeinen als „unbekannt“ interpretiert werden. |
sources-Objekt (Delta Lake)
Metrik | Beschreibung |
---|---|
sources.description |
Die Beschreibung der Quelle, aus der die Streamingabfrage gelesen wird. Beispiel: “DeltaSource[table]” |
sources.[startOffset/endOffset].sourceVersion |
Die Version der Serialisierung, mit der dieser Offset codiert ist. |
sources.[startOffset/endOffset].reservoirId |
Die ID der Tabelle, die gelesen wird. Dies wird verwendet, um Fehlkonfigurationen beim Neustart einer Abfrage zu erkennen. |
sources.[startOffset/endOffset].reservoirVersion |
Die Version der Tabelle, die gerade bearbeitet. |
sources.[startOffset/endOffset].index |
Der Index in der Sequenz von AddFiles in dieser Version. Dies wird verwendet, um große Commits in mehrere Batches aufzuteilen. Dieser Index wird durch Sortieren nach modificationTimestamp und path erstellt. |
sources.[startOffset/endOffset].isStartingVersion |
Gibt an, ob der aktuelle Offset den Anfang einer neuen Streamingabfrage markiert, anstatt die Verarbeitung von Änderungen, die nach der Verarbeitung der ursprünglichen Daten aufgetreten sind. Wenn Sie eine neue Abfrage starten, werden zunächst alle Daten verarbeitet, die sich zu Beginn in der Tabelle befinden, und dann die neu hinzugekommenen Daten. |
sources.latestOffset |
Der letzte vom Mikro-Batch verarbeitete Offset. |
sources.numInputRows |
Die Anzahl der verarbeiteten Eingabezeilen aus dieser Quelle. |
sources.inputRowsPerSecond |
Die Rate, mit der die Daten von dieser Quelle zur Verarbeitung ankommen. |
sources.processedRowsPerSecond |
Die Rate, mit der Spark die Daten von dieser Quelle verarbeitet. |
sources.metrics.numBytesOutstanding |
Die Gesamtgröße der ausstehenden Dateien (von RocksDB nachverfolgte Dateien). Dies ist die Backlog-Metrik für Delta und Autoloader als Streaming-Quelle. |
sources.metrics.numFilesOutstanding |
Die Anzahl der ausstehenden Dateien, die verarbeitet werden müssen. Dies ist die Backlog-Metrik für Delta und Autoloader als Streaming-Quelle. |
sink-Objekt (Delta Lake)
Metrik | Beschreibung |
---|---|
sink.description |
Die Beschreibung der Delta-Senke, in der die spezifische Delta-Senkenimplementierung verwendet wird. Beispiel: “DeltaSink[table]” |
sink.numOutputRows |
Die Anzahl der Zeilen ist immer „-1“, da Spark keine Ausgabezeilen für DSv1-Senken ableiten kann, was die Klassifizierung für die Delta Lake-Senke ist. |
Beispiele
Beispielereignis Kafka-zu-Kafka StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Beispielereignis Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Beispielereignis Kinesis-to-Delta Lake StreamingQueryListener
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Beispielereignis Kafka+Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Beispielereignis Raten-Quelle zu Delta Lake StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}