read_pulsar
функция потоковой передачи табличного значения
Область применения: Databricks SQL Databricks Runtime 14.1 и более поздних версий
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Возвращает таблицу с записями, считываемыми из Pulsar.
Эта табличная функция поддерживает только потоковые запросы и не поддерживает пакетные запросы.
Синтаксис
read_pulsar ( { option_key => option_value } [, ...] )
Аргументы
Для этой функции требуется вызов именованного параметра для ключей параметров.
Параметры serviceUrl
и topic
обязательны.
Ниже приведены краткие описания аргументов. Дополнительные описания см . в структурированной документации по потоковой передаче Pulsar .
Вариант | Тип | По умолчанию. | Description |
---|---|---|---|
serviceUrl | STRING | Обязательно | Универсальный код ресурса (URI) службы Pulsar. |
topic | STRING | Обязательно | Раздел, из который следует прочитать. |
predefinedSubscription | STRING | нет | Предопределенное имя подписки, используемое соединителем для отслеживания хода выполнения приложения Spark. |
subscriptionPrefix | STRING | нет | Префикс, используемый соединителем для создания случайной подписки для отслеживания хода выполнения приложения Spark. |
pollTimeoutMs | LONG | 120000 | Время ожидания для чтения сообщений из Pulsar в миллисекундах. |
failOnDataLoss | BOOLEAN | true | Определяет, следует ли завершать запрос при потере данных (например, разделы удаляются или сообщения удаляются из-за политики хранения). |
startingOffsets | STRING | latest | Начальная точка при запуске запроса ( самая ранняя, последняя или строка JSON), указывающая определенное смещение. Если последняя версия, средство чтения считывает самые новые записи после запуска. Если раньше, средство чтения считывает с самого раннего смещения. Пользователь также может указать строку JSON, указывающую определенное смещение. |
startingTime | STRING | нет | При указании источник Pulsar считывает сообщения, начиная с позиции указанного начального времени. |
Следующие аргументы используются для проверки подлинности клиента pulsar:
Вариант | Тип | По умолчанию. | Description |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | нет | Имя подключаемого модуля проверки подлинности. |
pulsarClientAuthParams | STRING | нет | Параметры плагина аутентификации. |
pulsarClientUseKeyStoreTls | STRING | нет | Следует ли использовать KeyStore для проверки подлинности tls. |
pulsarClientTlsTrustStoreType | STRING | нет | Тип файла TrustStore для проверки подлинности tls. |
pulsarClientTlsTrustStorePath | STRING | нет | Путь к файлу TrustStore для проверки подлинности tls. |
pulsarClientTlsTrustStorePassword | STRING | нет | Пароль TrustStore для проверки подлинности tls. |
Эти аргументы используются для настройки и проверки подлинности управления приемом пульсара, конфигурация администратора pulsar требуется только в том случае, если включен контроль допуска (если задан параметр maxBytesPerTrigger).
Вариант | Тип | По умолчанию. | Description |
---|---|---|---|
maxBytesPerTrigger | BIGINT | нет | Мягкое ограничение на максимальное количество байт, которые мы хотим обработать на микробатч. Если это указано, необходимо также указать admin.url. |
adminUrl | STRING | нет | Конфигурация Pulsar serviceHttpUrl. Требуется только при указании maxBytesPerTrigger. |
pulsarAdminAuthPlugin | STRING | нет | Имя подключаемого модуля проверки подлинности. |
pulsarAdminAuthParams | STRING | нет | Параметры подключаемого модуля аутентификации. |
pulsarClientUseKeyStoreTls | STRING | нет | Следует ли использовать KeyStore для проверки подлинности tls. |
pulsarAdminTlsTrustStoreType | STRING | нет | Тип файла TrustStore для проверки подлинности tls. |
pulsarAdminTlsTrustStorePath | STRING | нет | Путь к файлу TrustStore для проверки подлинности tls. |
pulsarAdminTlsTrustStorePassword | STRING | нет | Пароль TrustStore для проверки подлинности tls. |
Возвраты
Таблица записей пульсара со следующей схемой.
__key STRING NOT NULL
: ключ сообщения Pulsar.value BINARY NOT NULL
: значение сообщения Pulsar.Примечание. Для разделов с схемой Avro или JSON вместо загрузки содержимого в поле двоичного значения содержимое будет развернуто, чтобы сохранить имена полей и типы полей раздела Pulsar.
__topic STRING NOT NULL
: имя раздела Pulsar.__messageId BINARY NOT NULL
: идентификатор сообщения Pulsar.__publishTime TIMESTAMP NOT NULL
: время публикации сообщения Pulsar.__eventTime TIMESTAMP NOT NULL
: время события сообщения Pulsar.__messageProperties MAP<STRING, STRING>
: свойства сообщения Pulsar.
Примеры
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.