Поделиться через


Потоковая обработка с помощью Apache Kafka и Azure Databricks

В этой статье описывается, как использовать Apache Kafka в качестве источника или приемника при выполнении рабочих нагрузок структурированной потоковой передачи в Azure Databricks.

Дополнительные сведения о Kafka см. в документации по Kafka.

Чтение данных из Kafka

Ниже приведен пример потокового чтения из Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks также поддерживает пакетную семантику чтения для источников данных Kafka, как показано в следующем примере:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Для добавочной пакетной загрузки Databricks рекомендует использовать Kafka с Trigger.AvailableNow. См. инструкции по настройке добавочной пакетной обработки.

В Databricks Runtime 13.3 LTS и более поздних версиях Azure Databricks предоставляет функцию SQL для чтения данных Kafka. Стриминг с использованием SQL поддерживается только в Delta Live Tables или с потоком tables в Databricks SQL. См. функциюсо значением из read_kafka .

Настройка средства чтения структурированной потоковой передачи Kafka

Azure Databricks предоставляет ключевое слово kafka в качестве формата данных для настройки connections в Kafka 0.10+.

Ниже приведены наиболее распространенные конфигурации для Kafka:

Есть несколько способов указания разделов для подписки. Необходимо предоставить лишь один из этих parameters:

Вариант значение Описание
подписка Список из list тем, разделённых запятыми. Тема list для подписки.
subscribePattern Строка регулярного выражения Java. Шаблон, используемый для подписки на разделы.
назначить Строка в формате JSON {"topicA":[0,1],"topic":[2,4]}. Конкретные разделы topicPartitions для использования.

Другие важные конфигурации:

Вариант значение Значение по умолчанию Описание
kafka.bootstrap.servers Запятыми разделённый список list хоста:порта. empty [Обязательно] Конфигурация Kafka bootstrap.servers. Если данные из Kafka отсутствуют, сначала проверьте адрес брокера list. Если адрес брокера list неверный, ошибки могут не возникнуть. Это обусловлено тем, что клиент Kafka предполагает, что брокеры будут доступны в конечном итоге и в случае ошибок сети повторные попытки можно будет выполнять без ограничений.
failOnDataLoss true или false. true [Необязательно] Указывает, следует ли завершать запрос, если возможна потеря данных. Запросы могут постоянно завершаться сбоем считывания данных из Kafka во многих сценариях, таких как удаление разделов, усечение разделов перед обработкой и т. д. Мы пытаемся с применением консервативного подхода определить, могут ли данные быть потеряны. Иногда это может привести к ложным сигналам. Set этот параметр на false, если он не работает как ожидалось, или вы хотите, чтобы запрос продолжил обработку, несмотря на потерю данных.
minPartitions Целое число >= 0, 0 = отключено. 0 (отключено) [Необязательно] Минимальное число разделов для считывания из Kafka. С помощью minPartitions параметра Spark можно настроить произвольный минимум секций для чтения из Kafka. Обычно в Spark реализовано сопоставление разделов Kafka topicPartitions Kafka с разделами Spark, использующими данные из Kafka, в соотношении1-1. Если вы set параметр minPartitions значением, превышающим параметры Kafka topicPartitions, Spark развернет большие секции Kafka до небольших частей. Этот параметр может быть set во время пиковых нагрузок, отклонений данных и когда поток отстает для увеличения скорости обработки. Это происходит за счет инициализации потребителей Kafka для каждого триггера, который может повлиять на производительность, если при подключении к Kafka используется SSL.
kafka.group.id Идентификатор группы потребителей Kafka. не set [Необязательно] Идентификатор группы для использования при чтении из Kafka. Следует использовать с осторожностью. По умолчанию каждый запрос формирует уникальный идентификатор группы для чтения данных. Это гарантирует, что каждый запрос будет иметь собственную группу потребителей, которая не испытывает помех от любого другого потребителя, и, следовательно, может считать все разделы соответствующих подписок. В некоторых сценариях (например, при авторизации на основе группы Kafka) возможно, потребуется использовать определенные идентификаторы авторизованных групп для чтения данных. При необходимости можно set указать идентификатор группы. Но это нужно сделать с особой осторожностью, так как это может привести к непредвиденному поведению.

