Потоковая передача из Apache Pulsar
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
В Databricks Runtime 14.1 и более поздних версиях можно использовать структурированную потоковую передачу для потоковой передачи данных из Apache Pulsar в Azure Databricks.
Структурированная потоковая передача обеспечивает точно один раз семантику обработки для чтения данных из источников Pulsar.
Пример синтаксиса
Ниже приведен базовый пример использования структурированной потоковой передачи для чтения из Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Для указания разделов всегда необходимо указать service.url
один из следующих вариантов:
topic
topics
topicsPattern
Для полного списка параметров см. раздел Настройка параметров потоковой передачи Pulsar.
Проверка подлинности в Pulsar
Azure Databricks поддерживает проверку подлинности truststore и хранилища ключей в Pulsar. Databricks рекомендует использовать секреты при хранении сведений о конфигурации.
Во время настройки потока можно задать следующие параметры:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Если поток использует PulsarAdmin
, также задайте следующие параметры:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
В следующем примере демонстрируется настройка параметров проверки подлинности:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Схема Pulsar
Схема записей, считываемых из Pulsar, зависит от того, как разделы кодируют свои схемы.
- Для разделов с схемой Avro или JSON имена полей и типы полей сохраняются в результирующем кадре данных Spark.
- Для разделов без схемы или с простым типом данных в Pulsar полезные данные загружаются в столбец
value
. - Если средство чтения настроено для чтения нескольких разделов с различными схемами, задайте
allowDifferentTopicSchemas
для загрузки необработанного содержимого в столбецvalue
.
Записи Pulsar имеют следующие поля метаданных:
Столбец | Тип |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Настройка параметров потоковой передачи Pulsar
Все параметры настраиваются как часть структурированной потоковой передачи с помощью .option("<optionName>", "<optionValue>")
синтаксиса. Можно также настроить проверку подлинности с помощью параметров. См . проверку подлинности в Pulsar.
В следующей таблице описаны необходимые конфигурации для Pulsar. Необходимо указать только один из параметров topic
topics
или topicsPattern
.
Вариант | Значение по умолчанию | Description |
---|---|---|
service.url |
ничего | Конфигурация Pulsar для службы Pulsar serviceUrl . |
topic |
ничего | Строка имени раздела для использования темы. |
topics |
ничего | Разделенный запятыми список разделов, которые нужно использовать. |
topicsPattern |
ничего | Строка регулярных выражений Java, соответствующая темам для использования. |
В следующей таблице описаны другие параметры, поддерживаемые для Pulsar:
Вариант | Значение по умолчанию | Description |
---|---|---|
predefinedSubscription |
ничего | Предопределенное имя подписки, используемое соединителем для отслеживания хода выполнения приложения Spark. |
subscriptionPrefix |
ничего | Префикс, используемый соединителем для создания случайной подписки для отслеживания хода выполнения приложения Spark. |
pollTimeoutMs |
120000 | Время ожидания для чтения сообщений из Pulsar в миллисекундах. |
waitingForNonExistedTopic |
false |
Следует ли соединителю ожидать создания нужных разделов. |
failOnDataLoss |
true |
Определяет, следует ли завершать запрос при потере данных (например, разделы удаляются или сообщения удаляются из-за политики хранения). |
allowDifferentTopicSchemas |
false |
Если считываются несколько разделов с разными схемами, используйте этот параметр, чтобы отключить автоматическую десериализацию значений раздела на основе схем. Возвращаются только необработанные значения, если это true . |
startingOffsets |
latest |
Если latest средство чтения считывает самые новые записи после запуска. Если earliest , средство чтения считывает с самого раннего смещения. Пользователь также может указать строку JSON, указывающую определенное смещение. |
maxBytesPerTrigger |
ничего | Мягкое ограничение на максимальное количество байтов, которые мы хотим обработать за микробатч. Если это указано, admin.url необходимо также указать. |
admin.url |
ничего | Конфигурация Pulsar serviceHttpUrl . Требуется только при maxBytesPerTrigger указании. |
Вы также можете указать любые конфигурации клиента, администратора и читателя Pulsar, используя следующие шаблоны:
Расписание | Ссылка на параметры конигурации |
---|---|
pulsar.client.* |
Конфигурация клиента Pulsar |
pulsar.admin.* |
Конфигурация администратора Pulsar |
pulsar.reader.* |
Конфигурация средства чтения Pulsar |
Создание начальных смещения JSON
Можно вручную создать идентификатор сообщения, чтобы указать определенное смещение и передать его в формате JSON в параметр startingOffsets
. В следующем примере кода показан этот синтаксис:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()