Condividi tramite


read_pubsub funzione con valori di tabella di streaming

Si applica a: segno di spunta sì Databricks SQL segno di spunta sì Databricks Runtime 13.3 LTS e versioni successive

Restituisce una tabella con record letti da Pub/Sub da un argomento. Supporta solo le query di streaming.

Sintassi

read_pubsub( { parameter => value } [, ...])

Argomenti

read_pubsub richiede la chiamata al parametro denominato.

Gli unici argomenti obbligatori sono subscriptionId, projectIde topicId. Tutti gli altri argomenti sono facoltativi.

Per le descrizioni complete degli argomenti, vedere Configurare le opzioni per la lettura in streaming pub/sub.

Databricks consiglia di usare i segreti quando si forniscono opzioni di autorizzazione. Vedere funzione privata.

Per informazioni dettagliate sulla configurazione dell'accesso a Pub/Sub, vedere Configurare l'accesso a Pub/Sub.

Parametro Tipo Descrizione
subscriptionId STRING Obbligatorio, identificatore univoco assegnato a una sottoscrizione Pub/Sub.
projectId STRING Obbligatorio, ID progetto Google Cloud associato all'argomento Pub/Sub.
topicId STRING Obbligatorio, ID o nome dell'argomento Pub/Sub a cui eseguire la sottoscrizione.
clientEmail STRING Indirizzo di posta elettronica associato a un account del servizio per l'autenticazione.
clientId STRING ID client associato all'account del servizio per l'autenticazione.
privateKeyId STRING ID della chiave privata associata all'account del servizio.
privateKey STRING Chiave privata associata all'account del servizio per l'autenticazione.

Questi argomenti vengono usati per ottimizzare ulteriormente la lettura da Pub/Sub:

Parametro Tipo Descrizione
numFetchPartitions STRING Facoltativo con il numero predefinito di executor. Numero di attività Spark parallele che recuperano record da una sottoscrizione.
deleteSubscriptionOnStreamStop BOOLEAN Facoltativo con il valore predefinito false. Se impostato su true, la sottoscrizione passata al flusso viene eliminata al termine del processo di streaming.
maxBytesPerTrigger STRING Limite flessibile per l'elaborazione delle dimensioni del batch durante ogni micro batch attivato. Il valore predefinito è 'none'.
maxRecordsPerFetch STRING Numero di record da recuperare per ogni attività prima dell'elaborazione dei record. Il valore predefinito è '1000'.
maxFetchPeriod STRING Durata del recupero di ogni attività prima dell'elaborazione dei record. Il valore predefinito è '10s'.

Valori restituiti

Tabella di record Pub/Sub con lo schema seguente. La colonna degli attributi può essere Null, ma tutte le altre colonne non sono Null.

Nome Tipo di dati Nullable Standard Descrizione
messageId STRING No Identificatore univoco per il messaggio Pub/Sub.
payload BINARY No Contenuto del messaggio Pub/Sub.
attributes STRING Coppie chiave-valore che rappresentano gli attributi del messaggio Pub/Sub. Si tratta di una stringa con codifica JSON.
publishTimestampInMillis BIGINT No Timestamp di pubblicazione del messaggio, espresso in millisecondi.
sequenceNumber BIGINT No Identificatore univoco del record all'interno della partizione.

Esempi

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

I dati dovranno ora essere sottoposti a query da per un'ulteriore testing.streaming_table analisi.

Query errate:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);