Monitorování dotazů strukturovaného streamování v Azure Databricks
Azure Databricks poskytuje integrované monitorování pro aplikace strukturovaného streamování prostřednictvím uživatelského rozhraní Sparku na kartě Streamování .
Rozlišení dotazů strukturovaného streamování v uživatelském rozhraní Sparku
Zadejte jedinečný název dotazu přidáním .queryName(<query-name>)
do writeStream
kódu, abyste snadno rozlišili metriky, ke kterým streamům patří v uživatelském rozhraní Sparku.
Nabízení metrik strukturovaného streamování do externích služeb
Metriky streamování je možné odeslat do externích služeb pro účely upozorňování nebo řízení pomocí rozhraní Streaming Query Listener od Apache Spark. V Databricks Runtime 11.3 LTS a novějších je StreamingQueryListener
k dispozici v Pythonu a Scala.
Důležité
Pro úlohy používající režimy výpočetního přístupu s podporou katalogu Unity platí následující omezení:
-
StreamingQueryListener
vyžaduje, aby Databricks Runtime 15.1 nebo novější používal přihlašovací údaje nebo pracoval s objekty spravovanými katalogem Unity na výpočetních prostředcích s vyhrazeným režimem přístupu. -
StreamingQueryListener
vyžaduje Databricks Runtime 16.1 nebo novější pro úlohy Scala nakonfigurované pomocí standardního režimu přístupu (dříve režim sdíleného přístupu).
Poznámka:
Latence zpracování s posluchači může výrazně ovlivnit rychlost zpracování dotazů. Je doporučeno omezit logiku zpracování v těchto nasloucháčích a využívat systémy rychlé odezvy, jako je Kafka, pro efektivitu.
Následující kód obsahuje základní příklady syntaxe pro implementaci naslouchacího procesu:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definování pozorovatelných metrik ve strukturovaném streamování
Pozorovatelné metriky se nazývají libovolné agregační funkce, které lze definovat v dotazu (DataFrame). Jakmile provádění datového rámce dosáhne bodu dokončení (tj. dokončí dávkové dotaz nebo dosáhne epochy streamování), vygeneruje se pojmenovaná událost obsahující metriky pro data zpracovávaná od posledního bodu dokončení.
Tyto metriky můžete sledovat připojením posluchače k relaci Spark. Posluchač závisí na módu spuštění:
Dávkový režim: Použijte
QueryExecutionListener
.QueryExecutionListener
je volán po dokončení dotazu. Získejte přístup k metrikám pomocíQueryExecution.observedMetrics
mapy.Streamování nebo mikrobatch: Použijte
StreamingQueryListener
.StreamingQueryListener
se zavolá, když streamovací dotaz dokončí jednu epochu. Získejte přístup k metrikám pomocíStreamingQueryProgress.observedMetrics
mapy. Azure Databricks nepodporuje režim triggerucontinuous
pro streamování.
Příklad:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Mapa k:[UC], k:[Delta] a k:[SS] identifikátory v tabulce metrik
Re:[SS] Metriky používají pole reservoirId
na několika místech pro jedinečný identifikátor tabulky Delta používaný jako zdroj pro streamingový dotaz.
Pole reservoirId
mapuje jedinečný identifikátor uložený tabulkou Delta v transakčním protokolu Delta. Toto ID není namapováno na hodnotu tableId
přiřazenou re:[UC] a zobrazenou v Průzkumníku katalogu.
Pomocí následující syntaxe zkontrolujte identifikátor tabulky Delta. Funguje to u spravovaných tabulek Katalogu Unity, externích tabulek Katalogu Unity a všech tabulek Delta metastoru Hive:
DESCRIBE DETAIL <table-name>
Pole id
zobrazené ve výsledcích je identifikátor, který se mapuje na reservoirId
v streamovacích metrikách.
Metriky objektů StreamingQueryListener
Metrický | Popis |
---|---|
id |
Jedinečné ID dotazu, které se zachová při restartování. |
runId |
ID dotazu, které je jedinečné pro každé spuštění/restartování. Viz StreamingQuery.runId(). |
name |
Uživatelem zadaný název dotazu. Název má hodnotu null, pokud není zadán žádný název. |
timestamp |
Časové razítko pro spuštění mikrodávky. |
batchId |
Jedinečné ID pro aktuální dávku zpracovávaných dat. V případě opakování po selhání může být dané ID dávky provedeno vícekrát. Podobně platí, že pokud nejsou žádná data ke zpracování, ID dávky se nezvýší. |
numInputRows |
Agregace (napříč všemi zdroji) počtu záznamů zpracovaných v triggeru |
inputRowsPerSecond |
Agregovaná míra příchozích dat napříč všemi zdroji. |
processedRowsPerSecond |
Celková rychlost, ze všech zdrojů, s jakou Spark zpracovává data. |
DurationMs – objekt
Informace o době potřebnou k dokončení různých fází procesu provádění mikrobatchu.
Metrický | Popis |
---|---|
durationMs.addBatch |
Čas potřebný k provedení mikrodávky. Tím se vyloučí čas, který Spark potřebuje k naplánování mikrodávky. |
durationMs.getBatch |
Doba potřebná k načtení metadat o posunech z zdroje. |
durationMs.latestOffset |
Nejnovější odsazení spotřebované pro mikrodávku. Tento objekt průběhu odkazuje na čas potřebný k načtení nejnovějšího posunu ze zdrojů. |
durationMs.queryPlanning |
Doba potřebná ke generování plánu provádění. |
durationMs.triggerExecution |
Doba, která trvá naplánování a spuštění mikrobatchu. |
durationMs.walCommit |
Doba potřebná k potvrzení nových dostupných offsetů. |
eventTime – objekt
Informace o hodnotě času události, která se vyskytuje v datech zpracovávaných v mikrodávkových procesech. Tato data jsou používána vodoznakem k zjištění, jak oříznout stav pro zpracování stavových agregací definovaných v úloze strukturovaného streamování.
Metrický | Popis |
---|---|
eventTime.avg |
Průměrná doba trvání události zobrazená v tomto spouštěči. |
eventTime.max |
Maximální doba události zobrazená v daném triggeru. |
eventTime.min |
Minimální doba události viděná v tomto spouštěči. |
eventTime.watermark |
Hodnota vodoznaku použitého v této aktivační události. |
stateOperators – objekt
Informace o stavových operacích, které jsou definovány v úloze strukturovaného streamování a agregace vytvořené z nich.
Metrický | Popis |
---|---|
stateOperators.operatorName |
Název stavového operátoru, ke kterému se metriky vztahují, například symmetricHashJoin , dedupe . stateStoreSave |
stateOperators.numRowsTotal |
Celkový počet řádků v rámci stavu v důsledku stavového operátoru nebo agregace. |
stateOperators.numRowsUpdated |
Celkový počet řádků aktualizovaných ve stavu v důsledku stavového operátoru nebo agregace. |
stateOperators.allUpdatesTimeMs |
Tato metrika v současné době není měřitelná sparkem a plánuje se odebrat v budoucích aktualizacích. |
stateOperators.numRowsRemoved |
Celkový počet řádků odebraných ze stavu v důsledku stavového operátoru nebo agregace. |
stateOperators.allRemovalsTimeMs |
Tato metrika v současné době není měřitelná sparkem a plánuje se odebrat v budoucích aktualizacích. |
stateOperators.commitTimeMs |
Doba potřebná k potvrzení všech aktualizací (vložení a odebrání) a vrácení nové verze. |
stateOperators.memoryUsedBytes |
Paměť používaná úložištěm stavů. |
stateOperators.numRowsDroppedByWatermark |
Počet řádků, které jsou považovány za příliš pozdě, aby se zahrnuly do stavové agregace. Pouze streamovací agregace: Počet řádků vynechaných po agregaci (ne surových vstupních řádků). Toto číslo není přesné, ale naznačuje, že jsou vyřazována opožděná data. |
stateOperators.numShufflePartitions |
Počet oddílů pro zamíchání tohoto stavového operátoru. |
stateOperators.numStateStoreInstances |
Skutečná instance úložiště stavu, kterou operátor inicializoval a udržuje. U mnoha stavových operátorů je to stejné jako počet particí. Spojení stream-stream inicializují pro každý oddíl čtyři instance stavového úložiště. |
stateOperators.customMetrics – objekt
Informace shromážděné ze služby RocksDB zachycující metriky týkající se výkonu a operací s ohledem na stavové hodnoty, které udržuje pro úlohu strukturovaného streamování. Další informace najdete v tématu Konfigurace úložiště stavů RocksDB v Azure Databricks.
Metrický | Popis |
---|---|
customMetrics.rocksdbBytesCopied |
Počet zkopírovaných bajtů, jak sleduje správce souborů RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
Doba v milisekundách potřebná k pořízení snímku nativní databáze RocksDB a jeho zápisu do místního adresáře. |
customMetrics.rocksdbCompactLatency |
Doba komprimování v milisekundách během potvrzení kontrolního bodu (volitelné). |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Čas synchronizace nativního snímku RocksDB s externím úložištěm (umístění kontrolního bodu) v milisekundách |
customMetrics.rocksdbCommitFlushLatency |
Čas v milisekundách vyprázdnění změn v paměti RocksDB na místní disk. |
customMetrics.rocksdbCommitPauseLatency |
Doba v milisekundách potřebná k zastavení pracovních vláken na pozadí jako součást potvrzení kontrolního bodu, například při komprimaci. |
customMetrics.rocksdbCommitWriteBatchLatency |
Čas v milisekundách potřebný pro aplikaci fázovaných zápisů ve struktuře v paměti (WriteBatch ) na nativní RocksDB. |
customMetrics.rocksdbFilesCopied |
Počet souborů zkopírovaných podle sledování správcem souborů RocksDB. |
customMetrics.rocksdbFilesReused |
Počet znovupoužitých souborů sledovaných Správcem souborů RocksDB. |
customMetrics.rocksdbGetCount |
Počet get volání do databáze (nezahrnuje gets dávku WriteBatch v paměti použitou pro přípravné zápisy). |
customMetrics.rocksdbGetLatency |
Průměrná doba v nanosekundách základního nativního RocksDB::Get volání. |
customMetrics.rocksdbReadBlockCacheHitCount |
Počet zásahů do mezipaměti bloků v RocksDB, které jsou užitečné při zamezení čtení z místního disku. |
customMetrics.rocksdbReadBlockCacheMissCount |
Počet blokové mezipaměti v RocksDB není užitečný při zamezení čtení z místního disku. |
customMetrics.rocksdbSstFileSize |
Velikost všech souborů se statickou seřazenou tabulkou (SST) – tabulková struktura RocksDB používá k ukládání dat. |
customMetrics.rocksdbTotalBytesRead |
Počet nekomprimovaných bajtů přečtených operacemi get . |
customMetrics.rocksdbTotalBytesReadByCompaction |
Počet bajtů, které proces komprimace načítá z disku. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Celkový počet bajtů nekomprimovaných dat přečtených pomocí iterátoru. Některé stavové operace (například zpracování časového limitu v FlatMapGroupsWithState a vodoznaku) vyžadují čtení dat v DB prostřednictvím iterátoru. |
customMetrics.rocksdbTotalBytesWritten |
Celkový počet nekomprimovaných bajtů zapsaných operacemi put . |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Celkový počet bajtů, které proces komprimace zapisuje na disk. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Čas v milisekundách pro komprimace RocksDB, včetně komprimací pozadí a volitelné komprimace zahájené během potvrzení. |
customMetrics.rocksdbTotalFlushLatencyMs |
Celková doba proplachování, včetně proplachování na pozadí. Operace vyprázdnění jsou procesy, kterými MemTable se vyprázdní do úložiště, jakmile je zaplněno.
MemTables jsou první úrovní, ve které jsou data uložená ve službě RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
Velikost v bajtech nekomprimovaných souborů ZIP hlášených správcem souborů. Správce souborů spravuje využití a odstranění fyzického místa na disku se soubory SST. |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
Nejnovější verze snímku RocksDB uložená do kontrolního bodu. Hodnota -1 značí, že se nikdy neuložil žádný snímek. Vzhledem k tomu, že snímky jsou specifické pro každou instanci úložiště stavů, tato metrika se vztahuje na konkrétní ID oddílu a název úložiště stavů. |
source – objekt (Kafka)
Metrický | Popis |
---|---|
sources.description |
Podrobný popis zdroje Kafka, který určuje přesné téma Kafka, ze kterého čteme. Například: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
sources.startOffset objekt |
Počáteční číslo posunu v tématu Kafka, kde začala streamovací úloha. |
sources.endOffset objekt |
Poslední offset zpracovaný mikrodávkou. To se může rovnat latestOffset během probíhajícího provádění mikrošarže. |
sources.latestOffset objekt |
Poslední posunutí určené microbatchem. Proces mikrobatchingu nemusí zpracovat všechny offsety, když dochází k omezování, což způsobuje rozdíly mezi endOffset a latestOffset . |
sources.numInputRows |
Počet vstupních řádků zpracovaných z tohoto zdroje |
sources.inputRowsPerSecond |
Rychlost, s jakou data přicházejí ke zpracování z tohoto zdroje. |
sources.processedRowsPerSecond |
Rychlost, s jakou Spark zpracovává data z tohoto zdroje. |
sources.metrics – objekt (Kafka)
Metrický | Popis |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Průměrný počet offsetů, o které je streamovací dotaz pozadu za nejnovějším dostupným offsetem mezi všemi předplacenými tématy. |
sources.metrics.estimatedTotalBytesBehindLatest |
Odhadovaný počet bajtů, které dotazovací proces dosud nespotřeboval ze sledovaných témat. |
sources.metrics.maxOffsetsBehindLatest |
Maximální počet posunů, o které streamovací dotaz zaostává za nejnovějším dostupným posunem mezi všemi odebíranými tématy. |
sources.metrics.minOffsetsBehindLatest |
Minimální počet posunů, o něž streamovací dotaz zaostává za nejnovějším dostupným offsetem mezi všemi sledovanými topiky. |
objekt jímky (Kafka)
Metrický | Popis |
---|---|
sink.description |
Popis Kafka sinku, do kterého zapisuje streamovací dotaz, včetně podrobností o konkrétní použité implementaci Kafka sinku. Například: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Počet řádků, které byly zapsány do výstupní tabulky nebo výstupního úložiště jako součást mikrodávky. V některých situacích může být tato hodnota -1 a obecně ji lze interpretovat jako "neznámá". |
sources – objekt (Delta Lake)
Metrický | Popis |
---|---|
sources.description |
Popis zdroje, ze kterého streamovací dotaz čte. Například: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
Verze serializace, se kterou je toto posunutí kódováno. |
sources.[startOffset/endOffset].reservoirId |
ID přečtené tabulky. Používá se k detekci chybné konfigurace při restartování dotazu. Viz. mapu a identifikátory tabulky metrik re:[UC], re:[Delta] a re:[SS]. |
sources.[startOffset/endOffset].reservoirVersion |
Verze tabulky, která se právě zpracovává |
sources.[startOffset/endOffset].index |
Index v posloupnosti AddFiles této verze. Slouží k rozdělení velkých potvrzení do více dávek. Tento index je vytvořen řazením modificationTimestamp a path . |
sources.[startOffset/endOffset].isStartingVersion |
Určuje, zda aktuální posun označuje začátek nového streamovacího dotazu místo zpracování změn, ke kterým došlo po počátečním zpracování dat. Při spuštění nového dotazu se nejprve zpracuje všechna data, která jsou v tabulce na začátku, a pak všechna nová data, která dorazí. |
sources.latestOffset |
Nejnovější posun zpracovaný dotazem microbatch. |
sources.numInputRows |
Počet vstupních řádků zpracovaných z tohoto zdroje |
sources.inputRowsPerSecond |
Rychlost, s jakou data přicházejí ke zpracování z tohoto zdroje. |
sources.processedRowsPerSecond |
Rychlost, s jakou Spark zpracovává data z tohoto zdroje. |
sources.metrics.numBytesOutstanding |
Kombinovaná velikost nevyřízených souborů (soubory sledované rocksDB) Jedná se o metriku backlogu pro Delta a Auto Loader jako zdroj pro streamování. |
sources.metrics.numFilesOutstanding |
Počet nevyřízených souborů, které se mají zpracovat. Jedná se o metriku backlogu pro Delta a Auto Loader jako zdroje streamování. |
objekt jímky (Delta Lake)
Metrický | Popis |
---|---|
sink.description |
Popis jímky Delta podrobně popisuje konkrétní použitou implementaci jímky Delta. Například: “DeltaSink[table]” . |
sink.numOutputRows |
Počet řádků je vždy -1, protože Spark nemůže odvodit výstupní řádky pro jímky DSv1, což je klasifikace jímky Delta Lake. |
Příklady
Příklad události Kafka-to-Kafka StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Příklad události Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Příklad události Kinesis-to-Delta Lake StreamingQueryListener
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Příklad události Kafka+Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Příklad zdroje rychlosti pro událost typu StreamingQueryListener v Delta Lake
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}