Мониторинг запросов структурированной потоковой передачи в Azure Databricks
Azure Databricks предоставляет встроенный мониторинг для структурированных приложений потоковой передачи с помощью пользовательского интерфейса Spark на вкладке "Потоковая передача ".
Различение запросов структурированной потоковой передачи в пользовательском интерфейсе Spark
Задайте для потоков уникальное имя запроса, добавив .queryName(<query-name>)
в код writeStream
, чтобы легко различать, какие метрики принадлежат определенному потоку в пользовательском интерфейсе Spark.
Отправка метрик структурированной потоковой передачи во внешние службы
Метрики потоковой передачи можно отправить во внешние службы для оповещения или использования панелей мониторинга с помощью интерфейса прослушивателя потоковых запросов Apache Spark. В Databricks Runtime 11.3 LTS и более поздних версиях StreamingQueryListener
доступен в Python и Scala.
Внимание
Следующие ограничения применяются для рабочих нагрузок, использующих режимы доступа к вычислительным ресурсам с поддержкой каталога Unity:
-
StreamingQueryListener
требуется Databricks Runtime 15.1 или более поздней версии, чтобы использовать учетные данные или взаимодействовать с объектами, управляемыми каталогом Unity для вычислений одного пользователя. -
StreamingQueryListener
требуется Databricks Runtime 16.1 или более поздней версии для рабочих нагрузок Scala, настроенных в режиме общего доступа.
Примечание.
Задержка обработки с прослушивателями может значительно повлиять на скорость обработки запросов. Рекомендуется ограничить логику обработки в этих прослушивателях и выбрать запись в системы быстрого реагирования, такие как Kafka для повышения эффективности.
В следующем коде приведены основные примеры синтаксиса для реализации прослушивателя:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Определение наблюдаемых метрик в структурированной потоковой передаче
Метрики наблюдения именуются произвольными агрегатными функциями, которые можно определить в запросе (DataFrame). Как только выполнение DataFrame завершается (то есть завершает пакетный запрос или начинает потоковую передачу), запускается именованное событие, которое содержит метрики для данных, обработанных с момента последнего завершения.
Можно включить наблюдение за этими метриками, подключив прослушиватель к сеансу Spark. Прослушиватель зависит от режима выполнения:
Пакетный режим: используйте
QueryExecutionListener
.QueryExecutionListener
вызывается при завершении запроса. Доступ к метрикам осуществляется с помощью сопоставленияQueryExecution.observedMetrics
.Потоковая передача или микробатч: используйте
StreamingQueryListener
.StreamingQueryListener
вызывается при завершении эпохи потоковым запросом. Доступ к метрикам осуществляется с помощью сопоставленияStreamingQueryProgress.observedMetrics
. В Azure Databricks не поддерживается потоковая передача непрерывного выполнения.
Например:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Метрики объектов StreamingQueryListener
Метрическая | Description |
---|---|
id |
Уникальный идентификатор запроса, который сохраняется во время перезапуска. |
runId |
Идентификатор запроса, уникальный для каждого запуска или перезапуска. См. раздел StreamingQuery.runId(). |
name |
Указанное пользователем имя запроса. Имя равно NULL, если имя не указано. |
timestamp |
Метка времени для выполнения микробатча. |
batchId |
Уникальный идентификатор для текущего пакета обрабатываемых данных. В случае повторных попыток после сбоя указанный идентификатор пакетной службы может выполняться несколько раз. Аналогичным образом, если нет данных для обработки, идентификатор пакетной службы не увеличивается. |
numInputRows |
Совокупное (во всех источниках) количество записей, обрабатываемых в триггере. |
inputRowsPerSecond |
Совокупная скорость (во всех источниках) поступающих данных. |
processedRowsPerSecond |
Совокупная скорость (во всех источниках), с которой Spark обрабатывает данные. |
Объект durationMs
Сведения о времени, необходимого для выполнения различных этапов процесса выполнения микробатча.
Метрическая | Description |
---|---|
durationMs.addBatch |
Время выполнения микробатча. Это исключает время, необходимое Spark для планирования микробатча. |
durationMs.getBatch |
Время, необходимое для получения метаданных о смещениях из источника. |
durationMs.latestOffset |
Последнее смещение, используемое для микробатча. Этот объект хода выполнения относится к времени, затраченного на получение последнего смещения из источников. |
durationMs.queryPlanning |
Время, затраченное на создание плана выполнения. |
durationMs.triggerExecution |
Время, необходимое для планирования и выполнения микробатча. |
durationMs.walCommit |
Время фиксации новых доступных смещения. |
объект eventTime
Сведения о значении времени события, которое видно в данных, обрабатываемых в микробатче. Эти данные используются подложкой, чтобы выяснить, как обрезать состояние для обработки агрегатов с отслеживанием состояния, определенных в задании структурированной потоковой передачи.
Метрическая | Description |
---|---|
eventTime.avg |
Среднее время события, наблюдаемое в этом триггере. |
eventTime.max |
Максимальное время события, наблюдаемое в этом триггере. |
eventTime.min |
Минимальное время события, наблюдаемое в этом триггере. |
eventTime.watermark |
Значение водяного знака, используемого в этом триггере. |
Объект stateOperator
Сведения о операциях с отслеживанием состояния, определенных в задании структурированной потоковой передачи, и агрегатах, созданных из них.
Метрическая | Description |
---|---|
stateOperators.operatorName |
Имя оператора с отслеживанием состояния, к которому относятся метрики, например symmetricHashJoin , dedupe stateStoreSave . |
stateOperators.numRowsTotal |
Общее количество строк в состоянии в результате оператора с отслеживанием состояния или агрегирования. |
stateOperators.numRowsUpdated |
Общее количество строк, обновленных в состоянии в результате оператора с отслеживанием состояния или агрегирования. |
stateOperators.allUpdatesTimeMs |
Эта метрика в настоящее время не измерима с помощью Spark и планируется удалить в будущих обновлениях. |
stateOperators.numRowsRemoved |
Общее количество строк, удаленных из состояния в результате оператора с отслеживанием состояния или агрегирования. |
stateOperators.allRemovalsTimeMs |
Эта метрика в настоящее время не измерима с помощью Spark и планируется удалить в будущих обновлениях. |
stateOperators.commitTimeMs |
Время фиксации всех обновлений (помещает и удаляет) и возвращает новую версию. |
stateOperators.memoryUsedBytes |
Память, используемая хранилищем состояний. |
stateOperators.numRowsDroppedByWatermark |
Количество строк, которые считаются слишком поздно, которые должны быть включены в агрегирование с отслеживанием состояния. Только агрегаты потоковой передачи: количество строк, удаленных после агрегирования (не необработанных входных строк). Это число не является точным, но дает указание на то, что удалены поздние данные. |
stateOperators.numShufflePartitions |
Количество секций перетасовки для этого оператора с отслеживанием состояния. |
stateOperators.numStateStoreInstances |
Фактический экземпляр хранилища состояний, который оператор инициализировал и поддерживал. Для многих операторов с отслеживанием состояния это то же самое, что и количество секций. Однако поток-поток инициализирует четыре экземпляра хранилища состояний на секцию. |
объект stateOperator.customMetrics
Сведения, собранные из RocksDB, записывая метрики о производительности и операциях с учетом значений с отслеживанием состояния, которые он поддерживает для задания структурированной потоковой передачи. Дополнительные сведения см. в статье Настройка хранилища состояний RocksDB в Azure Databricks.
Метрическая | Description |
---|---|
customMetrics.rocksdbBytesCopied |
Количество байтов, скопированных в виде отслеживаемого диспетчером файлов RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
Время в миллисекундах, задав моментальный снимок собственного RocksDB и напишите его в локальный каталог. |
customMetrics.rocksdbCompactLatency |
Время сжатия (необязательно) в миллисекундах во время фиксации контрольной точки. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Время в миллисекундах синхронизации собственного моментального снимка RocksDB с внешним хранилищем (расположение контрольной точки). |
customMetrics.rocksdbCommitFlushLatency |
Время в миллисекундах сброса изменений в памяти RocksDB на локальный диск. |
customMetrics.rocksdbCommitPauseLatency |
Время в миллисекундах останавливает фоновые рабочие потоки в рамках фиксации контрольной точки, например для сжатия. |
customMetrics.rocksdbCommitWriteBatchLatency |
Время в миллисекундах при применении поэтапной записи в структуре памяти (WriteBatch ) к native RocksDB. |
customMetrics.rocksdbFilesCopied |
Количество файлов, скопированных в виде отслеживаемого диспетчером файлов RocksDB. |
customMetrics.rocksdbFilesReused |
Количество файлов, повторно используемых диспетчером файлов RocksDB. |
customMetrics.rocksdbGetCount |
Количество вызовов к базе данных (не включается get из gets пакета в памяти, используемого WriteBatch для промежуточной записи). |
customMetrics.rocksdbGetLatency |
Среднее время в наносекундах для базового собственного RocksDB::Get вызова. |
customMetrics.rocksdbReadBlockCacheHitCount |
Количество попаданий кэша из кэша блоков в RocksDB, полезное для предотвращения операций чтения локального диска. |
customMetrics.rocksdbReadBlockCacheMissCount |
Количество кэша блоков в RocksDB не полезно для предотвращения чтения локального диска. |
customMetrics.rocksdbSstFileSize |
Размер всех файлов статической отсортированных таблиц (SST) — табличной структуры RocksDB используется для хранения данных. |
customMetrics.rocksdbTotalBytesRead |
Количество несжатых байтов, считываемых операциями get . |
customMetrics.rocksdbTotalBytesReadByCompaction |
Количество байтов, считываемых процессом сжатия с диска. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Общее количество байтов несжатых данных, считываемых с помощью итератора. Для некоторых операций с отслеживанием состояния (например, обработка времени ожидания и FlatMapGroupsWithState подложки) требуется чтение данных в базе данных с помощью итератора. |
customMetrics.rocksdbTotalBytesWritten |
Общее количество несжатых байтов, put записанных операциями. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Общее количество байтов процесса сжатия записывается на диск. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Время в миллисекундах для уплотнений RocksDB, включая фоновые сжатия и необязательное сжатие, инициированное во время фиксации. |
customMetrics.rocksdbTotalFlushLatencyMs |
Общее время очистки, включая фоновую очистку. Операции очистки — это процессы, с помощью которых MemTable выполняется очистка хранилища после его полного завершения.
MemTables — это первый уровень хранения данных в RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
Размер в байтах несжатых ZIP-файлов, как сообщает диспетчер файлов. Диспетчер файлов управляет использованием и удалением физического места на диске SST. |
объект sources (Kafka)
Метрическая | Description |
---|---|
sources.description |
Подробное описание источника Kafka, указывающее точный раздел Kafka, считываемый из. Например: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
Объект sources.startOffset . |
Начальный номер смещения в разделе Kafka, в котором запущено задание потоковой передачи. |
Объект sources.endOffset . |
Последнее смещение, обработанное микробатчом. Это может быть равно текущему выполнению latestOffset микробатч. |
Объект sources.latestOffset . |
Последнее смещение, рисуемое микробатчом. Процесс микробатчинга может не обрабатывать все смещения при наличии регулирования, что приводит к endOffset разности.latestOffset |
sources.numInputRows |
Количество входных строк, обработанных из этого источника. |
sources.inputRowsPerSecond |
Скорость, с которой данные приходят для обработки из этого источника. |
sources.processedRowsPerSecond |
Скорость обработки данных spark из этого источника. |
объект sources.metrics (Kafka)
Метрическая | Description |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Среднее количество смещения, которое потоковый запрос находится за последним доступным смещением среди всех подписанных разделов. |
sources.metrics.estimatedTotalBytesBehindLatest |
Предполагаемое количество байтов, которые процесс запроса не использовал из подписанных разделов. |
sources.metrics.maxOffsetsBehindLatest |
Максимальное количество смещения, которое запрос потоковой передачи находится за последним доступным смещением среди всех подписанных разделов. |
sources.metrics.minOffsetsBehindLatest |
Минимальное количество смещения, которое запрос потоковой передачи находится за последним доступным смещением среди всех подписанных разделов. |
объект приемника (Kafka)
Метрическая | Description |
---|---|
sink.description |
Описание приемника Kafka, в который записывается потоковый запрос, подробное описание используемой реализации приемника Kafka. Например: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Количество строк, записанных в выходную таблицу или приемник в рамках микробатча. В некоторых ситуациях это значение может быть "-1" и, как правило, можно интерпретировать как "неизвестно". |
объект sources (Delta Lake)
Метрическая | Description |
---|---|
sources.description |
Описание источника, из которого выполняется потоковой запрос. Например: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
Версия сериализации, с которой закодировано это смещение. |
sources.[startOffset/endOffset].reservoirId |
Идентификатор считываемой таблицы. Это используется для обнаружения неправильной настройки при перезапуске запроса. |
sources.[startOffset/endOffset].reservoirVersion |
Версия таблицы, которая в настоящее время обрабатывается. |
sources.[startOffset/endOffset].index |
Индекс в последовательности AddFiles этой версии. Используется для разбиения больших фиксаций в несколько пакетов. Этот индекс создается путем сортировки modificationTimestamp и path . |
sources.[startOffset/endOffset].isStartingVersion |
Определяет, отмечает ли текущее смещение начало нового потокового запроса, а не обработку изменений, произошедших после обработки исходных данных. При запуске нового запроса все данные, присутствующие в таблице в начале, обрабатываются сначала, а затем все новые данные, поступающие. |
sources.latestOffset |
Последнее смещение, обработанное запросом microbatch. |
sources.numInputRows |
Количество входных строк, обработанных из этого источника. |
sources.inputRowsPerSecond |
Скорость, с которой данные приходят для обработки из этого источника. |
sources.processedRowsPerSecond |
Скорость обработки данных spark из этого источника. |
sources.metrics.numBytesOutstanding |
Объединенный размер невыполненных файлов (файлы, отслеживаемые RocksDB). Это метрика невыполненной работы для Delta и автозагрузчика в качестве источника потоковой передачи. |
sources.metrics.numFilesOutstanding |
Количество необработанных файлов. Это метрика невыполненной работы для Delta и автозагрузчика в качестве источника потоковой передачи. |
объект приемника (Delta Lake)
Метрическая | Description |
---|---|
sink.description |
Описание приемника Delta, подробное описание используемой реализации приемника Delta. Например: “DeltaSink[table]” . |
sink.numOutputRows |
Количество строк всегда равно "-1", так как Spark не может выводить выходные строки для приемников DSv1, что является классификацией приемника Delta Lake. |
Примеры
Пример события Kafka to-Kafka StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Пример события Delta Lake to-Delta Lake StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Пример события Kinesis to-Delta Lake StreamingQueryListener
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Пример события Kafka+Delta Lake to-Delta Lake StreamingQueryListener
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Пример источника скорости в событие Delta Lake StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}