read_pubsub
funzione con valori di tabella di streaming
Si applica a: Databricks SQL Databricks Runtime 13.3 LTS e versioni successive
Restituisce una tabella con record letti da Pub/Sub da un argomento. Supporta solo le query di streaming.
Sintassi
read_pubsub( { parameter => value } [, ...])
Argomenti
read_pubsub
richiede la chiamata al parametro denominato.
Gli unici argomenti obbligatori sono subscriptionId
, projectId
e topicId
. Tutti gli altri argomenti sono facoltativi.
Per le descrizioni complete degli argomenti, vedere Configurare le opzioni per la lettura in streaming pub/sub.
Databricks consiglia di usare i segreti quando si forniscono opzioni di autorizzazione. Vedere funzione privata.
Per informazioni dettagliate sulla configurazione dell'accesso a Pub/Sub, vedere Configurare l'accesso a Pub/Sub.
Parametro | Tipo | Descrizione |
---|---|---|
subscriptionId |
STRING |
Obbligatorio, identificatore univoco assegnato a una sottoscrizione Pub/Sub. |
projectId |
STRING |
Obbligatorio, ID progetto Google Cloud associato all'argomento Pub/Sub. |
topicId |
STRING |
Obbligatorio, ID o nome dell'argomento Pub/Sub a cui eseguire la sottoscrizione. |
clientEmail |
STRING |
Indirizzo di posta elettronica associato a un account del servizio per l'autenticazione. |
clientId |
STRING |
ID client associato all'account del servizio per l'autenticazione. |
privateKeyId |
STRING |
ID della chiave privata associata all'account del servizio. |
privateKey |
STRING |
Chiave privata associata all'account del servizio per l'autenticazione. |
Questi argomenti vengono usati per ottimizzare ulteriormente la lettura da Pub/Sub:
Parametro | Tipo | Descrizione |
---|---|---|
numFetchPartitions |
STRING |
Facoltativo con il numero predefinito di executor. Numero di attività Spark parallele che recuperano record da una sottoscrizione. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Facoltativo con il valore predefinito false . Se impostato su true, la sottoscrizione passata al flusso viene eliminata al termine del processo di streaming. |
maxBytesPerTrigger |
STRING |
Limite flessibile per l'elaborazione delle dimensioni del batch durante ogni micro batch attivato. Il valore predefinito è 'none'. |
maxRecordsPerFetch |
STRING |
Numero di record da recuperare per ogni attività prima dell'elaborazione dei record. Il valore predefinito è '1000'. |
maxFetchPeriod |
STRING |
Durata del recupero di ogni attività prima dell'elaborazione dei record. Il valore predefinito è '10s'. |
Valori restituiti
Tabella di record Pub/Sub con lo schema seguente. La colonna degli attributi può essere Null, ma tutte le altre colonne non sono Null.
Nome | Tipo di dati | Nullable | Standard | Descrizione |
---|---|---|---|---|
messageId |
STRING |
No | Identificatore univoco per il messaggio Pub/Sub. | |
payload |
BINARY |
No | Contenuto del messaggio Pub/Sub. | |
attributes |
STRING |
Sì | Coppie chiave-valore che rappresentano gli attributi del messaggio Pub/Sub. Si tratta di una stringa con codifica JSON. | |
publishTimestampInMillis |
BIGINT |
No | Timestamp di pubblicazione del messaggio, espresso in millisecondi. | |
sequenceNumber |
BIGINT |
No | Identificatore univoco del record all'interno della partizione. |
Esempi
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
I dati dovranno ora essere sottoposti a query da per un'ulteriore testing.streaming_table
analisi.
Query errate:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);