Delen via


read_pulsar streaming-table-waardefunctie

Van toepassing op:vinkje als ja aan Databricks SQL vinkje als ja aan Databricks Runtime 14.1 en hoger

Belangrijk

Deze functie is beschikbaar als openbare preview.

Retourneert een table met records die zijn gelezen uit Pulsar.

Deze table-waardefunctie ondersteunt alleen streaming en geen batchquery.

Syntaxis

read_pulsar ( { option_key => option_value } [, ...] )

Argumenten

Voor deze functie is aanroepen van benoemde parameters vereist voor de optiesleutels.

De opties serviceUrl en topic zijn verplicht.

De beschrijvingen van de argumenten zijn hier kort. Zie de documentatie voor gestructureerd streamen van Pulsar voor uitgebreide beschrijvingen.

Optie Type Default Beschrijving
serviceUrl STRING Verplicht De URI van de Pulsar-service.
onderwerp STRING Verplicht Het onderwerp waaruit u wilt lezen.
predefinedSubscription STRING Geen De vooraf gedefinieerde abonnementsnaam die door de connector wordt gebruikt om de voortgang van spark-toepassingen bij te houden.
subscriptionPrefix STRING Geen Een voorvoegsel dat door de connector wordt gebruikt om een willekeurig abonnement te generate om de voortgang van Spark-toepassingen te volgen.
pollTimeoutMs LANG 120.000 De time-out voor het lezen van berichten van Pulsar in milliseconden.
failOnDataLoss BOOLEAN true Hiermee bepaalt u of een query mislukt wanneer gegevens verloren gaan (bijvoorbeeld onderwerpen worden verwijderd of berichten worden verwijderd vanwege bewaarbeleid).
startingOffsets STRING nieuwste Het beginpunt waarop een query wordt gestart, ofwel de vroegste, meest recente of een JSON-tekenreeks die een specifieke offsetaangeeft. Als de laatste is, leest de lezer de nieuwste records nadat deze is gestart. Indien de vroegst, leest de lezer uit de vroegste offset. De gebruiker kan ook een JSON-tekenreeks opgeven waarmee een specifieke offsetwordt opgegeven.
startingTime STRING Geen Wanneer dit is opgegeven, leest de Pulsar-bron berichten vanaf de positie van de opgegeven startTime.

De volgende argumenten worden gebruikt voor verificatie van de pulsar-client:

Optie Type Default Beschrijving
pulsarClientAuthPluginClassName STRING Geen Naam van de verificatieinvoegtoepassing.
pulsarClientAuthParams STRING Geen Parameters voor de authenticatie-plug-in.
pulsarClientUseKeyStoreTls STRING Geen Of keystore moet worden gebruikt voor tls-verificatie.
pulsarClientTlsTrustStoreType STRING Geen TrustStore-bestandstype voor tls-verificatie.
pulsarClientTlsTrustStorePath STRING Geen TrustStore-bestandspad voor tls-verificatie.
pulsarClientTlsTrustStorePassword STRING Geen TrustStore-wachtwoord voor tls-verificatie.

Deze argumenten worden gebruikt voor configuratie en verificatie van pulsar-toegangsbeheer, pulsar-beheerconfiguratie is alleen vereist wanneer toegangsbeheer is ingeschakeld (wanneer maxBytesPerTrigger is set)

Optie Type Default Beschrijving
maxBytesPerTrigger BIGINT Geen Een zachte limit van het maximum aantal bytes dat we per microbatch willen verwerken. Als dit is opgegeven, moet admin.url ook worden opgegeven.
adminUrl STRING Geen De Pulsar-serviceHttpUrl-configuratie. Alleen nodig wanneer maxBytesPerTrigger is opgegeven.
pulsarAdminAuthPlugin STRING Geen Naam van de verificatieinvoegtoepassing.
pulsarAdminAuthParams STRING Geen Parameters voor de authenticatieplugin.
pulsarClientUseKeyStoreTls STRING Geen Of keystore moet worden gebruikt voor tls-verificatie.
pulsarAdminTlsTrustStoreType STRING Geen TrustStore-bestandstype voor tls-verificatie.
pulsarAdminTlsTrustStorePath STRING Geen TrustStore-bestandspad voor tls-verificatie.
pulsarAdminTlsTrustStorePassword STRING Geen TrustStore-wachtwoord voor tls-verificatie.

Retouren

Een table van pulsar records met de volgende schema.

  • __key STRING NOT NULL: Pulsar message key.

  • value BINARY NOT NULL: Pulsar berichtwaarde.

    Opmerking: Voor onderwerpen met Avro of JSON schema, in plaats van inhoud te laden in een binair waardeveld, wordt de inhoud uitgebreid om de veldnamen en veldtypen van het Pulsar-onderwerp te behouden.

  • __topic STRING NOT NULL: Naam van Pulsar-onderwerp.

  • __messageId BINARY NOT NULL: Pulsar bericht-id.

  • __publishTime TIMESTAMP NOT NULL: Publicatietijd van Pulsar-bericht.

  • __eventTime TIMESTAMP NOT NULL: Pulsar bericht gebeurtenis tijd.

  • __messageProperties MAP<STRING, STRING>: Pulsar berichteigenschappen.

Voorbeelden

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.