Поделиться через


Подключение приложения Apache Spark с помощью Центров событий Azure

В этом руководстве описывается, как подключить приложение Spark к Центрам событий для потоковой передачи в режиме реального времени. Такая интеграция обеспечивает потоковую передачу без необходимости изменять клиенты протокола или запускать собственные кластеры Kafka или Zookeeper. Для работы с этим руководством требуется Apache Spark v2.4+ и Apache Kafka v2.0+.

Примечание

Этот пример можно найти на сайте GitHub.

В этом руководстве описано следующее:

  • Создание пространства имен в Центрах событий
  • Клонирование примера проекта
  • Запуск Spark.
  • Чтение из Центров событий для Kafka.
  • Запись в Центры событий для Kafka.

Предварительные требования

Перед началом работы с данным руководством необходимо выполнить описанные ниже условия.

Примечание

Адаптер Spark-Kafka был обновлен для поддержки Kafka v2.0, начиная со Spark v2.4. В предыдущих выпусках Spark адаптер поддерживал Kafka v0.10 и более поздние версии, но в основном как база использовались API Kafka v0.10. Так как Центры событий для Kafka не поддерживают Kafka v0.10, адаптеры Spark-Kafka от версий Spark до версии 2.4 не поддерживаются Центрами событий для экосистем Kafka.

Создание пространства имен Центров событий

Для отправки и получения данных из любой службы Центров событий требуется пространство имен Центров событий. См. раздел Создание концентратора событий для получения инструкций по созданию пространства имен и концентратора событий. Получите строку подключения Центров событий и полное доменное имя (FQDN) для последующего использования. Инструкции см. в статье Get an Event Hubs connection string (Получение строки подключения для Центров событий).

Клонирование примера проекта

Клонируйте репозиторий Центров событий Azure и перейдите к вложенной папке tutorials/spark:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Чтение из Центров событий для Kafka.

Внеся несколько изменений в конфигурации, вы сможете запустить чтение из Центров событий для Kafka. Обновите BOOTSTRAP_SERVERS и EH_SASL сведениями о пространстве имен, чтобы запустить потоковую передачу с помощью Центров событий так же, как с Kafka. Чтобы ознакомиться с полным примером кода, просмотрите файл sparkConsumer.scala на сайте GitHub.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Если появится сообщение об ошибке, аналогичное приведенной ниже, добавьте .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") в вызов и повторите попытку spark.readStream .

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Запись в Центры событий для Kafka.

Вы также можете выполнить запись в Центры событий таким же образом, как и в Kafka. Обновите конфигурацию, чтобы заменить сведения в BOOTSTRAP_SERVERS и EH_SASL сведениями о пространстве имен в Центре событий. Чтобы ознакомиться с полным примером кода, просмотрите файл sparkProducer.scala на сайте GitHub.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Дальнейшие действия

Дополнительные сведения о Центрах событий и Центрах событий для Kafka см. в следующих статьях: