read_pubsub
funkcja przesyłania strumieniowego o wartości tabeli
Dotyczy: Databricks SQL Databricks Runtime 13.3 LTS i nowsze
Zwraca tabelę z rekordami odczytanymi z pubu/pod tematu. Obsługuje tylko zapytania przesyłane strumieniowo.
Składnia
read_pubsub( { parameter => value } [, ...])
Argumenty
read_pubsub
wymaga wywołania nazwanego parametru.
Jedynymi wymaganymi argumentami są subscriptionId
, projectId
i topicId
. Wszystkie inne argumenty są opcjonalne.
Aby uzyskać pełne opisy argumentów, zobacz Configure options for Pub/Sub streaming read (Konfigurowanie opcji dla odczytu przesyłania strumieniowego pub/sub).
Usługa Databricks zaleca używanie wpisów tajnych podczas udostępniania opcji autoryzacji. Zobacz funkcję wpisu tajnego.
Aby uzyskać szczegółowe informacje na temat konfigurowania dostępu do pubu/podsieci, zobacz Konfigurowanie dostępu do pubu/subskrypcji.
Parametr | Type | Opis |
---|---|---|
subscriptionId |
STRING |
Wymagany unikatowy identyfikator przypisany do subskrypcji Pub/Sub. |
projectId |
STRING |
Wymagany identyfikator projektu Google Cloud skojarzony z tematem Pub/Sub. |
topicId |
STRING |
Wymagane, identyfikator lub nazwa tematu Pub/Sub do zasubskrybowania. |
clientEmail |
STRING |
Adres e-mail skojarzony z kontem usługi na potrzeby uwierzytelniania. |
clientId |
STRING |
Identyfikator klienta skojarzony z kontem usługi na potrzeby uwierzytelniania. |
privateKeyId |
STRING |
Identyfikator klucza prywatnego skojarzonego z kontem usługi. |
privateKey |
STRING |
Klucz prywatny skojarzony z kontem usługi na potrzeby uwierzytelniania. |
Te argumenty są używane do dalszego dostrajania podczas odczytywania z Pub/Sub:
Parametr | Type | Opis |
---|---|---|
numFetchPartitions |
STRING |
Opcjonalnie z domyślną liczbą funkcji wykonawczych. Liczba równoległych zadań platformy Spark, które pobierają rekordy z subskrypcji. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Opcjonalnie z domyślnym false . Jeśli ustawiono wartość true, subskrypcja przekazana do strumienia zostanie usunięta po zakończeniu zadania przesyłania strumieniowego. |
maxBytesPerTrigger |
STRING |
Miękki limit rozmiaru partii, który ma być przetwarzany podczas każdej wyzwalanej mikrosadowej partii. Wartość domyślna to "none". |
maxRecordsPerFetch |
STRING |
Liczba rekordów do pobrania na zadanie przed przetworzeniem rekordów. Wartość domyślna to "1000". |
maxFetchPeriod |
STRING |
Czas trwania każdego zadania do pobrania przed przetworzeniem rekordów. Wartość domyślna to "10s". |
Zwraca
Tabela rekordów Pub/Sub z następującym schematem. Kolumna atrybutów może mieć wartość null, ale wszystkie inne kolumny nie mają wartości null.
Nazwisko | Typ danych | Dopuszczający wartość null | Standardowa | opis |
---|---|---|---|---|
messageId |
STRING |
Nie. | Unikatowy identyfikator komunikatu Pub/Sub. | |
payload |
BINARY |
Nie. | Zawartość komunikatu Pub/Sub. | |
attributes |
STRING |
Tak | Pary klucz-wartość reprezentujące atrybuty komunikatu Pub/Sub. Jest to ciąg zakodowany w formacie JSON. | |
publishTimestampInMillis |
BIGINT |
Nie. | Sygnatura czasowa opublikowania wiadomości w milisekundach. | |
sequenceNumber |
BIGINT |
Nie. | Unikatowy identyfikator rekordu w ramach jego fragmentu. |
Przykłady
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Teraz należy wykonać zapytanie dotyczące danych z elementu testing.streaming_table
w celu dalszej analizy.
Błędne zapytania:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);