Freigeben über


Überwachung strukturierter Streaming-Abfragen auf Azure Databricks

Azure Databricks bietet über die Spark-Benutzeroberfläche auf der Registerkarte Streaming eine integrierte Überwachung für strukturierte Streaming-Anwendungen.

Unterscheiden Sie strukturierte Streaming-Abfragen in der Spark-Benutzeroberfläche

Geben Sie Ihren Streams einen eindeutigen Abfragenamen, indem Sie .queryName(<query-name>) zu Ihrem writeStream-Code hinzufügen, damit Sie in der Spark-Benutzeroberfläche leicht unterscheiden können, welche Metriken zu welchem Stream gehören.

Pushen Sie strukturierte Streaming-Metriken an externe Dienste

Streamingmetriken können unter Verwendung der Streamingabfragelistener-Schnittstelle von Apache Spark an externe Dienste für Warnungs- oder Dashboardanwendungsfälle übertragen werden. Ab Databricks Runtime 11.3 LTS ist der Streamingabfragelistener in Python und Scala verfügbar.

Wichtig

Anmeldeinformationen und Objekte, die von Unity Catalog verwaltet werden, können nicht in der StreamingQueryListener-Logik verwendet werden.

Hinweis

Die Verarbeitungslatenz mit Listenern kann sich erheblich auf die Abfrageverarbeitungsgeschwindigkeiten auswirken. Es wird empfohlen, die Verarbeitungslogik in diesen Listenern einzuschränken und sich für das Schreiben in Fast-Response-Systeme wie Kafka zur Effizienz zu entscheiden.

Der folgende Code enthält einfache Syntaxbeispiele für die Implementierung eines Listeners:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definieren von beobachtbaren Metriken in strukturiertem Streaming

Beobachtbare Metriken werden als beliebige Aggregatfunktionen bezeichnet, die in einer Abfrage (DataFrame) definiert werden können. Sobald die Ausführung eines DataFrames einen Abschlusspunkt erreicht (d. h. eine Batchabfrage beendet oder eine Streamingepoche erreicht), wird ein benanntes Ereignis ausgegeben, das die Metriken für die seit dem letzten Abschlusspunkt verarbeiteten Daten enthält.

Sie können diese Metriken beobachten, indem Sie einen Listener an die Spark-Sitzung anfügen. Der Listener hängt vom Ausführungsmodus ab:

  • Batchmodus: Verwenden Sie QueryExecutionListener.

    QueryExecutionListener wird aufgerufen, wenn die Abfrage abgeschlossen ist. Greifen Sie mithilfe der QueryExecution.observedMetrics-Karte auf die Metriken zu.

  • Streaming oder Microbatch: Verwenden Sie StreamingQueryListener.

    StreamingQueryListener wird aufgerufen, wenn die Streamingabfrage eine Epoche abgeschlossen hat. Greifen Sie mithilfe der StreamingQueryProgress.observedMetrics-Karte auf die Metriken zu. Azure Databricks unterstützt kein fortlaufendes Ausführungsstreaming.

Beispiel:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener-Objektmetriken

Metrik Beschreibung
id Eine eindeutige Abfrage-ID, die über Neustarts hinweg bestehen bleibt.
runId Eine Abfrage-ID, die für jeden Start/Neustart eindeutig ist. Siehe StreamingQuery.runId().
name Die benutzerdefinierter Name der Abfrage. Der Name ist NULL, wenn kein Name angegeben wird.
timestamp Der Zeitstempel für die Ausführung des Micro-Batch.
batchId Ein eindeutige ID für den aktuellen Datenbatch der verarbeiteten Daten. Im Falle von Wiederholungsversuchen nach einem Fehler kann eine bestimmte Batch-ID mehr als einmal ausgeführt werden. Wenn keine Daten zu verarbeiten sind, wird die Batch-ID nicht erhöht.
numInputRows Die Gesamtzahl der in einem Trigger verarbeiteten Datensätze (für alle Quellen hinweg).
inputRowsPerSecond Die aggregierte Rate der eingehenden Daten (über alle Quellen hinweg).
processedRowsPerSecond Die aggregierte Geschwindigkeit, mit der Spark Daten verarbeitet (über alle Quellen hinweg).

durationMs-Objekt

Informationen über die Zeit, die benötigt wird, um die verschiedenen Phasen der Ausführung des Micro-Batch-Prozesses abzuschließen.

