Delen via


read_pubsub functie met streamingtabelwaarde

Van toepassing op: vinkje als ja aan Databricks SQL vinkje als ja aan Databricks Runtime 13.3 LTS en hoger

Retourneert een tabel met records die zijn gelezen uit Pub/Sub uit een onderwerp. Ondersteunt alleen streamingquery's.

Syntaxis

read_pubsub( { parameter => value } [, ...])

Argumenten

read_pubsubvereist aanroepen van benoemde parameters.

De enige vereiste argumenten zijn subscriptionId, projectIden topicId. Alle andere argumenten zijn optioneel.

Zie Opties configureren voor pub-/substreaming voor volledige beschrijvingen van argumenten.

Databricks raadt het gebruik van geheimen aan bij het leveren van autorisatieopties. Zie de geheime functie.

Zie Toegang tot Pub/Sub configureren voor meer informatie over het configureren van toegang tot Pub/Sub.

Parameter Type Description
subscriptionId STRING Vereist, de unieke id die is toegewezen aan een Pub/Sub-abonnement.
projectId STRING Vereist, de Google Cloud-project-id die is gekoppeld aan het onderwerp Pub/Sub.
topicId STRING Vereist, de id of naam van het onderwerp Pub/Sub waarop u zich wilt abonneren.
clientEmail STRING Het e-mailadres dat is gekoppeld aan een serviceaccount voor verificatie.
clientId STRING De client-id die is gekoppeld aan het serviceaccount voor verificatie.
privateKeyId STRING De id van de persoonlijke sleutel die is gekoppeld aan het serviceaccount.
privateKey STRING De persoonlijke sleutel die is gekoppeld aan het serviceaccount voor verificatie.

Deze argumenten worden gebruikt voor verdere afstemming bij het lezen vanuit Pub/Sub:

Parameter Type Description
numFetchPartitions STRING Optioneel met het standaardaantal uitvoerders. Het aantal parallelle Spark-taken waarmee records uit een abonnement worden opgehaald.
deleteSubscriptionOnStreamStop BOOLEAN Optioneel met standaardwaarde false. Als dit is ingesteld op true, wordt het abonnement dat aan de stream is doorgegeven, verwijderd wanneer de streamingtaak afloopt.
maxBytesPerTrigger STRING Een zachte limiet voor de batchgrootte die moet worden verwerkt tijdens elke geactiveerde microbatch. De standaardwaarde is 'none'.
maxRecordsPerFetch STRING Het aantal records dat per taak moet worden opgehaald voordat records worden verwerkt. De standaardwaarde is '1000'.
maxFetchPeriod STRING De tijdsduur voor elke taak die moet worden opgehaald voordat records worden verwerkt. De standaardwaarde is '10s'.

Retouren

Een tabel met Pub/Sub-records met het volgende schema. De kolom kenmerken kan null zijn, maar alle andere kolommen zijn niet null.

Naam Gegevenstype Null-waarde toegestaan Standaard Beschrijving
messageId STRING Nee Unieke id voor het pub-/subbericht.
payload BINARY Nee De inhoud van het pub-/subbericht.
attributes STRING Ja Sleutel-waardeparen die de kenmerken van het pub-/subbericht vertegenwoordigen. Dit is een json-gecodeerde tekenreeks.
publishTimestampInMillis BIGINT Nee De tijdstempel toen het bericht werd gepubliceerd, in milliseconden.
sequenceNumber BIGINT Nee De unieke id van de record in de shard.

Voorbeelden

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

De gegevens moeten nu worden opgevraagd uit de testing.streaming_table voor verdere analyse.

Onjuiste query's:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);