Zelfstudie: Apache Kafka streams-API gebruiken in Azure HDInsight
Leer hoe u een toepassing maakt die gebruikmaakt van de Streams-API van Apache Kafka en hoe u deze uitvoert met Kafka in HDInsight.
De voorbeeldtoepassing die wordt gebruikt in deze zelfstudie, is een app voor het tellen van woorden die via een stream worden aangeboden. Eerst worden er tekstgegevens gelezen uit een onderwerp van Kafka, vervolgens worden afzonderlijke woorden uitgepakt en ten slotte worden de woorden en het aantal woorden opgeslagen in een ander Kafka-onderwerp.
Kafka-stroomverwerking wordt vaak uitgevoerd met Behulp van Apache Spark. Kafka versie 2.1.1 en 2.4.1 (in HDInsight 4.0 en 5.0) ondersteunt de Kafka Streams-API. Met deze API kunt u gegevensstromen transformeren tussen invoer- en uitvoeronderwerpen.
Meer informatie over de Streams-API van Kafka vindt u in het Engelstalige artikel Intro to Streams op Apache.org.
In deze zelfstudie leert u het volgende:
- De code begrijpen
- De toepassing compileren en implementeren
- Kafka onderwerpen configureren
- De code uitvoeren
Vereisten
Een Kafka-cluster in HDInsight 4.0 of 5.0. Zie Aan de slag met Apache Kafka in HDInsight voor informatie over het maken van een Kafka-cluster in HDInsight.
Voer de stappen in het document Consumer- en Producer-API's van Apache Kafka uit. In de stappen in dit document worden de voorbeeldtoepassing en onderwerpen gebruikt die in deze zelfstudie zijn gemaakt.
JDK-versie 8 (Java Developer Kit) of een equivalent, zoals OpenJDK.
Apache Maven correct geïnstalleerd volgens Apache. Maven is een systeem voor het bouwen van Java-projecten.
Een SSH-client. Zie voor meer informatie Verbinding maken met HDInsight (Apache Hadoop) via SSH.
De code begrijpen
De voorbeeldtoepassing bevindt zich op https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, in de submap Streaming
. De toepassing bestaat uit twee bestanden:
pom.xml
: dit bestand definieert de projectafhankelijkheden, de Java-versie en de pakketmethoden.Stream.java
: dit bestand implementeert de streaming-logica.
Pom.xml
Belangrijke aandachtspunten voor het bestand pom.xml
:
Afhankelijkheden: dit project is afhankelijk van de Kafka-API Streams, die wordt geleverd door het pakket
kafka-clients
. Deze afhankelijkheid wordt gedefinieerd met de volgende XML-code:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
De vermelding
${kafka.version}
wordt gedeclareerd in de sectie<properties>..</properties>
vanpom.xml
, en wordt geconfigureerd voor de Kafka-versie van het HDInsight-cluster.Plugins: de Maven-plugins bieden diverse mogelijkheden. In dit project worden de volgende plugins of invoegtoepassingen gebruikt:
maven-compiler-plugin
: wordt gebruikt om de Java-versie die wordt gebruikt door het project in te stellen op 8. HDInsight 4.0 en 5.0 vereist Java 8.maven-shade-plugin
: Wordt gebruikt voor het genereren van een uber jar die deze toepassing en eventuele afhankelijkheden bevat. Dit bestand wordt ook gebruikt om het toegangspunt van de toepassing in te stellen, zodat u het Jar-bestand rechtstreeks kunt uitvoeren, dus zonder de hoofdklasse op te geven.
Stream.java
Het bestand Stream.java gebruikt de Streams-API voor het implementeren van een toepassing voor het tellen van woorden. De toepassing leest gegevens uit een Kafka-onderwerp met de naam test
en schrijft het gelezen aantal woorden naar een onderwerp met de naam wordcounts
.
Met de volgende code wordt de toepassing gedefinieerd:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Het voorbeeld compileren en implementeren
Als u het project wilt implementeren in het Kafka-cluster in HDInsight, voert u de volgende stappen uit:
Stel uw huidige mappen in op de locatie van de map
hdinsight-kafka-java-get-started-master\Streaming
en gebruik vervolgens deze opdracht om een JAR-pakket te maken:mvn clean package
Met deze opdracht maakt u het pakket op
target/kafka-streaming-1.0-SNAPSHOT.jar
.Vervang
sshuser
door de SSH-gebruiker voor uw cluster enclustername
door de naam van het cluster. Gebruik de volgende opdracht om het bestandkafka-streaming-1.0-SNAPSHOT.jar
naar uw HDInsight-cluster te kopiëren. Voer het wachtwoord voor het SSH-gebruikersaccount in wanneer hierom wordt gevraagd.scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
Apache Kafka-onderwerpen maken
Vervang
sshuser
door de SSH-gebruiker voor uw cluster enCLUSTERNAME
door de naam van het cluster. Gebruik de volgende opdracht om een SSH-verbinding naar het cluster te openen. Voer het wachtwoord voor het SSH-gebruikersaccount in wanneer hierom wordt gevraagd.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Installeer jq, een opdrachtregel-JSON-processor. Voer in de open SSH-verbinding de volgende opdracht in om
jq
te installeren:sudo apt -y install jq
wachtwoordvariabele instellen. Vervang
PASSWORD
door het aanmeldwachtwoord voor het cluster en voer de volgende opdracht in:export PASSWORD='PASSWORD'
extraheer de clusternaam met de juiste letters. De daadwerkelijke lettergrootte van de clusternaam kan anders zijn dan verwacht, afhankelijk van hoe het cluster is gemaakt. Met deze opdracht wordt de werkelijke behuizing verkregen en vervolgens opgeslagen in een variabele. Voer de volgende opdracht in:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Notitie
Als u dit proces van buiten het cluster uitvoert, is er een andere procedure voor het opslaan van de clusternaam. Haal de clusternaam op in kleine letters uit de Azure-portal. Vervang vervolgens de clusternaam voor
<clustername>
in de volgende opdracht en voer deze uit:export clusterName='<clustername>'
.Gebruik de volgende opdrachten als u de Kafka-brokerhosts en de Apache Zookeeper-hosts wilt opvragen. Voer desgevraagd het wachtwoord voor het account voor clusteraanmelding (admin).
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Notitie
Voor deze opdrachten is toegang tot Ambari vereist. Als uw cluster zich achter een NSG bevindt, voert u deze opdrachten uit vanaf een computer die toegang heeft tot Ambari.
Gebruik de volgende opdrachten om de onderwerpen te maken die worden gebruikt door de streaming-bewerking:
Notitie
Er kan een foutbericht worden weergegeven dat het onderwerp
test
al bestaat. Dit is geen probleem omdat het onderwerp mogelijk in de zelfstudie over de Producer- en Consumer-API's van Apache Kafka is gemaakt./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
De onderwerpen worden gebruikt voor de volgende doeleinden:
test
: dit is het onderwerp waarin records worden ontvangen. De streaming-toepassing leest uit dit onderwerp.wordcounts
: dit is het onderwerp waarin de uitvoer van de streaming-toepassing wordt opgeslagen.RekeyedIntermediateTopic
: dit onderwerp wordt gebruikt voor het opnieuw partitioneren van gegevens wanneer het aantal woorden wordt bijgewerkt door de operatorcountByKey
.wordcount-example-Counts-changelog
: in dit onderwerp worden statuswaarden opgeslagen die worden gebruikt door de bewerkingcountByKey
.
Kafka in HDInsight kan ook worden geconfigureerd voor het automatisch maken van onderwerpen. Zie How to configure Apache Kafka on HDInsight to automatically create topics (Apache Kafka in HDInsight configureren voor het automatisch maken van onderwerpen) voor meer informatie.
De code uitvoeren
Gebruik de volgende opdracht om de streaming-toepassing te starten als een achtergrondproces:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
Mogelijk krijgt u een waarschuwing over Apache
log4j
. U kunt deze waarschuwing negeren.Als u records wilt verzenden naar het onderwerp
test
, gebruikt u de volgende opdracht gebruiken om de Producer-toepassing te starten:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
Zodra de toepassing is voltooid, gebruikt u de volgende opdracht om de informatie weer te geven die is opgeslagen in het onderwerp
wordcounts
:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
De parameters
--property
geven de Consumer-API opdracht om de key (het woord) samen met de count (waarde) weer te geven. Deze parameter configureert ook welke deserializer moet worden gebruikt bij het lezen van deze waarden uit Kafka.De uitvoer lijkt op het volgende:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
De parameter
--from-beginning
configureert de Consumer om te beginnen bij het begin van de records die zijn opgeslagen in het onderwerp. Het aantal wordt met elk ontvangen woord opgehoogd, zodat het onderwerp meerdere vermeldingen voor elk woord bevat, met een oplopend aantal.Gebruik Ctrl+C om de Producer af te sluiten. Blijf op Ctrl+C drukken om de toepassing en de Consumer af te sluiten.
Gebruik de volgende opdrachten om de onderwerpen te verwijderen die worden gebruikt door de streaming-bewerking:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Resources opschonen
Als u de in deze zelfstudie gemaakte resources wilt opschonen, kunt u de resourcegroep verwijderen. Als u de resourcegroep verwijdert, worden ook het bijbehorende HDInsight-cluster en eventuele andere resources die aan de resourcegroep zijn gekoppeld, verwijderd.
Ga als volgt te werk om de resourcegroep te verwijderen in Azure Portal:
- Vouw het menu aan de linkerkant in Azure Portal uit om het menu met services te openen en kies Resourcegroepen om de lijst met resourcegroepen weer te geven.
- Zoek de resourcegroep die u wilt verwijderen en klik met de rechtermuisknop op de knop Meer (... ) aan de rechterkant van de vermelding.
- Selecteer Resourcegroep verwijderen en bevestig dit.
Volgende stappen
In dit document hebt u geleerd hoe u de Streams-API van Apache Kafka gebruikt met Kafka in HDInsight. Gebruik de volgende documenten voor meer informatie over het werken met Kafka.