Sdílet prostřednictvím


read_pubsub funkce streamování table-valued

Platí pro:zaškrtnutí označeného ano Databricks SQL zaškrtnutí označeného ano Databricks Runtime 13.3 LTS a vyšší

Vrátí table se záznamy přečtenými z Pub/Sub z určitého tématu. Podporuje pouze dotazy streamování.

Syntaxe

read_pubsub( { parameter => value } [, ...])

Argumenty

read_pubsub vyžaduje vyvolání pojmenovaného parametru.

Jedinými povinnými argumenty jsou subscriptionId, projectIda topicId. Všechny ostatní argumenty jsou volitelné.

Úplný popis argumentu najdete v tématu Konfigurace možností čtení pub/sub streamingu.

Databricks doporučuje používat tajné kódy při poskytování možností autorizace. Viz funkce tajného kódu.

Podrobnosti o konfiguraci přístupu k pub/sub naleznete v tématu Konfigurace přístupu k Pub/Sub.

Parametr Typ Popis
subscriptionId STRING Vyžaduje se jedinečné identifier přiřazené k předplatnému Pub/Sub.
projectId STRING Povinné je ID projektu Google Cloud přidružené k tématu Pub/Sub.
topicId STRING Povinné, ID nebo název tématu Pub/Sub pro přihlášení k odběru.
clientEmail STRING E-mailová adresa přidružená k účtu služby pro ověřování.
clientId STRING ID klienta přidružené k účtu služby pro ověřování.
privateKeyId STRING ID privátního klíče přidruženého k účtu služby.
privateKey STRING Privátní klíč přidružený k účtu služby pro ověřování.

Tyto argumenty se používají k dalšímu vyladění při čtení z pub/sub:

Parametr Typ Popis
numFetchPartitions STRING Volitelné s výchozím počtem exekutorů. Početparalelních
deleteSubscriptionOnStreamStop BOOLEAN Volitelné s výchozím nastavením false. Pokud je set nastavena na true, odběr předaný do datového proudu se po skončení streamovací úlohy odstraní.
maxBytesPerTrigger STRING Měkká limit pro velikost dávky, která se má zpracovat během každé zahájené mikrodávky. Výchozí hodnota je žádná.
maxRecordsPerFetch STRING Počet záznamů, které se mají načíst na každou úlohu před zpracováním záznamů. Výchozí hodnota je 1000.
maxFetchPeriod STRING Doba trvání každého úkolu, která se má načíst před zpracováním záznamů. Výchozí hodnota je 10s.

Návraty

table záznamů Pub/Sub s následujícím schema. Atributy column mohou mít hodnotu null, ale všechny ostatní columns nemají hodnotu null.

Name Datový typ Vynulovatelné Standard Popis
messageId STRING No Jedinečné identifier pro zprávu Pub/Sub.
payload BINARY No Obsah zprávy Pub/Sub
attributes STRING Ano Páry klíč-hodnota představující atributy zprávy Pub/Sub Jedná se o řetězec kódovaný ve formátu JSON.
publishTimestampInMillis BIGINT No Časové razítko při publikování zprávy v milisekundách.
sequenceNumber BIGINT No Jedinečný identifier záznamu v rámci jeho fragmentu.

Příklady

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Data by se teď museli dotazovat z testing.streaming_table další analýzy.

Chybné dotazy:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);