Udostępnij za pośrednictwem


Monitorowanie zapytań strukturalnego przesyłania strumieniowego w usłudze Azure Databricks

Usługa Azure Databricks zapewnia wbudowane monitorowanie aplikacji do przesyłania strumieniowego ze strukturą za pośrednictwem interfejsu użytkownika platformy Spark na karcie Przesyłanie strumieniowe .

Rozróżnianie zapytań przesyłania strumieniowego ze strukturą w interfejsie użytkownika platformy Spark

Podaj unikatową nazwę zapytania, dodając .queryName(<query-name>) do writeStream kodu, aby łatwo odróżnić metryki, do których należy strumień w interfejsie użytkownika platformy Spark.

Wypychanie metryk przesyłania strumieniowego ze strukturą do usług zewnętrznych

Metryki przesyłania strumieniowego można wypychać do usług zewnętrznych na potrzeby zgłaszania alertów lub pulpitów nawigacyjnych przy użyciu interfejsu odbiornika zapytań przesyłania strumieniowego platformy Apache Spark. W środowisku Databricks Runtime 11.3 LTS i nowszym odbiornik zapytań przesyłania strumieniowego jest dostępny w językach Python i Scala.

Ważne

Nie można używać poświadczeń i obiektów zarządzanych przez wykaz aparatu Unity w StreamingQueryListener logice.

Uwaga

Opóźnienie przetwarzania z odbiornikami może znacząco wpływać na szybkość przetwarzania zapytań. Zaleca się ograniczenie logiki przetwarzania w tych odbiornikach i wybranie pisania do systemów szybkiego reagowania, takich jak Kafka, w celu zwiększenia wydajności.

Poniższy kod zawiera podstawowe przykłady składni implementowania odbiornika:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definiowanie obserwowalnych metryk w strumieniu ze strukturą

Obserwowane metryki są nazywane dowolnymi funkcjami agregacji, które można zdefiniować w zapytaniu (DataFrame). Gdy tylko wykonanie ramki danych osiągnie punkt ukończenia (czyli kończy zapytanie wsadowe lub osiąga epokę przesyłania strumieniowego), emitowane jest nazwane zdarzenie zawierające metryki dla danych przetworzonych od ostatniego punktu ukończenia.

Możesz obserwować te metryki, dołączając odbiornik do sesji platformy Spark. Odbiornik zależy od trybu wykonywania:

  • Tryb wsadowy: użyj polecenia QueryExecutionListener.

    QueryExecutionListener jest wywoływana po zakończeniu zapytania. Uzyskaj dostęp do metryk przy użyciu QueryExecution.observedMetrics mapy.

  • Przesyłanie strumieniowe lub mikrobajt: użyj polecenia StreamingQueryListener.

    StreamingQueryListener jest wywoływana, gdy zapytanie przesyłane strumieniowo kończy epokę. Uzyskaj dostęp do metryk przy użyciu StreamingQueryProgress.observedMetrics mapy. Usługa Azure Databricks nie obsługuje przesyłania strumieniowego ciągłego wykonywania.

Na przykład:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Metryki obiektu StreamingQueryListener

Metryczne opis
id Unikatowy identyfikator zapytania, który utrzymuje się podczas ponownego uruchamiania.
runId Identyfikator zapytania, który jest unikatowy dla każdego uruchomienia/ponownego uruchomienia. Zobacz StreamingQuery.runId().
name Określona przez użytkownika nazwa zapytania. Nazwa ma wartość null, jeśli nie określono żadnej nazwy.
timestamp Sygnatura czasowa wykonania mikrobajtu.
batchId Unikatowy identyfikator bieżącej partii przetwarzanych danych. W przypadku ponownych prób po awarii dany identyfikator partii może zostać wykonany więcej niż raz. Podobnie, gdy nie ma danych do przetworzenia, identyfikator partii nie jest zwiększany.
numInputRows Zagregowana (we wszystkich źródłach) liczba rekordów przetworzonych w wyzwalaczu.
inputRowsPerSecond Zagregowany (we wszystkich źródłach) współczynnik przychodzących danych.
processedRowsPerSecond Szybkość agregacji (we wszystkich źródłach), z jaką platforma Spark przetwarza dane.

durationMs, obiekt

Informacje o czasie potrzebnym na ukończenie różnych etapów procesu wykonywania mikrobajtów.