— Одновременный запуск запросов (как пакет, так и потоковая передача) с одинаковым идентификатором группы, скорее всего, вмешивается друг в друга, что приводит к тому, что каждый запрос считывает только часть данных.
— Это также может произойти при запуске или перезапуске запросов в быстром успешном выполнении. Чтобы свести к минимуму такие проблемы, set конфигурации потребителя Kafka session.timeout.ms быть очень маленькими.
startingOffsets первое, последнее latest [Необязательно] Начальная точка при запуске запроса — либо «earliest», что означает с самых ранних смещений, либо json-строка, указывающая начальный offset для каждого раздела темы. В json -2 в качестве offset можно использовать для ссылки на самые ранние, -1 последние. Примечание. Для пакетных запросов последнее смещение (заданное неявно или с использованием -1 в json) запрещено. Для потоковых запросов это применяется только при запуске нового запроса, и возобновление всегда будет начинаться с того места where, где запрос был остановлен. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время.

Сведения о других необязательных конфигурациях см. в статье Руководство по интеграции структурированной потоковой передачи Kafka.

Schema для записей системы Kafka

schema записей Kafka составляет:

Column Тип
key binary
значение binary
topic строка
partition INT
offset длинный
TIMESTAMP длинный
timestampType INT

key и value всегда десериализуются как массивы байтов с ByteArrayDeserializer. Используйте операции DataFrame (например, cast("string")) для явной десериализации ключей и values.

Запись данных в Kafka

Ниже приведен пример потоковой записи в Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks также поддерживает семантику пакетной записи в приемники данных Kafka, как показано в следующем примере:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Настройка модуля записи структурированной потоковой передачи Kafka

Внимание

Databricks Runtime 13.3 LTS и выше включает более новую версию библиотеки kafka-clients , которая позволяет идемпотентной записи по умолчанию. Если приемник Kafka использует версию 2.8.0 или ниже с настроенными списками управления доступом, но без IDEMPOTENT_WRITE включения запись завершается ошибкой org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Чтобы устранить эту ошибку, обновите ее до Kafka версии 2.8.0 или более поздней, или установив параметр .option(“kafka.enable.idempotence”, “false”) при настройке модуля записи структурированной потоковой передачи.

schema, предоставленный DataStreamWriter, взаимодействует с приемником Kafka. Можно использовать следующие поля:

имя Column Обязательно или необязательно Тип
key необязательно STRING или BINARY
value обязательно STRING или BINARY
headers необязательно ARRAY
topic необязательный (игнорируется, если topic = set в качестве опции записи) STRING
partition необязательно INT

Ниже приведены распространенные варианты set при написании в Kafka:

Вариант значение Значение по умолчанию Описание
kafka.boostrap.servers Разделенный запятыми list<host:port> ничего [Обязательно] Конфигурация Kafka bootstrap.servers.
topic STRING не set [Необязательно] Задает раздел для записи всех строк. Этот параметр переопределяет любой раздел column, который существует в наборе данных.
includeHeaders BOOLEAN false [Необязательно] Следует ли включать заголовки Kafka в строку.

Сведения о других необязательных конфигурациях см. в статье Руководство по интеграции структурированной потоковой передачи Kafka.

Получение метрик Kafka

Вы можете get среднее, минимальное и максимальное количества отставания потокового запроса от последних доступных offset по всем подписанным топикам, используя метрики avgOffsetsBehindLatest, maxOffsetsBehindLatestи minOffsetsBehindLatest. См. статью Чтение метрик в интерактивном режиме.

Примечание.

Доступно в Databricks Runtime 9.1 и более поздних версий.

Get предполагаемое общее количество байтов, которые процесс запроса не использовал из подписанных разделов, проверив значение estimatedTotalBytesBehindLatest. Эта оценка основана на пакетах, которые были обработаны за последние 300 секунд. Период, на котором основана оценка, можно изменить, задав для параметра bytesEstimateWindowLength другое значение. Например, чтобы set его до 10 минут:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Если вы выполняете поток в записной книжке, эти метрики можно просмотреть на вкладке "Необработанные данные " на панели мониторинга хода выполнения потокового запроса:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Подключение Azure Databricks к Kafka с помощью SSL

Чтобы включить SSL-connections в Kafka, следуйте инструкциям в документации Confluent шифрования и проверки подлинности с помощью SSL-. Вы можете указать описанные здесь конфигурации с префиксом с kafka. как параметры. Например, вы можете указать расположение хранилища доверия в свойстве kafka.ssl.truststore.location.

Databricks рекомендует:

В следующем примере используются расположения хранилища объектов и секреты Databricks для включения SSL-подключения:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Подключение Kafka в HDInsight к Azure Databricks

  1. Создайте кластер Kafka HDInsight.

    Инструкции см. в статье Подключение к Kafka в HDInsight с помощью виртуальной сети Azure.

  2. Настройте брокеры Kafka для объявления правильного адреса.

    Следуйте инструкциям из раздела Настройка Kafka для объявления IP-адресов. Если вы выполняете настройки Kafka самостоятельно на виртуальных машинах Azure, убедитесь, что конфигурация advertised.listeners брокеров set на внутренний IP-адрес узлов.

  3. Создайте кластер Azure Databricks.

  4. Установите пиринг между кластером Kafka и кластером Azure Databricks.

    Следуйте инструкциям из раздела пиринг между одноранговыми виртуальными сетями.

