read_pubsub
funkce streamování table-valued
Platí pro: Databricks SQL
Databricks Runtime 13.3 LTS a vyšší
Vrátí table se záznamy přečtenými z Pub/Sub z určitého tématu. Podporuje pouze dotazy streamování.
Syntaxe
read_pubsub( { parameter => value } [, ...])
Argumenty
read_pubsub
vyžaduje vyvolání pojmenovaného parametru.
Jedinými povinnými argumenty jsou subscriptionId
, projectId
a topicId
. Všechny ostatní argumenty jsou volitelné.
Úplný popis argumentu najdete v tématu Konfigurace možností čtení pub/sub streamingu.
Databricks doporučuje používat tajné kódy při poskytování možností autorizace. Viz funkce tajného kódu.
Podrobnosti o konfiguraci přístupu k pub/sub naleznete v tématu Konfigurace přístupu k Pub/Sub.
Parametr | Typ | Popis |
---|---|---|
subscriptionId |
STRING |
Vyžaduje se jedinečné identifier přiřazené k předplatnému Pub/Sub. |
projectId |
STRING |
Povinné je ID projektu Google Cloud přidružené k tématu Pub/Sub. |
topicId |
STRING |
Povinné, ID nebo název tématu Pub/Sub pro přihlášení k odběru. |
clientEmail |
STRING |
E-mailová adresa přidružená k účtu služby pro ověřování. |
clientId |
STRING |
ID klienta přidružené k účtu služby pro ověřování. |
privateKeyId |
STRING |
ID privátního klíče přidruženého k účtu služby. |
privateKey |
STRING |
Privátní klíč přidružený k účtu služby pro ověřování. |
Tyto argumenty se používají k dalšímu vyladění při čtení z pub/sub:
Parametr | Typ | Popis |
---|---|---|
numFetchPartitions |
STRING |
Volitelné s výchozím počtem exekutorů. Početparalelních |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Volitelné s výchozím nastavením false . Pokud je set nastavena na true, odběr předaný do datového proudu se po skončení streamovací úlohy odstraní. |
maxBytesPerTrigger |
STRING |
Měkká limit pro velikost dávky, která se má zpracovat během každé zahájené mikrodávky. Výchozí hodnota je žádná. |
maxRecordsPerFetch |
STRING |
Počet záznamů, které se mají načíst na každou úlohu před zpracováním záznamů. Výchozí hodnota je 1000. |
maxFetchPeriod |
STRING |
Doba trvání každého úkolu, která se má načíst před zpracováním záznamů. Výchozí hodnota je 10s. |
Návraty
table záznamů Pub/Sub s následujícím schema. Atributy column mohou mít hodnotu null, ale všechny ostatní columns nemají hodnotu null.
Name | Datový typ | Vynulovatelné | Standard | Popis |
---|---|---|---|---|
messageId |
STRING |
No | Jedinečné identifier pro zprávu Pub/Sub. | |
payload |
BINARY |
No | Obsah zprávy Pub/Sub | |
attributes |
STRING |
Ano | Páry klíč-hodnota představující atributy zprávy Pub/Sub Jedná se o řetězec kódovaný ve formátu JSON. | |
publishTimestampInMillis |
BIGINT |
No | Časové razítko při publikování zprávy v milisekundách. | |
sequenceNumber |
BIGINT |
No | Jedinečný identifier záznamu v rámci jeho fragmentu. |
Příklady
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Data by se teď museli dotazovat z testing.streaming_table
další analýzy.
Chybné dotazy:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);