функция, имеющая значения типа read_kafka
table
Область применения: Databricks SQL Databricks Runtime 13.3 LTS и выше
Считывает данные из кластера Apache Kafka и возвращает данные в табличной форме.
Может считывать данные из одной или нескольких тем Kafka. Он поддерживает как пакетные запросы, так и прием потоковой передачи.
Синтаксис
read_kafka([option_key => option_value ] [, ...])
Аргументы
Для этой функции требуется вызов именованного параметра.
-
option_key
: имя параметра для настройки. Для параметров, содержащих точки (), необходимо использовать обратные знаки (.
'). -
option_value
: константное выражение для set опции. Принимает литералы и скалярные функции.
Возвраты
Записи считываются из кластера Apache Kafka со следующими schema:
-
key BINARY
: ключ записи Kafka. -
value BINARY NOT NULL
: значение записи Kafka. -
topic STRING NOT NULL
: имя раздела Kafka, из нее считывается запись. -
partition INT NOT NULL
: идентификатор kafka partition запись считывается. -
offset BIGINT NOT NULL
: Номер записи offset вTopicPartition
Kafka. -
timestamp TIMESTAMP NOT NULL
: значение метки времени для записи.timestampType
column определяет, к чему соответствует эта метка времени. -
timestampType INTEGER NOT NULL
: тип метки времени, указанной вtimestamp
column. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: Заголовок values, предоставленный как часть записи (если это включено).
Примеры
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Параметры
Подробные варианты list можно найти в документации Apache Spark.
Обязательные параметры
Укажите приведенный ниже вариант для подключения к кластеру Kafka.
Вариант |
---|
bootstrapServers Тип: String Разделенный запятыми list пар узлов и портов, указывающих на кластер Kafka. Значение по умолчанию: нет |
Укажите только один из приведенных ниже вариантов, чтобы настроить, какие разделы Kafka будут извлекать данные из.
Вариант |
---|
assign Тип: String Строка JSON, содержащая определенные секции раздела для использования. Например, для '{"topicA":[0,1],"topicB":[2,4]}' раздела 0'й и 1-й секций будут использоваться разделы.Значение по умолчанию: нет |
subscribe Тип: String Разделяемая запятыми list тем Kafka для чтения. Значение по умолчанию: нет |
subscribePattern Тип: String Регулярное выражение, соответствующее темам для подписки. Значение по умолчанию: нет |
Прочие параметры
read_kafka
можно использовать в пакетных запросах и в потоковых запросах. Приведенные ниже параметры указывают тип запроса, к которому они применяются.
Вариант |
---|
endingOffsets Тип запроса: String только пакетСмещения для чтения вплоть до определенного момента в пакетном запросе: либо "latest" , чтобы указать последние записи, либо строку JSON, задающую конечное смещение offset для каждого TopicPartition. В формате JSON -1 в качестве offset можно использовать для ссылки на последние версии.
-2 (ранний) в качестве offset не допускается.Значение по умолчанию: "latest" |
endingOffsetsByTimestamp Тип запроса: String только пакетСтрока JSON, указывающая конечную метку времени для чтения до каждого раздела. Метки времени должны быть предоставлены в виде длинного значения метки времени в миллисекундах, так как 1970-01-01 00:00:00 UTC , например,1686444353000 . Дополнительные сведения о поведении с метками времени см . ниже .endingOffsetsByTimestamp имеет больший приоритет, чем endingOffsets .Значение по умолчанию: нет |
endingTimestamp Тип запроса: String только пакетСтроковое значение метки времени в миллисекундах с тех пор 1970-01-01 00:00:00 UTC например "1686444353000" . Если Kafka не возвращает соответствующий offset, offset будет set до последней. Дополнительные сведения о поведении с метками времени см . ниже . Примечание. endingTimestamp Имеет приоритет над endingOffsetsByTimestamp иendingOffsets .Значение по умолчанию: нет |
includeHeaders Тип запроса: Boolean потоковая передача и пакетСледует ли включать заголовки Kafka в строку. Значение по умолчанию: false |
kafka.<consumer_option> Тип запроса: String потоковая передача и пакетЛюбые параметры для конкретного потребителя Kafka можно передать с kafka. префиксом. Эти параметры должны быть окружены обратными знаками при указании, в противном случае вы get ошибку синтаксического анализа. Параметры можно найти в документации Kafka.Примечание. Вы не должны set следующие параметры с этой функцией: key.deserializer , , value.deserializer bootstrap.servers group.id Значение по умолчанию: нет |
maxOffsetsPerTrigger Тип запроса: Long только потоковая передачаОцените limit по максимальному количеству смещений или строк, обработанных за интервал триггера. Указанное общее количество смещения будет пропорционально разделено по разделам. Значение по умолчанию: нет |
startingOffsets Тип запроса: String потоковая передача и пакетНачальная точка при запуске запроса может быть либо "earliest" , который начинается с самых ранних смещений, "latest" , который начинается только с последних смещений, либо JSON-строка, указывающая начальный offset для каждой партиции темы (TopicPartition). В формате JSON -2 как offset можно использовать для обозначения самого раннего, -1 как самого позднего.Примечание. Для пакетных запросов последние (неявно или с помощью -1 в JSON) запрещены. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время. Значение по умолчанию: "latest" для потоковой передачи "earliest" для пакетной службы |
startingOffsetsByTimestamp Тип запроса: String потоковая передача и пакетСтрока JSON, указывающая начальную метку времени для каждой темыPartition. Метки времени должны быть предоставлены в виде длинного значения метки времени в миллисекундах, так как 1970-01-01 00:00:00 UTC , например 1686444353000 . Дополнительные сведения о поведении с метками времени см . ниже . Если Kafka не возвращает соответствующий offset, поведение будет соответствовать значению параметра startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp имеет больший приоритет, чем startingOffsets .Примечание. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время. Значение по умолчанию: нет |
startingOffsetsByTimestampStrategy Тип запроса: String потоковая передача и пакетЭта стратегия используется, если указанная начальная offset по метке времени (либо глобальной, либо на partition) не соответствует метке времени, возвращенной Kafka для offset. Доступные стратегии: - "error" : сбой запроса- "latest" : назначает последние offset для этих разделов, чтобы Spark мог считывать новые записи из этих разделов в последующих микропакетах.Значение по умолчанию: "error" |
startingTimestamp Тип запроса: String потоковая передача и пакетСтроковое значение метки времени в миллисекундах с тех пор 1970-01-01 00:00:00 UTC например "1686444353000" . Дополнительные сведения о поведении с метками времени см . ниже . Если Kafka не возвращает соответствующий offset, поведение будет соответствовать значению параметра startingOffsetsByTimestampStrategy .startingTimestamp имеет приоритет над startingOffsetsByTimestamp и startingOffsets .Примечание. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные секции во время запроса начнутся самым ранним. Значение по умолчанию: нет |
Примечание.
Возвращаемый offset для каждого partition является самым ранним offset, метка времени которого больше или равна заданной метке времени в соответствующем partition. Поведение зависит от параметров, если Kafka не возвращает соответствующий offset - проверьте описание каждого параметра.
Spark просто передает сведения о KafkaConsumer.offsetsForTimes
метке времени и не интерпретирует или не определяет значение. Дополнительные сведения KafkaConsumer.offsetsForTimes
см. в документации. Кроме того, значение метки времени здесь может отличаться в зависимости от конфигурации Kafka (log.message.timestamp.type
). Дополнительные сведения см. в документации по Apache Kafka.