Потоковая обработка с помощью Apache Kafka и Azure Databricks
В этой статье описывается, как использовать Apache Kafka в качестве источника или приемника при выполнении рабочих нагрузок структурированной потоковой передачи в Azure Databricks.
Дополнительные сведения о Kafka см. в документации по Kafka.
Чтение данных из Kafka
Ниже приведен пример потокового чтения из Kafka:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Azure Databricks также поддерживает пакетную семантику чтения для источников данных Kafka, как показано в следующем примере:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Для добавочной пакетной загрузки Databricks рекомендует использовать Kafka с Trigger.AvailableNow
. См. инструкции по настройке добавочной пакетной обработки.
В Databricks Runtime 13.3 LTS и более поздних версиях Azure Databricks предоставляет функцию SQL для чтения данных Kafka. Стриминг с использованием SQL поддерживается только в Delta Live Tables или с потоком tables в Databricks SQL. См. функциюсо значением из
Настройка средства чтения структурированной потоковой передачи Kafka
Azure Databricks предоставляет ключевое слово kafka
в качестве формата данных для настройки connections в Kafka 0.10+.
Ниже приведены наиболее распространенные конфигурации для Kafka:
Есть несколько способов указания разделов для подписки. Необходимо предоставить лишь один из этих parameters:
Вариант | значение | Описание |
---|---|---|
подписка | Список из list тем, разделённых запятыми. | Тема list для подписки. |
subscribePattern | Строка регулярного выражения Java. | Шаблон, используемый для подписки на разделы. |
назначить | Строка в формате JSON {"topicA":[0,1],"topic":[2,4]} . |
Конкретные разделы topicPartitions для использования. |
Другие важные конфигурации:
Вариант | значение | Значение по умолчанию | Описание |
---|---|---|---|
kafka.bootstrap.servers | Запятыми разделённый список list хоста:порта. | empty | [Обязательно] Конфигурация Kafka bootstrap.servers . Если данные из Kafka отсутствуют, сначала проверьте адрес брокера list. Если адрес брокера list неверный, ошибки могут не возникнуть. Это обусловлено тем, что клиент Kafka предполагает, что брокеры будут доступны в конечном итоге и в случае ошибок сети повторные попытки можно будет выполнять без ограничений. |
failOnDataLoss |
true или false . |
true |
[Необязательно] Указывает, следует ли завершать запрос, если возможна потеря данных. Запросы могут постоянно завершаться сбоем считывания данных из Kafka во многих сценариях, таких как удаление разделов, усечение разделов перед обработкой и т. д. Мы пытаемся с применением консервативного подхода определить, могут ли данные быть потеряны. Иногда это может привести к ложным сигналам.
Set этот параметр на false , если он не работает как ожидалось, или вы хотите, чтобы запрос продолжил обработку, несмотря на потерю данных. |
minPartitions | Целое число >= 0, 0 = отключено. | 0 (отключено) | [Необязательно] Минимальное число разделов для считывания из Kafka. С помощью minPartitions параметра Spark можно настроить произвольный минимум секций для чтения из Kafka. Обычно в Spark реализовано сопоставление разделов Kafka topicPartitions Kafka с разделами Spark, использующими данные из Kafka, в соотношении1-1. Если вы set параметр minPartitions значением, превышающим параметры Kafka topicPartitions, Spark развернет большие секции Kafka до небольших частей. Этот параметр может быть set во время пиковых нагрузок, отклонений данных и когда поток отстает для увеличения скорости обработки. Это происходит за счет инициализации потребителей Kafka для каждого триггера, который может повлиять на производительность, если при подключении к Kafka используется SSL. |
kafka.group.id | Идентификатор группы потребителей Kafka. | не set | [Необязательно] Идентификатор группы для использования при чтении из Kafka. Следует использовать с осторожностью. По умолчанию каждый запрос формирует уникальный идентификатор группы для чтения данных. Это гарантирует, что каждый запрос будет иметь собственную группу потребителей, которая не испытывает помех от любого другого потребителя, и, следовательно, может считать все разделы соответствующих подписок. В некоторых сценариях (например, при авторизации на основе группы Kafka) возможно, потребуется использовать определенные идентификаторы авторизованных групп для чтения данных. При необходимости можно set указать идентификатор группы. Но это нужно сделать с особой осторожностью, так как это может привести к непредвиденному поведению. — Одновременный запуск запросов (как пакет, так и потоковая передача) с одинаковым идентификатором группы, скорее всего, вмешивается друг в друга, что приводит к тому, что каждый запрос считывает только часть данных. — Это также может произойти при запуске или перезапуске запросов в быстром успешном выполнении. Чтобы свести к минимуму такие проблемы, set конфигурации потребителя Kafka session.timeout.ms быть очень маленькими. |
startingOffsets | первое, последнее | latest | [Необязательно] Начальная точка при запуске запроса — либо «earliest», что означает с самых ранних смещений, либо json-строка, указывающая начальный offset для каждого раздела темы. В json -2 в качестве offset можно использовать для ссылки на самые ранние, -1 последние. Примечание. Для пакетных запросов последнее смещение (заданное неявно или с использованием -1 в json) запрещено. Для потоковых запросов это применяется только при запуске нового запроса, и возобновление всегда будет начинаться с того места where, где запрос был остановлен. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время. |
Сведения о других необязательных конфигурациях см. в статье Руководство по интеграции структурированной потоковой передачи Kafka.
Schema для записей системы Kafka
schema записей Kafka составляет:
Column | Тип |
---|---|
key | binary |
значение | binary |
topic | строка |
partition | INT |
offset | длинный |
TIMESTAMP | длинный |
timestampType | INT |
key
и value
всегда десериализуются как массивы байтов с ByteArrayDeserializer
. Используйте операции DataFrame (например, cast("string")
) для явной десериализации ключей и values.
Запись данных в Kafka
Ниже приведен пример потоковой записи в Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Azure Databricks также поддерживает семантику пакетной записи в приемники данных Kafka, как показано в следующем примере:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Настройка модуля записи структурированной потоковой передачи Kafka
Внимание
Databricks Runtime 13.3 LTS и выше включает более новую версию библиотеки kafka-clients
, которая позволяет идемпотентной записи по умолчанию. Если приемник Kafka использует версию 2.8.0 или ниже с настроенными списками управления доступом, но без IDEMPOTENT_WRITE
включения запись завершается ошибкой org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
.
Чтобы устранить эту ошибку, обновите ее до Kafka версии 2.8.0 или более поздней, или установив параметр .option(“kafka.enable.idempotence”, “false”)
при настройке модуля записи структурированной потоковой передачи.
schema, предоставленный DataStreamWriter, взаимодействует с приемником Kafka. Можно использовать следующие поля:
имя Column | Обязательно или необязательно | Тип |
---|---|---|
key |
необязательно |
STRING или BINARY |
value |
обязательно |
STRING или BINARY |
headers |
необязательно | ARRAY |
topic |
необязательный (игнорируется, если topic = set в качестве опции записи) |
STRING |
partition |
необязательно | INT |
Ниже приведены распространенные варианты set при написании в Kafka:
Вариант | значение | Значение по умолчанию | Описание |
---|---|---|---|
kafka.boostrap.servers |
Разделенный запятыми list<host:port> |
ничего | [Обязательно] Конфигурация Kafka bootstrap.servers . |
topic |
STRING |
не set | [Необязательно] Задает раздел для записи всех строк. Этот параметр переопределяет любой раздел column, который существует в наборе данных. |
includeHeaders |
BOOLEAN |
false |
[Необязательно] Следует ли включать заголовки Kafka в строку. |
Сведения о других необязательных конфигурациях см. в статье Руководство по интеграции структурированной потоковой передачи Kafka.
Получение метрик Kafka
Вы можете get среднее, минимальное и максимальное количества отставания потокового запроса от последних доступных offset по всем подписанным топикам, используя метрики avgOffsetsBehindLatest
, maxOffsetsBehindLatest
и minOffsetsBehindLatest
. См. статью Чтение метрик в интерактивном режиме.
Примечание.
Доступно в Databricks Runtime 9.1 и более поздних версий.
Get предполагаемое общее количество байтов, которые процесс запроса не использовал из подписанных разделов, проверив значение estimatedTotalBytesBehindLatest
. Эта оценка основана на пакетах, которые были обработаны за последние 300 секунд. Период, на котором основана оценка, можно изменить, задав для параметра bytesEstimateWindowLength
другое значение. Например, чтобы set его до 10 минут:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Если вы выполняете поток в записной книжке, эти метрики можно просмотреть на вкладке "Необработанные данные " на панели мониторинга хода выполнения потокового запроса:
{
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic]]",
"metrics" : {
"avgOffsetsBehindLatest" : "4.0",
"maxOffsetsBehindLatest" : "4",
"minOffsetsBehindLatest" : "4",
"estimatedTotalBytesBehindLatest" : "80.0"
},
} ]
}
Подключение Azure Databricks к Kafka с помощью SSL
Чтобы включить SSL-connections в Kafka, следуйте инструкциям в документации Confluent шифрования и проверки подлинности с помощью SSL-. Вы можете указать описанные здесь конфигурации с префиксом с kafka.
как параметры. Например, вы можете указать расположение хранилища доверия в свойстве kafka.ssl.truststore.location
.
Databricks рекомендует:
- Храните сертификаты в облачном хранилище объектов. Доступ к сертификатам можно ограничить только кластерами, которые могут получить доступ к Kafka. См. управление данными в Unity Catalog.
- Храните пароли сертификатов как секреты в области секретов.
В следующем примере используются расположения хранилища объектов и секреты Databricks для включения SSL-подключения:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Подключение Kafka в HDInsight к Azure Databricks
Создайте кластер Kafka HDInsight.
Инструкции см. в статье Подключение к Kafka в HDInsight с помощью виртуальной сети Azure.
Настройте брокеры Kafka для объявления правильного адреса.
Следуйте инструкциям из раздела Настройка Kafka для объявления IP-адресов. Если вы выполняете настройки Kafka самостоятельно на виртуальных машинах Azure, убедитесь, что конфигурация
advertised.listeners
брокеров set на внутренний IP-адрес узлов.Создайте кластер Azure Databricks.
Установите пиринг между кластером Kafka и кластером Azure Databricks.
Следуйте инструкциям из раздела пиринг между одноранговыми виртуальными сетями.
Проверка подлинности субъекта-службы с помощью идентификатора Microsoft Entra и Центры событий Azure
Azure Databricks поддерживает проверку подлинности заданий Spark со службами Центров событий. Эта проверка подлинности выполняется с помощью OAuth с идентификатором Microsoft Entra.
Azure Databricks поддерживает проверку подлинности Идентификатора Microsoft Entra с идентификатором клиента и секретом в следующих вычислительных средах:
- Databricks Runtime 12.2 LTS и более поздних версий для вычислений, настроенных в режиме доступа к одному пользователю.
- Databricks Runtime 14.3 LTS и более поздних версий для вычислений, настроенных в режиме общего доступа.
- Конвейеры Delta Live Tables, конфигурированные без Unity Catalog.
Azure Databricks не поддерживает проверку подлинности Microsoft Entra ID с сертификатом в любой вычислительной среде или в конвейерах Delta Live Tables, настроенных с использованием Unity Catalog.
Эта проверка подлинности не работает в общих кластерах или в Unity Catalog Delta Live Tables.
Настройка соединителя Структурированной потоковой передачи Kafka
Чтобы выполнить проверку подлинности с помощью идентификатора Microsoft Entra, вам потребуется следующее values:
Идентификатор клиента. Это можно найти на вкладке служб идентификатора Microsoft Entra ID .
Идентификатор клиента (также известный как идентификатор приложения).
Секрет клиента. После этого необходимо добавить его в качестве секрета в рабочую область Databricks. Чтобы добавить этот секрет, см. раздел "Управление секретами".
Раздел EventHubs. Вы можете найти
разделов в разделе Центров событий в разделеСущности на определенной странице пространства имен центров событий. Для работы с несколькими разделами можно set роль IAM на уровне Центров событий. Сервер EventHubs. Это можно найти на странице обзора определенного пространства имен Центров событий:
Кроме того, чтобы использовать идентификатор Записи, необходимо сообщить Kafka использовать механизм OAuth SASL (SASL является универсальным протоколом, и OAuth является типом SASL "механизм"):
-
kafka.security.protocol
должно бытьSASL_SSL
-
kafka.sasl.mechanism
должно бытьOAUTHBEARER
-
kafka.sasl.login.callback.handler.class
должно быть полным именем класса Java со значениемkafkashaded
обработчика обратного вызова для входа класса Kafka. См. следующий пример для точного класса.
Пример
Далее рассмотрим работающий пример:
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Обработка потенциальных ошибок
Параметры потоковой передачи не поддерживаются.
Если вы пытаетесь использовать этот механизм проверки подлинности в конвейере Delta Live Live Tables, настроенном с помощью Unity Catalog, может появиться следующая ошибка:
Чтобы устранить эту ошибку, используйте поддерживаемую конфигурацию вычислений. См. проверку подлинности субъекта-службы с помощью идентификатора Microsoft Entra и Центры событий Azure.
Не удалось создать новый
KafkaAdminClient
объект.Это внутренняя ошибка, вызываемая Kafka, если какие-либо из следующих параметров проверки подлинности неверны:
- Идентификатор клиента (также известный как идентификатор приложения)
- Идентификатор клиента
- Сервер EventHubs
Чтобы устранить ошибку, убедитесь, что values соответствуют этим параметрам.
Кроме того, эта ошибка может появиться при изменении параметров конфигурации, предоставленных по умолчанию в примере (которые вы попросили не изменять), например
kafka.security.protocol
.Возвращаемые записи отсутствуют
Если вы пытаетесь отобразить или обработать кадр данных, но не получаете результаты, вы увидите следующее в пользовательском интерфейсе.
Это сообщение означает, что проверка подлинности прошла успешно, но EventHubs не возвращала никаких данных. Некоторые возможные причины (хотя и не являются исчерпывающими) являются:
- Вы указали неправильный раздел EventHubs .
- Для параметра конфигурации Kafka по умолчанию используется
startingOffsets
параметр конфигурацииlatest
Kafka, и вы пока не получаете никаких данных через раздел. Вы можете начать чтение данных setstartingOffsetstoearliest
с самых ранних смещений Kafka.