Metryczne opis
durationMs.addBatch Czas potrzebny na wykonanie mikrobajtu. Wyklucza to czas, przez który platforma Spark planuje mikrobajt.
durationMs.getBatch Czas potrzebny na pobranie metadanych dotyczących przesunięć ze źródła.
durationMs.latestOffset Najnowsze przesunięcie używane dla mikrobabajtu. Ten obiekt postępu odnosi się do czasu potrzebnego na pobranie najnowszego przesunięcia ze źródeł.
durationMs.queryPlanning Czas potrzebny na wygenerowanie planu wykonania.
durationMs.triggerExecution Czas potrzebny na zaplanowanie i wykonanie mikrobajtu.
durationMs.walCommit Czas potrzebny na zatwierdzenie nowych dostępnych przesunięć.

eventTime, obiekt

Informacje o wartości czasu zdarzenia widocznej w danych przetwarzanych w mikrobajtach. Te dane są używane przez znak wodny, aby dowiedzieć się, jak przyciąć stan przetwarzania agregacji stanowych zdefiniowanych w zadaniu przesyłania strumieniowego ze strukturą.

Metryczne opis
eventTime.avg Średni czas zdarzenia widoczny w tym wyzwalaczu.
eventTime.max Maksymalny czas zdarzenia widoczny w tym wyzwalaczu.
eventTime.min Minimalny czas zdarzenia widoczny w tym wyzwalaczu.
eventTime.watermark Wartość znaku wodnego używanego w tym wyzwalaczu.

stateOperators, obiekt

Informacje o operacjach stanowych zdefiniowanych w zadaniu przesyłania strumieniowego ze strukturą i agregacjach generowanych z nich.

Metryczne opis
stateOperators.operatorName Nazwa operatora stanowego, do którego odnoszą się metryki, takie jak symmetricHashJoin, dedupe, . stateStoreSave
stateOperators.numRowsTotal Całkowita liczba wierszy w stanie w wyniku operatora stanowego lub agregacji.
stateOperators.numRowsUpdated Całkowita liczba wierszy zaktualizowanych w stanie w wyniku operatora stanowego lub agregacji.
stateOperators.allUpdatesTimeMs Ta metryka nie jest obecnie wymierna przez platformę Spark i planuje zostać usunięta w przyszłych aktualizacjach.
stateOperators.numRowsRemoved Całkowita liczba wierszy usuniętych ze stanu w wyniku operatora stanowego lub agregacji.
stateOperators.allRemovalsTimeMs Ta metryka nie jest obecnie wymierna przez platformę Spark i planuje zostać usunięta w przyszłych aktualizacjach.
stateOperators.commitTimeMs Czas potrzebny na zatwierdzenie wszystkich aktualizacji (umieszcza i usuwa) i zwraca nową wersję.
stateOperators.memoryUsedBytes Pamięć używana przez magazyn stanów.
stateOperators.numRowsDroppedByWatermark Liczba wierszy, które są uznawane za za późno, aby zostały uwzględnione w agregacji stanowej. Tylko agregacje przesyłania strumieniowego: liczba wierszy porzuconych po agregacji (nie nieprzetworzonych wierszy wejściowych). Ta liczba nie jest dokładna, ale wskazuje, że są porzucane opóźnione dane.
stateOperators.numShufflePartitions Liczba partycji mieszania dla tego operatora stanowego.
stateOperators.numStateStoreInstances Rzeczywiste wystąpienie magazynu stanów, które operator zainicjował i konserwował. W przypadku wielu operatorów stanowych jest to samo co liczba partycji. Jednak sprzężenia strumienia strumienia inicjują cztery wystąpienia magazynu stanów na partycję.

stateOperators.customMetrics, obiekt

Informacje zebrane z bazy danych RocksDB przechwytujące metryki dotyczące wydajności i operacji w odniesieniu do wartości stanowych, które są przechowywane dla zadania przesyłania strumieniowego ze strukturą. Aby uzyskać więcej informacji, zobacz Configure RocksDB state store on Azure Databricks (Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks).

