Připojení aplikace Apache Spark pomocí Azure Event Hubs
Tento kurz vás provede připojením aplikace Spark ke službě Event Hubs pro streamování v reálném čase. Tato integrace umožňuje streamování bez nutnosti měnit klienty protokolu nebo spouštět vlastní clustery Kafka nebo Zookeeper. Tento kurz vyžaduje Apache Spark verze 2.4 nebo novější a Apache Kafka verze 2.0 nebo novější.
Poznámka
Tato ukázka je dostupná na GitHubu.
V tomto kurzu se naučíte:
- Vytvoření oboru názvů služby Event Hubs
- Naklonování ukázkového projektu
- Spuštění Sparku
- Čtení ze služby Event Hubs pro ekosystém Kafka
- Zápis do služby Event Hubs pro ekosystém Kafka
Požadavky
Než začnete s tímto kurzem, ujistěte se, že máte následující:
- Předplatné Azure. Pokud žádné nemáte, vytvořte si bezplatný účet.
- Apache Spark verze 2.4
- Apache Kafka verze 2.0
- Git
Poznámka
Adaptér Spark-Kafka byl aktualizován tak, aby od Sparku v2.4 podporoval Kafka v2.0. V předchozích verzích Sparku adaptér podporoval Kafka v0.10 a novější, ale spoléhal se konkrétně na rozhraní API Kafka v0.10. Vzhledem k tomu, že Event Hubs pro ekosystém Kafka nepodporuje Kafka v0.10, nepodporuje ani adaptéry Spark-Kafka ze starších verzí Sparku než v2.4.
Vytvoření oboru názvů služby Event Hubs
K odesílání do jakékoli služby Event Hubs a příjmu z ní se vyžaduje obor názvů služby Event Hubs. Pokyny k vytvoření oboru názvů a centra událostí najdete v tématu Vytvoření centra událostí. Získejte plně kvalifikovaný název domény a připojovací řetězec služby Event Hubs pro pozdější použití. Pokyny najdete v tématu Získání připojovacího řetězce služby Event Hubs.
Naklonování ukázkového projektu
Naklonujte úložiště Azure Event Hubs a přejděte do podsložky tutorials/spark
:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
Čtení ze služby Event Hubs pro ekosystém Kafka
Stačí několik změn konfigurace a můžete začít číst ze služby Event Hubs pro ekosystém Kafka. Aktualizujte hodnoty BOOTSTRAP_SERVERS a EH_SASL s použitím podrobností z vašeho oboru názvů a můžete začít streamovat se službou Event Hubs stejně jako s Kafka. Kompletní vzorový kód najdete v souboru sparkConsumer.scala na GitHubu.
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
Pokud se zobrazí chyba podobná následující chybě, přidejte .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
do spark.readStream
volání a zkuste to znovu.
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
Zápis do služby Event Hubs pro ekosystém Kafka
Do služby Event Hubs můžete také zapisovat stejným způsobem jako do systému Kafka. Nezapomeňte aktualizovat konfiguraci a změnit hodnoty BOOTSTRAP_SERVERS a EH_SASL s použitím informací z vašeho oboru názvů služby Event Hubs. Kompletní vzorový kód najdete v souboru sparkProducer.scala na GitHubu.
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
Další kroky
Další informace o službě Event Hubs a Event Hubs pro Kafka najdete v následujících článcích: