read_pulsar
streaming-table-waardefunctie
Van toepassing op: Databricks SQL Databricks Runtime 14.1 en hoger
Belangrijk
Deze functie is beschikbaar als openbare preview.
Retourneert een table met records die zijn gelezen uit Pulsar.
Deze table-waardefunctie ondersteunt alleen streaming en geen batchquery.
Syntaxis
read_pulsar ( { option_key => option_value } [, ...] )
Argumenten
Voor deze functie is aanroepen van benoemde parameters vereist voor de optiesleutels.
De opties serviceUrl
en topic
zijn verplicht.
De beschrijvingen van de argumenten zijn hier kort. Zie de documentatie voor gestructureerd streamen van Pulsar voor uitgebreide beschrijvingen.
Optie | Type | Default | Beschrijving |
---|---|---|---|
serviceUrl | STRING | Verplicht | De URI van de Pulsar-service. |
onderwerp | STRING | Verplicht | Het onderwerp waaruit u wilt lezen. |
predefinedSubscription | STRING | Geen | De vooraf gedefinieerde abonnementsnaam die door de connector wordt gebruikt om de voortgang van spark-toepassingen bij te houden. |
subscriptionPrefix | STRING | Geen | Een voorvoegsel dat door de connector wordt gebruikt om een willekeurig abonnement te generate om de voortgang van Spark-toepassingen te volgen. |
pollTimeoutMs | LANG | 120.000 | De time-out voor het lezen van berichten van Pulsar in milliseconden. |
failOnDataLoss | BOOLEAN | true | Hiermee bepaalt u of een query mislukt wanneer gegevens verloren gaan (bijvoorbeeld onderwerpen worden verwijderd of berichten worden verwijderd vanwege bewaarbeleid). |
startingOffsets | STRING | nieuwste | Het beginpunt waarop een query wordt gestart, ofwel de vroegste, meest recente of een JSON-tekenreeks die een specifieke offsetaangeeft. Als de laatste is, leest de lezer de nieuwste records nadat deze is gestart. Indien de vroegst, leest de lezer uit de vroegste offset. De gebruiker kan ook een JSON-tekenreeks opgeven waarmee een specifieke offsetwordt opgegeven. |
startingTime | STRING | Geen | Wanneer dit is opgegeven, leest de Pulsar-bron berichten vanaf de positie van de opgegeven startTime. |
De volgende argumenten worden gebruikt voor verificatie van de pulsar-client:
Optie | Type | Default | Beschrijving |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | Geen | Naam van de verificatieinvoegtoepassing. |
pulsarClientAuthParams | STRING | Geen | Parameters voor de authenticatie-plug-in. |
pulsarClientUseKeyStoreTls | STRING | Geen | Of keystore moet worden gebruikt voor tls-verificatie. |
pulsarClientTlsTrustStoreType | STRING | Geen | TrustStore-bestandstype voor tls-verificatie. |
pulsarClientTlsTrustStorePath | STRING | Geen | TrustStore-bestandspad voor tls-verificatie. |
pulsarClientTlsTrustStorePassword | STRING | Geen | TrustStore-wachtwoord voor tls-verificatie. |
Deze argumenten worden gebruikt voor configuratie en verificatie van pulsar-toegangsbeheer, pulsar-beheerconfiguratie is alleen vereist wanneer toegangsbeheer is ingeschakeld (wanneer maxBytesPerTrigger is set)
Optie | Type | Default | Beschrijving |
---|---|---|---|
maxBytesPerTrigger | BIGINT | Geen | Een zachte limit van het maximum aantal bytes dat we per microbatch willen verwerken. Als dit is opgegeven, moet admin.url ook worden opgegeven. |
adminUrl | STRING | Geen | De Pulsar-serviceHttpUrl-configuratie. Alleen nodig wanneer maxBytesPerTrigger is opgegeven. |
pulsarAdminAuthPlugin | STRING | Geen | Naam van de verificatieinvoegtoepassing. |
pulsarAdminAuthParams | STRING | Geen | Parameters voor de authenticatieplugin. |
pulsarClientUseKeyStoreTls | STRING | Geen | Of keystore moet worden gebruikt voor tls-verificatie. |
pulsarAdminTlsTrustStoreType | STRING | Geen | TrustStore-bestandstype voor tls-verificatie. |
pulsarAdminTlsTrustStorePath | STRING | Geen | TrustStore-bestandspad voor tls-verificatie. |
pulsarAdminTlsTrustStorePassword | STRING | Geen | TrustStore-wachtwoord voor tls-verificatie. |
Retouren
Een table van pulsar records met de volgende schema.
__key STRING NOT NULL
: Pulsar message key.value BINARY NOT NULL
: Pulsar berichtwaarde.Opmerking: Voor onderwerpen met Avro of JSON schema, in plaats van inhoud te laden in een binair waardeveld, wordt de inhoud uitgebreid om de veldnamen en veldtypen van het Pulsar-onderwerp te behouden.
__topic STRING NOT NULL
: Naam van Pulsar-onderwerp.__messageId BINARY NOT NULL
: Pulsar bericht-id.__publishTime TIMESTAMP NOT NULL
: Publicatietijd van Pulsar-bericht.__eventTime TIMESTAMP NOT NULL
: Pulsar bericht gebeurtenis tijd.__messageProperties MAP<STRING, STRING>
: Pulsar berichteigenschappen.
Voorbeelden
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.