Поделиться через


функция, имеющая значения типа read_kafkatable

Область применения:флажок Databricks SQL флажок Databricks Runtime 13.3 LTS и выше

Считывает данные из кластера Apache Kafka и возвращает данные в табличной форме.

Может считывать данные из одной или нескольких тем Kafka. Он поддерживает как пакетные запросы, так и прием потоковой передачи.

Синтаксис

read_kafka([option_key => option_value ] [, ...])

Аргументы

Для этой функции требуется вызов именованного параметра.

  • option_key: имя параметра для настройки. Для параметров, содержащих точки (), необходимо использовать обратные знаки (.').
  • option_value: константное выражение для set опции. Принимает литералы и скалярные функции.

Возвраты

Записи считываются из кластера Apache Kafka со следующими schema:

  • key BINARY: ключ записи Kafka.
  • value BINARY NOT NULL: значение записи Kafka.
  • topic STRING NOT NULL: имя раздела Kafka, из нее считывается запись.
  • partition INT NOT NULL: идентификатор kafka partition запись считывается.
  • offset BIGINT NOT NULL: Номер записи offset в TopicPartitionKafka.
  • timestamp TIMESTAMP NOT NULL: значение метки времени для записи. timestampType column определяет, к чему соответствует эта метка времени.
  • timestampType INTEGER NOT NULL: тип метки времени, указанной в timestampcolumn.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Заголовок values, предоставленный как часть записи (если это включено).

Примеры

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Параметры

Подробные варианты list можно найти в документации Apache Spark.

Обязательные параметры

Укажите приведенный ниже вариант для подключения к кластеру Kafka.

Вариант
bootstrapServers

Тип: String

Разделенный запятыми list пар узлов и портов, указывающих на кластер Kafka.

Значение по умолчанию: нет

Укажите только один из приведенных ниже вариантов, чтобы настроить, какие разделы Kafka будут извлекать данные из.

Вариант
assign

Тип: String

Строка JSON, содержащая определенные секции раздела для использования. Например, для '{"topicA":[0,1],"topicB":[2,4]}'раздела 0'й и 1-й секций будут использоваться разделы.

Значение по умолчанию: нет
subscribe

Тип: String

Разделяемая запятыми list тем Kafka для чтения.

Значение по умолчанию: нет
subscribePattern

Тип: String

Регулярное выражение, соответствующее темам для подписки.

Значение по умолчанию: нет

Прочие параметры

read_kafka можно использовать в пакетных запросах и в потоковых запросах. Приведенные ниже параметры указывают тип запроса, к которому они применяются.

Вариант
endingOffsets

Тип запроса: String только пакет

Смещения для чтения вплоть до определенного момента в пакетном запросе: либо "latest", чтобы указать последние записи, либо строку JSON, задающую конечное смещение offset для каждого TopicPartition. В формате JSON -1 в качестве offset можно использовать для ссылки на последние версии. -2 (ранний) в качестве offset не допускается.

Значение по умолчанию: "latest"
endingOffsetsByTimestamp

Тип запроса: String только пакет

Строка JSON, указывающая конечную метку времени для чтения до каждого раздела. Метки времени должны быть предоставлены в виде длинного значения метки времени в миллисекундах, так как 1970-01-01 00:00:00 UTC, например,
1686444353000. Дополнительные сведения о поведении с метками времени см . ниже .
endingOffsetsByTimestamp имеет больший приоритет, чем endingOffsets.

Значение по умолчанию: нет
endingTimestamp

Тип запроса: String только пакет

Строковое значение метки времени в миллисекундах с тех пор
1970-01-01 00:00:00 UTCнапример "1686444353000". Если Kafka не возвращает соответствующий offset, offset будет set до последней. Дополнительные сведения о поведении с метками времени см . ниже . Примечание. endingTimestamp Имеет приоритет над endingOffsetsByTimestamp и
endingOffsets.

Значение по умолчанию: нет
includeHeaders

Тип запроса: Boolean потоковая передача и пакет

Следует ли включать заголовки Kafka в строку.

Значение по умолчанию: false
kafka.<consumer_option>

Тип запроса: String потоковая передача и пакет

Любые параметры для конкретного потребителя Kafka можно передать с kafka. префиксом. Эти параметры должны быть окружены обратными знаками при указании, в противном случае вы get ошибку синтаксического анализа. Параметры можно найти в документации Kafka.

Примечание. Вы не должны set следующие параметры с этой функцией:
key.deserializer, , value.deserializerbootstrap.serversgroup.id

Значение по умолчанию: нет
maxOffsetsPerTrigger

Тип запроса: Long только потоковая передача

Оцените limit по максимальному количеству смещений или строк, обработанных за интервал триггера. Указанное общее количество смещения будет пропорционально разделено по разделам.

Значение по умолчанию: нет
startingOffsets

Тип запроса: String потоковая передача и пакет

Начальная точка при запуске запроса может быть либо "earliest", который начинается с самых ранних смещений, "latest", который начинается только с последних смещений, либо JSON-строка, указывающая начальный offset для каждой партиции темы (TopicPartition). В формате JSON -2 как offset можно использовать для обозначения самого раннего, -1 как самого позднего.

Примечание. Для пакетных запросов последние (неявно или с помощью -1 в JSON) запрещены. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время.

Значение по умолчанию: "latest" для потоковой передачи "earliest" для пакетной службы
startingOffsetsByTimestamp

Тип запроса: String потоковая передача и пакет

Строка JSON, указывающая начальную метку времени для каждой темыPartition. Метки времени должны быть предоставлены в виде длинного значения метки времени в миллисекундах, так как 1970-01-01 00:00:00 UTC, например 1686444353000. Дополнительные сведения о поведении с метками времени см . ниже . Если Kafka не возвращает соответствующий offset, поведение будет соответствовать значению параметра startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp имеет больший приоритет, чем startingOffsets.

Примечание. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время.

Значение по умолчанию: нет
startingOffsetsByTimestampStrategy

Тип запроса: String потоковая передача и пакет

Эта стратегия используется, если указанная начальная offset по метке времени (либо глобальной, либо на partition) не соответствует метке времени, возвращенной Kafka для offset. Доступные стратегии:

- "error": сбой запроса
- "latest": назначает последние offset для этих разделов, чтобы Spark мог считывать новые записи из этих разделов в последующих микропакетах.

Значение по умолчанию: "error"
startingTimestamp

Тип запроса: String потоковая передача и пакет

Строковое значение метки времени в миллисекундах с тех пор
1970-01-01 00:00:00 UTCнапример "1686444353000". Дополнительные сведения о поведении с метками времени см . ниже . Если Kafka не возвращает соответствующий offset, поведение будет соответствовать значению параметра startingOffsetsByTimestampStrategy.
startingTimestamp имеет приоритет над startingOffsetsByTimestamp и startingOffsets.

Примечание. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться со смещений, определенных в контрольной точке запроса. Недавно обнаруженные секции во время запроса начнутся самым ранним.

Значение по умолчанию: нет

Примечание.

Возвращаемый offset для каждого partition является самым ранним offset, метка времени которого больше или равна заданной метке времени в соответствующем partition. Поведение зависит от параметров, если Kafka не возвращает соответствующий offset - проверьте описание каждого параметра.

Spark просто передает сведения о KafkaConsumer.offsetsForTimesметке времени и не интерпретирует или не определяет значение. Дополнительные сведения KafkaConsumer.offsetsForTimesсм. в документации. Кроме того, значение метки времени здесь может отличаться в зависимости от конфигурации Kafka (log.message.timestamp.type). Дополнительные сведения см. в документации по Apache Kafka.