read_pulsar
-Streamingtabellenwertfunktionen
Gilt für: Databricks SQL Databricks Runtime 14.1 und höher
Wichtig
Dieses Feature befindet sich in der Public Preview.
Gibt eine Tabelle mit aus Pulsar gelesenen Datensätzen zurück.
Diese Tabellenwertfunktion unterstützt nur Streaming und keine Batchabfragen.
Syntax
read_pulsar ( { option_key => option_value } [, ...] )
Argumente
Diese Funktion erfordert einen Aufruf benannter Parameter für die Optionsschlüssel.
Die Optionen serviceUrl
und topic
sind obligatorisch.
Hier werden die Argumente nur kurz beschrieben. Ausführliche Beschreibungen finden Sie in der Dokumentation zu strukturiertem Streaming mit Pulsar.
Option | Type | Standard | Beschreibung |
---|---|---|---|
Dienst-URL | STRING | Obligatorisch. | Der URI des Pulsar-Dienstes. |
topic | STRING | Obligatorisch. | Das Thema, aus dem gelesen werden soll. |
predefinedSubscription | STRING | Keine | Der vordefinierte Abonnementname, der vom Connector verwendet wird, um den Spark-Anwendungsstatus nachzuverfolgen. |
subscriptionPrefix | STRING | Keine | Ein Präfix, das vom Connector verwendet wird, um ein zufälliges Abonnement zu generieren, um den Spark-Anwendungsstatus nachzuverfolgen. |
pollTimeoutMs | LONG | 120000 | Das Timeout zum Lesen von Nachrichten von Pulsar in Millisekunden. |
failOnDataLoss | BOOLEAN | true | Steuert, ob eine Abfrage fehlschlägt, wenn Daten verloren gehen (z. B. wenn Themen gelöscht werden oder Nachrichten aufgrund einer Aufbewahrungsrichtlinie gelöscht werden). |
startingOffsets | STRING | latest | Der Startpunkt, an dem eine Abfrage gestartet wird, entweder „earliest“ (frühester) oder „latest“ (letzter) oder JSON-Zeichenfolge, die einen bestimmten Offset angibt. Wenn „latest“, liest der Leser die neuesten Datensätze, nachdem die Ausführung gestartet wurde. Wenn „earliest“, liest der Leser ab dem frühesten Offset. Der Benutzer kann auch eine JSON-Zeichenfolge angeben, die einen bestimmten Offset festlegt. |
startingTime | STRING | Keine | Bei Angabe dieser Option liest die Pulsar-Quelle Nachrichten ab der Position der angegebenen Startzeit. |
Die folgenden Argumente werden für die Authentifizierung des Pulsar-Clients verwendet:
Option | Type | Standard | Beschreibung |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | Keine | Name des Authentifizierungs-Plug-Ins |
pulsarClientAuthParams | STRING | Keine | Parameter für das Authentifizierungs-Plug-In |
pulsarClientUseKeyStoreTls | STRING | Keine | Gibt an, ob KeyStore für die TLS-Authentifizierung verwendet werden soll |
pulsarClientTlsTrustStoreType | STRING | Keine | TrustStore-Dateityp für TLS-Authentifizierung |
pulsarClientTlsTrustStorePath | STRING | Keine | TrustStore-Dateipfad für TLS-Authentifizierung |
pulsarClientTlsTrustStorePassword | STRING | Keine | TrustStore-Kennwort für TLS-Authentifizierung |
Diese Argumente werden für die Konfiguration und Authentifizierung der Pulsar-Zulassungssteuerung verwendet. Die Pulsar-Administratorkonfiguration ist nur erforderlich, wenn die Zulassungssteuerung aktiviert ist(wenn maxBytesPerTrigger festgelegt wird).
Option | Type | Standard | Beschreibung |
---|---|---|---|
maxBytesPerTrigger | BIGINT | Keine | Eine weiche Grenze der maximalen Anzahl von Bytes, die pro Mikrobatch verarbeitet werden sollen. Wenn dies angegeben wird, muss auch „admin.url“ angegeben werden. |
adminUrl | STRING | Keine | Die Pulsar-serviceHttpUrl-Konfiguration. Nur erforderlich, wenn maxBytesPerTrigger angegeben wird. |
pulsarAdminAuthPlugin | STRING | Keine | Name des Authentifizierungs-Plug-Ins |
pulsarAdminAuthParams | STRING | Keine | Parameter für das Authentifizierungs-Plug-In |
pulsarClientUseKeyStoreTls | STRING | Keine | Gibt an, ob KeyStore für die TLS-Authentifizierung verwendet werden soll |
pulsarAdminTlsTrustStoreType | STRING | Keine | TrustStore-Dateityp für TLS-Authentifizierung |
pulsarAdminTlsTrustStorePath | STRING | Keine | TrustStore-Dateipfad für TLS-Authentifizierung |
pulsarAdminTlsTrustStorePassword | STRING | Keine | TrustStore-Kennwort für TLS-Authentifizierung |
Gibt zurück
Eine Tabelle mit Pulsar-Datensätzen mit dem folgenden Schema:
__key STRING NOT NULL
: Pulsar-Nachrichtenschlüsselvalue BINARY NOT NULL
: Pulsar-NachrichtenwertHinweis: Bei Themen mit Avro- oder JSON-Schema wird der Inhalt nicht in ein Binärwertfeld geladen, sondern erweitert, um die Feldnamen und Feldtypen des Pulsar-Themas beizubehalten.
__topic STRING NOT NULL
: Pulsar-Themenname__messageId BINARY NOT NULL
: Pulsar-Nachrichten-ID__publishTime TIMESTAMP NOT NULL
: Veröffentlichungszeitpunkt der Pulsar-Nachricht__eventTime TIMESTAMP NOT NULL
: Ereigniszeit der Pulsar-Nachricht__messageProperties MAP<STRING, STRING>
: Pulsar-Nachrichteneigenschaften
Beispiele
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.