Metryczne opis
customMetrics.rocksdbBytesCopied Liczba bajtów skopiowanych przez Menedżera plików Bazy danych RocksDB.
customMetrics.rocksdbCommitCheckpointLatency Czas w milisekundach tworzenia migawki natywnej bazy danych RocksDB i zapisywania go w katalogu lokalnym.
customMetrics.rocksdbCompactLatency Czas w milisekundach kompaktowanie (opcjonalnie) podczas zatwierdzania punktu kontrolnego.
customMetrics.rocksdbCommitFileSyncLatencyMs Czas w milisekundach synchronizowania natywnej migawki bazy danych RocksDB z magazynem zewnętrznym (lokalizacja punktu kontrolnego).
customMetrics.rocksdbCommitFlushLatency Czas w milisekundach opróżniania danych RocksDB w pamięci zmienia się na dysk lokalny.
customMetrics.rocksdbCommitPauseLatency Czas w milisekundach zatrzymywania wątków procesu roboczego w tle w ramach zatwierdzenia punktu kontrolnego, takiego jak kompaktowanie.
customMetrics.rocksdbCommitWriteBatchLatency Czas w milisekundach stosowania przygotowanych zapisów w strukturze w pamięci (WriteBatch) do natywnej bazy danych RocksDB.
customMetrics.rocksdbFilesCopied Liczba plików skopiowanych zgodnie z śledzeniem przez Menedżera plików Bazy danych RocksDB.
customMetrics.rocksdbFilesReused Liczba ponownie użytych plików, które są śledzone przez Menedżera plików Bazy danych RocksDB.
customMetrics.rocksdbGetCount Liczba wywołań get bazy danych (nie obejmuje gets WriteBatch operacji wsadowych w pamięci używanych do przejściowych zapisów).
customMetrics.rocksdbGetLatency Średni czas w nanosekundach dla bazowego wywołania natywnego RocksDB::Get .
customMetrics.rocksdbReadBlockCacheHitCount Liczba trafień pamięci podręcznej z pamięci podręcznej bloku w bazie danych RocksDB, które są przydatne w unikaniu odczytów dysku lokalnego.
customMetrics.rocksdbReadBlockCacheMissCount Liczba blokowej pamięci podręcznej w bazie danych RocksDB nie jest przydatna w unikaniu odczytów dysku lokalnego.
customMetrics.rocksdbSstFileSize Rozmiar wszystkich plików tabeli sortowanej statycznej (SST) — struktura tabelaryczna RocksDB używa do przechowywania danych.
customMetrics.rocksdbTotalBytesRead Liczba nieskompresowanych bajtów odczytanych według get operacji.
customMetrics.rocksdbTotalBytesReadByCompaction Liczba bajtów odczytywanych przez proces kompaktowania z dysku.
customMetrics.rocksdbTotalBytesReadThroughIterator Całkowita liczba bajtów nieskompresowanych danych odczytanych przy użyciu iteratora. Niektóre operacje stanowe (na przykład przetwarzanie limitu czasu i FlatMapGroupsWithState znakowanie wodne) wymagają odczytywania danych w bazie danych za pośrednictwem iteratora.
customMetrics.rocksdbTotalBytesWritten Całkowita liczba nieskompresowanych bajtów zapisanych przez put operacje.
customMetrics.rocksdbTotalBytesWrittenByCompaction Całkowita liczba bajtów zapisu w procesie kompaktowania na dysku.
customMetrics.rocksdbTotalCompactionLatencyMs Czas w milisekundach kompaktowania bazy danych RocksDB, w tym kompaktowanie w tle i opcjonalna kompaktowanie zainicjowane podczas zatwierdzania.
customMetrics.rocksdbTotalFlushLatencyMs Łączny czas opróżniania, w tym opróżnianie tła. Operacje opróżniania to procesy, za pomocą których MemTable opróżniane są do magazynu po ich zapełnieniu. MemTables to pierwszy poziom, na którym dane są przechowywane w bazie danych RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed Rozmiar w bajtach nieskompresowanych plików zip zgłoszony przez Menedżera plików. Menedżer plików zarządza fizycznym wykorzystaniem i usunięciem miejsca na dysku pliku SST.

sources object (Kafka)

Metryczne opis
sources.description Szczegółowy opis źródła platformy Kafka określający dokładny temat platformy Kafka odczytywany. Na przykład: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset sprzeciwiać się Numer przesunięcia początkowego w temacie platformy Kafka, w którym uruchomiono zadanie przesyłania strumieniowego.
sources.endOffset sprzeciwiać się Ostatnie przesunięcie przetworzone przez mikrobajt. Może to być równe latestOffset dla trwającego wykonywania mikrobajta.
sources.latestOffset sprzeciwiać się Najnowsze przesunięcie obliczone przez mikrobabajt. Proces mikrobatchingu może nie przetwarzać wszystkich przesunięć, gdy występuje ograniczanie przepustowości, co skutkuje endOffset różnicą i latestOffset różnicą.
sources.numInputRows Liczba wierszy wejściowych przetworzonych z tego źródła.
sources.inputRowsPerSecond Szybkość, z jaką dane docierają do przetwarzania z tego źródła.
sources.processedRowsPerSecond Szybkość przetwarzania danych z tego źródła przez platformę Spark.