Проверка подлинности субъекта-службы с помощью идентификатора Microsoft Entra и Центры событий Azure

Azure Databricks поддерживает проверку подлинности заданий Spark со службами Центров событий. Эта проверка подлинности выполняется с помощью OAuth с идентификатором Microsoft Entra.

Схема проверки подлинности AAD

Azure Databricks поддерживает проверку подлинности Идентификатора Microsoft Entra с идентификатором клиента и секретом в следующих вычислительных средах:

  • Databricks Runtime 12.2 LTS и более поздних версий для вычислений, настроенных в режиме доступа к одному пользователю.
  • Databricks Runtime 14.3 LTS и более поздних версий для вычислений, настроенных в режиме общего доступа.
  • Конвейеры Delta Live Tables, конфигурированные без Unity Catalog.

Azure Databricks не поддерживает проверку подлинности Microsoft Entra ID с сертификатом в любой вычислительной среде или в конвейерах Delta Live Tables, настроенных с использованием Unity Catalog.

Эта проверка подлинности не работает в общих кластерах или в Unity Catalog Delta Live Tables.

Настройка соединителя Структурированной потоковой передачи Kafka

Чтобы выполнить проверку подлинности с помощью идентификатора Microsoft Entra, вам потребуется следующее values:

  • Идентификатор клиента. Это можно найти на вкладке служб идентификатора Microsoft Entra ID .

  • Идентификатор клиента (также известный как идентификатор приложения).

  • Секрет клиента. После этого необходимо добавить его в качестве секрета в рабочую область Databricks. Чтобы добавить этот секрет, см. раздел "Управление секретами".

  • Раздел EventHubs. Вы можете найти разделов в разделе Центров событий в разделе Сущности на определенной странице пространства имен центров событий . Для работы с несколькими разделами можно set роль IAM на уровне Центров событий.

  • Сервер EventHubs. Это можно найти на странице обзора определенного пространства имен Центров событий:

    Пространство имен Центров событий

Кроме того, чтобы использовать идентификатор Записи, необходимо сообщить Kafka использовать механизм OAuth SASL (SASL является универсальным протоколом, и OAuth является типом SASL "механизм"):

  • kafka.security.protocol должно быть SASL_SSL
  • kafka.sasl.mechanism должно быть OAUTHBEARER
  • kafka.sasl.login.callback.handler.class должно быть полным именем класса Java со значением kafkashaded обработчика обратного вызова для входа класса Kafka. См. следующий пример для точного класса.

Пример

Далее рассмотрим работающий пример:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Обработка потенциальных ошибок

  • Параметры потоковой передачи не поддерживаются.

    Если вы пытаетесь использовать этот механизм проверки подлинности в конвейере Delta Live Live Tables, настроенном с помощью Unity Catalog, может появиться следующая ошибка:

    Неподдерживаемая ошибка потоковой передачи

    Чтобы устранить эту ошибку, используйте поддерживаемую конфигурацию вычислений. См. проверку подлинности субъекта-службы с помощью идентификатора Microsoft Entra и Центры событий Azure.

  • Не удалось создать новый KafkaAdminClientобъект.

    Это внутренняя ошибка, вызываемая Kafka, если какие-либо из следующих параметров проверки подлинности неверны:

    • Идентификатор клиента (также известный как идентификатор приложения)
    • Идентификатор клиента
    • Сервер EventHubs

    Чтобы устранить ошибку, убедитесь, что values соответствуют этим параметрам.

    Кроме того, эта ошибка может появиться при изменении параметров конфигурации, предоставленных по умолчанию в примере (которые вы попросили не изменять), например kafka.security.protocol.

  • Возвращаемые записи отсутствуют

    Если вы пытаетесь отобразить или обработать кадр данных, но не получаете результаты, вы увидите следующее в пользовательском интерфейсе.

    Нет сообщения о результатах

    Это сообщение означает, что проверка подлинности прошла успешно, но EventHubs не возвращала никаких данных. Некоторые возможные причины (хотя и не являются исчерпывающими) являются:

    • Вы указали неправильный раздел EventHubs .
    • Для параметра конфигурации Kafka по умолчанию используется startingOffsetsпараметр конфигурации latest Kafka, и вы пока не получаете никаких данных через раздел. Вы можете начать чтение данных setstartingOffsetstoearliest с самых ранних смещений Kafka.