共用方式為


監視 Azure Databricks 上的結構化串流查詢

Azure Databricks 透過 [串流] 索引卷標下的 Spark UI,為結構化串流 應用程式提供內建監視。

區分 Spark UI 中的結構化串流查詢

藉由將 新增 .queryName(<query-name>) 至程式 writeStream 代碼,輕鬆地區分哪些計量屬於Spark UI中的哪個數據流,以提供唯一的查詢名稱。

將結構化串流計量推送至外部服務

串流計量可以推送至外部服務,以使用 Apache Spark 的串流查詢接聽程式介面來警示或儀錶板使用案例。 在 Databricks Runtime 11.3 LTS 和更新版本中,串流查詢接聽程式可在 Python 和 Scala 中使用。

重要

Unity 目錄所管理的認證和物件不能用於邏輯中 StreamingQueryListener

注意

使用接聽程式處理延遲可能會大幅影響查詢處理速度。 建議您限制這些接聽程式中的處理邏輯,並選擇寫入 Kafka 等快速響應系統以提高效率。

下列程式代碼提供實作接聽程式之語法的基本範例:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

在結構化串流中定義可觀察的計量

可觀察的計量會命名為可在查詢上定義的任意聚合函數(DataFrame)。 一旦 DataFrame 執行到達完成點(也就是完成批次查詢或到達串流 epoch),就會發出具名事件,其中包含自上次完成點之後所處理數據的計量。

您可以將接聽程式附加至Spark工作階段,以觀察這些計量。 接聽程式取決於執行模式:

  • 批次模式:使用 QueryExecutionListener

    QueryExecutionListener 會在查詢完成時呼叫。 使用 QueryExecution.observedMetrics 地圖存取計量。

  • 串流或微批次:使用 StreamingQueryListener

    StreamingQueryListener 當串流查詢完成 epoch 時呼叫。 使用 StreamingQueryProgress.observedMetrics 地圖存取計量。 Azure Databricks 不支持連續執行串流。

例如:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener 物件計量

計量 描述
id 在重新啟動時保存的唯一查詢標識碼。
runId 每個啟動/重新啟動都是唯一的查詢標識碼。 請參閱 StreamingQuery.runId()。
name 使用者指定的查詢名稱。 如果未指定名稱,則 Name 為 null。
timestamp microbatch 執行的時間戳。
batchId 目前正在處理之數據批次的唯一標識符。 在失敗后重試的情況下,可能會多次執行指定的批次標識碼。 同樣地,如果沒有要處理的數據,批次標識符就不會遞增。
numInputRows 在觸發程式中處理的記錄總數(跨所有來源)。
inputRowsPerSecond 抵達數據的匯總(跨所有來源)速率。
processedRowsPerSecond Spark 正在處理數據的匯總 (跨所有來源) 速率。

durationMs 物件

完成微批次執行程式各個階段所花費時間的相關信息。

計量 描述
durationMs.addBatch 執行 microbatch 所需的時間。 這不包括Spark規劃 microbatch 所需的時間。
durationMs.getBatch 從來源擷取有關位移的元數據所花費的時間。
durationMs.latestOffset microbatch 所耗用的最新位移。 此進度物件是指從來源擷取最新位移所花費的時間。
durationMs.queryPlanning 產生執行計劃所花費的時間。
durationMs.triggerExecution 規劃和執行 microbatch 所需的時間。
durationMs.walCommit 認可新可用位移所花費的時間。

eventTime 物件

有關在 microbatch 中處理之數據內所見事件時間值的資訊。 浮浮水印會使用此資料來瞭解如何修剪狀態,以處理結構化串流作業中定義的具狀態匯總。

計量 描述
eventTime.avg 在該觸發程式中看到的平均事件時間。
eventTime.max 在該觸發程式中看到的最大事件時間。
eventTime.min 在該觸發程式中看到的最小事件時間。
eventTime.watermark 該觸發程式中使用的浮浮水印值。

stateOperators 物件

結構化串流作業中定義之具狀態作業的相關信息,以及從中產生的匯總。

計量 描述
stateOperators.operatorName 計量關聯之具狀態運算子的名稱,例如symmetricHashJoindedupestateStoreSave
stateOperators.numRowsTotal 狀態中的數據列總數,因為具狀態運算符或匯總。
stateOperators.numRowsUpdated 狀態中因具狀態運算符或匯總而更新的數據列總數。
stateOperators.allUpdatesTimeMs 此計量目前無法由Spark測量,並計劃在未來的更新中移除。
stateOperators.numRowsRemoved 因具狀態運算符或匯總而移除狀態的數據列總數。
stateOperators.allRemovalsTimeMs 此計量目前無法由Spark測量,並計劃在未來的更新中移除。
stateOperators.commitTimeMs 認可所有更新所需的時間(放置和移除),並傳回新版本。
stateOperators.memoryUsedBytes 狀態存放區所使用的記憶體。
stateOperators.numRowsDroppedByWatermark 被視為太晚而無法包含在具狀態匯總中的數據列數目。 串流匯總:卸除匯總后的數據列數目(而非原始輸入數據列)。 這個數位不精確,但會指出已卸除延遲的數據。
stateOperators.numShufflePartitions 這個具狀態運算子的隨機分割區數目。
stateOperators.numStateStoreInstances 運算子已初始化和維護的實際狀態存放區實例。 對於許多具狀態運算符,這與分割區數目相同。 不過,數據流串流聯結會初始化每個分割區的四個狀態存放區實例。

stateOperators.customMetrics 物件

從 RocksDB 收集的資訊,擷取其效能和作業的相關計量,以及針對結構化串流作業所維護的具狀態值。 如需詳細資訊,請參閱 在 Azure Databricks 上設定 RocksDB 狀態存放區。

計量 描述
customMetrics.rocksdbBytesCopied RocksDB 檔案管理員所追蹤的位元元組數目。
customMetrics.rocksdbCommitCheckpointLatency 擷取原生 RocksDB 快照集的時間,並將其寫入本機目錄。
customMetrics.rocksdbCompactLatency 在檢查點認可期間壓縮以毫秒為單位的時間(選擇性)。
customMetrics.rocksdbCommitFileSyncLatencyMs 將原生 RocksDB 快照集同步至外部記憶體的時間以毫秒為單位(檢查點位置)。
customMetrics.rocksdbCommitFlushLatency 排清 RocksDB 記憶體內部變更至本機磁碟的時間,以毫秒為單位。
customMetrics.rocksdbCommitPauseLatency 停止背景背景背景工作線程作為檢查點認可一部分的時間,例如壓縮的時間。
customMetrics.rocksdbCommitWriteBatchLatency 將分段WriteBatch寫入套用至原生 RocksDB 的毫秒數。
customMetrics.rocksdbFilesCopied RocksDB 檔案管理員所追蹤的檔案數目。
customMetrics.rocksdbFilesReused RocksDB 檔案管理員所追蹤的重複使用檔案數目。
customMetrics.rocksdbGetCount 對 DB 的 get 呼叫數目(不包括 gets 用於 WriteBatch 暫存寫入的記憶體內部批次)。
customMetrics.rocksdbGetLatency 基礎原生 RocksDB::Get 呼叫之 nanoseconds 的平均時間。
customMetrics.rocksdbReadBlockCacheHitCount 從 RocksDB 中的區塊快取叫用快取計數,有助於避免本機磁碟讀取。
customMetrics.rocksdbReadBlockCacheMissCount RocksDB 中的區塊快取計數在避免本機磁碟讀取方面沒有用處。
customMetrics.rocksdbSstFileSize 所有靜態排序數據表 (SST) 檔案的大小 - 表格式結構 RocksDB 會使用 來儲存數據。
customMetrics.rocksdbTotalBytesRead 作業讀取 get 的未壓縮位元組數目。
customMetrics.rocksdbTotalBytesReadByCompaction 壓縮程式從磁碟讀取的位元組數目。
customMetrics.rocksdbTotalBytesReadThroughIterator 使用反覆運算器讀取未壓縮數據的位元組總數。 某些具狀態作業(例如,逾 FlatMapGroupsWithState 時處理和浮水印)需要透過反覆運算器在 DB 中讀取數據。
customMetrics.rocksdbTotalBytesWritten 作業寫入 put 的未壓縮位元組總數。
customMetrics.rocksdbTotalBytesWrittenByCompaction 壓縮程式寫入磁碟的位元組總數。
customMetrics.rocksdbTotalCompactionLatencyMs RocksDB 壓縮的時間以毫秒為單位,包括背景壓縮和認可期間起始的選擇性壓縮。
customMetrics.rocksdbTotalFlushLatencyMs 排清時間總計,包括背景排清。 排清作業是一 MemTable 旦滿時,排清至記憶體的進程。 MemTables 是數據儲存在 RocksDB 的第一個層級。
customMetrics.rocksdbZipFileBytesUncompressed 檔案管理員所報告的未壓縮 zip 檔案大小,以位元組為單位。 檔案管理員會管理實體 SST 檔案磁碟空間使用率和刪除。

sources 物件 (Kafka)

計量 描述
sources.description Kafka 來源的詳細描述,指定要從中讀取的確切 Kafka 主題。 例如: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”
sources.startOffset 物件 Kafka 主題中啟動串流作業的起始位移號碼。
sources.endOffset 物件 microbatch 處理的最後一個位移。 這可能等於 latestOffset 進行中的微批次執行。
sources.latestOffset 物件 microbatch 所計算的最新位移。 當發生節流時,微批次程式可能不會處理所有位移,這會導致 endOffsetlatestOffset 差異。
sources.numInputRows 從這個來源處理的輸入數據列數目。
sources.inputRowsPerSecond 數據從這個來源抵達的速率。
sources.processedRowsPerSecond Spark 正在處理此來源數據的速率。

sources.metrics 物件 (Kafka)

計量 描述
sources.metrics.avgOffsetsBehindLatest 串流查詢的平均位移數是所有已訂閱主題中最新的可用位移後方。
sources.metrics.estimatedTotalBytesBehindLatest 查詢進程未從訂閱的主題取用的估計位元組數目。
sources.metrics.maxOffsetsBehindLatest 串流查詢在所有已訂閱主題中最新的可用位移後方的最大位移數目。
sources.metrics.minOffsetsBehindLatest 串流查詢所落後所有訂閱主題中最新可用位移的最小位移數。

sink 物件 (Kafka)

計量 描述
sink.description 串流查詢正在寫入的 Kafka 接收描述,詳細說明所使用的特定 Kafka 接收實作。 例如: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”
sink.numOutputRows 寫入輸出數據表或接收作為 microbatch 一部分的數據列數目。 在某些情況下,這個值可以是 “-1”,通常可以解譯為「未知」。

sources 物件 (Delta Lake)

計量 描述
sources.description 串流查詢從中讀取的來源描述。 例如: “DeltaSource[table]”
sources.[startOffset/endOffset].sourceVersion 此位移編碼的串行化版本。
sources.[startOffset/endOffset].reservoirId 正在讀取之數據表的標識碼。 這會用來偵測重新啟動查詢時設定錯誤。
sources.[startOffset/endOffset].reservoirVersion 目前正在處理的數據表版本。
sources.[startOffset/endOffset].index 此版本中序列中的索引 AddFiles 。 這是用來將大型認可分成多個批次。 此索引是藉由排序和 modificationTimestamp path來建立。
sources.[startOffset/endOffset].isStartingVersion 識別目前的位移是否標記新串流查詢的開頭,而不是處理處理初始數據之後發生的變更。 啟動新的查詢時,會先處理數據表中的所有數據,然後再處理任何抵達的新數據。
sources.latestOffset microbatch 查詢所處理的最新位移。
sources.numInputRows 從這個來源處理的輸入數據列數目。
sources.inputRowsPerSecond 數據從這個來源抵達的速率。
sources.processedRowsPerSecond Spark 正在處理此來源數據的速率。
sources.metrics.numBytesOutstanding 未處理檔案的合併大小(由 RocksDB 追蹤的檔案)。 這是 Delta 和自動載入器作為串流來源的待辦專案計量。
sources.metrics.numFilesOutstanding 要處理的未處理檔案數目。 這是 Delta 和自動載入器作為串流來源的待辦專案計量。

接收物件 (Delta Lake)

計量 描述
sink.description Delta 接收的描述,詳細說明所使用的特定 Delta 接收實作。 例如: “DeltaSink[table]”
sink.numOutputRows 數據列數目一律為 “-1”,因為 Spark 無法推斷 DSv1 接收的輸出數據列,這是 Delta Lake 接收的分類。

範例

範例 Kafka-to-Kafka StreamingQueryListener 事件

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

範例 Delta Lake to-Delta Lake StreamingQueryListener 事件

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

範例 Kinesis-to-Delta Lake StreamingQueryListener 事件

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

範例 Kafka+Delta Lake to-Delta Lake StreamingQueryListener 事件

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Delta Lake StreamingQueryListener 事件的範例速率來源

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}