Esercitazione: Usare l'API di flussi Apache Kafka in Azure HDInsight
Informazioni su come creare un'applicazione che usa l'API Apache Kafka Streams ed eseguirla con Kafka in HDInsight.
In questa esercitazione viene usata un'applicazione di conteggio delle parole. L'applicazione legge i dati di testo da un argomento Kafka, estrae singole parole e quindi archivia il conteggio delle parole in un altro argomento Kafka.
L'elaborazione del flusso Kafka viene spesso eseguita usando Apache Spark. Kafka versione 2.1.1 e 2.4.1 (in HDInsight 4.0 e 5.0) supporta l'API di Flussi Kafka. che consente di trasformare i flussi di dati tra argomenti di input e argomenti di output.
Per altre informazioni su Kafka Streams, vedere la documentazione di introduzione ai flussi su Apache.org.
In questa esercitazione apprenderai a:
- Informazioni sul codice
- Compilare e distribuire l'applicazione
- Configurare gli argomenti Kafka
- Eseguire il codice
Prerequisiti
Un cluster Kafka in HDInsight 4.0 o 5.0. Per informazioni su come creare un cluster Kafka in HDInsight, vedere il documento su come iniziare a usare Apache Kafka in HDInsight.
Completare i passaggi descritti nel documento API Apache Kafka Producer e Consumer. La procedura descritta in questo documento usa l'applicazione e gli argomenti di esempio creati in questa esercitazione.
Java Developer Kit (JDK) versione 8 o equivalente, ad esempio OpenJDK.
Apache Maven correttamente installato in base alle indicazioni di Apache. Maven è un sistema di compilazione per progetti Java.
Un client SSH. Per altre informazioni, vedere Connettersi a HDInsight (Apache Hadoop) con SSH.
Informazioni sul codice
L'applicazione di esempio si trova in https://github.com/Azure-Samples/hdinsight-kafka-java-get-started nella sottodirectory Streaming
. L'applicazione è costituita da due file:
pom.xml
: definisce le dipendenze di progetto, la versione di Java e i metodi di creazione dei pacchetti.Stream.java
: implementa la logica di flusso.
Pom.xml
Gli aspetti importanti da comprendere nel file pom.xml
sono:
Dipendenze: questo progetto si basa sull'API Kafka Streams, che è disponibile nel pacchetto
kafka-clients
. Il codice XML seguente definisce questa dipendenza:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
La voce
${kafka.version}
viene dichiarata nella sezione<properties>..</properties>
dipom.xml
ed è configurata per la versione Kafka del cluster HDInsight.Plug-in: i plug-in Maven offrono varie funzionalità. In questo progetto vengono usati i plug-in seguenti:
maven-compiler-plugin
: usato per impostare su 8 la versione di Java usata dal progetto. HDInsight 4.0 e 5.0 richiede Java 8.maven-shade-plugin
: usato per generare un file uber jar contenente questa applicazione e tutte le dipendenze. Viene inoltre usato per impostare il punto di ingresso dell'applicazione, in modo che sia possibile eseguire il file Jar direttamente senza dover specificare la classe principale.
Stream.Java
Il file Stream.java usa l'API Streams per implementare un'applicazione di conteggio delle parole. Legge i dati da un argomento Kafka denominato test
e scrive il conteggio delle parole in un argomento denominato wordcounts
.
Il codice seguente descrive l'applicazione di conteggio delle parole:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Creare e distribuire l'esempio
Per creare e distribuire il progetto in un cluster Kafka in HDInsight, seguire questa procedura:
Impostare la directory corrente sul percorso della directory
hdinsight-kafka-java-get-started-master\Streaming
e quindi usare il comando seguente per creare un pacchetto JAR:mvn clean package
Questo comando crea il pacchetto in
target/kafka-streaming-1.0-SNAPSHOT.jar
.Sostituire
sshuser
con il nome utente SSH del cluster e sostituireclustername
con il nome del cluster. Usare il comando seguente per copiare il filekafka-streaming-1.0-SNAPSHOT.jar
nel cluster HDInsight. Quando richiesto, immettere la password per l'account utente SSH.scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
Creare gli argomenti di Apache Kafka
Sostituire
sshuser
con il nome utente SSH del cluster e sostituireCLUSTERNAME
con il nome del cluster. Per aprire una connessione SSH al cluster, immettere il comando seguente. Quando richiesto, immettere la password per l'account utente SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Installare jq, un processore JSON da riga di comando. Dalla connessione SSH aperta, immettere il comando seguente per installare
jq
:sudo apt -y install jq
Configurare la variabile di password. Sostituire
PASSWORD
con la password di accesso al cluster e quindi immettere il comando:export PASSWORD='PASSWORD'
Estrarre il nome del cluster con l'uso corretto di maiuscole e minuscole. L'uso effettivo di maiuscole e minuscole nel nome del cluster può differire dal previsto, a seconda della modalità di creazione del cluster. Questo comando ottiene l'effettiva combinazione di maiuscole e minuscole e quindi la archivia in una variabile. Immettere il comando seguente:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Nota
Se si esegue questo processo dall'esterno del cluster, è disponibile una procedura diversa per l'archiviazione del nome del cluster. Recuperare il nome del cluster in lettere minuscole dal portale di Azure. Sostituire quindi
<clustername>
con il nome del cluster nel comando seguente ed eseguire il comando:export clusterName='<clustername>'
.Per ottenere gli host del broker Kafka e gli host Apache Zookeeper, usare i comandi seguenti. Quando richiesto, immettere la password dell'account (admin) di accesso al cluster.
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Nota
Questi comandi richiedono l'accesso a Ambari. Se il cluster è protetto da un gruppo NSG, eseguire questi comandi da un computer in grado di accedere ad Ambari.
Per creare gli argomenti usati dall'operazione di streaming, usare i comandi seguenti:
Nota
È possibile che venga visualizzato un errore che indica che l'argomento
test
esiste già. Non è un problema poiché potrebbe essere stato creato nell'esercitazione dell'API Producer e Consumer./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Gli argomenti vengono usati per gli scopi seguenti:
test
: in questo argomento vengono ricevuti i record. Applicazione di streaming legge i dati da questo argomento.wordcounts
: in questo argomento l'applicazione di streaming archivia l'output.RekeyedIntermediateTopic
: questo argomento viene usato per partizionare nuovamente i dati mentre il conteggio viene aggiornato dall'operatorecountByKey
.wordcount-example-Counts-changelog
: questo argomento è un archivio di stati usato dall'operazionecountByKey
.
È possibile configurare Kafka in HDInsight anche in modo che gli argomenti vengano creati automaticamente. Per altre informazioni, vedere il documento Configure automatic topic creation (Configurare la creazione automatica degli argomenti).
Eseguire il codice
Per avviare l'applicazione di streaming come processo in background, usare il comando seguente:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
È possibile che venga visualizzato un avviso su Apache
log4j
. È possibile ignorare questo avviso.Per inviare i record all'argomento
test
, usare il comando seguente per avviare l'applicazione producer:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
Al termine dell'elaborazione del producer, usare il comando seguente per visualizzare le informazioni archiviate nell'argomento
wordcounts
:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
I parametri
--property
indicano al consumer di console di stampare sia la chiave (parola) sia il numero (valore). Questo parametro configura anche il deserializzatore da usare durante la lettura dei valori da Kafka.L'output è simile al testo seguente:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
Il parametro
--from-beginning
configura il consumer in modo che venga avviato all'inizio dei record archiviati nell'argomento. Il conteggio viene incrementato ogni volta che viene rilevata una parola, pertanto l'argomento contiene più voci per ogni parola, con un numero crescente.Usare Ctrl + C per chiudere il producer. Usare ancora Ctrl + C per chiudere l'applicazione e il consumer.
Per eliminare gli argomenti usati dall'operazione di streaming, eseguire questi comandi:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Pulire le risorse
Per pulire le risorse create da questa esercitazione, eliminare il gruppo di risorse. Se si elimina il gruppo di risorse, vengono eliminati anche il cluster HDInsight associato e tutte le altre risorse correlate al gruppo di risorse.
Per rimuovere il gruppo di risorse usando il portale di Azure:
- Nel portale di Azure espandere il menu a sinistra per aprire il menu dei servizi e quindi scegliere Gruppi di risorse per visualizzare l'elenco dei gruppi di risorse.
- Individuare il gruppo di risorse da eliminare e quindi fare clic con il pulsante destro del mouse su Altro (...) a destra dell'elenco.
- Scegliere Elimina gruppo di risorse e quindi confermare.
Passaggi successivi
In questo documento si è appreso come usare l'API Apache Kafka Streams con Kafka in HDInsight. Per altre informazioni sull'uso di Kafka, vedere: