Monitoring Structured Streaming queries on Azure Databricks
Azure Databricks provides built-in monitoring for Structured Streaming applications through the Spark UI under the Streaming tab.
Distinguish Structured Streaming queries in the Spark UI
Provide your streams a unique query name by adding .queryName(<query-name>)
to your writeStream
code to easily distinguish which metrics belong to which stream in the Spark UI.
Push Structured Streaming metrics to external services
Streaming metrics can be pushed to external services for alerting or dashboarding use cases by using Apache Spark’s Streaming Query Listener interface. In Databricks Runtime 11.3 LTS and above, StreamingQueryListener
is available in Python and Scala.
Important
The following limitations apply for workloads using Unity Catalog-enabled compute access modes:
StreamingQueryListener
requires Databricks Runtime 15.1 or above to use credentials or interact with objects managed by Unity Catalog on single user compute.StreamingQueryListener
requires Databricks Runtime 16.1 or above for Scala workloads configured with shared access mode.
Note
Processing latency with listeners can significantly affect query processing speeds. It’s advised to limit processing logic in these listeners and opt for writing to fast-response systems like Kafka for efficiency.
The following code provides basic examples of the syntax for implementing a listener:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Defining observable metrics in Structured Streaming
Observable metrics are named arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (that is, finishes a batch query or reaches a streaming epoch), a named event is emitted that contains the metrics for the data processed since the last completion point.
You can observe these metrics by attaching a listener to the Spark session. The listener depends on the execution mode:
Batch mode: Use
QueryExecutionListener
.QueryExecutionListener
is called when the query completes. Access the metrics using theQueryExecution.observedMetrics
map.Streaming, or microbatch: Use
StreamingQueryListener
.StreamingQueryListener
is called when the streaming query completes an epoch. Access the metrics using theStreamingQueryProgress.observedMetrics
map. Azure Databricks does not support continuous execution streaming.
For example:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
StreamingQueryListener object metrics
Metric | Description |
---|---|
id |
A unique query ID that persists across restarts. |
runId |
A query id that is unique for every start/restart. See StreamingQuery.runId(). |
name |
The user-specified name of the query. Name is null if no name is specified. |
timestamp |
The timestamp for the execution of the microbatch. |
batchId |
A unique ID for the current batch of data being processed. In the case of retries after a failure, a given batch ID may be executed more than once. Similarly, when there is no data to be processed, the batch ID is not incremented. |
numInputRows |
The aggregate (across all sources) number of records processed in a trigger. |
inputRowsPerSecond |
The aggregate (across all sources) rate of arriving data. |
processedRowsPerSecond |
The aggregate (across all sources) rate at which Spark is processing data. |
durationMs object
Information about the time it takes to complete various stages of the microbatch execution process.
Metric | Description |
---|---|
durationMs.addBatch |
The time taken to execute the microbatch. This excludes the time Spark takes to plan the microbatch. |
durationMs.getBatch |
The time it takes to retrieve the metadata about the offsets from the source. |
durationMs.latestOffset |
The latest offset consumed for the microbatch. This progress object refers to the time taken to retrieve the latest offset from sources. |
durationMs.queryPlanning |
The time taken to generate the execution plan. |
durationMs.triggerExecution |
The time it takes to plan and execute the microbatch. |
durationMs.walCommit |
The time taken to commit the new available offsets. |
eventTime object
Information about the event time value seen within the data being processed in the microbatch. This data is used by the watermark to figure out how to trim the state for processing stateful aggregations defined in the Structured Streaming job.
Metric | Description |
---|---|
eventTime.avg |
The average event time seen in that trigger. |
eventTime.max |
The maximum event time seen in that trigger. |
eventTime.min |
The minimum event time seen in that trigger. |
eventTime.watermark |
The value of the watermark used in that trigger. |
stateOperators object
Information about the stateful operations that are defined in the Structured Streaming job and the aggregations that are produced from them.
Metric | Description |
---|---|
stateOperators.operatorName |
The name of the stateful operator to which the metrics relate, such as symmetricHashJoin , dedupe , stateStoreSave . |
stateOperators.numRowsTotal |
The total number of rows in state as a result of a stateful operator or aggregation. |
stateOperators.numRowsUpdated |
The total number of rows updated in state as a result of a stateful operator or aggregation. |
stateOperators.allUpdatesTimeMs |
This metric is currently not measurable by Spark and is planned to be removed in future updates. |
stateOperators.numRowsRemoved |
The total number of rows removed from state as a result of a stateful operator or aggregation. |
stateOperators.allRemovalsTimeMs |
This metric is currently not measurable by Spark and is planned to be removed in future updates. |
stateOperators.commitTimeMs |
The time taken to commit all updates (puts and removes) and return a new version. |
stateOperators.memoryUsedBytes |
Memory used by the state store. |
stateOperators.numRowsDroppedByWatermark |
The number of rows that are considered too late to be included in a stateful aggregation. Streaming aggregations only: The number of rows dropped post-aggregation (not raw input rows). This number is not precise, but provides an indication that there is late data being dropped. |
stateOperators.numShufflePartitions |
The number of shuffle partitions for this stateful operator. |
stateOperators.numStateStoreInstances |
The actual state store instance that the operator has initialized and maintained. For many stateful operators, this is the same as the number of partitions. However, stream-stream joins initialize four state store instances per partition. |
stateOperators.customMetrics object
Information collected from RocksDB capturing metrics about its performance and operations with respect to the stateful values it maintains for the Structured Streaming job. For more information, see Configure RocksDB state store on Azure Databricks.
Metric | Description |
---|---|
customMetrics.rocksdbBytesCopied |
The number of bytes copied as tracked by the RocksDB File Manager. |
customMetrics.rocksdbCommitCheckpointLatency |
The time in milliseconds taking a snapshot of native RocksDB and write it to a local directory. |
customMetrics.rocksdbCompactLatency |
The time in milliseconds compacting (optional) during the checkpoint commit. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
The time in milliseconds syncing the native RocksDB snapshot to external storage (the checkpoint location). |
customMetrics.rocksdbCommitFlushLatency |
The time in milliseconds flushing the RocksDB in-memory changes to the local disk. |
customMetrics.rocksdbCommitPauseLatency |
The time in milliseconds stopping the background worker threads as part of the checkpoint commit, such as for compaction. |
customMetrics.rocksdbCommitWriteBatchLatency |
The time in milliseconds applying the staged writes in in-memory structure (WriteBatch ) to native RocksDB. |
customMetrics.rocksdbFilesCopied |
The number of files copied as tracked by the RocksDB File Manager. |
customMetrics.rocksdbFilesReused |
The number of files reused as tracked by the RocksDB File Manager. |
customMetrics.rocksdbGetCount |
The number of get calls to the DB (does not include gets from WriteBatch - in-memory batch used for staging writes). |
customMetrics.rocksdbGetLatency |
The average time in nanoseconds for the underlying native RocksDB::Get call. |
customMetrics.rocksdbReadBlockCacheHitCount |
The count of cache hits from the block cache in RocksDB that are useful in avoiding local disk reads. |
customMetrics.rocksdbReadBlockCacheMissCount |
The count of the block cache in RocksDB is not useful in avoiding local disk reads. |
customMetrics.rocksdbSstFileSize |
The size of all Static Sorted Table (SST) file - the tabular structure RocksDB uses to store data. |
customMetrics.rocksdbTotalBytesRead |
The number of uncompressed bytes read by get operations. |
customMetrics.rocksdbTotalBytesReadByCompaction |
The number of bytes that the compaction process reads from the disk. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
The total number of bytes of uncompressed data read using an iterator. Some stateful operations (for example, timeout processing in FlatMapGroupsWithState and watermarking) require reading data in DB through an iterator. |
customMetrics.rocksdbTotalBytesWritten |
The total number of uncompressed bytes written by put operations. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
The total number of bytes the compaction process writes to the disk. |
customMetrics.rocksdbTotalCompactionLatencyMs |
The time in milliseconds for RocksDB compactions, including background compactions and the optional compaction initiated during the commit. |
customMetrics.rocksdbTotalFlushLatencyMs |
The total flush time, including background flushing. Flush operations are processes by which the MemTable is flushed to storage once it’s full. MemTables are the first level where data is stored in RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
The size in bytes of the uncompressed zip files as reported by the File Manager. The File Manager manages the physical SST file disk space utilization and deletion. |
sources object (Kafka)
Metric | Description |
---|---|
sources.description |
A detailed description of the Kafka source, specifying the exact Kafka topic being read from. For example: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
sources.startOffset object |
The starting offset number within the Kafka topic at which the streaming job started. |
sources.endOffset object |
The last offset processed by the microbatch. This could be equal to latestOffset for an ongoing microbatch execution. |
sources.latestOffset object |
The latest offset figured by the microbatch. The microbatching process might not process all offsets when there is throttling, which results in endOffset and latestOffset differiong. |
sources.numInputRows |
The number of input rows processed from this source. |
sources.inputRowsPerSecond |
The rate at which data is arriving for processing from this source. |
sources.processedRowsPerSecond |
The rate at which Spark is processing data from this source. |
sources.metrics object (Kafka)
Metric | Description |
---|---|
sources.metrics.avgOffsetsBehindLatest |
The average number of offsets that the streaming query is behind the latest available offset among all the subscribed topics. |
sources.metrics.estimatedTotalBytesBehindLatest |
The estimated number of bytes that the query process has not consumed from the subscribed topics. |
sources.metrics.maxOffsetsBehindLatest |
The maximum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics. |
sources.metrics.minOffsetsBehindLatest |
The minimum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics. |
sink object (Kafka)
Metric | Description |
---|---|
sink.description |
The description of the Kafka sink to which the streaming query is writing, detailing the specific Kafka sink implementation being used. For example: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
The number of rows that were written to the output table or sink as part of the microbatch. For some situations, this value can be “-1” and generally can be interpreted as “unknown”. |
sources object (Delta Lake)
Metric | Description |
---|---|
sources.description |
The description of the source from which the streaming query is reading from. For example: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
The version of serialization with which this offset is encoded. |
sources.[startOffset/endOffset].reservoirId |
The ID of the table being read. This is used to detect misconfiguration when restarting a query. |
sources.[startOffset/endOffset].reservoirVersion |
The version of the table that is currently processing. |
sources.[startOffset/endOffset].index |
The index in the sequence of AddFiles in this version. This is used to break large commits into multiple batches. This index is created by sorting on modificationTimestamp and path . |
sources.[startOffset/endOffset].isStartingVersion |
Identifies whether current offset marks the start of a new streaming query rather than the processing of changes that occurred after the initial data was processed. When starting a new query, all data present in the table at the start is processed first, and then any new data that arrives. |
sources.latestOffset |
The latest offset processed by the microbatch query. |
sources.numInputRows |
The number of input rows processed from this source. |
sources.inputRowsPerSecond |
The rate at which data is arriving for processing from this source. |
sources.processedRowsPerSecond |
The rate at which Spark is processing data from this source. |
sources.metrics.numBytesOutstanding |
The combined size of the outstanding files (files tracked by RocksDB). This is the backlog metric for Delta and Auto Loader as the streaming source. |
sources.metrics.numFilesOutstanding |
The number of outstanding files to be processed. This is the backlog metric for Delta and Auto Loader as the streaming source. |
sink object (Delta Lake)
Metric | Description |
---|---|
sink.description |
The description of the Delta sink, detailing the specific Delta sink implementation being used. For example: “DeltaSink[table]” . |
sink.numOutputRows |
The number of rows is always “-1” because Spark can’t infer output rows for DSv1 sinks, which is the classification for the Delta Lake sink. |
Examples
Example Kafka-to-Kafka StreamingQueryListener event
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Example Delta Lake-to-Delta Lake StreamingQueryListener event
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Example Kinesis-to-Delta Lake StreamingQueryListener event
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Example Kafka+Delta Lake-to-Delta Lake StreamingQueryListener event
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Example rate source to Delta Lake StreamingQueryListener event
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}