sources.metrics, obiekt (Kafka)

Metryczne opis
sources.metrics.avgOffsetsBehindLatest Średnia liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów.
sources.metrics.estimatedTotalBytesBehindLatest Szacowana liczba bajtów, z których proces zapytania nie korzysta z subskrybowanych tematów.
sources.metrics.maxOffsetsBehindLatest Maksymalna liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów.
sources.metrics.minOffsetsBehindLatest Minimalna liczba przesunięć, które zapytanie przesyłane strumieniowo znajduje się za najnowszym dostępnym przesunięciem wśród wszystkich subskrybowanych tematów.

obiekt ujścia (Kafka)

Metryczne opis
sink.description Opis ujścia platformy Kafka, do którego pisze zapytanie przesyłane strumieniowo, szczegółowo opis używanej implementacji ujścia platformy Kafka. Na przykład: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Liczba wierszy zapisanych w tabeli wyjściowej lub ujściu w ramach mikrobatchu. W niektórych sytuacjach ta wartość może mieć wartość "-1" i ogólnie może być interpretowana jako "nieznana".

obiekt sources (Delta Lake)

Metryczne opis
sources.description Opis źródła, z którego odczytuje zapytanie przesyłane strumieniowo. Na przykład: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Wersja serializacji, za pomocą której to przesunięcie jest zakodowane.
sources.[startOffset/endOffset].reservoirId Identyfikator odczytywanej tabeli. Służy do wykrywania błędnej konfiguracji podczas ponownego uruchamiania zapytania.
sources.[startOffset/endOffset].reservoirVersion Wersja tabeli, która jest obecnie przetwarzana.
sources.[startOffset/endOffset].index Indeks w sekwencji AddFiles w tej wersji. Służy do dzielenia dużych zatwierdzeń na wiele partii. Ten indeks jest tworzony przez sortowanie według modificationTimestamp i path.
sources.[startOffset/endOffset].isStartingVersion Określa, czy bieżące przesunięcie oznacza początek nowego zapytania przesyłania strumieniowego, a nie przetwarzanie zmian, które wystąpiły po przetworzeniu początkowych danych. Podczas uruchamiania nowego zapytania wszystkie dane obecne w tabeli na początku są najpierw przetwarzane, a następnie wszelkie nowe dane, które docierają.
sources.latestOffset Najnowsze przesunięcie przetworzone przez zapytanie mikrobatch.
sources.numInputRows Liczba wierszy wejściowych przetworzonych z tego źródła.
sources.inputRowsPerSecond Szybkość, z jaką dane docierają do przetwarzania z tego źródła.
sources.processedRowsPerSecond Szybkość przetwarzania danych z tego źródła przez platformę Spark.
sources.metrics.numBytesOutstanding Łączny rozmiar zaległych plików (pliki śledzone przez bazę danych RocksDB). Jest to metryka listy prac dla funkcji delta i automatycznego modułu ładującego jako źródło przesyłania strumieniowego.
sources.metrics.numFilesOutstanding Liczba zaległych plików do przetworzenia. Jest to metryka listy prac dla funkcji delta i automatycznego modułu ładującego jako źródło przesyłania strumieniowego.

obiekt ujścia (Delta Lake)

Metryczne opis
sink.description Opis ujścia delty, szczegółowo opis używanej implementacji ujścia delty. Na przykład: “DeltaSink[table]”.
sink.numOutputRows Liczba wierszy jest zawsze "-1", ponieważ platforma Spark nie może wywnioskować wierszy wyjściowych dla ujścia DSv1, czyli klasyfikacji ujścia usługi Delta Lake.

Przykłady

Przykładowe zdarzenie Kafka-to-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Przykładowe zdarzenie Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Przykładowe zdarzenie Kinesis-to-Delta Lake StreamingQueryListener

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Przykładowe zdarzenie Platformy Kafka+usługa Delta Lake do usługi Delta Lake StreamingQueryListener

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Przykładowe źródło współczynnika dla zdarzenia Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}