Freigeben über


read_pulsar-Streamingtabellenwertfunktionen

Gilt für: Häkchen ja Databricks SQL durch Häkchen mit „Ja“ markiert Databricks Runtime 14.1 und höher

Wichtig

Dieses Feature befindet sich in der Public Preview.

Gibt eine Tabelle mit aus Pulsar gelesenen Datensätzen zurück.

Diese Tabellenwertfunktion unterstützt nur Streaming und keine Batchabfragen.

Syntax

read_pulsar ( { option_key => option_value } [, ...] )

Argumente

Diese Funktion erfordert einen Aufruf benannter Parameter für die Optionsschlüssel.

Die Optionen serviceUrl und topic sind obligatorisch.

Hier werden die Argumente nur kurz beschrieben. Ausführliche Beschreibungen finden Sie in der Dokumentation zu strukturiertem Streaming mit Pulsar.

Option Type Standard Beschreibung
Dienst-URL STRING Obligatorisch. Der URI des Pulsar-Dienstes.
topic STRING Obligatorisch. Das Thema, aus dem gelesen werden soll.
predefinedSubscription STRING Keine Der vordefinierte Abonnementname, der vom Connector verwendet wird, um den Spark-Anwendungsstatus nachzuverfolgen.
subscriptionPrefix STRING Keine Ein Präfix, das vom Connector verwendet wird, um ein zufälliges Abonnement zu generieren, um den Spark-Anwendungsstatus nachzuverfolgen.
pollTimeoutMs LONG 120000 Das Timeout zum Lesen von Nachrichten von Pulsar in Millisekunden.
failOnDataLoss BOOLEAN true Steuert, ob eine Abfrage fehlschlägt, wenn Daten verloren gehen (z. B. wenn Themen gelöscht werden oder Nachrichten aufgrund einer Aufbewahrungsrichtlinie gelöscht werden).
startingOffsets STRING latest Der Startpunkt, an dem eine Abfrage gestartet wird, entweder „earliest“ (frühester) oder „latest“ (letzter) oder JSON-Zeichenfolge, die einen bestimmten Offset angibt. Wenn „latest“, liest der Leser die neuesten Datensätze, nachdem die Ausführung gestartet wurde. Wenn „earliest“, liest der Leser ab dem frühesten Offset. Der Benutzer kann auch eine JSON-Zeichenfolge angeben, die einen bestimmten Offset festlegt.
startingTime STRING Keine Bei Angabe dieser Option liest die Pulsar-Quelle Nachrichten ab der Position der angegebenen Startzeit.

Die folgenden Argumente werden für die Authentifizierung des Pulsar-Clients verwendet:

Option Type Standard Beschreibung
pulsarClientAuthPluginClassName STRING Keine Name des Authentifizierungs-Plug-Ins
pulsarClientAuthParams STRING Keine Parameter für das Authentifizierungs-Plug-In
pulsarClientUseKeyStoreTls STRING Keine Gibt an, ob KeyStore für die TLS-Authentifizierung verwendet werden soll
pulsarClientTlsTrustStoreType STRING Keine TrustStore-Dateityp für TLS-Authentifizierung
pulsarClientTlsTrustStorePath STRING Keine TrustStore-Dateipfad für TLS-Authentifizierung
pulsarClientTlsTrustStorePassword STRING Keine TrustStore-Kennwort für TLS-Authentifizierung

Diese Argumente werden für die Konfiguration und Authentifizierung der Pulsar-Zulassungssteuerung verwendet. Die Pulsar-Administratorkonfiguration ist nur erforderlich, wenn die Zulassungssteuerung aktiviert ist(wenn maxBytesPerTrigger festgelegt wird).

Option Type Standard Beschreibung
maxBytesPerTrigger BIGINT Keine Eine weiche Grenze der maximalen Anzahl von Bytes, die pro Mikrobatch verarbeitet werden sollen. Wenn dies angegeben wird, muss auch „admin.url“ angegeben werden.
adminUrl STRING Keine Die Pulsar-serviceHttpUrl-Konfiguration. Nur erforderlich, wenn maxBytesPerTrigger angegeben wird.
pulsarAdminAuthPlugin STRING Keine Name des Authentifizierungs-Plug-Ins
pulsarAdminAuthParams STRING Keine Parameter für das Authentifizierungs-Plug-In
pulsarClientUseKeyStoreTls STRING Keine Gibt an, ob KeyStore für die TLS-Authentifizierung verwendet werden soll
pulsarAdminTlsTrustStoreType STRING Keine TrustStore-Dateityp für TLS-Authentifizierung
pulsarAdminTlsTrustStorePath STRING Keine TrustStore-Dateipfad für TLS-Authentifizierung
pulsarAdminTlsTrustStorePassword STRING Keine TrustStore-Kennwort für TLS-Authentifizierung

Gibt zurück

Eine Tabelle mit Pulsar-Datensätzen mit dem folgenden Schema:

  • __key STRING NOT NULL: Pulsar-Nachrichtenschlüssel

  • value BINARY NOT NULL: Pulsar-Nachrichtenwert

    Hinweis: Bei Themen mit Avro- oder JSON-Schema wird der Inhalt nicht in ein Binärwertfeld geladen, sondern erweitert, um die Feldnamen und Feldtypen des Pulsar-Themas beizubehalten.

  • __topic STRING NOT NULL: Pulsar-Themenname

  • __messageId BINARY NOT NULL: Pulsar-Nachrichten-ID

  • __publishTime TIMESTAMP NOT NULL: Veröffentlichungszeitpunkt der Pulsar-Nachricht

  • __eventTime TIMESTAMP NOT NULL: Ereigniszeit der Pulsar-Nachricht

  • __messageProperties MAP<STRING, STRING>: Pulsar-Nachrichteneigenschaften

Beispiele

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.