Surveillance des requêtes de Structured Streaming sur Azure Databricks
Azure Databricks fournit une surveillance intégrée pour les applications de Structured Streaming via l’interface utilisateur de Spark sous l’onglet Streaming.
Distinguer les requêtes de Structured Streaming dans l’interface utilisateur de Spark
Donnez à vos flux un nom de requête unique en ajoutant .queryName(<query-name>)
à votre code writeStream
pour distinguer facilement quelles métriques appartiennent à quel flux dans l’interface utilisateur de Spark.
Envoyer (push) des métriques de Structured Streaming à des services externes
Les métriques de streaming peuvent être poussées vers des services externes pour des cas d'alerte ou d'utilisation de tableaux de bord à l’aide de l’interface de listener de requêtes de streaming d’Apache Spark. Dans Databricks Runtime 11.3 LTS et versions ultérieures, StreamingQueryListener
est disponible en Python et Scala.
Important
Les limitations suivantes s’appliquent aux charges de travail à l’aide des modes d’accès au calcul compatibles avec le catalogue Unity :
-
StreamingQueryListener
nécessite Databricks Runtime 15.1 ou une version ultérieure pour utiliser des informations d’identification ou interagir avec des objets gérés par le Catalogue Unity en mode d'accès dédié sur le calcul. -
StreamingQueryListener
nécessite Databricks Runtime 16.1 ou ultérieur pour les charges de travail Scala configurées avec le mode d’accès standard (anciennement mode d’accès partagé).
Remarque
La latence de traitement avec les écouteurs peut affecter considérablement les vitesses de traitement des requêtes. Il est conseillé de limiter la logique de traitement dans ces écouteurs et d’opter pour l’écriture dans des systèmes de réponse rapide comme Kafka pour une efficacité.
Le code suivant fournit des exemples simples de la syntaxe à utiliser pour implémenter un écouteur :
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Définition de métriques observables dans Structured Streaming
Les métriques observables sont des fonctions d’agrégation arbitraires nommées qui peuvent être définies sur une requête (DataFrame). Dès que l’exécution d’un DataFrame atteint un point d’achèvement (c’est-à-dire termine une requête par lot ou atteint une époque de streaming), un événement nommé est émis, qui contient les métriques pour les données traitées depuis le dernier point d’achèvement.
Vous pouvez observer ces métriques en attachant un écouteur à la session Spark. L’écouteur dépend du mode d’exécution :
Mode par lot : Utilisez
QueryExecutionListener
.QueryExecutionListener
est appelé quand la requête est terminée. Accédez aux métriques en utilisant la carteQueryExecution.observedMetrics
.Diffusion en continu ou microbatch : Utiliser
StreamingQueryListener
.StreamingQueryListener
est appelé quand la requête de streaming termine une époque. Accédez aux métriques en utilisant la carteStreamingQueryProgress.observedMetrics
. Azure Databricks ne prend pas en charge le mode déclencheurcontinuous
pour la diffusion en continu.
Par exemple :
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Carte des identifiants de table de mesures re :[UC], re :[Delta], et re :[SS]
les métriques re :[SS] utilisent le champ reservoirId
à plusieurs endroits pour l’identité unique d’une table Delta utilisée comme source pour une requête de diffusion en continu.
Le champ reservoirId
mappe l’identificateur unique stocké par la table Delta dans le journal des transactions Delta. Cet ID ne correspond pas à la valeur tableId
attribuée par re :[UC] et affichée dans Catalog Explorer.
Utilisez la syntaxe suivante pour passer en revue l’identificateur de table d’une table Delta. Cela fonctionne pour les tables gérées par le catalogue Unity, les tables externes du catalogue Unity et toutes les tables Delta du metastore Hive :
DESCRIBE DETAIL <table-name>
Le champ id
affiché dans les résultats est l’identificateur qui correspond au reservoirId
dans les métriques de streaming.
Métriques d’objet StreamingQueryListener
Métrique | Description |
---|---|
id |
ID de requête unique qui persiste entre les redémarrages. |
runId |
ID de requête unique pour chaque démarrage/redémarrage. Voir StreamingQuery.runId(). |
name |
Nom spécifié par l’utilisateur de la requête. Le nom est nul si aucun nom n’est spécifié. |
timestamp |
Horodatage pour l’exécution du microbatch. |
batchId |
ID unique pour le lot actuel de données en cours de traitement. Dans le cas de nouvelles tentatives après un échec, un ID de lot donné peut être exécuté plusieurs fois. De même, lorsqu’il n’y a pas de données à traiter, l’ID de lot n’est pas incrémenté. |
numInputRows |
Nombre d’enregistrements agrégés (sur toutes les sources) traités dans un déclencheur. |
inputRowsPerSecond |
Taux d’agrégation (sur toutes les sources) des données arrivantes. |
processedRowsPerSecond |
Taux d’agrégation (sur toutes les sources) auquel Spark traite les données. |
Objet durationMs
Informations sur le temps nécessaire pour effectuer différentes étapes du processus d’exécution de microbatch.
Métrique | Description |
---|---|
durationMs.addBatch |
Temps nécessaire pour exécuter le microbatch. Cela exclut le temps nécessaire à Spark pour planifier le micro-lot. |
durationMs.getBatch |
Temps nécessaire pour récupérer les métadonnées sur les décalages de la source. |
durationMs.latestOffset |
Le décalage le plus récent consommé pour le micro-lot. Cet objet de progression fait référence au temps nécessaire pour récupérer le dernier offset des sources. |
durationMs.queryPlanning |
Temps nécessaire pour générer le plan d’exécution. |
durationMs.triggerExecution |
Temps nécessaire pour planifier et exécuter le microbatch. |
durationMs.walCommit |
Temps nécessaire pour valider les nouveaux décalages disponibles. |
Objet eventTime
Informations sur la valeur d’heure de l’événement observée dans les données traitées dans le microbatch. Ces données sont utilisées par le filigrane pour déterminer comment découper l’état pour le traitement des agrégations d'état définies dans la tâche Structured Streaming.
Métrique | Description |
---|---|
eventTime.avg |
Temps moyen d’événement observé dans ce déclencheur. |
eventTime.max |
Durée maximale de l’événement observée dans ce déclencheur. |
eventTime.min |
Durée minimale de l’événement observée dans ce déclencheur. |
eventTime.watermark |
Valeur du filigrane utilisé dans ce déclencheur. |
Objet stateOperators
Informations sur les opérations avec état définies dans le travail Structured Streaming et les agrégations produites à partir de celles-ci.
Métrique | Description |
---|---|
stateOperators.operatorName |
Nom de l’opérateur avec état auquel les métriques sont liées, telles que symmetricHashJoin , dedupe , stateStoreSave . |
stateOperators.numRowsTotal |
Nombre total de lignes dans l’état à la suite d’un opérateur ou d’une agrégation avec état. |
stateOperators.numRowsUpdated |
Nombre total de lignes mises à jour dans l’état à la suite d’un opérateur ou d’une agrégation avec état. |
stateOperators.allUpdatesTimeMs |
Cette métrique n’est actuellement pas mesurable par Spark et est prévue pour être supprimée dans les prochaines mises à jour. |
stateOperators.numRowsRemoved |
Nombre total de lignes supprimées de l’état à la suite d’un opérateur ou d’une agrégation avec état. |
stateOperators.allRemovalsTimeMs |
Cette métrique n’est actuellement pas mesurable par Spark et est prévue pour être supprimée dans les prochaines mises à jour. |
stateOperators.commitTimeMs |
Temps nécessaire pour valider toutes les mises à jour (place et suppressions) et retourner une nouvelle version. |
stateOperators.memoryUsedBytes |
Mémoire utilisée par le magasin d’état. |
stateOperators.numRowsDroppedByWatermark |
Nombre de lignes considérées comme trop tardives à inclure dans une agrégation avec état. Agrégations de diffusion en continu uniquement : nombre de lignes supprimées après l’agrégation (pas de lignes d’entrée brutes). Ce nombre n’est pas précis, mais indique qu’il existe des données tardives supprimées. |
stateOperators.numShufflePartitions |
Nombre de partitions aléatoires pour cet opérateur avec état. |
stateOperators.numStateStoreInstances |
Instance réelle du magasin d’états que l’opérateur a initialisée et conservée. Pour de nombreux opérateurs avec état, il s’agit du même que le nombre de partitions. Toutefois, les jointures stream-stream initialisent quatre instances de magasin d’état par partition. |
Objet stateOperators.customMetrics
Information collectée à partir de la capture de métriques de RocksDB concernant ses performances et ses opérations par rapport aux valeurs d'état qu'il gère pour la tâche de Structured Streaming. Pour plus d’informations, consultez Configurer le magasin d’état RocksDB sur Azure Databricks.
Métrique | Description |
---|---|
customMetrics.rocksdbBytesCopied |
Le nombre d’octets copiés comme suivi par le gestionnaire de fichiers RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
Le temps en millisecondes prenant un instantané de RocksDB natif et l’écrivez dans un répertoire local. |
customMetrics.rocksdbCompactLatency |
Le temps de compactage en millisecondes (facultatif) pendant la validation du point de contrôle. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Le temps en millisecondes de synchronisation de l’instantané RocksDB natif vers le stockage externe (emplacement de point de contrôle). |
customMetrics.rocksdbCommitFlushLatency |
Le temps en millisecondes de vidage des modifications en mémoire de la base de données RocksDB sur le disque local. |
customMetrics.rocksdbCommitPauseLatency |
La durée en millisecondes d’arrêt des threads de travail en arrière-plan dans le cadre de la validation de point de contrôle, par exemple pour le compactage. |
customMetrics.rocksdbCommitWriteBatchLatency |
Le temps en millisecondes appliquant les écritures intermédiaires dans la structure en mémoire (WriteBatch ) dans la base de données rocksDB native. |
customMetrics.rocksdbFilesCopied |
Le nombre de fichiers copiés comme suivi par le gestionnaire de fichiers RocksDB. |
customMetrics.rocksdbFilesReused |
Le nombre de fichiers réutilisés d’après le suivi du gestionnaire de fichiers RocksDB. |
customMetrics.rocksdbGetCount |
Le nombre d’appels get à la base de données (n’inclut pas gets de WriteBatch - lot en mémoire utilisé pour les écritures intermédiaires). |
customMetrics.rocksdbGetLatency |
Le délai moyen en nanosecondes pour l’appel natif RocksDB::Get sous-jacent. |
customMetrics.rocksdbReadBlockCacheHitCount |
Le nombre d’accès au cache à partir du cache de blocs dans RocksDB qui sont utiles pour éviter les lectures de disque local. |
customMetrics.rocksdbReadBlockCacheMissCount |
Le nombre de caches de blocs dans RocksDB n’est pas utile pour éviter les lectures de disque local. |
customMetrics.rocksdbSstFileSize |
Taille de tous les fichiers SST (Static Sorted Table) - la structure tabulaire que RocksDB utilise pour stocker des données. |
customMetrics.rocksdbTotalBytesRead |
Le nombre d’octets non compressés lus par les opérations get . |
customMetrics.rocksdbTotalBytesReadByCompaction |
Nombre d’octets lus par le processus de compactage à partir du disque. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Le nombre total d’octets de données non compressées lues à l’aide d’un itérateur. Certaines opérations avec état (par exemple, le traitement du délai d’attente dans FlatMapGroupsWithState et le filigrane) nécessitent la lecture de données dans la base de données via un itérateur. |
customMetrics.rocksdbTotalBytesWritten |
Le nombre total d’octets non compressés écrits par les opérations put . |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Le nombre total d’octets que le processus de compactage écrit sur le disque. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Le temps nécessaire pour les compactages par RocksDB, y compris les compactages d’arrière-plan et le compactage facultatif lancé pendant la validation. (en millisecondes) |
customMetrics.rocksdbTotalFlushLatencyMs |
Le temps de vidage, y compris le vidage en arrière-plan. Les opérations de vidage sont des processus par lesquels le MemTable est vidé vers le stockage une fois qu’il’est plein.
MemTables sont le premier niveau où les données sont stockées dans RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
La taille en octets des fichiers zip non compressés, comme indiqué par le gestionnaire de fichiers. Le gestionnaire de fichiers gère l’utilisation et la suppression de l’espace disque du fichier SST physiques. |
Objet sources (Kafka)
Métrique | Description |
---|---|
sources.description |
Une description détaillée de la source Kafka, en spécifiant la rubrique Kafka exacte à partir de laquelle il est lu. Par exemple : “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
l'objet sources.startOffset |
Numéro de décalage de départ dans la rubrique Kafka à laquelle la tâche de diffusion en continu a démarré. |
l'objet sources.endOffset |
Dernier décalage traité par le microbatch. Cela peut être identique à latestOffset pour une exécution de micro-lot en cours. |
l'objet sources.latestOffset |
Le dernier décalage calculé par le microbatch. Le processus de microbatching peut ne pas traiter tous les décalages en cas de limitation, ce qui entraîne une différenciation entre endOffset et latestOffset . |
sources.numInputRows |
Le nombre de lignes d’entrée traitées à partir de cette source. |
sources.inputRowsPerSecond |
Le taux auquel les données arrivent pour le traitement à partir de cette source. |
sources.processedRowsPerSecond |
Le taux auquel Spark traite les données de cette source. |
Objet sources.metrics (Kafka)
Métrique | Description |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Nombre moyen de décalages par lesquels la requête en streaming est en retard par rapport au dernier décalage disponible de tous les sujets abonnés. |
sources.metrics.estimatedTotalBytesBehindLatest |
Le nombre estimé d’octets que le processus de requête n’a pas consommés à partir des rubriques souscrites. |
sources.metrics.maxOffsetsBehindLatest |
Nombre maximal de décalages que la requête de diffusion en continu est en retard par rapport au dernier décalage disponible parmi tous les sujets abonnés. |
sources.metrics.minOffsetsBehindLatest |
Nombre minimal de décalages que la requête de diffusion en continu est derrière le dernier décalage disponible parmi toutes les rubriques abonnées. |
Objet sink (Kafka)
Métrique | Description |
---|---|
sink.description |
La description du récepteur Kafka dans lequel la requête de diffusion en continu écrit, détaillant l’implémentation spécifique du récepteur Kafka utilisée. Par exemple : “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Nombre de lignes écrites dans la table de sortie ou le récepteur dans le cadre du microbatch. Dans certaines situations, cette valeur peut être « -1 » et peut généralement être interprétée comme « inconnue ». |
Objet sources (Delta Lake)
Métrique | Description |
---|---|
sources.description |
La description de la source à partir de laquelle la requête de streaming lit. Par exemple : “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
La version de la sérialisation avec laquelle ce décalage est encodé. |
sources.[startOffset/endOffset].reservoirId |
ID de la table en cours de lecture. Cela permet de détecter une configuration incorrecte lors du redémarrage d’une requête. Consultez la carte pour les identificateurs de tableau des métriques re :[UC], re :[Delta], et re :[SS]. |
sources.[startOffset/endOffset].reservoirVersion |
Version de la table en cours de traitement. |
sources.[startOffset/endOffset].index |
L’index dans la séquence de AddFiles dans cette version. Cela permet de décomposer les validations volumineuses en plusieurs lots. Cet index est créé en triant à l’aide de modificationTimestamp et path . |
sources.[startOffset/endOffset].isStartingVersion |
Identifie si le décalage actuel marque le début d’une nouvelle requête de streaming plutôt que le traitement des modifications qui se sont produites après le traitement des données initiales. Lors du démarrage d’une nouvelle requête, toutes les données présentes dans la table au début sont traitées en premier, puis toutes les nouvelles données qui arrivent. |
sources.latestOffset |
Décalage le plus récent traité par la requête de microbatch. |
sources.numInputRows |
Le nombre de lignes d’entrée traitées à partir de cette source. |
sources.inputRowsPerSecond |
Le taux auquel les données arrivent pour le traitement à partir de cette source. |
sources.processedRowsPerSecond |
Le taux auquel Spark traite les données de cette source. |
sources.metrics.numBytesOutstanding |
La taille combinée des fichiers en attente (fichiers suivis par RocksDB). Il s’agit de la métrique du backlog lorsque Delta et Auto Loader sont la source de streaming. |
sources.metrics.numFilesOutstanding |
Le nombre de fichiers en attente à traiter. Il s’agit de la métrique du backlog lorsque Delta et Auto Loader sont la source de streaming. |
Objet sink (Delta Lake)
Métrique | Description |
---|---|
sink.description |
La description du récepteur Delta, détaillant l’implémentation spécifique du récepteur Delta utilisée. Par exemple : “DeltaSink[table]” . |
sink.numOutputRows |
Le nombre de lignes est toujours « -1 », car Spark ne peut pas déduire les lignes de sortie pour les récepteurs DSv1, qui est la classification du récepteur Delta Lake. |
Exemples
Exemple d’événement Kafka-to-Kafka avec StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Exemple d’événement Delta Lake-to-Delta Lake avec StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Exemple d’événement Delta Lake StreamingQueryListener
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Exemple d’événement Kafka+Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Exemple de mesures pour un événement source vers Delta Lake avec StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}