Monitorowanie zapytań strukturalnego przesyłania strumieniowego w usłudze Azure Databricks
Usługa Azure Databricks zapewnia wbudowane monitorowanie aplikacji do przesyłania strumieniowego ze strukturą za pośrednictwem interfejsu użytkownika platformy Spark na karcie Przesyłanie strumieniowe .
Rozróżnianie zapytań przesyłania strumieniowego ze strukturą w interfejsie użytkownika platformy Spark
Podaj unikatową nazwę zapytania, dodając .queryName(<query-name>)
do writeStream
kodu, aby łatwo odróżnić metryki, do których należy strumień w interfejsie użytkownika platformy Spark.
Wypychanie metryk przesyłania strumieniowego ze strukturą do usług zewnętrznych
Metryki przesyłania strumieniowego można wypychać do usług zewnętrznych na potrzeby zgłaszania alertów lub pulpitów nawigacyjnych przy użyciu interfejsu odbiornika zapytań przesyłania strumieniowego platformy Apache Spark. W środowisku Databricks Runtime 11.3 LTS i nowszym odbiornik zapytań przesyłania strumieniowego jest dostępny w językach Python i Scala.
Ważne
Nie można używać poświadczeń i obiektów zarządzanych przez wykaz aparatu Unity w StreamingQueryListener
logice.
Uwaga
Opóźnienie przetwarzania z odbiornikami może znacząco wpływać na szybkość przetwarzania zapytań. Zaleca się ograniczenie logiki przetwarzania w tych odbiornikach i wybranie pisania do systemów szybkiego reagowania, takich jak Kafka, w celu zwiększenia wydajności.
Poniższy kod zawiera podstawowe przykłady składni implementowania odbiornika:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definiowanie obserwowalnych metryk w strumieniu ze strukturą
Obserwowane metryki są nazywane dowolnymi funkcjami agregacji, które można zdefiniować w zapytaniu (DataFrame). Gdy tylko wykonanie ramki danych osiągnie punkt ukończenia (czyli kończy zapytanie wsadowe lub osiąga epokę przesyłania strumieniowego), emitowane jest nazwane zdarzenie zawierające metryki dla danych przetworzonych od ostatniego punktu ukończenia.
Możesz obserwować te metryki, dołączając odbiornik do sesji platformy Spark. Odbiornik zależy od trybu wykonywania:
Tryb wsadowy: użyj polecenia
QueryExecutionListener
.QueryExecutionListener
jest wywoływana po zakończeniu zapytania. Uzyskaj dostęp do metryk przy użyciuQueryExecution.observedMetrics
mapy.Przesyłanie strumieniowe lub mikrobajt: użyj polecenia
StreamingQueryListener
.StreamingQueryListener
jest wywoływana, gdy zapytanie przesyłane strumieniowo kończy epokę. Uzyskaj dostęp do metryk przy użyciuStreamingQueryProgress.observedMetrics
mapy. Usługa Azure Databricks nie obsługuje przesyłania strumieniowego ciągłego wykonywania.
Na przykład:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Metryki obiektu StreamingQueryListener
Metryczne | opis |
---|---|
id |
Unikatowy identyfikator zapytania, który utrzymuje się podczas ponownego uruchamiania. |
runId |
Identyfikator zapytania, który jest unikatowy dla każdego uruchomienia/ponownego uruchomienia. Zobacz StreamingQuery.runId(). |
name |
Określona przez użytkownika nazwa zapytania. Nazwa ma wartość null, jeśli nie określono żadnej nazwy. |
timestamp |
Sygnatura czasowa wykonania mikrobajtu. |
batchId |
Unikatowy identyfikator bieżącej partii przetwarzanych danych. W przypadku ponownych prób po awarii dany identyfikator partii może zostać wykonany więcej niż raz. Podobnie, gdy nie ma danych do przetworzenia, identyfikator partii nie jest zwiększany. |
numInputRows |
Zagregowana (we wszystkich źródłach) liczba rekordów przetworzonych w wyzwalaczu. |
inputRowsPerSecond |
Zagregowany (we wszystkich źródłach) współczynnik przychodzących danych. |
processedRowsPerSecond |
Szybkość agregacji (we wszystkich źródłach), z jaką platforma Spark przetwarza dane. |
durationMs, obiekt
Informacje o czasie potrzebnym na ukończenie różnych etapów procesu wykonywania mikrobajtów.
Metryczne | opis |
---|---|
durationMs.addBatch |
Czas potrzebny na wykonanie mikrobajtu. Wyklucza to czas, przez który platforma Spark planuje mikrobajt. |
durationMs.getBatch |
Czas potrzebny na pobranie metadanych dotyczących przesunięć ze źródła. |
durationMs.latestOffset |
Najnowsze przesunięcie używane dla mikrobabajtu. Ten obiekt postępu odnosi się do czasu potrzebnego na pobranie najnowszego przesunięcia ze źródeł. |
durationMs.queryPlanning |
Czas potrzebny na wygenerowanie planu wykonania. |
durationMs.triggerExecution |
Czas potrzebny na zaplanowanie i wykonanie mikrobajtu. |
durationMs.walCommit |
Czas potrzebny na zatwierdzenie nowych dostępnych przesunięć. |
eventTime, obiekt
Informacje o wartości czasu zdarzenia widocznej w danych przetwarzanych w mikrobajtach. Te dane są używane przez znak wodny, aby dowiedzieć się, jak przyciąć stan przetwarzania agregacji stanowych zdefiniowanych w zadaniu przesyłania strumieniowego ze strukturą.
Metryczne | opis |
---|---|
eventTime.avg |
Średni czas zdarzenia widoczny w tym wyzwalaczu. |
eventTime.max |
Maksymalny czas zdarzenia widoczny w tym wyzwalaczu. |
eventTime.min |
Minimalny czas zdarzenia widoczny w tym wyzwalaczu. |
eventTime.watermark |
Wartość znaku wodnego używanego w tym wyzwalaczu. |
stateOperators, obiekt
Informacje o operacjach stanowych zdefiniowanych w zadaniu przesyłania strumieniowego ze strukturą i agregacjach generowanych z nich.
Metryczne | opis |
---|---|
stateOperators.operatorName |
Nazwa operatora stanowego, do którego odnoszą się metryki, takie jak symmetricHashJoin , dedupe , . stateStoreSave |
stateOperators.numRowsTotal |
Całkowita liczba wierszy w stanie w wyniku operatora stanowego lub agregacji. |
stateOperators.numRowsUpdated |
Całkowita liczba wierszy zaktualizowanych w stanie w wyniku operatora stanowego lub agregacji. |
stateOperators.allUpdatesTimeMs |
Ta metryka nie jest obecnie wymierna przez platformę Spark i planuje zostać usunięta w przyszłych aktualizacjach. |
stateOperators.numRowsRemoved |
Całkowita liczba wierszy usuniętych ze stanu w wyniku operatora stanowego lub agregacji. |
stateOperators.allRemovalsTimeMs |
Ta metryka nie jest obecnie wymierna przez platformę Spark i planuje zostać usunięta w przyszłych aktualizacjach. |
stateOperators.commitTimeMs |
Czas potrzebny na zatwierdzenie wszystkich aktualizacji (umieszcza i usuwa) i zwraca nową wersję. |
stateOperators.memoryUsedBytes |
Pamięć używana przez magazyn stanów. |
stateOperators.numRowsDroppedByWatermark |
Liczba wierszy, które są uznawane za za późno, aby zostały uwzględnione w agregacji stanowej. Tylko agregacje przesyłania strumieniowego: liczba wierszy porzuconych po agregacji (nie nieprzetworzonych wierszy wejściowych). Ta liczba nie jest dokładna, ale wskazuje, że są porzucane opóźnione dane. |
stateOperators.numShufflePartitions |
Liczba partycji mieszania dla tego operatora stanowego. |
stateOperators.numStateStoreInstances |
Rzeczywiste wystąpienie magazynu stanów, które operator zainicjował i konserwował. W przypadku wielu operatorów stanowych jest to samo co liczba partycji. Jednak sprzężenia strumienia strumienia inicjują cztery wystąpienia magazynu stanów na partycję. |
stateOperators.customMetrics, obiekt
Informacje zebrane z bazy danych RocksDB przechwytujące metryki dotyczące wydajności i operacji w odniesieniu do wartości stanowych, które są przechowywane dla zadania przesyłania strumieniowego ze strukturą. Aby uzyskać więcej informacji, zobacz Configure RocksDB state store on Azure Databricks (Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks).
Metryczne | opis |
---|---|
customMetrics.rocksdbBytesCopied |
Liczba bajtów skopiowanych przez Menedżera plików Bazy danych RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
Czas w milisekundach tworzenia migawki natywnej bazy danych RocksDB i zapisywania go w katalogu lokalnym. |
customMetrics.rocksdbCompactLatency |
Czas w milisekundach kompaktowanie (opcjonalnie) podczas zatwierdzania punktu kontrolnego. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Czas w milisekundach synchronizowania natywnej migawki bazy danych RocksDB z magazynem zewnętrznym (lokalizacja punktu kontrolnego). |
customMetrics.rocksdbCommitFlushLatency |
Czas w milisekundach opróżniania danych RocksDB w pamięci zmienia się na dysk lokalny. |
customMetrics.rocksdbCommitPauseLatency |
Czas w milisekundach zatrzymywania wątków procesu roboczego w tle w ramach zatwierdzenia punktu kontrolnego, takiego jak kompaktowanie. |
customMetrics.rocksdbCommitWriteBatchLatency |
Czas w milisekundach stosowania przygotowanych zapisów w strukturze w pamięci (WriteBatch ) do natywnej bazy danych RocksDB. |
customMetrics.rocksdbFilesCopied |
Liczba plików skopiowanych zgodnie z śledzeniem przez Menedżera plików Bazy danych RocksDB. |
customMetrics.rocksdbFilesReused |
Liczba ponownie użytych plików, które są śledzone przez Menedżera plików Bazy danych RocksDB. |
customMetrics.rocksdbGetCount |
Liczba wywołań get bazy danych (nie obejmuje gets WriteBatch operacji wsadowych w pamięci używanych do przejściowych zapisów). |
customMetrics.rocksdbGetLatency |
Średni czas w nanosekundach dla bazowego wywołania natywnego RocksDB::Get . |
customMetrics.rocksdbReadBlockCacheHitCount |
Liczba trafień pamięci podręcznej z pamięci podręcznej bloku w bazie danych RocksDB, które są przydatne w unikaniu odczytów dysku lokalnego. |
customMetrics.rocksdbReadBlockCacheMissCount |
Liczba blokowej pamięci podręcznej w bazie danych RocksDB nie jest przydatna w unikaniu odczytów dysku lokalnego. |
customMetrics.rocksdbSstFileSize |
Rozmiar wszystkich plików tabeli sortowanej statycznej (SST) — struktura tabelaryczna RocksDB używa do przechowywania danych. |
customMetrics.rocksdbTotalBytesRead |
Liczba nieskompresowanych bajtów odczytanych według get operacji. |
customMetrics.rocksdbTotalBytesReadByCompaction |
Liczba bajtów odczytywanych przez proces kompaktowania z dysku. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Całkowita liczba bajtów nieskompresowanych danych odczytanych przy użyciu iteratora. Niektóre operacje stanowe (na przykład przetwarzanie limitu czasu i FlatMapGroupsWithState znakowanie wodne) wymagają odczytywania danych w bazie danych za pośrednictwem iteratora. |
customMetrics.rocksdbTotalBytesWritten |
Całkowita liczba nieskompresowanych bajtów zapisanych przez put operacje. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Całkowita liczba bajtów zapisu w procesie kompaktowania na dysku. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Czas w milisekundach kompaktowania bazy danych RocksDB, w tym kompaktowanie w tle i opcjonalna kompaktowanie zainicjowane podczas zatwierdzania. |
customMetrics.rocksdbTotalFlushLatencyMs |
Łączny czas opróżniania, w tym opróżnianie tła. Operacje opróżniania to procesy, za pomocą których MemTable opróżniane są do magazynu po ich zapełnieniu. MemTables to pierwszy poziom, na którym dane są przechowywane w bazie danych RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
Rozmiar w bajtach nieskompresowanych plików zip zgłoszony przez Menedżera plików. Menedżer plików zarządza fizycznym wykorzystaniem i usunięciem miejsca na dysku pliku SST. |
sources object (Kafka)
Metryczne | opis |
---|---|
sources.description |
Szczegółowy opis źródła platformy Kafka określający dokładny temat platformy Kafka odczytywany. Na przykład: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
sources.startOffset sprzeciwiać się |
Numer przesunięcia początkowego w temacie platformy Kafka, w którym uruchomiono zadanie przesyłania strumieniowego. |
sources.endOffset sprzeciwiać się |
Ostatnie przesunięcie przetworzone przez mikrobajt. Może to być równe latestOffset dla trwającego wykonywania mikrobajta. |
sources.latestOffset sprzeciwiać się |
Najnowsze przesunięcie obliczone przez mikrobabajt. Proces mikrobatchingu może nie przetwarzać wszystkich przesunięć, gdy występuje ograniczanie przepustowości, co skutkuje endOffset różnicą i latestOffset różnicą. |
sources.numInputRows |
Liczba wierszy wejściowych przetworzonych z tego źródła. |
sources.inputRowsPerSecond |
Szybkość, z jaką dane docierają do przetwarzania z tego źródła. |
sources.processedRowsPerSecond |
Szybkość przetwarzania danych z tego źródła przez platformę Spark. |
sources.metrics, obiekt (Kafka)
Metryczne | opis |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Średnia liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów. |
sources.metrics.estimatedTotalBytesBehindLatest |
Szacowana liczba bajtów, z których proces zapytania nie korzysta z subskrybowanych tematów. |
sources.metrics.maxOffsetsBehindLatest |
Maksymalna liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów. |
sources.metrics.minOffsetsBehindLatest |
Minimalna liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów. |
obiekt ujścia (Kafka)
Metryczne | opis |
---|---|
sink.description |
Opis ujścia platformy Kafka, do którego pisze zapytanie przesyłane strumieniowo, szczegółowo opis używanej implementacji ujścia platformy Kafka. Na przykład: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Liczba wierszy zapisanych w tabeli wyjściowej lub ujściu w ramach mikrobatchu. W niektórych sytuacjach ta wartość może mieć wartość "-1" i ogólnie może być interpretowana jako "nieznana". |
obiekt sources (Delta Lake)
Metryczne | opis |
---|---|
sources.description |
Opis źródła, z którego odczytuje zapytanie przesyłane strumieniowo. Na przykład: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
Wersja serializacji, za pomocą której to przesunięcie jest zakodowane. |
sources.[startOffset/endOffset].reservoirId |
Identyfikator odczytywanej tabeli. Służy do wykrywania błędnej konfiguracji podczas ponownego uruchamiania zapytania. |
sources.[startOffset/endOffset].reservoirVersion |
Wersja tabeli, która jest obecnie przetwarzana. |
sources.[startOffset/endOffset].index |
Indeks w sekwencji AddFiles w tej wersji. Służy do dzielenia dużych zatwierdzeń na wiele partii. Ten indeks jest tworzony przez sortowanie według modificationTimestamp i path . |
sources.[startOffset/endOffset].isStartingVersion |
Określa, czy bieżące przesunięcie oznacza początek nowego zapytania przesyłania strumieniowego, a nie przetwarzanie zmian, które wystąpiły po przetworzeniu początkowych danych. Podczas uruchamiania nowego zapytania wszystkie dane obecne w tabeli na początku są najpierw przetwarzane, a następnie wszelkie nowe dane, które docierają. |
sources.latestOffset |
Najnowsze przesunięcie przetworzone przez zapytanie mikrobatch. |
sources.numInputRows |
Liczba wierszy wejściowych przetworzonych z tego źródła. |
sources.inputRowsPerSecond |
Szybkość, z jaką dane docierają do przetwarzania z tego źródła. |
sources.processedRowsPerSecond |
Szybkość przetwarzania danych z tego źródła przez platformę Spark. |
sources.metrics.numBytesOutstanding |
Łączny rozmiar zaległych plików (pliki śledzone przez bazę danych RocksDB). Jest to metryka listy prac dla funkcji delta i automatycznego modułu ładującego jako źródło przesyłania strumieniowego. |
sources.metrics.numFilesOutstanding |
Liczba zaległych plików do przetworzenia. Jest to metryka listy prac dla funkcji delta i automatycznego modułu ładującego jako źródło przesyłania strumieniowego. |
obiekt ujścia (Delta Lake)
Metryczne | opis |
---|---|
sink.description |
Opis ujścia delty, szczegółowo opis używanej implementacji ujścia delty. Na przykład: “DeltaSink[table]” . |
sink.numOutputRows |
Liczba wierszy jest zawsze "-1", ponieważ platforma Spark nie może wywnioskować wierszy wyjściowych dla ujścia DSv1, czyli klasyfikacji ujścia usługi Delta Lake. |
Przykłady
Przykładowe zdarzenie Kafka-to-Kafka StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Przykładowe zdarzenie Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Przykładowe zdarzenie Kinesis-to-Delta Lake StreamingQueryListener
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Przykładowe zdarzenie Platformy Kafka+usługa Delta Lake do usługi Delta Lake StreamingQueryListener
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Przykładowe źródło współczynnika dla zdarzenia Delta Lake StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}