funkcja read_pulsar
przesyłania strumieniowego o wartości tabeli
Dotyczy: Databricks SQL
Databricks Runtime 14.1 i nowsze
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Zwraca tabelę z rekordami odczytanymi z pulsaru.
Ta funkcja z wartością tabeli obsługuje tylko przesyłanie strumieniowe, a nie zapytanie wsadowe.
Składnia
read_pulsar ( { option_key => option_value } [, ...] )
Argumenty
Ta funkcja wymaga wywołania nazwanego parametru dla kluczy opcji.
Opcje serviceUrl
i topic
są obowiązkowe.
Opisy argumentów są krótkie tutaj. Aby uzyskać rozszerzone opisy, zobacz ustrukturyzowaną dokumentację pulsaru przesyłania strumieniowego.
Opcja | Typ | Domyślny | opis |
---|---|---|---|
serviceUrl | STRUNA | Obowiązkowy | Identyfikator URI usługi Pulsar. |
topic | STRUNA | Obowiązkowy | Temat do odczytania. |
wstępnie zdefiniowanesubskrypcja | STRUNA | Brak | Wstępnie zdefiniowana nazwa subskrypcji używana przez łącznik do śledzenia postępu aplikacji platformy Spark. |
subscriptionPrefix | STRUNA | Brak | Prefiks używany przez łącznik do generowania losowej subskrypcji do śledzenia postępu aplikacji platformy Spark. |
pollTimeoutMs | DŁUGI | 120000 | Limit czasu odczytywania komunikatów z Pulsar w milisekundach. |
failOnDataLoss | BOOLOWSKI | prawda | Określa, czy zapytanie nie powiodło się w przypadku utraty danych (na przykład tematy są usuwane lub komunikaty są usuwane z powodu zasad przechowywania). |
startOffsets | STRUNA | latest | Początek, kiedy zapytanie jest uruchamiane, może być zdefiniowany jako najwcześniejszy, najnowszy lub jako ciąg JSON określający określone przesunięcie. Jeśli jest najnowsza, czytelnik odczytuje najnowsze rekordy po uruchomieniu. Jeśli najwcześniej, czytnik odczytuje od najwcześniejszego przesunięcia. Użytkownik może również określić ciąg JSON, który określa określone przesunięcie. |
startTime | STRUNA | Brak | Po określeniu źródło Pulsar odczytuje komunikaty rozpoczynające się od pozycji określonego czasu rozpoczęcia. |
Następujące argumenty są używane do uwierzytelniania klienta pulsar:
Opcja | Typ | Domyślny | opis |
---|---|---|---|
pulsarClientAuthPluginClassName | STRUNA | Brak | Nazwa wtyczki uwierzytelniania. |
pulsarClientAuthParams | STRUNA | Brak | Parametry wtyczki uwierzytelniania. |
pulsarClientUseKeyStoreTls | STRUNA | Brak | Czy używać magazynu kluczy do uwierzytelniania tls. |
pulsarClientTlsTrustStoreType | STRUNA | Brak | Typ pliku TrustStore na potrzeby uwierzytelniania tls. |
pulsarClientTlsTrustStorePath | STRUNA | Brak | Ścieżka pliku TrustStore na potrzeby uwierzytelniania tls. |
pulsarClientTlsTrustStorePassword | STRUNA | Brak | Hasło trustStore na potrzeby uwierzytelniania tls. |
Te argumenty są używane do konfiguracji i uwierzytelniania kontroli przyjęć Pulsar, konfiguracja administracyjna Pulsar jest wymagana tylko w przypadku włączenia kontroli przyjęć (gdy parametr maxBytesPerTrigger jest ustawiony).
Opcja | Typ | Domyślny | opis |
---|---|---|---|
maxBytesPerTrigger | BIGINT | Brak | Miękki limit maksymalnej liczby bajtów, które chcemy przetworzyć na mikropartię. Jeśli jest to określone, należy również określić adres admin.url. |
adminUrl | STRUNA | Brak | Konfiguracja pulsar serviceHttpUrl. Wymagane tylko w przypadku określenia parametru maxBytesPerTrigger. |
pulsarAdminAuthPlugin | STRUNA | Brak | Nazwa wtyczki uwierzytelniania. |
pulsarAdminAuthParams | STRUNA | Brak | Parametry wtyczki uwierzytelniania. |
pulsarClientUseKeyStoreTls | STRUNA | Brak | Czy używać magazynu kluczy do uwierzytelniania tls. |
pulsarAdminTlsTrustStoreType | STRUNA | Brak | Typ pliku TrustStore na potrzeby uwierzytelniania tls. |
pulsarAdminTlsTrustStorePath | STRUNA | Brak | Ścieżka pliku TrustStore na potrzeby uwierzytelniania tls. |
pulsarAdminTlsTrustStorePassword | STRUNA | Brak | Hasło trustStore na potrzeby uwierzytelniania tls. |
Zwraca
Tabela rekordów pulsarnych z następującym schematem.
__key STRING NOT NULL
: Klucz komunikatu pulsar.value BINARY NOT NULL
: wartość komunikatu pulsu.Uwaga: w przypadku tematów ze schematem Avro lub JSON zamiast ładowania zawartości do pola wartości binarnej zawartość zostanie rozszerzona w celu zachowania nazw pól i typów pól tematu Pulsar.
__topic STRING NOT NULL
: Pulsar nazwa tematu.__messageId BINARY NOT NULL
: identyfikator komunikatu pulsu.__publishTime TIMESTAMP NOT NULL
: Czas publikowania komunikatu pulsar.__eventTime TIMESTAMP NOT NULL
: czas zdarzenia komunikatu pulsu.__messageProperties MAP<STRING, STRING>
: właściwości komunikatu pulsu.
Przykłady
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.
Powiązane artykuły
- CREATE STREAMING TABLE
- funkcja read_files zwracająca tabelę
- funkcja read_kafka zwracająca tabelę
- funkcja przesyłania strumieniowego read_kinesis zwracająca wartości tabeli