Поделиться через


read_pulsar функция потоковой передачи табличного значения

Область применения:флажок Databricks SQL флажок Databricks Runtime 14.1 и более поздних версий

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Возвращает таблицу с записями, считываемыми из Pulsar.

Эта табличная функция поддерживает только потоковые запросы и не поддерживает пакетные запросы.

Синтаксис

read_pulsar ( { option_key => option_value } [, ...] )

Аргументы

Для этой функции требуется вызов именованного параметра для ключей параметров.

Параметры serviceUrl и topic обязательны.

Ниже приведены краткие описания аргументов. Дополнительные описания см . в структурированной документации по потоковой передаче Pulsar .

Вариант Тип По умолчанию. Description
serviceUrl STRING Обязательно Универсальный код ресурса (URI) службы Pulsar.
topic STRING Обязательно Раздел, из который следует прочитать.
predefinedSubscription STRING нет Предопределенное имя подписки, используемое соединителем для отслеживания хода выполнения приложения Spark.
subscriptionPrefix STRING нет Префикс, используемый соединителем для создания случайной подписки для отслеживания хода выполнения приложения Spark.
pollTimeoutMs LONG 120000 Время ожидания для чтения сообщений из Pulsar в миллисекундах.
failOnDataLoss BOOLEAN true Определяет, следует ли завершать запрос при потере данных (например, разделы удаляются или сообщения удаляются из-за политики хранения).
startingOffsets STRING latest Начальная точка при запуске запроса ( самая ранняя, последняя или строка JSON), указывающая определенное смещение. Если последняя версия, средство чтения считывает самые новые записи после запуска. Если раньше, средство чтения считывает с самого раннего смещения. Пользователь также может указать строку JSON, указывающую определенное смещение.
startingTime STRING нет При указании источник Pulsar считывает сообщения, начиная с позиции указанного начального времени.

Следующие аргументы используются для проверки подлинности клиента pulsar:

Вариант Тип По умолчанию. Description
pulsarClientAuthPluginClassName STRING нет Имя подключаемого модуля проверки подлинности.
pulsarClientAuthParams STRING нет Параметры плагина аутентификации.
pulsarClientUseKeyStoreTls STRING нет Следует ли использовать KeyStore для проверки подлинности tls.
pulsarClientTlsTrustStoreType STRING нет Тип файла TrustStore для проверки подлинности tls.
pulsarClientTlsTrustStorePath STRING нет Путь к файлу TrustStore для проверки подлинности tls.
pulsarClientTlsTrustStorePassword STRING нет Пароль TrustStore для проверки подлинности tls.

Эти аргументы используются для настройки и проверки подлинности управления приемом пульсара, конфигурация администратора pulsar требуется только в том случае, если включен контроль допуска (если задан параметр maxBytesPerTrigger).

Вариант Тип По умолчанию. Description
maxBytesPerTrigger BIGINT нет Мягкое ограничение на максимальное количество байт, которые мы хотим обработать на микробатч. Если это указано, необходимо также указать admin.url.
adminUrl STRING нет Конфигурация Pulsar serviceHttpUrl. Требуется только при указании maxBytesPerTrigger.
pulsarAdminAuthPlugin STRING нет Имя подключаемого модуля проверки подлинности.
pulsarAdminAuthParams STRING нет Параметры подключаемого модуля аутентификации.
pulsarClientUseKeyStoreTls STRING нет Следует ли использовать KeyStore для проверки подлинности tls.
pulsarAdminTlsTrustStoreType STRING нет Тип файла TrustStore для проверки подлинности tls.
pulsarAdminTlsTrustStorePath STRING нет Путь к файлу TrustStore для проверки подлинности tls.
pulsarAdminTlsTrustStorePassword STRING нет Пароль TrustStore для проверки подлинности tls.

Возвраты

Таблица записей пульсара со следующей схемой.

  • __key STRING NOT NULL: ключ сообщения Pulsar.

  • value BINARY NOT NULL: значение сообщения Pulsar.

    Примечание. Для разделов с схемой Avro или JSON вместо загрузки содержимого в поле двоичного значения содержимое будет развернуто, чтобы сохранить имена полей и типы полей раздела Pulsar.

  • __topic STRING NOT NULL: имя раздела Pulsar.

  • __messageId BINARY NOT NULL: идентификатор сообщения Pulsar.

  • __publishTime TIMESTAMP NOT NULL: время публикации сообщения Pulsar.

  • __eventTime TIMESTAMP NOT NULL: время события сообщения Pulsar.

  • __messageProperties MAP<STRING, STRING>: свойства сообщения Pulsar.

Примеры

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.