Fonctions table de diffusion en continu read_pulsar
S’applique à : Databricks SQL Databricks Runtime 14.1 et versions ultérieures
Important
Cette fonctionnalité est disponible en préversion publique.
Retourne une table avec des enregistrements lus depuis Pulsar.
Cette fonction table ne prend en charge que la diffusion en continu et non la requête par lots.
Syntaxe
read_pulsar ( { option_key => option_value } [, ...] )
Arguments
Cette fonction nécessite un appel de paramètre nommé pour les clés d’option.
Les options serviceUrl
et topic
sont obligatoires.
Les descriptions des arguments sont brèves ici. Consultez la documentation flux structuré Pulsar pour obtenir des descriptions étendues.
Option | Type | Default | Description |
---|---|---|---|
serviceUrl | STRING | Obligatoire | URI du service Pulsar. |
topic | STRING | Obligatoire | Rubrique depuis laquelle lire. |
predefinedSubscription | STRING | Aucun | Nom d’abonnement prédéfini utilisé par le connecteur pour suivre la progression de l’application Spark. |
subscriptionPrefix | STRING | Aucun | Préfixe utilisé par le connecteur pour générer un abonnement aléatoire pour suivre la progression de l’application Spark. |
pollTimeoutMs | LONG | 120 000 | Délai d’attente de lecture des messages à partir de Pulsar en millisecondes. |
failOnDataLoss | BOOLEAN | true | Contrôle l’échec d’une requête lorsque les données sont perdues (par exemple, les rubriques sont supprimées ou les messages sont supprimés en raison d’une stratégie de rétention). |
startingOffsets | STRING | latest | Point de départ lorsqu’une requête est démarrée, soit la plus ancienne, la plus récente, ou une chaîne JSON qui spécifie un décalage spécifique. S’il s’agit de la plus récente, le lecteur lit les enregistrements les plus récents après le début de son exécution. S’il s’agit de la plus ancienne, le lecteur lit à partir du décalage le plus ancien. L’utilisateur peut également spécifier une chaîne JSON qui spécifie un décalage spécifique. |
startingTime | STRING | Aucun | Quand elle est spécifiée, la source Pulsar lit les messages depuis la position du startingTime spécifiée. |
Les arguments suivants sont utilisés pour l’authentification du client pulsar :
Option | Type | Default | Description |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | Aucun | Nom du plug-in d’authentification. |
pulsarClientAuthParams | STRING | Aucun | Paramètres du plug-in d’authentification. |
pulsarClientUseKeyStoreTls | STRING | Aucun | Indique s’il faut utiliser KeyStore pour l’authentification tls. |
pulsarClientTlsTrustStoreType | STRING | Aucun | Type de fichier TrustStore pour l’authentification tls. |
pulsarClientTlsTrustStorePath | STRING | Aucun | Chemin d’accès du fichier TrustStore pour l’authentification tls. |
pulsarClientTlsTrustStorePassword | STRING | Aucun | Mot de passe TrustStore pour l’authentification tls. |
Ces arguments sont utilisés pour la configuration et l’authentification du contrôle d’admission pulsar. La configuration d’administration pulsar est requise uniquement lorsque le contrôle d’admission est activé (lorsque maxBytesPerTrigger est défini)
Option | Type | Default | Description |
---|---|---|---|
maxBytesPerTrigger | BIGINT | Aucun | Limite logicielle du nombre maximal d’octets que nous voulons traiter par microbatch. Si cette valeur est spécifiée, la valeur admin.url doit également être spécifiée. |
adminUrl | STRING | Aucun | Configuration de serviceHttpUrl Pulsar. Nécessaire uniquement lorsque maxBytesPerTrigger est spécifié. |
pulsarAdminAuthPlugin | STRING | Aucun | Nom du plug-in d’authentification. |
pulsarAdminAuthParams | STRING | Aucun | Paramètres du plug-in d’authentification. |
pulsarClientUseKeyStoreTls | STRING | Aucun | Indique s’il faut utiliser KeyStore pour l’authentification tls. |
pulsarAdminTlsTrustStoreType | STRING | Aucun | Type de fichier TrustStore pour l’authentification tls. |
pulsarAdminTlsTrustStorePath | STRING | Aucun | Chemin d’accès du fichier TrustStore pour l’authentification tls. |
pulsarAdminTlsTrustStorePassword | STRING | Aucun | Mot de passe TrustStore pour l’authentification tls. |
Retours
Table des enregistrements pulsar avec le schéma suivant.
__key STRING NOT NULL
: clé de message Pulsar.value BINARY NOT NULL
: valeur du message Pulsar.Remarque : pour les rubriques avec un schéma Avro ou JSON, au lieu de charger du contenu dans un champ de valeur binaire, le contenu sera développé pour conserver les noms de champ et les types de champ de la rubrique Pulsar.
__topic STRING NOT NULL
: nom de la rubrique Pulsar.__messageId BINARY NOT NULL
: ID du message Pulsar.__publishTime TIMESTAMP NOT NULL
: heure de publication du message Pulsar.__eventTime TIMESTAMP NOT NULL
: heure de l’évènement de message Pulsar.__messageProperties MAP<STRING, STRING>
: propriétés du message Pulsar.
Exemples
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.