read_kafka
funkcja wartości tabeli
Dotyczy: Databricks SQL Databricks Runtime 13.3 LTS i nowsze
Odczytuje dane z klastra platformy Apache Kafka i zwraca dane w postaci tabelarycznej.
Może odczytywać dane z co najmniej jednego tematu platformy Kafka. Obsługuje zarówno zapytania wsadowe, jak i pozyskiwanie przesyłania strumieniowego.
Składnia
read_kafka([option_key => option_value ] [, ...])
Argumenty
Ta funkcja wymaga wywołania nazwanego parametru.
option_key
: nazwa opcji do skonfigurowania. Należy użyć backticks (') dla opcji zawierających kropki (.
).option_value
: wyrażenie stałe do ustawienia opcji. Akceptuje literały i funkcje skalarne.
Zwraca
Rekordy odczytane z klastra platformy Apache Kafka z następującym schematem:
key BINARY
: klucz rekordu platformy Kafka.value BINARY NOT NULL
: wartość rekordu platformy Kafka.topic STRING NOT NULL
: nazwa tematu platformy Kafka, z których jest odczytywany rekord.partition INT NOT NULL
: identyfikator partycji platformy Kafka, z których jest odczytywany rekord.offset BIGINT NOT NULL
: numer przesunięcia rekordu na platformie KafkaTopicPartition
.timestamp TIMESTAMP NOT NULL
: wartość znacznika czasu dla rekordu. KolumnatimestampType
definiuje, co odpowiada znacznikowi czasu.timestampType INTEGER NOT NULL
: typ znacznika czasu określonego w kolumnietimestamp
.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: wartości nagłówka podane jako część rekordu (jeśli jest włączone).
Przykłady
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opcje
Szczegółową listę opcji można znaleźć w dokumentacji platformy Apache Spark.
Wymagane opcje
Podaj poniższą opcję nawiązywania połączenia z klastrem platformy Kafka.
Opcja |
---|
bootstrapServers Typ: String Rozdzielona przecinkami lista par hostów/portów wskazująca klaster platformy Kafka. Wartość domyślna: Brak |
Podaj tylko jedną z poniższych opcji, aby skonfigurować tematy platformy Kafka do ściągania danych.
Opcja |
---|
assign Typ: String Ciąg JSON zawierający określone partycje tematu, z których mają być używane. Na przykład w przypadku '{"topicA":[0,1],"topicB":[2,4]}' partycji 0 i 1. tematu będą używane partycje.Wartość domyślna: Brak |
subscribe Typ: String Rozdzielona przecinkami lista tematów platformy Kafka do odczytania. Wartość domyślna: Brak |
subscribePattern Typ: String Wyrażenie regularne pasujące do tematów do subskrybowania. Wartość domyślna: Brak |
Różne opcje
read_kafka
można używać w zapytaniach wsadowych i w zapytaniach przesyłanych strumieniowo. Poniższe opcje określają, do którego typu zapytania mają zastosowanie.
Opcja |
---|
endingOffsets Typ: Typ zapytania: String tylko partiaPrzesunięcie do odczytu dla zapytania wsadowego, "latest" aby określić najnowsze rekordy lub ciąg JSON określający przesunięcie końcowe dla każdej części tematu. W formacie JSON jako -1 przesunięcie może służyć do odwoływania się do najnowszej wersji. -2 (najwcześniejsze) jako przesunięcie jest niedozwolone.Wartość domyślna: "latest" |
endingOffsetsByTimestamp Typ: Typ zapytania: String tylko partiaCiąg JSON określający znacznik czasu zakończenia do odczytania do każdego elementu TopicPartition. Znaczniki czasu muszą być podane jako długa wartość znacznika czasu w milisekundach od 1970-01-01 00:00:00 UTC , na przykład1686444353000 . Zobacz uwagę poniżej, aby uzyskać szczegółowe informacje o zachowaniu ze znacznikami czasu.endingOffsetsByTimestamp ma pierwszeństwo przed endingOffsets .Wartość domyślna: Brak |
endingTimestamp Typ: Typ zapytania: String tylko partiaWartość ciągu znacznika czasu w milisekundach od 1970-01-01 00:00:00 UTC , na przykład "1686444353000" . Jeśli platforma Kafka nie zwróci dopasowanego przesunięcia, przesunięcie zostanie ustawione na najnowsze. Zobacz uwagę poniżej, aby uzyskać szczegółowe informacje o zachowaniu ze znacznikami czasu. Uwaga: endingTimestamp ma pierwszeństwo przed endingOffsetsByTimestamp iendingOffsets .Wartość domyślna: Brak |
includeHeaders Typ: Typ zapytania: Boolean przesyłanie strumieniowe i wsadoweOkreśla, czy w wierszu mają być uwzględniane nagłówki platformy Kafka. Wartość domyślna: false |
kafka.<consumer_option> Typ: Typ zapytania: String przesyłanie strumieniowe i wsadoweWszystkie opcje specyficzne dla konsumentów platformy Kafka można przekazać za pomocą prefiksu kafka. . Te opcje muszą być otoczone przez backticks po podaniu, w przeciwnym razie zostanie wyświetlony błąd analizatora. Opcje można znaleźć w dokumentacji platformy Kafka.Uwaga: nie należy ustawiać następujących opcji za pomocą tej funkcji: key.deserializer , , value.deserializer , , bootstrap.servers group.id Wartość domyślna: Brak |
maxOffsetsPerTrigger Typ: Typ zapytania: Long tylko przesyłanie strumienioweLimit szybkości maksymalnej liczby przesunięć lub wierszy przetworzonych na interwał wyzwalacza. Określona łączna liczba przesunięć zostanie proporcjonalnie podzielona między elementy TopicPartitions. Wartość domyślna: Brak |
startingOffsets Typ: Typ zapytania: String przesyłanie strumieniowe i wsadowePunkt początkowy podczas uruchamiania zapytania, "earliest" czyli od najwcześniejszych przesunięć, "latest" czyli od najnowszych przesunięć, lub ciągu JSON określającego przesunięcie początkowe dla każdej części tematu. W formacie JSON jako -2 przesunięcie można odwoływać się najwcześniej do najnowszej -1 .Uwaga: w przypadku zapytań wsadowych najnowsza wersja (niejawnie lub przy użyciu -1 w formacie JSON) jest niedozwolona. W przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Ponownie uruchomione zapytania przesyłane strumieniowo będą kontynuowane z przesunięć zdefiniowanych w punkcie kontrolnym zapytania. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej. Wartość domyślna: "latest" dla przesyłania strumieniowego dla "earliest" partii |
startingOffsetsByTimestamp Typ: Typ zapytania: String przesyłanie strumieniowe i wsadoweCiąg JSON określający znacznik czasu rozpoczęcia dla każdego elementu TopicPartition. Znaczniki czasu muszą być podane jako długa wartość znacznika czasu w milisekundach od 1970-01-01 00:00:00 UTC , na przykład 1686444353000 . Zobacz uwagę poniżej, aby uzyskać szczegółowe informacje o zachowaniu ze znacznikami czasu. Jeśli platforma Kafka nie zwróci dopasowanego przesunięcia, zachowanie będzie zgodne z wartością opcji startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp ma pierwszeństwo przed startingOffsets .Uwaga: w przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Ponownie uruchomione zapytania przesyłane strumieniowo będą kontynuowane z przesunięć zdefiniowanych w punkcie kontrolnym zapytania. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej. Wartość domyślna: Brak |
startingOffsetsByTimestampStrategy Typ: Typ zapytania: String przesyłanie strumieniowe i wsadoweTa strategia jest używana, gdy określone przesunięcie początkowe według sygnatury czasowej (globalnej lub na partycję) nie jest zgodne z zwróconym przesunięciem platformy Kafka. Dostępne strategie to: - "error" : niepowodzenie zapytania- "latest" : przypisuje najnowsze przesunięcie dla tych partycji, aby platforma Spark mogła odczytywać nowsze rekordy z tych partycji w kolejnych mikrosadach.Wartość domyślna: "error" |
startingTimestamp Typ: Typ zapytania: String przesyłanie strumieniowe i wsadoweWartość ciągu znacznika czasu w milisekundach od 1970-01-01 00:00:00 UTC , na przykład "1686444353000" . Zobacz uwagę poniżej, aby uzyskać szczegółowe informacje o zachowaniu ze znacznikami czasu. Jeśli platforma Kafka nie zwróci dopasowanego przesunięcia, zachowanie będzie zgodne z wartością opcji startingOffsetsByTimestampStrategy .startingTimestamp ma pierwszeństwo przed startingOffsetsByTimestamp i startingOffsets .Uwaga: w przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Ponownie uruchomione zapytania przesyłane strumieniowo będą kontynuowane z przesunięć zdefiniowanych w punkcie kontrolnym zapytania. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej. Wartość domyślna: Brak |
Uwaga
Zwrócone przesunięcie dla każdej partycji jest najwcześniejszym przesunięciem, którego sygnatura czasowa jest większa lub równa podanemu znacznikowi czasu w odpowiedniej partycji. Zachowanie różni się w zależności od opcji, jeśli platforma Kafka nie zwraca dopasowanego przesunięcia — sprawdź opis każdej opcji.
Platforma Spark po prostu przekazuje informacje znacznika czasu do KafkaConsumer.offsetsForTimes
elementu i nie interpretuje ani nie rozumuje wartości. Aby uzyskać więcej informacji na temat KafkaConsumer.offsetsForTimes
usługi , zapoznaj się z dokumentacją. Ponadto znaczenie znacznika czasu w tym miejscu może się różnić w zależności od konfiguracji platformy Kafka (log.message.timestamp.type
). Aby uzyskać szczegółowe informacje, zobacz dokumentację platformy Apache Kafka.