Подписка на Google Pub/Sub
Azure Databricks предоставляет встроенный соединитель для подписки на Google Pub/Sub в Databricks Runtime 13.3 LTS и выше. Этот соединитель предоставляет точно один раз семантику обработки записей от подписчика.
Примечание.
Pub/Sub может публиковать повторяющиеся записи, и записи могут поступать подписчику вне порядка. Для обработки повторяющихся и устаревших записей необходимо написать код Azure Databricks.
Пример синтаксиса
В следующем примере кода показан базовый синтаксис для настройки структурированной потоковой передачи из Pub/Sub:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Дополнительные параметры конфигурации см. в разделе "Настройка параметров потоковой передачи pub/sub".
Настройка доступа к Pub/Sub
Databricks рекомендует использовать секреты при предоставлении параметров авторизации. Для авторизации подключения требуются следующие параметры:
clientEmail
clientId
privateKey
privateKeyId
В следующей таблице описаны роли, необходимые для настроенных учетных данных:
Роли | Обязательно или необязательно | Использование |
---|---|---|
roles/pubsub.viewer или roles/viewer |
Обязательное поле | Проверьте, существует ли подписка и получить подписку |
roles/pubsub.subscriber |
Обязательное поле | Получение данных из подписки |
roles/pubsub.editor или roles/editor |
Необязательно | Включает создание подписки, если она не существует, а также позволяет использовать deleteSubscriptionOnStreamStop ее для удаления подписок при завершении потока. |
Схема Pub/Sub
Схема потока соответствует записям, которые извлекаются из Pub/Sub, как описано в следующей таблице:
Поле | Тип |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Настройка параметров для чтения потоковой передачи pub/sub
В следующей таблице описаны параметры, поддерживаемые для Pub/Sub. Все параметры настраиваются как часть структурированной потоковой передачи с помощью .option("<optionName>", "<optionValue>")
синтаксиса.
Примечание.
Некоторые параметры конфигурации Pub/Sub используют концепцию получения вместо микропакетов. Это отражает сведения о внутренней реализации, а параметры работают аналогично совместному выполнению в других соединителях структурированной потоковой передачи, за исключением того, что записи извлекаются и обрабатываются.
Вариант | Значение по умолчанию | Description |
---|---|---|
numFetchPartitions |
При инициализации потока задается значение 1 половины числа исполнителей, присутствующих в потоке. | Количество параллельных задач Spark, которые извлекает записи из подписки. |
deleteSubscriptionOnStreamStop |
false |
Если true подписка, переданная потоку, удаляется при завершении задания потоковой передачи. |
maxBytesPerTrigger |
ничего | Обратимое ограничение для обработки размера пакета во время каждого запускаемого микропакета. |
maxRecordsPerFetch |
1000 | Количество записей для получения каждой задачи перед обработкой записей. |
maxFetchPeriod |
10 seconds | Длительность времени для каждой задачи, извлекаемой перед обработкой записей. Databricks рекомендует использовать значение по умолчанию. |
Добавочная семантика пакетной обработки для Pub/Sub
Можно использовать Trigger.AvailableNow
для использования доступных записей из источников Pub/Sub добавочного пакета.
Azure Databricks записывает метку времени при начале чтения с параметром Trigger.AvailableNow
. Записи, обработанные пакетом, включают все ранее извлекаемые данные и все недавно опубликованные записи с меткой времени меньше, чем метка времени начала записанного потока.
См. инструкции по настройке добавочной пакетной обработки.
Мониторинг метрик потоковой передачи
Структурированные метрики хода выполнения потоковой передачи сообщают о количестве записей, извлекаемых и готовых к обработке, размере записей, которые будут возвращены и готовы к обработке, и количеству повторяющихся записей, которые отображаются с момента запуска потока. Ниже приведен пример этих метрик:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Ограничения
Спекулятивное выполнение (spark.speculation
) не поддерживается в Pub/Sub.