Metrik Beschreibung
durationMs.addBatch Die Zeit, die für die Ausführung des Mikro-Batch benötigt wird. Dies schließt die Zeit aus, die Spark für die Planung des Mikro-Batch benötigt.
durationMs.getBatch Die Zeit, die benötigt wird, um die Metadaten zu den Offsets von der Quelle abzurufen.
durationMs.latestOffset Zuletzt verbrauchter Offset für das Mikro-Batch. Dieses Fortschrittsobjekt bezieht sich auf die Zeit, die benötigt wird, um den letzten Offset aus den Quellen abzurufen.
durationMs.queryPlanning Die Zeit, die für die Erstellung des Ausführungsplans benötigt wird.
durationMs.triggerExecution Die Zeit, die zum Planen und Ausführen des Mikro-Batch benötigt wird.
durationMs.walCommit Die Zeit, die für die Übertragung der neuen verfügbaren Offsets benötigt wird.

eventTime-Objekt

Informationen über den Zeitwert des Ereignisses in den Daten, die im Micro-Batch verarbeitet werden. Diese Daten werden vom Wasserzeichen verwendet, um herauszufinden, wie der Zustand für die Verarbeitung von zustandsbehafteten Aggregationen, die im Auftrag für strukturiertes Streaming definiert sind, gekürzt werden kann.

Metrik Beschreibung
eventTime.avg Die durchschnittliche Ereigniszeit, die in diesem Trigger zu sehen ist.
eventTime.max Die maximale Ereigniszeit, die in diesem Trigger angezeigt wird.
eventTime.min Die minimale Ereigniszeit, die in diesem Trigger zu sehen ist.
eventTime.watermark Der Wert des Wasserzeichens, das im Auslöser verwendet wird.

stateOperators-Objekt

Informationen über die zustandsbehafteten Vorgänge, die im Auftrag für strukturiertes Streaming definiert sind, und die daraus resultierenden Aggregationen.

Metrik Beschreibung
stateOperators.operatorName Der Name des zustandsbehafteten Operators, auf den sich die Metriken beziehen, z. B. symmetricHashJoin, dedupe. stateStoreSave.
stateOperators.numRowsTotal Die Gesamtanzahl der Zeilen im Zustand als Ergebnis des zustandsbehafteten Operators oder der Aggregation.
stateOperators.numRowsUpdated Die Gesamtanzahl der Zeilen, die als Ergebnis des zustandsbehafteten Operators oder der Aggregation im Zustand aktualisiert wurden.
stateOperators.allUpdatesTimeMs Diese Metrik ist derzeit von Spark nicht messbar und soll in zukünftigen Updates entfernt werden.
stateOperators.numRowsRemoved Die Gesamtanzahl der Zeilen, die als Ergebnis des zustandsbehafteten Operators oder der Aggregation aus dem Zustand entfernt wurden.
stateOperators.allRemovalsTimeMs Diese Metrik ist derzeit von Spark nicht messbar und soll in zukünftigen Updates entfernt werden.
stateOperators.commitTimeMs Die Zeit, die benötigt wird, um alle Aktualisierungen (hinzugefügt und entfernt) zu übertragen und eine neue Version zu erstellen.
stateOperators.memoryUsedBytes Vom Zustandsspeicher verwendeter Speicher.
stateOperators.numRowsDroppedByWatermark Die Anzahl der Zeilen, die als zu spät angesehen werden, um in die zustandsbehaftete Aggregation einbezogen zu werden. Gilt nur für Streamingaggregationen: Die Anzahl der Zeilen, die nach der Aggregation verworfen wurden, und keine unformatierten Eingabezeilen. Diese Zahl ist nicht genau, gibt aber einen Hinweis darauf, dass verspätete Daten verworfen werden.
stateOperators.numShufflePartitions Die Anzahl der Shuffle-Partitionen für diesen zustandsbehafteten Operator.
stateOperators.numStateStoreInstances Die tatsächliche Zustandsspeicherinstanz, die der Operator initialisiert und verwaltet hat. Bei vielen zustandsbehafteten Operatoren entspricht dies der Anzahl der Partitionen. Stream-Stream-Verknüpfungen initialisieren jedoch vier Zustandsspeicherinstanzen pro Partition.

stateOperators.customMetrics-Objekt

Von RocksDB gesammelte Informationen, die Metriken über die Leistung und die Vorgänge in Bezug auf die zustandsbehafteten Werte erfassen, die für den strukturierten Streaming-Auftrag verwaltet werden. Weitere Informationen finden Sie unter Konfigurieren Sie den RocksDB-Statusspeicher auf Azure Databricks.

