read_pulsar
strömmande tabellvärdesfunktion
Gäller för: Databricks SQL Databricks Runtime 14.1 och senare
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Returnerar en tabell med poster som avlästa från Pulsar.
Den här tabellvärdesfunktionen stöder endast direktuppspelning och inte batchfråga.
Syntax
read_pulsar ( { option_key => option_value } [, ...] )
Argument
Den här funktionen kräver namngivna parameteranrop för alternativnycklarna.
Alternativen serviceUrl
och topic
är obligatoriska.
Beskrivningarna av argumenten är korta här. Mer information finns i dokumentationen om strukturerade pulsarströmmar .
Alternativ | Typ | Standardvärde | beskrivning |
---|---|---|---|
serviceUrl | STRÄNG | Obligatorisk | URI:n för Pulsar-tjänsten. |
Avsnitt | STRÄNG | Obligatorisk | Ämnet att läsa från. |
predefinedSubscription | STRÄNG | Ingen | Det fördefinierade prenumerationsnamnet som används av anslutningsappen för att spåra spark-programmets förlopp. |
subscriptionPrefix | STRÄNG | Ingen | Ett prefix som används av kopplingen för att generera en slumpmässig prenumeration för att spåra spark-applikationens framsteg. |
pollTimeoutMs | LÅNG | 120000 | Tidsgränsen för att läsa meddelanden från Pulsar i millisekunder. |
failOnDataLoss | BOOLESK | true | Styr om en fråga ska misslyckas när data går förlorade (till exempel ämnen tas bort eller meddelanden tas bort på grund av kvarhållningsprincip). |
startingOffsets | STRÄNG | senaste | Startpunkten när en fråga startas, antingen tidigaste, senaste eller en JSON-sträng som anger en specifik förskjutning. Om den senaste är läser läsaren de senaste posterna när den börjar köras. Om det är den tidigaste, läser läsaren från den tidigaste offset. Användaren kan också ange en JSON-sträng som anger en specifik förskjutning. |
startingTime | STRÄNG | Ingen | När det anges läser Pulsar-källan meddelanden från positionen för den angivna startingTime. |
Följande argument används för autentisering av pulsar-klienten:
Alternativ | Typ | Standardvärde | beskrivning |
---|---|---|---|
pulsarClientAuthPluginClassName | STRÄNG | Ingen | Namnet på plugin-programmet för autentisering. |
pulsarClientAuthParams | STRÄNG | Ingen | Parametrar för plugin-programmet för autentisering. |
pulsarClientUseKeyStoreTls | STRÄNG | Ingen | Om du vill använda KeyStore för tls-autentisering. |
pulsarClientTlsTrustStoreType | STRÄNG | Ingen | TrustStore-filtyp för tls-autentisering. |
pulsarClientTlsTrustStorePath | STRÄNG | Ingen | TrustStore-filsökväg för tls-autentisering. |
pulsarClientTlsTrustStorePassword | STRÄNG | Ingen | TrustStore-lösenord för tls-autentisering. |
Dessa argument används för konfiguration och autentisering av pulsar-antagningskontroll, pulsaradministratörskonfiguration krävs endast när antagningskontroll är aktiverad (när maxBytesPerTrigger har angetts)
Alternativ | Typ | Standardvärde | beskrivning |
---|---|---|---|
maxBytesPerTrigger | BIGINT | Ingen | En mjuk gräns för det maximala antalet byte som vi vill bearbeta per mikrobatch. Om detta anges måste även admin.url anges. |
adminUrl | STRÄNG | Ingen | Pulsar-tjänstenHttpUrl-konfigurationen. Behövs bara när maxBytesPerTrigger har angetts. |
pulsarAdminAuthPlugin | STRÄNG | Ingen | Namnet på plugin-programmet för autentisering. |
pulsarAdminAuthParams | STRÄNG | Ingen | Parametrar för plugin-programmet för autentisering. |
pulsarClientUseKeyStoreTls | STRÄNG | Ingen | Om du vill använda KeyStore för tls-autentisering. |
pulsarAdminTlsTrustStoreType | STRÄNG | Ingen | TrustStore-filtyp för tls-autentisering. |
pulsarAdminTlsTrustStorePath | STRÄNG | Ingen | TrustStore-filsökväg för tls-autentisering. |
pulsarAdminTlsTrustStorePassword | STRÄNG | Ingen | TrustStore-lösenord för tls-autentisering. |
Returer
En tabell med pulsarposter med följande schema.
__key STRING NOT NULL
: Pulsar-meddelandenyckel.value BINARY NOT NULL
: Pulsar-meddelandevärde.Obs! För ämnen med Avro- eller JSON-schema expanderas innehållet i stället för att läsa in innehåll i ett binärt värdefält för att bevara fältnamnen och fälttyperna för Pulsar-ämnet.
__topic STRING NOT NULL
: Pulsar ämnesnamn.__messageId BINARY NOT NULL
: Pulsar-meddelande-ID.__publishTime TIMESTAMP NOT NULL
: Publiceringstid för Pulsar-meddelande.__eventTime TIMESTAMP NOT NULL
: Pulsar meddelandehändelsetid.__messageProperties MAP<STRING, STRING>
: Pulsar-meddelandeegenskaper.
Exempel
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.