Kurz: Použití rozhraní API streamů Apache Kafka ve službě Azure HDInsight
Zjistěte, jak vytvořit aplikaci, která používá rozhraní Apache Kafka Toky API, a spustit ji se systémem Kafka ve službě HDInsight.
Aplikace použitá v tomto kurzu počítá slova v datovém proudu. Přečte textová data z tématu Kafka, extrahuje jednotlivá slova a pak uloží slova a jejich počet do jiného tématu Kafka.
Zpracování datových proudů Kafka se často provádí pomocí Apache Sparku. Kafka verze 2.1.1 a 2.4.1 (v HDInsight 4.0 a 5.0) podporuje rozhraní Kafka Toky API. Toto rozhraní API umožňuje transformovat datové proudy mezi vstupními a výstupními tématy.
Další informace o datových proudech Kafka najdete v úvodní dokumentaci k datovým proudům na webu Apache.org.
V tomto kurzu se naučíte:
- Vysvětlení kódu
- Sestavení a nasazení aplikace
- Konfigurace témat Kafka
- Spuštění kódu
Požadavky
Kafka v clusteru HDInsight 4.0 nebo 5.0 Informace o vytvoření Kafka v clusteru HDInsight najdete v dokumentu Začínáme s Apache Kafka ve službě HDInsight .
Dokončete kroky v dokumentu rozhraní Apache Kafka Consumer and Producer API . Kroky v tomto dokumentu používají ukázkovou aplikaci a témata vytvořená v tomto kurzu.
Java Developer Kit (JDK) verze 8 nebo ekvivalentní, například OpenJDK.
Apache Maven je správně nainstalovaný podle Apache. Maven je systém sestavení projektu pro projekty Java.
Klient SSH. Další informace najdete v tématu Připojení ke službě HDInsight (Apache Hadoop) pomocí SSH.
Vysvětlení kódu
Ukázková aplikace se nachází na adrese https://github.com/Azure-Samples/hdinsight-kafka-java-get-started v podadresáři Streaming
. Aplikace se skládá ze dvou souborů:
pom.xml
: Tento soubor definuje závislosti projektu, verzi Javy a metody balení.Stream.java
: Tento soubor implementuje logiku streamování.
Pom.xml
V souboru pom.xml
je důležité porozumět následujícímu:
Závislosti: Tento projekt spoléhá na rozhraní Kafka Streams API, které je součástí balíčku
kafka-clients
. Tuto závislost definuje následující kód XML:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
Položka
${kafka.version}
se deklaruje v části<properties>..</properties>
souborupom.xml
a je nakonfigurovaná na verzi systému Kafka v clusteru HDInsight.Moduly plug-in: Moduly plug-in Mavenu poskytují různé funkce. V tomto projektu se používají následující moduly plug-in:
maven-compiler-plugin
: Slouží k nastavení verze Javy, kterou projekt používá, na 8. HDInsight 4.0 a 5.0 vyžadují Javu 8.maven-shade-plugin
: Slouží k vygenerování souboru JAR uber, který obsahuje tuto aplikaci, a všech závislostí. Používá se také k nastavení vstupního bodu aplikace, abyste mohli přímo spustit soubor Jar, aniž byste museli zadávat hlavní třídu.
Stream.java
Soubor Stream.java pomocí rozhraní Streams API implementuje aplikaci počítání slov. Čte data z tématu Kafka test
a zapisuje počty slov do tématu wordcounts
.
Následující kód definuje aplikaci počítání slov:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Sestavení a nasazení příkladu
Pokud chcete sestavit a nasadit projekt do clusteru Kafka ve službě HDInsight, postupujte následovně:
Nastavte aktuální adresář na umístění
hdinsight-kafka-java-get-started-master\Streaming
adresáře a pak pomocí následujícího příkazu vytvořte balíček JAR:mvn clean package
Tento příkaz vytvoří balíček v umístění
target/kafka-streaming-1.0-SNAPSHOT.jar
.Místo
sshuser
použijte jméno uživatele SSH pro váš cluster a místoclustername
zadejte název clusteru. Pomocí následujícího příkazu zkopírujtekafka-streaming-1.0-SNAPSHOT.jar
soubor do clusteru HDInsight. Pokud se zobrazí výzva, zadejte heslo uživatelského účtu SSH.scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
Témata týkající se vytváření Apache Kafka
Místo
sshuser
použijte jméno uživatele SSH pro váš cluster a místoCLUSTERNAME
zadejte název clusteru. Otevřete připojení SSH ke clusteru zadáním následujícího příkazu. Pokud se zobrazí výzva, zadejte heslo uživatelského účtu SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Nainstalujte jq, procesor JSON příkazového řádku. Z otevřeného připojení SSH zadejte následující příkaz, který chcete nainstalovat
jq
:sudo apt -y install jq
Nastavte proměnnou hesla. Nahraďte
PASSWORD
přihlašovacím heslem clusteru a pak zadejte příkaz:export PASSWORD='PASSWORD'
Extrahujte název clusteru se správnými písmeny. Skutečná velikost výskytu názvu clusteru se může lišit od očekávání podle toho, jak byl cluster vytvořen. Tento příkaz získá skutečné velikostí a pak ho uloží do proměnné. Zadejte tento příkaz:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Poznámka:
Pokud tento proces provádíte mimo cluster, existuje jiný postup pro uložení názvu clusteru. Název clusteru získáte v malých písmenech z webu Azure Portal. Potom nahraďte název
<clustername>
clusteru následujícím příkazem a spusťte ho:export clusterName='<clustername>'
.K získání hostitelů zprostředkovatele Kafka a hostitelů Apache Zookeeper použijte následující příkazy. Po zobrazení výzvy zadejte heslo pro účet přihlášení clusteru (admin).
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Poznámka:
Tyto příkazy vyžadují přístup Ambari. Pokud je váš cluster za skupinou zabezpečení sítě, spusťte tyto příkazy z počítače, který má přístup k Ambari.
K vytvoření témat, která používá operace streamování, použijte následující příkazy:
Poznámka:
Možná se zobrazí chyba, protože téma
test
již existuje. To je v pořádku, protože se mohlo vytvořit v kurzu k rozhraní Producer and Consumer API./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Témata slouží k následujícím účelům:
test
: V tomto tématu se přijímají záznamy. Odtud čte data aplikace streamování.wordcounts
: Do tohoto tématu aplikace streamování ukládá výstup.RekeyedIntermediateTopic
: Toto téma slouží k opětovnému rozdělení dat při aktualizaci počtu pomocí operátorucountByKey
.wordcount-example-Counts-changelog
: Toto téma používá operacecountByKey
jako úložiště stavu.
Systém Kafka ve službě HDInsight je také možné nakonfigurovat tak, aby vytvářel témata automaticky. Další informace najdete v dokumentu Konfigurace automatického vytváření témat.
Spuštění kódu
Pokud chcete aplikaci streamování spustit jako proces na pozadí, použijte následující příkaz:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
Může se zobrazit upozornění na Apache
log4j
. Toto upozornění můžete ignorovat.K odesílání záznamů do tématu
test
použijte následující příkaz, který spustí aplikaci producenta:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
Jakmile bude producent hotový, pomocí následujícího příkazu zobrazte informace uložené v tématu
wordcounts
:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
Parametry
--property
říkají konzole konzumenta, aby vytiskla klíč (slovo) společně s počtem (hodnota). Tento parametr také konfiguruje deserializátor, který se použije při čtení těchto hodnot ze systému Kafka.Výstup se bude podobat následujícímu:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
Parametr
--from-beginning
konfiguruje konzumenta tak, aby začal číst záznamy uložené v tématu od začátku. Počet se zvýší při každém zjištění slova, takže téma obsahuje pro každé slovo několik záznamů s rostoucím počtem.Producenta ukončíte stisknutím Ctrl+C. Pokračujte a pomocí Ctrl + C ukončete aplikaci i konzumenta.
Pokud chcete odstranit témata používaná operací streamování, použijte následující příkazy:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Vyčištění prostředků
Pokud chcete vyčistit prostředky vytvořené v tomto kurzu, můžete odstranit skupinu prostředků. Odstraněním skupiny prostředků odstraníte také přidružený cluster HDInsight a všechny další prostředky, které jsou k příslušné skupině prostředků přidružené.
Odebrání skupiny prostředků pomocí webu Azure Portal:
- Na webu Azure Portal rozbalením nabídky na levé straně otevřete nabídku služeb a pak zvolte Skupiny prostředků. Zobrazí se seznam skupin prostředků.
- Vyhledejte skupinu prostředků, kterou chcete odstranit, a klikněte pravým tlačítkem na tlačítko Další (...) na pravé straně seznamu.
- Vyberte Odstranit skupinu prostředků a potvrďte tuto akci.
Další kroky
V tomto dokumentu jste zjistili, jak používat rozhraní Apache Kafka Toky API se systémem Kafka ve službě HDInsight. Další informace o práci se systémem Kafka najdete v následujícím tématu.