Metrik Beschreibung
customMetrics.rocksdbBytesCopied Die Anzahl der kopierten Bytes, die vom RocksDB File Manager nachverfolgt werden.
customMetrics.rocksdbCommitCheckpointLatency Die Zeit in Millisekunden, um eine Momentaufnahme der nativen RocksDB zu machen und sie in ein lokales Verzeichnis zu schreiben.
customMetrics.rocksdbCompactLatency Die Zeit in Millisekunden für die Komprimierung (optional) während des Prüfpunktcommits.
customMetrics.rocksdbCommitFileSyncLatencyMs Die Zeit in Millisekunden, die die native RocksDB-Momentaufnahme mit externem Speicher synchronisiert (der Prüfpunktstandort).
customMetrics.rocksdbCommitFlushLatency Die Zeit in Millisekunden beim Leeren der RocksDB-Speicheränderungen auf dem lokalen Datenträger.
customMetrics.rocksdbCommitPauseLatency Die Zeit in Millisekunden, die die Hintergrundarbeitsthreads als Teil des Prüfpunkt-Commits beendet, z. B. zur Komprimierung.
customMetrics.rocksdbCommitWriteBatchLatency Die Zeit in Millisekunden, um die gestaffelten Schreibvorgänge in der In-Memory-Struktur (WriteBatch) auf die native RocksDB-Instanz anzuwenden.
customMetrics.rocksdbFilesCopied Die Anzahl der kopierten Dateien, die vom RocksDB File Manager nachverfolgt werden.
customMetrics.rocksdbFilesReused Die Anzahl der wiederverwendeten Dateien, die vom RocksDB File Manager nachverfolgt werden.
customMetrics.rocksdbGetCount Die Anzahl der get-Aufrufe an die DB (beinhaltet nicht gets von WriteBatch In-Memory-Batch, der für Staging-Schreibvorgänge verwendet wird).
customMetrics.rocksdbGetLatency Die durchschnittliche Zeit in Nanosekunden pro zugrunde liegendem nativen RocksDB::Get-Aufruf.
customMetrics.rocksdbReadBlockCacheHitCount Die Anzahl der Cachetreffer aus dem Blockcache in RocksDB, die nützlich sind, um lokale Datenträgerlesevorgänge zu vermeiden.
customMetrics.rocksdbReadBlockCacheMissCount Die Anzahl des Blockcaches in RocksDB ist nicht hilfreich, um lokale Datenträgerlesevorgänge zu vermeiden.
customMetrics.rocksdbSstFileSize Die Größe aller SST-Datei (Static Sorted Table) – die tabellarische Struktur RocksDB verwendet, um Daten zu speichern.
customMetrics.rocksdbTotalBytesRead Die Anzahl unkomprimierter Bytes, die durch get-Vorgänge gelesen wurden.
customMetrics.rocksdbTotalBytesReadByCompaction Die Anzahl der Bytes, die der Komprimierungsprozess vom Datenträger liest.
customMetrics.rocksdbTotalBytesReadThroughIterator Die Gesamtzahl der Bytes von nicht komprimierten Daten, die mit einem Iterator gelesen werden. Einige der zustandsbehafteten Vorgänge (z. B. Timeout-Verarbeitung in FlatMapGroupsWithState und Wasserzeichen) erfordern das Lesen von Daten in der DB über einen Iterator.
customMetrics.rocksdbTotalBytesWritten Die Gesamtanzahl der unkomprimierten Bytes, die durch put-Vorgänge geschrieben wurden.
customMetrics.rocksdbTotalBytesWrittenByCompaction Die Gesamtanzahl der Bytes, die der Komprimierungsprozess auf den Datenträger schreibt.
customMetrics.rocksdbTotalCompactionLatencyMs Die Zeit in Millisekunden für die Komprimierung von RocksDB, einschließlich der Komprimierung im Hintergrund und der optionalen Komprimierung, die während des Commits eingeleitet wird.
customMetrics.rocksdbTotalFlushLatencyMs Die Gesamtleerungsdauer, einschließlich Hintergrundleerung. Leerungsvorgänge sind Vorgänge, mit denen die MemTable in den Speicher geleert wird, sobald sie voll ist. MemTables sind die erste Ebene, auf der Daten in RocksDB gespeichert werden.
customMetrics.rocksdbZipFileBytesUncompressed Die Größe in Byte der nicht komprimierten ZIP-Dateien, wie vom Datei-Manager angegeben. Der File Manager verwaltet die Nutzung und Löschung des physischen SST-Dateispeichers.

sources-Objekt (Kafka)

Metrik Beschreibung
sources.description Eine detaillierte Beschreibung der Kafka-Quelle, die das genaue Kafka-Thema angibt, aus dem gelesen wird. Beispiel: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”
sources.startOffset-Objekt Die Start-Offset-Nummer innerhalb des Kafka-Themas, mit dem der Streaming-Auftrag begonnen hat.
sources.endOffset-Objekt Der letzte vom Mikro-Batch verarbeiteter Offset. Bei einer laufenden Mikro-Batch-Ausführung könnte dies gleich latestOffset sein.
sources.latestOffset-Objekt Der letzte vom Mikrobatch ermittelte Wert. Der Mikrobatchingsprozess verarbeitet möglicherweise nicht alle Offsets, wenn eine Drosselung vorhanden ist, was zu unterschiedlichen Ergebnissen von endOffset und latestOffset führt.
sources.numInputRows Die Anzahl der verarbeiteten Eingabezeilen aus dieser Quelle.
sources.inputRowsPerSecond Die Rate, mit der die Daten von dieser Quelle zur Verarbeitung ankommen.
sources.processedRowsPerSecond Die Rate, mit der Spark die Daten von dieser Quelle verarbeitet.

sources.metrics-Objekt (Kafka)

Metrik Beschreibung
sources.metrics.avgOffsetsBehindLatest Die durchschnittliche Anzahl der Offsets, um die die Streaming-Anfrage unter allen abonnierten Themen hinter dem letzten verfügbaren Offset zurückliegt.
sources.metrics.estimatedTotalBytesBehindLatest Die geschätzte Anzahl der Bytes, die der Abfrageprozess nicht von den abonnierten Themen verbraucht hat.
sources.metrics.maxOffsetsBehindLatest Die maximale Anzahl der Offsets, um die die Streaming-Anfrage hinter dem letzten verfügbaren Offset aller abonnierten Themen zurückliegt.
sources.metrics.minOffsetsBehindLatest Die Mindestanzahl der Offsets, um die die Streaming-Anfrage hinter dem letzten verfügbaren Offset aller abonnierten Themen zurückliegt.

sink-Objekt (Kafka)

Metrik Beschreibung
sink.description Die Beschreibung der Kafka-Spüle, in die die Streamingabfrage geschrieben wird, und die spezifische Kafka-Senkenimplementierung wird verwendet. Beispiel: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”
sink.numOutputRows Die Anzahl der Zeilen, die als Teil des Mikro-Batch in die Ausgabetabelle oder Senke geschrieben wurden. In einigen Situationen kann dieser Wert „-1“ sein und kann im Allgemeinen als „unbekannt“ interpretiert werden.

sources-Objekt (Delta Lake)

Metrik Beschreibung
sources.description Die Beschreibung der Quelle, aus der die Streamingabfrage gelesen wird. Beispiel: “DeltaSource[table]”
sources.[startOffset/endOffset].sourceVersion Die Version der Serialisierung, mit der dieser Offset codiert ist.
sources.[startOffset/endOffset].reservoirId Die ID der Tabelle, die gelesen wird. Dies wird verwendet, um Fehlkonfigurationen beim Neustart einer Abfrage zu erkennen.
sources.[startOffset/endOffset].reservoirVersion Die Version der Tabelle, die gerade bearbeitet.
sources.[startOffset/endOffset].index Der Index in der Sequenz von AddFiles in dieser Version. Dies wird verwendet, um große Commits in mehrere Batches aufzuteilen. Dieser Index wird durch Sortieren nach modificationTimestamp und path erstellt.
sources.[startOffset/endOffset].isStartingVersion Gibt an, ob der aktuelle Offset den Anfang einer neuen Streamingabfrage markiert, anstatt die Verarbeitung von Änderungen, die nach der Verarbeitung der ursprünglichen Daten aufgetreten sind. Wenn Sie eine neue Abfrage starten, werden zunächst alle Daten verarbeitet, die sich zu Beginn in der Tabelle befinden, und dann die neu hinzugekommenen Daten.
sources.latestOffset Der letzte vom Mikro-Batch verarbeitete Offset.
sources.numInputRows Die Anzahl der verarbeiteten Eingabezeilen aus dieser Quelle.
sources.inputRowsPerSecond Die Rate, mit der die Daten von dieser Quelle zur Verarbeitung ankommen.
sources.processedRowsPerSecond Die Rate, mit der Spark die Daten von dieser Quelle verarbeitet.
sources.metrics.numBytesOutstanding Die Gesamtgröße der ausstehenden Dateien (von RocksDB nachverfolgte Dateien). Dies ist die Backlog-Metrik für Delta und Autoloader als Streaming-Quelle.
sources.metrics.numFilesOutstanding Die Anzahl der ausstehenden Dateien, die verarbeitet werden müssen. Dies ist die Backlog-Metrik für Delta und Autoloader als Streaming-Quelle.

sink-Objekt (Delta Lake)

Metrik Beschreibung
sink.description Die Beschreibung der Delta-Senke, in der die spezifische Delta-Senkenimplementierung verwendet wird. Beispiel: “DeltaSink[table]”
sink.numOutputRows Die Anzahl der Zeilen ist immer „-1“, da Spark keine Ausgabezeilen für DSv1-Senken ableiten kann, was die Klassifizierung für die Delta Lake-Senke ist.

Beispiele

Beispielereignis Kafka-zu-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Beispielereignis Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Beispielereignis Kinesis-to-Delta Lake StreamingQueryListener

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Beispielereignis Kafka+Delta Lake-to-Delta Lake StreamingQueryListener

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Beispielereignis Raten-Quelle zu Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}