Sdílet prostřednictvím


Monitorování dotazů strukturovaného streamování v Azure Databricks

Azure Databricks poskytuje integrované monitorování pro aplikace strukturovaného streamování prostřednictvím uživatelského rozhraní Sparku na kartě Streamování .

Rozlišení dotazů strukturovaného streamování v uživatelském rozhraní Sparku

Zadejte jedinečný název dotazu přidáním .queryName(<query-name>) do writeStream kódu, abyste snadno rozlišili metriky, ke kterým streamům patří v uživatelském rozhraní Sparku.

Nabízení metrik strukturovaného streamování do externích služeb

Metriky streamování je možné odeslat do externích služeb pro účely upozorňování nebo řízení pomocí rozhraní Streaming Query Listener od Apache Spark. V Databricks Runtime 11.3 LTS a novějších je StreamingQueryListener k dispozici v Pythonu a Scala.

Důležité

Pro úlohy používající režimy výpočetního přístupu s podporou katalogu Unity platí následující omezení:

  • StreamingQueryListener vyžaduje, aby Databricks Runtime 15.1 nebo novější používal přihlašovací údaje nebo pracoval s objekty spravovanými katalogem Unity na výpočetních prostředcích s vyhrazeným režimem přístupu.
  • StreamingQueryListener vyžaduje Databricks Runtime 16.1 nebo novější pro úlohy Scala nakonfigurované pomocí standardního režimu přístupu (dříve režim sdíleného přístupu).

Poznámka:

Latence zpracování s posluchači může výrazně ovlivnit rychlost zpracování dotazů. Je doporučeno omezit logiku zpracování v těchto nasloucháčích a využívat systémy rychlé odezvy, jako je Kafka, pro efektivitu.

Následující kód obsahuje základní příklady syntaxe pro implementaci naslouchacího procesu:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definování pozorovatelných metrik ve strukturovaném streamování

Pozorovatelné metriky se nazývají libovolné agregační funkce, které lze definovat v dotazu (DataFrame). Jakmile provádění datového rámce dosáhne bodu dokončení (tj. dokončí dávkové dotaz nebo dosáhne epochy streamování), vygeneruje se pojmenovaná událost obsahující metriky pro data zpracovávaná od posledního bodu dokončení.

Tyto metriky můžete sledovat připojením posluchače k relaci Spark. Posluchač závisí na módu spuštění:

  • Dávkový režim: Použijte QueryExecutionListener.

    QueryExecutionListener je volán po dokončení dotazu. Získejte přístup k metrikám pomocí QueryExecution.observedMetrics mapy.

  • Streamování nebo mikrobatch: Použijte StreamingQueryListener.

    StreamingQueryListener se zavolá, když streamovací dotaz dokončí jednu epochu. Získejte přístup k metrikám pomocí StreamingQueryProgress.observedMetrics mapy. Azure Databricks nepodporuje režim triggeru continuous pro streamování.

Příklad:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Mapa k:[UC], k:[Delta] a k:[SS] identifikátory v tabulce metrik

Re:[SS] Metriky používají pole reservoirId na několika místech pro jedinečný identifikátor tabulky Delta používaný jako zdroj pro streamingový dotaz.

Pole reservoirId mapuje jedinečný identifikátor uložený tabulkou Delta v transakčním protokolu Delta. Toto ID není namapováno na hodnotu tableId přiřazenou re:[UC] a zobrazenou v Průzkumníku katalogu.

Pomocí následující syntaxe zkontrolujte identifikátor tabulky Delta. Funguje to u spravovaných tabulek Katalogu Unity, externích tabulek Katalogu Unity a všech tabulek Delta metastoru Hive:

DESCRIBE DETAIL <table-name>

Pole id zobrazené ve výsledcích je identifikátor, který se mapuje na reservoirId v streamovacích metrikách.

Metriky objektů StreamingQueryListener

Metrický Popis
id Jedinečné ID dotazu, které se zachová při restartování.
runId ID dotazu, které je jedinečné pro každé spuštění/restartování. Viz StreamingQuery.runId().
name Uživatelem zadaný název dotazu. Název má hodnotu null, pokud není zadán žádný název.
timestamp Časové razítko pro spuštění mikrodávky.
batchId Jedinečné ID pro aktuální dávku zpracovávaných dat. V případě opakování po selhání může být dané ID dávky provedeno vícekrát. Podobně platí, že pokud nejsou žádná data ke zpracování, ID dávky se nezvýší.
numInputRows Agregace (napříč všemi zdroji) počtu záznamů zpracovaných v triggeru
inputRowsPerSecond Agregovaná míra příchozích dat napříč všemi zdroji.
processedRowsPerSecond Celková rychlost, ze všech zdrojů, s jakou Spark zpracovává data.

DurationMs – objekt

Informace o době potřebnou k dokončení různých fází procesu provádění mikrobatchu.

Metrický Popis
durationMs.addBatch Čas potřebný k provedení mikrodávky. Tím se vyloučí čas, který Spark potřebuje k naplánování mikrodávky.
durationMs.getBatch Doba potřebná k načtení metadat o posunech z zdroje.
durationMs.latestOffset Nejnovější odsazení spotřebované pro mikrodávku. Tento objekt průběhu odkazuje na čas potřebný k načtení nejnovějšího posunu ze zdrojů.
durationMs.queryPlanning Doba potřebná ke generování plánu provádění.
durationMs.triggerExecution Doba, která trvá naplánování a spuštění mikrobatchu.
durationMs.walCommit Doba potřebná k potvrzení nových dostupných offsetů.

eventTime – objekt

Informace o hodnotě času události, která se vyskytuje v datech zpracovávaných v mikrodávkových procesech. Tato data jsou používána vodoznakem k zjištění, jak oříznout stav pro zpracování stavových agregací definovaných v úloze strukturovaného streamování.

Metrický Popis
eventTime.avg Průměrná doba trvání události zobrazená v tomto spouštěči.
eventTime.max Maximální doba události zobrazená v daném triggeru.
eventTime.min Minimální doba události viděná v tomto spouštěči.
eventTime.watermark Hodnota vodoznaku použitého v této aktivační události.

stateOperators – objekt

Informace o stavových operacích, které jsou definovány v úloze strukturovaného streamování a agregace vytvořené z nich.

Metrický Popis
stateOperators.operatorName Název stavového operátoru, ke kterému se metriky vztahují, například symmetricHashJoin, dedupe. stateStoreSave
stateOperators.numRowsTotal Celkový počet řádků v rámci stavu v důsledku stavového operátoru nebo agregace.
stateOperators.numRowsUpdated Celkový počet řádků aktualizovaných ve stavu v důsledku stavového operátoru nebo agregace.
stateOperators.allUpdatesTimeMs Tato metrika v současné době není měřitelná sparkem a plánuje se odebrat v budoucích aktualizacích.
stateOperators.numRowsRemoved Celkový počet řádků odebraných ze stavu v důsledku stavového operátoru nebo agregace.
stateOperators.allRemovalsTimeMs Tato metrika v současné době není měřitelná sparkem a plánuje se odebrat v budoucích aktualizacích.
stateOperators.commitTimeMs Doba potřebná k potvrzení všech aktualizací (vložení a odebrání) a vrácení nové verze.
stateOperators.memoryUsedBytes Paměť používaná úložištěm stavů.
stateOperators.numRowsDroppedByWatermark Počet řádků, které jsou považovány za příliš pozdě, aby se zahrnuly do stavové agregace. Pouze streamovací agregace: Počet řádků vynechaných po agregaci (ne surových vstupních řádků). Toto číslo není přesné, ale naznačuje, že jsou vyřazována opožděná data.
stateOperators.numShufflePartitions Počet oddílů pro zamíchání tohoto stavového operátoru.
stateOperators.numStateStoreInstances Skutečná instance úložiště stavu, kterou operátor inicializoval a udržuje. U mnoha stavových operátorů je to stejné jako počet particí. Spojení stream-stream inicializují pro každý oddíl čtyři instance stavového úložiště.

stateOperators.customMetrics – objekt

Informace shromážděné ze služby RocksDB zachycující metriky týkající se výkonu a operací s ohledem na stavové hodnoty, které udržuje pro úlohu strukturovaného streamování. Další informace najdete v tématu Konfigurace úložiště stavů RocksDB v Azure Databricks.

Metrický Popis
customMetrics.rocksdbBytesCopied Počet zkopírovaných bajtů, jak sleduje správce souborů RocksDB.
customMetrics.rocksdbCommitCheckpointLatency Doba v milisekundách potřebná k pořízení snímku nativní databáze RocksDB a jeho zápisu do místního adresáře.
customMetrics.rocksdbCompactLatency Doba komprimování v milisekundách během potvrzení kontrolního bodu (volitelné).
customMetrics.rocksdbCommitFileSyncLatencyMs Čas synchronizace nativního snímku RocksDB s externím úložištěm (umístění kontrolního bodu) v milisekundách
customMetrics.rocksdbCommitFlushLatency Čas v milisekundách vyprázdnění změn v paměti RocksDB na místní disk.
customMetrics.rocksdbCommitPauseLatency Doba v milisekundách potřebná k zastavení pracovních vláken na pozadí jako součást potvrzení kontrolního bodu, například při komprimaci.
customMetrics.rocksdbCommitWriteBatchLatency Čas v milisekundách potřebný pro aplikaci fázovaných zápisů ve struktuře v paměti (WriteBatch) na nativní RocksDB.
customMetrics.rocksdbFilesCopied Počet souborů zkopírovaných podle sledování správcem souborů RocksDB.
customMetrics.rocksdbFilesReused Počet znovupoužitých souborů sledovaných Správcem souborů RocksDB.
customMetrics.rocksdbGetCount Počet get volání do databáze (nezahrnuje gets dávku WriteBatch v paměti použitou pro přípravné zápisy).
customMetrics.rocksdbGetLatency Průměrná doba v nanosekundách základního nativního RocksDB::Get volání.
customMetrics.rocksdbReadBlockCacheHitCount Počet zásahů do mezipaměti bloků v RocksDB, které jsou užitečné při zamezení čtení z místního disku.
customMetrics.rocksdbReadBlockCacheMissCount Počet blokové mezipaměti v RocksDB není užitečný při zamezení čtení z místního disku.
customMetrics.rocksdbSstFileSize Velikost všech souborů se statickou seřazenou tabulkou (SST) – tabulková struktura RocksDB používá k ukládání dat.
customMetrics.rocksdbTotalBytesRead Počet nekomprimovaných bajtů přečtených operacemi get .
customMetrics.rocksdbTotalBytesReadByCompaction Počet bajtů, které proces komprimace načítá z disku.
customMetrics.rocksdbTotalBytesReadThroughIterator Celkový počet bajtů nekomprimovaných dat přečtených pomocí iterátoru. Některé stavové operace (například zpracování časového limitu v FlatMapGroupsWithState a vodoznaku) vyžadují čtení dat v DB prostřednictvím iterátoru.
customMetrics.rocksdbTotalBytesWritten Celkový počet nekomprimovaných bajtů zapsaných operacemi put .
customMetrics.rocksdbTotalBytesWrittenByCompaction Celkový počet bajtů, které proces komprimace zapisuje na disk.
customMetrics.rocksdbTotalCompactionLatencyMs Čas v milisekundách pro komprimace RocksDB, včetně komprimací pozadí a volitelné komprimace zahájené během potvrzení.
customMetrics.rocksdbTotalFlushLatencyMs Celková doba proplachování, včetně proplachování na pozadí. Operace vyprázdnění jsou procesy, kterými MemTable se vyprázdní do úložiště, jakmile je zaplněno. MemTables jsou první úrovní, ve které jsou data uložená ve službě RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed Velikost v bajtech nekomprimovaných souborů ZIP hlášených správcem souborů. Správce souborů spravuje využití a odstranění fyzického místa na disku se soubory SST.
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> Nejnovější verze snímku RocksDB uložená do kontrolního bodu. Hodnota -1 značí, že se nikdy neuložil žádný snímek. Vzhledem k tomu, že snímky jsou specifické pro každou instanci úložiště stavů, tato metrika se vztahuje na konkrétní ID oddílu a název úložiště stavů.

source – objekt (Kafka)

Metrický Popis
sources.description Podrobný popis zdroje Kafka, který určuje přesné téma Kafka, ze kterého čteme. Například: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset objekt Počáteční číslo posunu v tématu Kafka, kde začala streamovací úloha.
sources.endOffset objekt Poslední offset zpracovaný mikrodávkou. To se může rovnat latestOffset během probíhajícího provádění mikrošarže.
sources.latestOffset objekt Poslední posunutí určené microbatchem. Proces mikrobatchingu nemusí zpracovat všechny offsety, když dochází k omezování, což způsobuje rozdíly mezi endOffset a latestOffset.
sources.numInputRows Počet vstupních řádků zpracovaných z tohoto zdroje
sources.inputRowsPerSecond Rychlost, s jakou data přicházejí ke zpracování z tohoto zdroje.
sources.processedRowsPerSecond Rychlost, s jakou Spark zpracovává data z tohoto zdroje.

