Condividi tramite


Connettere l'applicazione Apache Spark con Hub eventi di Azure

Questa esercitazione illustra come connettere l'applicazione Spark a Hub eventi per lo streaming in tempo reale. Questa integrazione consente lo streaming senza dover modificare i client di protocollo o eseguire cluster Kafka o Zookeeper personalizzati. Questa esercitazione richiede Apache Spark v2.4+ e Apache Kafka v2.0+.

Nota

Questo esempio è disponibile su GitHub

In questa esercitazione verranno illustrate le procedure per:

  • Creare uno spazio dei nomi di Hub eventi
  • Clonare il progetto di esempio
  • Eseguire Spark
  • Leggere dati da Hub eventi per Kafka
  • Scrivere dati in Hub eventi per Kafka

Prerequisiti

Prima di iniziare questa esercitazione, assicurarsi di avere:

Nota

L'adapter Spark-Kafka è stato aggiornato per supportare Kafka v2.0 a partire da Spark v2.4. Nelle versioni precedenti di Spark l'adapter supportava Kafka v0.10 e versioni successive ma dipendeva specificamente dalle API di Kafka v0.10. Poiché Hub eventi per Kafka non supporta Kafka v0.10, gli adapter Spark-Kafka di versioni di Spark precedenti alla v2.4 non sono supportati da Hub eventi per gli ecosistemi Kafka.

Creare uno spazio dei nomi di Hub eventi

Per l'invio e la ricezione da qualsiasi servizio Hub eventi è richiesto uno spazio dei nomi di Hub eventi. Per istruzioni su come creare uno spazio dei nomi e un hub eventi, vedere Creazione di un hub eventi . Ottenere la stringa di connessione di Hub eventi e il nome di dominio completo (FQDN) da usare successivamente. Per istruzioni, vedere Ottenere una stringa di connessione ad Hub eventi.

Clonare il progetto di esempio

Clonare il repository di Hub eventi di Azure e passare alla sottocartella tutorials/spark:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Leggere dati da Hub eventi per Kafka

Con poche modifiche alla configurazione è possibile iniziare a leggere i dati da Hub eventi per Kafka. Dopo aver aggiornato BOOTSTRAP_SERVERS e EH_SASL con i dettagli dello spazio dei nomi, è possibile avviare lo streaming con Hub eventi allo stesso modo di Kafka. Per il codice di esempio completo, vedere il file sparkConsumer.scala su GitHub.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Se viene visualizzato un errore simile all'errore seguente, aggiungere .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") alla spark.readStream chiamata e riprovare.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Scrivere dati in Hub eventi per Kafka

È anche possibile scrivere in Hub eventi allo stesso modo in cui si scrive in Kafka. Non dimenticare di aggiornare la configurazione in modo da modificare BOOTSTRAP_SERVERS e EH_SASL con le informazioni dello spazio dei nomi di Hub eventi. Per il codice di esempio completo, vedere il file sparkProducer.scala su GitHub.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Passaggi successivi

Per altre informazioni su Hub eventi e Hub eventi per Kafka, vedere gli articoli seguenti: