read_pubsub
functie met streamingtabelwaarde
Van toepassing op: Databricks SQL Databricks Runtime 13.3 LTS en hoger
Retourneert een tabel met records die zijn gelezen uit Pub/Sub uit een onderwerp. Ondersteunt alleen streamingquery's.
Syntaxis
read_pubsub( { parameter => value } [, ...])
Argumenten
read_pubsub
vereist aanroepen van benoemde parameters.
De enige vereiste argumenten zijn subscriptionId
, projectId
en topicId
. Alle andere argumenten zijn optioneel.
Zie Opties configureren voor pub-/substreaming voor volledige beschrijvingen van argumenten.
Databricks raadt het gebruik van geheimen aan bij het leveren van autorisatieopties. Zie de geheime functie.
Zie Toegang tot Pub/Sub configureren voor meer informatie over het configureren van toegang tot Pub/Sub.
Parameter | Type | Description |
---|---|---|
subscriptionId |
STRING |
Vereist, de unieke id die is toegewezen aan een Pub/Sub-abonnement. |
projectId |
STRING |
Vereist, de Google Cloud-project-id die is gekoppeld aan het onderwerp Pub/Sub. |
topicId |
STRING |
Vereist, de id of naam van het onderwerp Pub/Sub waarop u zich wilt abonneren. |
clientEmail |
STRING |
Het e-mailadres dat is gekoppeld aan een serviceaccount voor verificatie. |
clientId |
STRING |
De client-id die is gekoppeld aan het serviceaccount voor verificatie. |
privateKeyId |
STRING |
De id van de persoonlijke sleutel die is gekoppeld aan het serviceaccount. |
privateKey |
STRING |
De persoonlijke sleutel die is gekoppeld aan het serviceaccount voor verificatie. |
Deze argumenten worden gebruikt voor verdere afstemming bij het lezen vanuit Pub/Sub:
Parameter | Type | Description |
---|---|---|
numFetchPartitions |
STRING |
Optioneel met het standaardaantal uitvoerders. Het aantal parallelle Spark-taken waarmee records uit een abonnement worden opgehaald. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Optioneel met standaardwaarde false . Als dit is ingesteld op true, wordt het abonnement dat aan de stream is doorgegeven, verwijderd wanneer de streamingtaak afloopt. |
maxBytesPerTrigger |
STRING |
Een zachte limiet voor de batchgrootte die moet worden verwerkt tijdens elke geactiveerde microbatch. De standaardwaarde is 'none'. |
maxRecordsPerFetch |
STRING |
Het aantal records dat per taak moet worden opgehaald voordat records worden verwerkt. De standaardwaarde is '1000'. |
maxFetchPeriod |
STRING |
De tijdsduur voor elke taak die moet worden opgehaald voordat records worden verwerkt. De standaardwaarde is '10s'. |
Retouren
Een tabel met Pub/Sub-records met het volgende schema. De kolom kenmerken kan null zijn, maar alle andere kolommen zijn niet null.
Naam | Gegevenstype | Null-waarde toegestaan | Standaard | Beschrijving |
---|---|---|---|---|
messageId |
STRING |
Nee | Unieke id voor het pub-/subbericht. | |
payload |
BINARY |
Nee | De inhoud van het pub-/subbericht. | |
attributes |
STRING |
Ja | Sleutel-waardeparen die de kenmerken van het pub-/subbericht vertegenwoordigen. Dit is een json-gecodeerde tekenreeks. | |
publishTimestampInMillis |
BIGINT |
Nee | De tijdstempel toen het bericht werd gepubliceerd, in milliseconden. | |
sequenceNumber |
BIGINT |
Nee | De unieke id van de record in de shard. |
Voorbeelden
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
De gegevens moeten nu worden opgevraagd uit de testing.streaming_table
voor verdere analyse.
Onjuiste query's:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);