sources.metrics – objekt (Kafka)

Metrický Popis
sources.metrics.avgOffsetsBehindLatest Průměrný počet offsetů, o které je streamovací dotaz pozadu za nejnovějším dostupným offsetem mezi všemi předplacenými tématy.
sources.metrics.estimatedTotalBytesBehindLatest Odhadovaný počet bajtů, které dotazovací proces dosud nespotřeboval ze sledovaných témat.
sources.metrics.maxOffsetsBehindLatest Maximální počet posunů, o které streamovací dotaz zaostává za nejnovějším dostupným posunem mezi všemi odebíranými tématy.
sources.metrics.minOffsetsBehindLatest Minimální počet posunů, o něž streamovací dotaz zaostává za nejnovějším dostupným offsetem mezi všemi sledovanými topiky.

objekt jímky (Kafka)

Metrický Popis
sink.description Popis Kafka sinku, do kterého zapisuje streamovací dotaz, včetně podrobností o konkrétní použité implementaci Kafka sinku. Například: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Počet řádků, které byly zapsány do výstupní tabulky nebo výstupního úložiště jako součást mikrodávky. V některých situacích může být tato hodnota -1 a obecně ji lze interpretovat jako "neznámá".

sources – objekt (Delta Lake)

Metrický Popis
sources.description Popis zdroje, ze kterého streamovací dotaz čte. Například: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Verze serializace, se kterou je toto posunutí kódováno.
sources.[startOffset/endOffset].reservoirId ID přečtené tabulky. Používá se k detekci chybné konfigurace při restartování dotazu. Viz. mapu a identifikátory tabulky metrik re:[UC], re:[Delta] a re:[SS].
sources.[startOffset/endOffset].reservoirVersion Verze tabulky, která se právě zpracovává
sources.[startOffset/endOffset].index Index v posloupnosti AddFiles této verze. Slouží k rozdělení velkých potvrzení do více dávek. Tento index je vytvořen řazením modificationTimestamp a path.
sources.[startOffset/endOffset].isStartingVersion Určuje, zda aktuální posun označuje začátek nového streamovacího dotazu místo zpracování změn, ke kterým došlo po počátečním zpracování dat. Při spuštění nového dotazu se nejprve zpracuje všechna data, která jsou v tabulce na začátku, a pak všechna nová data, která dorazí.
sources.latestOffset Nejnovější posun zpracovaný dotazem microbatch.
sources.numInputRows Počet vstupních řádků zpracovaných z tohoto zdroje
sources.inputRowsPerSecond Rychlost, s jakou data přicházejí ke zpracování z tohoto zdroje.
sources.processedRowsPerSecond Rychlost, s jakou Spark zpracovává data z tohoto zdroje.
sources.metrics.numBytesOutstanding Kombinovaná velikost nevyřízených souborů (soubory sledované rocksDB) Jedná se o metriku backlogu pro Delta a Auto Loader jako zdroj pro streamování.
sources.metrics.numFilesOutstanding Počet nevyřízených souborů, které se mají zpracovat. Jedná se o metriku backlogu pro Delta a Auto Loader jako zdroje streamování.

objekt jímky (Delta Lake)

Metrický Popis
sink.description Popis jímky Delta podrobně popisuje konkrétní použitou implementaci jímky Delta. Například: “DeltaSink[table]”.
sink.numOutputRows Počet řádků je vždy -1, protože Spark nemůže odvodit výstupní řádky pro jímky DSv1, což je klasifikace jímky Delta Lake.

Příklady

Příklad události Kafka-to-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1370,
      "SnapshotLastUploaded.partition_1_default" : 1370,
      "SnapshotLastUploaded.partition_2_default" : 1362,
      "SnapshotLastUploaded.partition_3_default" : 1370,
      "SnapshotLastUploaded.partition_4_default" : 1356,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1360,
      "SnapshotLastUploaded.partition_1_default" : 1360,
      "SnapshotLastUploaded.partition_2_default" : 1352,
      "SnapshotLastUploaded.partition_3_default" : 1360,
      "SnapshotLastUploaded.partition_4_default" : 1346,
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
      "SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
      "SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
      "SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
      "SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Příklad události Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Příklad události Kinesis-to-Delta Lake StreamingQueryListener

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Příklad události Kafka+Delta Lake-to-Delta Lake StreamingQueryListener

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Příklad zdroje rychlosti pro událost typu StreamingQueryListener v Delta Lake

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}