read_kafka
табличное значение функции
Область применения: Databricks SQL
Databricks Runtime 13.3 LTS и выше
Считывает данные из кластера Apache Kafka и возвращает данные в табличной форме.
Может считывать данные из одной или нескольких тем Kafka. Он поддерживает как пакетные запросы, так и прием потоковой передачи.
Синтаксис
read_kafka([option_key => option_value ] [, ...])
Аргументы
Для этой функции требуется вызов именованного параметра.
-
option_key
: имя параметра для настройки. Для параметров, содержащих точки (), необходимо использовать обратные знаки (.
'). -
option_value
: константное выражение для задания параметра. Принимает литералы и скалярные функции.
Возвраты
Записи считываются из кластера Apache Kafka со следующей схемой:
-
key BINARY
: ключ записи Kafka. -
value BINARY NOT NULL
: значение записи Kafka. -
topic STRING NOT NULL
: имя раздела Kafka, из нее считывается запись. -
partition INT NOT NULL
: идентификатор секции Kafka, из нее считывается запись. -
offset BIGINT NOT NULL
: номер смещения записи вTopicPartition
Kafka. -
timestamp TIMESTAMP NOT NULL
: значение метки времени для записи. СтолбецtimestampType
определяет, что соответствует этой метке времени. -
timestampType INTEGER NOT NULL
: тип метки времени, указанной в столбцеtimestamp
. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: значения заголовков, предоставленные как часть записи (если включено).
Примеры
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Параметры
Подробный список параметров можно найти в документации по Apache Spark .
Обязательные параметры
Укажите приведенный ниже вариант для подключения к кластеру Kafka.
Вариант |
---|
bootstrapServers Тип: String Список пар узлов и портов, разделенных запятыми, указывающих на кластер Kafka. Значение по умолчанию: нет |
Укажите только один из приведенных ниже вариантов, чтобы настроить, какие разделы Kafka будут извлекать данные из.
Вариант |
---|
assign Тип: String Строка JSON, содержащая определенные секции раздела для использования. Например, для '{"topicA":[0,1],"topicB":[2,4]}' раздела 0'й и 1-й секций будут использоваться разделы.Значение по умолчанию: нет |
subscribe Тип: String Разделенный запятыми список разделов Kafka для чтения. Значение по умолчанию: нет |
subscribePattern Тип: String Регулярное выражение, соответствующее темам для подписки. Значение по умолчанию: нет |
Прочие параметры
read_kafka
можно использовать в пакетных запросах и в потоковых запросах. Приведенные ниже параметры указывают тип запроса, к которому они применяются.
Вариант |
---|
endingOffsets Тип запроса: String только пакетСмещения для чтения до определённой точки в пакетном запросе: либо "latest" для указания последних записей, либо строка JSON, задающая конечное смещение для каждого TopicPartition. В формате JSON можно использовать -1 в качестве смещения для ссылки на последнее.
-2 (самый ранний) в качестве смещения не допускается.Значение по умолчанию: "latest" |
endingOffsetsByTimestamp Тип запроса: String только пакетСтрока JSON, указывающая конечную метку времени для чтения до каждого раздела. Метки времени должны быть предоставлены в виде длинного значения метки времени в миллисекундах, так как 1970-01-01 00:00:00 UTC , например,1686444353000 . Дополнительные сведения о поведении с метками времени см . ниже .endingOffsetsByTimestamp имеет больший приоритет, чем endingOffsets .Значение по умолчанию: нет |
endingTimestamp Тип запроса: String только пакетСтроковое значение метки времени в миллисекундах с тех пор 1970-01-01 00:00:00 UTC например "1686444353000" . Если Kafka не возвращает соответствующее смещение, то для смещения будет задано значение latest. Дополнительные сведения о поведении с метками времени см . ниже . Примечание. endingTimestamp Имеет приоритет над endingOffsetsByTimestamp иendingOffsets .Значение по умолчанию: нет |
includeHeaders Тип запроса: Boolean потоковая передача и пакетСледует ли включать заголовки Kafka в строку. Значение по умолчанию: false |
kafka.<consumer_option> Тип запроса: String потоковая передача и пакетЛюбые параметры для конкретного потребителя Kafka можно передать с kafka. префиксом. Эти параметры должны быть окружены обратными знаками при указании, в противном случае вы получите ошибку синтаксического анализа. Параметры можно найти в документации Kafka.Примечание. Для этой функции не следует задавать следующие параметры: key.deserializer , , value.deserializer bootstrap.servers group.id Значение по умолчанию: нет |
maxOffsetsPerTrigger Тип запроса: Long только потоковая передачаОграничение скорости на максимальное количество смещений или строк, обработанных за каждый интервал триггера. Указанное общее количество смещения будет пропорционально разделено по разделам. Значение по умолчанию: нет |
startingOffsets Тип запроса: String потоковая передача и пакетНачальная точка при запуске запроса может быть либо "earliest" , что начинается с самых ранних смещений, либо "latest" , что используется только для последних смещений, либо строка JSON, указывающая начальное смещение для каждого TopicPartition. В формате JSON смещение -2 можно использовать для указания на самый ранний момент, а -1 — на самый последний.Примечание. Для пакетных запросов последние (неявно или с помощью -1 в JSON) запрещены. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время. Значение по умолчанию: "latest" для потоковой передачи "earliest" для пакетной службы |
startingOffsetsByTimestamp Тип запроса: String потоковая передача и пакетСтрока JSON, указывающая начальную метку времени для каждой темыPartition. Метки времени должны быть предоставлены в виде длинного значения метки времени в миллисекундах, так как 1970-01-01 00:00:00 UTC , например 1686444353000 . Дополнительные сведения о поведении с метками времени см . ниже . Если Kafka не возвращает соответствующее смещение, поведение будет соответствовать значению параметра startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp имеет больший приоритет, чем startingOffsets .Примечание. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время. Значение по умолчанию: нет |
startingOffsetsByTimestampStrategy Тип запроса: String потоковая передача и пакетЭта стратегия используется, если указанное начальное смещение по метке времени (либо глобальной, либо для каждого раздела) не соответствует возвращаемому смещению Kafka. Доступные стратегии: - "error" : сбой запроса- "latest" : назначает последнее смещение для этих разделов, чтобы Spark мог считывать новые записи из этих разделов в последующих микропакетах.Значение по умолчанию: "error" |
startingTimestamp Тип запроса: String потоковая передача и пакетСтроковое значение метки времени в миллисекундах с тех пор 1970-01-01 00:00:00 UTC например "1686444353000" . Дополнительные сведения о поведении с метками времени см . ниже . Если Kafka не возвращает совпадающее смещение, поведение будет подчиняться значению параметра startingOffsetsByTimestampStrategy .startingTimestamp имеет приоритет над startingOffsetsByTimestamp и startingOffsets .Примечание. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные секции во время запроса начнутся самым ранним. Значение по умолчанию: нет |
Примечание.
Возвращаемое смещение для каждой секции является самым ранним смещением, метка времени которого больше или равна заданной метке времени в соответствующей секции. Поведение зависит от параметров, если Kafka не возвращает соответствующее смещение, проверьте описание каждого параметра.
Spark просто передает сведения о KafkaConsumer.offsetsForTimes
метке времени и не интерпретирует или не определяет значение. Дополнительные сведения KafkaConsumer.offsetsForTimes
см. в документации. Кроме того, значение метки времени здесь может отличаться в зависимости от конфигурации Kafka (log.message.timestamp.type
). Дополнительные сведения см. в документации по Apache Kafka.