Övervaka frågor om strukturerad direktuppspelning i Azure Databricks
Azure Databricks tillhandahåller inbyggd övervakning för program för strukturerad direktuppspelning via Spark-användargränssnittet under fliken Direktuppspelning .
Särskilja strukturerade strömningsfrågor i Spark-användargränssnittet
Ge dina strömmar ett unikt frågenamn genom att lägga .queryName(<query-name>)
till i koden writeStream
för att enkelt skilja vilka mått som tillhör vilken ström i Spark-användargränssnittet.
Skicka mått för strukturerad direktuppspelning till externa tjänster
Mått för direktuppspelning kan skickas till externa tjänster för användningsfall för aviseringar eller instrumentpaneler med hjälp av Apache Sparks gränssnitt för strömningsfrågaslyssnare. I Databricks Runtime 11.3 LTS och senare är StreamingQueryListener
tillgängligt i Python och Scala.
Viktigt!
Följande begränsningar gäller för arbetsbelastningar som använder Unity Catalog-aktiverade beräkningsåtkomstlägen:
-
StreamingQueryListener
kräver att Databricks Runtime 15.1 eller senare använder credentials eller interagerar med objekt som hanteras av Unity Catalog vid beräkning av en enda användare. -
StreamingQueryListener
kräver Databricks Runtime 16.1 eller senare för Scala-arbetsbelastningar som konfigurerats med läget för delad åtkomst.
Kommentar
Bearbetning av svarstider med lyssnare kan avsevärt påverka frågebearbetningshastigheten. Det rekommenderas att använda limit bearbetningslogik i dessa lyssnare och istället välja att skriva till snabbsvarssystem som Kafka för att öka effektiviteten.
Följande kod innehåller grundläggande exempel på syntaxen för att implementera en lyssnare:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definiera observerbara mått i strukturerad direktuppspelning
Observerbara mått namnges godtyckliga aggregeringsfunktioner som kan definieras i en fråga (DataFrame). Så snart körningen av en DataFrame når en slutförandepunkt (dvs. slutför en batchfråga eller når en strömmande epok) genereras en namngiven händelse som innehåller måtten för de data som bearbetats sedan den senaste slutförandepunkten.
Du kan observera dessa mått genom att koppla en lyssnare till Spark-sessionen. Lyssnaren är beroende av körningsläget:
Batch-läge: Använd
QueryExecutionListener
.QueryExecutionListener
anropas när frågan är klar. Få åtkomst till måtten med hjälp av kartanQueryExecution.observedMetrics
.Direktuppspelning eller mikrobatch: Använd
StreamingQueryListener
.StreamingQueryListener
anropas när strömningsfrågan slutför en epok. Få åtkomst till måtten med hjälp av kartanStreamingQueryProgress.observedMetrics
. Azure Databricks stöder inte kontinuerlig körningsströmning.
Till exempel:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Mått för StreamingQueryListener-objekt
Mätvärde | Beskrivning |
---|---|
id |
Ett unikt fråge-ID som finns kvar i omstarter. |
runId |
Ett fråge-ID som är unikt för varje start/omstart. Se StreamingQuery.runId(). |
name |
Det användardefinierade namnet på frågan. Namnet är null om inget namn har angetts. |
timestamp |
Tidsstämpeln för körningen av mikrobatchen. |
batchId |
Ett unikt ID för den aktuella batchen med data som bearbetas. Vid återförsök efter ett fel kan ett visst batch-ID köras mer än en gång. På samma sätt ökar inte batch-ID:t när det inte finns några data som ska bearbetas. |
numInputRows |
Det aggregerade antalet poster (över alla källor) som bearbetas i en utlösare. |
inputRowsPerSecond |
Den aggregerade (över alla källor) frekvensen för ankommande data. |
processedRowsPerSecond |
Den aggregerade (över alla källor) hastighet med vilken Spark bearbetar data. |
durationMs-objekt
Information om den tid det tar att slutföra olika steg i mikrobatchkörningsprocessen.
Mätvärde | Beskrivning |
---|---|
durationMs.addBatch |
Den tid det tar att köra mikrobatchen. Detta utesluter den tid spark tar att planera mikrobatchen. |
durationMs.getBatch |
Den tid det tar att hämta metadata om förskjutningarna från källan. |
durationMs.latestOffset |
Den senaste offset förbrukades för mikrobatchen. Det här förloppsobjektet refererar till den tid det tar att hämta den senaste offset från källor. |
durationMs.queryPlanning |
Den tid det tar att generate körningsplanen. |
durationMs.triggerExecution |
Den tid det tar att planera och köra mikrobatchen. |
durationMs.walCommit |
Den tid det tar att checka in de nya tillgängliga förskjutningarna. |
eventTime-objekt
Information om händelsetidsvärdet som visas i de data som bearbetas i mikrobatchen. Detta data används av watermark för att ta reda på hur tillståndet ska justeras för bearbetning av de tillståndskänsliga aggregeringar som definierats i Structured Streaming-jobbet.
Mätvärde | Beskrivning |
---|---|
eventTime.avg |
Den genomsnittliga händelsetiden som visas i utlösaren. |
eventTime.max |
Den maximala händelsetiden som visas i utlösaren. |
eventTime.min |
Den minsta händelsetid som visas i utlösaren. |
eventTime.watermark |
Värdet för watermark som används i utlösaren. |
stateOperators-objekt
Information om tillståndskänsliga åtgärder som definieras i structured streaming-jobbet och de sammansättningar som skapas från dem.
Mätvärde | Beskrivning |
---|---|
stateOperators.operatorName |
Namnet på den tillståndskänsliga operator som måtten relaterar till, till exempel symmetricHashJoin , dedupe , stateStoreSave . |
stateOperators.numRowsTotal |
Det totala antalet rader i tillstånd som ett resultat av en tillståndskänslig operator eller aggregering. |
stateOperators.numRowsUpdated |
Det totala antalet rader som har uppdaterats i tillstånd till följd av en tillståndskänslig operator eller aggregering. |
stateOperators.allUpdatesTimeMs |
Det här måttet kan för närvarande inte mätas av Spark och planeras att tas bort i framtida uppdateringar. |
stateOperators.numRowsRemoved |
Det totala antalet rader som tagits bort från tillståndet till följd av en tillståndskänslig operator eller aggregering. |
stateOperators.allRemovalsTimeMs |
Det här måttet kan för närvarande inte mätas av Spark och planeras att tas bort i framtida uppdateringar. |
stateOperators.commitTimeMs |
Den tid det tar att checka in alla uppdateringar (placerar och tar bort) och returnerar en ny version. |
stateOperators.memoryUsedBytes |
Minne som används av tillståndsarkivet. |
stateOperators.numRowsDroppedByWatermark |
Antalet rader som anses vara för sena för att ingå i en tillståndskänslig aggregering. Endast strömmande aggregeringar: Antalet rader som släppts efter aggregering (inte råa indatarader). Det här talet är inte exakt, men ger en indikation på att sena data tas bort. |
stateOperators.numShufflePartitions |
Antalet shuffle-partitioner för den här tillståndskänsliga operatorn. |
stateOperators.numStateStoreInstances |
Den faktiska tillståndslagringsinstansen som operatorn har initierat och underhållit. För många tillståndskänsliga operatorer är detta samma som antalet partitioner. Stream-stream-kopplingar initierar dock fyra tillståndslagerinstanser per partition. |
stateOperators.customMetrics-objekt
Information som samlas in från RocksDB och innehåller mått om dess prestanda och åtgärder med avseende på statefula values den underhåller för Structured Streaming-jobbet. Mer information finns i Konfigurera RocksDB-tillståndslager på Azure Databricks.
Mätvärde | Beskrivning |
---|---|
customMetrics.rocksdbBytesCopied |
Antalet byte som kopierats enligt spårning av RocksDB-filhanteraren. |
customMetrics.rocksdbCommitCheckpointLatency |
Tiden i millisekunder som tar en ögonblicksbild av inbyggda RocksDB och skriver den till en lokal katalog. |
customMetrics.rocksdbCompactLatency |
Tiden i millisekunders komprimering (valfritt) under kontrollpunktsincheckningen. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Tiden i millisekunder som synkroniserar den interna RocksDB-ögonblicksbilden till extern lagring (kontrollpunktsplatsen). |
customMetrics.rocksdbCommitFlushLatency |
Tiden i millisekunder som rensar RocksDB-minnesinterna ändringar till den lokala disken. |
customMetrics.rocksdbCommitPauseLatency |
Tiden i millisekunder som stoppar bakgrundsarbetstrådarna som en del av kontrollpunktsincheckningen, till exempel för komprimering. |
customMetrics.rocksdbCommitWriteBatchLatency |
Tiden i millisekunder som tillämpar mellanlagrade skrivningar i minnesintern struktur (WriteBatch ) på interna RocksDB. |
customMetrics.rocksdbFilesCopied |
Antalet filer som kopierats som spårade av RocksDB-filhanteraren. |
customMetrics.rocksdbFilesReused |
Antalet filer som återanvänds enligt spåras av RocksDB-filhanteraren. |
customMetrics.rocksdbGetCount |
Antalet get anrop till databasen (inkluderar gets inte från WriteBatch - minnesintern batch som används för mellanlagring av skrivningar). |
customMetrics.rocksdbGetLatency |
Den genomsnittliga tiden i nanosekunder för det underliggande interna RocksDB::Get anropet. |
customMetrics.rocksdbReadBlockCacheHitCount |
Antalet cacheträffar från blockcacheminnet i RocksDB som är användbara för att undvika lokala diskläsningar. |
customMetrics.rocksdbReadBlockCacheMissCount |
Antalet blockcacheminnen i RocksDB är inte användbart för att undvika lokala diskläsningar. |
customMetrics.rocksdbSstFileSize |
Storleken på alla SST-filer (Static Sorted Table) – den tabellstruktur som RocksDB använder för att lagra data. |
customMetrics.rocksdbTotalBytesRead |
Antalet okomprimerade byte som lästs av get åtgärder. |
customMetrics.rocksdbTotalBytesReadByCompaction |
Antalet byte som komprimeringsprocessen läser från disken. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Det totala antalet byte av okomprimerade data som lästs med hjälp av en iterator. Vissa tillståndskänsliga åtgärder (till exempel timeoutbearbetning i FlatMapGroupsWithState och vattenstämpling) kräver läsning av data i DB via en iterator. |
customMetrics.rocksdbTotalBytesWritten |
Det totala antalet okomprimerade byte som skrivits av put åtgärder. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Det totala antalet byte som komprimeringsprocessen skriver till disken. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Tiden i millisekunder för RocksDB-komprimering, inklusive bakgrundskomprimeringar och den valfria komprimering som initierades under incheckningen. |
customMetrics.rocksdbTotalFlushLatencyMs |
Den totala tömningstiden, inklusive bakgrundsspolning. Tömningsåtgärder är processer som MemTable töms till lagring när den är full.
MemTables är den första nivån where data lagras i RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
Storleken i byte för de okomprimerade zip-filerna enligt filhanterarens rapporter. Filhanteraren hanterar den fysiska SST-filens diskutrymmesanvändning och borttagning. |
källobjekt (Kafka)
Mätvärde | Beskrivning |
---|---|
sources.description |
En detaljerad beskrivning av Kafka-källan som anger det exakta Kafka-ämnet som läse från. Exempel: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
sources.startOffset objekt |
Det startnummer offset inom Kafka-ämnet där strömningsjobbet startade. |
sources.endOffset objekt |
Den sista offset bearbetas av mikrobatchen. Detta kan vara lika med latestOffset för en pågående mikrobatchkörning. |
sources.latestOffset objekt |
Den senaste offset beräknad av mikrobatchmetoden. Mikrobatching-processen kanske inte bearbetar alla förskjutningar när det finns begränsningar, vilket resulterar i endOffset och latestOffset differentiering. |
sources.numInputRows |
Antalet indatarader som bearbetas från den här källan. |
sources.inputRowsPerSecond |
Den hastighet med vilken data anländer för bearbetning från den här källan. |
sources.processedRowsPerSecond |
Den hastighet med vilken Spark bearbetar data från den här källan. |
sources.metrics-objekt (Kafka)
Mätvärde | Beskrivning |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Det genomsnittliga antalet förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga offset bland alla prenumerationsämnen. |
sources.metrics.estimatedTotalBytesBehindLatest |
Det uppskattade antalet byte som frågeprocessen inte har förbrukat från de prenumererade ämnena. |
sources.metrics.maxOffsetsBehindLatest |
Det maximala antalet förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga offset bland alla prenumerationsämnen. |
sources.metrics.minOffsetsBehindLatest |
Det minsta antalet förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga offset bland alla prenumerationsämnen. |
mottagarobjekt (Kafka)
Mätvärde | Beskrivning |
---|---|
sink.description |
Beskrivningen av Kafka-mottagaren som strömningsfrågan skriver till, som beskriver den specifika Kafka-mottagarimplementeringen som används. Exempel: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Antalet rader som skrivits till utdata table eller mottagare som en del av mikrobatchen. I vissa situationer kan det här värdet vara "-1" och kan i allmänhet tolkas som "okänt". |
källobjekt (Delta Lake)
Mätvärde | Beskrivning |
---|---|
sources.description |
Beskrivningen av källan som strömningsfrågan läser från. Exempel: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
Den version av serialisering som detta offset är kodad med. |
sources.[startOffset/endOffset].reservoirId |
ID:t för table som läses. Detta används för att identifiera felkonfiguration vid omstart av en fråga. |
sources.[startOffset/endOffset].reservoirVersion |
Den version av table som bearbetas för närvarande. |
sources.[startOffset/endOffset].index |
Indexet i sekvensen AddFiles i den här versionen. Detta används för att dela upp stora incheckningar i flera batchar. Det här indexet skapas genom sortering på modificationTimestamp och path . |
sources.[startOffset/endOffset].isStartingVersion |
Identifierar om den aktuella offset markerar början på en ny strömförfrågan snarare än bearbetningen av ändringar som inträffade efter att de ursprungliga data bearbetades. När du startar en ny fråga bearbetas alla data som finns i table i början först och sedan alla nya data som tas emot. |
sources.latestOffset |
Den senaste offset bearbetas av mikrobatchfrågan. |
sources.numInputRows |
Antalet indatarader som bearbetas från den här källan. |
sources.inputRowsPerSecond |
Den hastighet med vilken data anländer för bearbetning från den här källan. |
sources.processedRowsPerSecond |
Den hastighet med vilken Spark bearbetar data från den här källan. |
sources.metrics.numBytesOutstanding |
Den kombinerade storleken på de utestående filerna (filer som spåras av RocksDB). Det här är måttet för kvarvarande uppgifter för Delta och Auto Loader som strömningskälla. |
sources.metrics.numFilesOutstanding |
Antalet utestående filer som ska bearbetas. Det här är måttet för kvarvarande uppgifter för Delta och Auto Loader som strömningskälla. |
mottagarobjekt (Delta Lake)
Mätvärde | Beskrivning |
---|---|
sink.description |
Beskrivningen av deltamottagaren, som beskriver den specifika deltamottagareimplementering som används. Exempel: “DeltaSink[table]” . |
sink.numOutputRows |
Antalet rader är alltid "-1" eftersom Spark inte kan härleda utdatarader för DSv1-mottagare, vilket är klassificeringen för Delta Lake-mottagare. |
Exempel
Exempel på Kafka-to-Kafka StreamingQueryListener-händelse
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Exempel på Delta Lake-to-Delta Lake StreamingQueryListener-händelse
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Exempel på Kinesis-to-Delta Lake StreamingQueryListener-händelse
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Exempel på Kafka+Delta Lake-to-Delta Lake StreamingQueryListener-händelse
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Exempel på frekvenskälla till Delta Lake StreamingQueryListener-händelse
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}