Samouczek: używanie interfejsu API strumieni platformy Apache Kafka w usłudze Azure HDInsight
Dowiedz się, jak utworzyć aplikację, która używa interfejsu API strumieni platformy Apache Kafka, i uruchomić ją na platformie Kafka w usłudze HDInsight.
Aplikacja przedstawiona w tym samouczku zlicza przesyłane strumieniowo wyrazy. Odczytuje ona dane tekstowe z tematu platformy Kafka, wyodrębnia poszczególne wyrazy, a następnie zapisuje liczbę wyrazów w innym temacie platformy Kafka.
Przetwarzanie strumieni platformy Kafka jest często wykonywane przy użyciu platformy Apache Spark. Platforma Kafka w wersji 2.1.1 i 2.4.1 (w usługach HDInsight 4.0 i 5.0) obsługuje interfejs API platformy Kafka Strumienie. Ten interfejs API umożliwia przekształcanie strumieni danych między tematami wejściowymi i wyjściowymi.
Aby uzyskać więcej informacji o strumieniach platformy Kafka, zobacz dokumentację Intro to Streams (Wprowadzenie do strumieni) w serwisie Apache.org.
Z tego samouczka dowiesz się, jak wykonywać następujące czynności:
- Zrozumienie kodu
- Kompilowanie i wdrażanie aplikacji
- Konfigurowanie tematów platformy Kafka
- Uruchamianie kodu
Wymagania wstępne
Kafka w klastrze usługi HDInsight 4.0 lub 5.0. Aby dowiedzieć się, jak utworzyć platformę Kafka w klastrze usługi HDInsight, zobacz dokument Wprowadzenie do platformy Apache Kafka w usłudze HDInsight.
Wykonaj kroki opisane w dokumencie Apache Kafka Consumer and Producer API (Interfejs API odbiorcy i producenta na platformie Apache Kafka). Czynności opisane w tym dokumencie bazują na przykładowej aplikacji i tematach utworzonych w tym samouczku.
Zestaw Java Developer Kit (JDK) w wersji 8 lub odpowiednik, taki jak OpenJDK.
Narzędzie Apache Maven poprawnie zainstalowane zgodnie z apache. Maven to system kompilacji projektu dla projektów Java.
Klient SSH. Aby uzyskać więcej informacji, zobacz Łączenie się z usługą HDInsight (Apache Hadoop) przy użyciu protokołu SSH.
Zrozumienie kodu
Przykładowa aplikacja znajduje się pod adresem https://github.com/Azure-Samples/hdinsight-kafka-java-get-started w podkatalogu Streaming
. Aplikacja składa się z dwóch plików:
pom.xml
: w tym pliku są definiowane zależności projektu, wersja języka Java i metody pakowania.Stream.java
: ten plik implementuje logikę przesyłania strumieniowego.
Pom.xml
Należy zrozumieć następujące ważne kwestie dotyczące pliku pom.xml
:
Zależności: ten projekt bazuje na interfejsie API strumieni platformy Kafka, który jest udostępniany w pakiecie
kafka-clients
. Ta zależność jest definiowana przez następujący kod XML:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
Wpis
${kafka.version}
jest zadeklarowany w sekcji<properties>..</properties>
plikupom.xml
i jest skonfigurowany zgodnie z wersją platformy Kafka znajdującą się w klastrze usługi HDInsight.Wtyczki: wtyczki Maven zapewniają różne możliwości. W tym projekcie są używane następujące wtyczki:
maven-compiler-plugin
: służy do ustawiania wersji 8 języka Java używanej przez projekt. Usługi HDInsight 4.0 i 5.0 wymagają środowiska Java 8.maven-shade-plugin
: służy do generowania pliku jar uber zawierającego tę aplikację i wszelkich zależności. Służy również do ustawiania punktu wejścia aplikacji, dzięki czemu można bezpośrednio uruchomić plik Jar bez konieczności określania klasy głównej.
Stream.java
Plik Stream.java używa interfejsu API strumieni do zaimplementowania aplikacji liczącej wyrazy. Odczytuje on dane z tematu Kafka o nazwie test
i zapisuje liczbę wyrazów w temacie o nazwie wordcounts
.
Poniższy kod definiuje aplikację do zliczania wyrazów:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Kompilowanie i wdrażanie przykładu
Aby skompilować i wdrożyć projekt na platformie Kafka w klastrze usługi HDInsight, wykonaj następujące kroki:
Ustaw bieżący katalog na lokalizację
hdinsight-kafka-java-get-started-master\Streaming
katalogu, a następnie użyj następującego polecenia, aby utworzyć pakiet jar:mvn clean package
To polecenie tworzy pakiet w lokalizacji
target/kafka-streaming-1.0-SNAPSHOT.jar
.Zamień ciąg
sshuser
na nazwę użytkownika SSH klastra i zamień ciągclustername
na nazwę klastra. Użyj następującego polecenia, aby skopiować plik do klastrakafka-streaming-1.0-SNAPSHOT.jar
usługi HDInsight. Jeśli zostanie wyświetlony monit, wprowadź hasło konta użytkownika SSH.scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
Tworzenie tematów platformy Apache Kafka
Zamień ciąg
sshuser
na nazwę użytkownika SSH klastra i zamień ciągCLUSTERNAME
na nazwę klastra. Otwórz połączenie SSH z klastrem, wprowadzając następujące polecenie. Jeśli zostanie wyświetlony monit, wprowadź hasło konta użytkownika SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Zainstaluj procesor jq, wiersz polecenia JSON. W otwartym połączeniu SSH wprowadź następujące polecenie, aby zainstalować program
jq
:sudo apt -y install jq
Konfigurowanie zmiennej hasła. Zastąp
PASSWORD
ciąg hasłem logowania klastra, a następnie wprowadź polecenie:export PASSWORD='PASSWORD'
Wyodrębnij poprawnie przypadek nazwy klastra. Rzeczywista wielkość liter nazwy klastra może być inna niż oczekiwano, w zależności od sposobu utworzenia klastra. To polecenie uzyskuje rzeczywistą wielkość liter, a następnie przechowuje ją w zmiennej. Podaj następujące polecenie:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Uwaga
Jeśli wykonujesz ten proces spoza klastra, istnieje inna procedura przechowywania nazwy klastra. Pobierz nazwę klastra w małym przypadku z witryny Azure Portal. Następnie zastąp nazwę klastra w
<clustername>
następującym poleceniu i wykonaj ją:export clusterName='<clustername>'
.Aby uzyskać hosty brokera platformy Kafka i hosty usługi Apache Zookeeper, użyj poniższych poleceń. Po wyświetleniu monitu wprowadź hasło dla konta logowania klastra (administratora).
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Uwaga
Te polecenia wymagają dostępu systemu Ambari. Jeśli klaster znajduje się za sieciową grupą zabezpieczeń, uruchom te polecenia z maszyny, która może uzyskać dostęp do systemu Ambari.
Aby utworzyć tematy używane przez operację przesyłania strumieniowego, użyj następujących poleceń:
Uwaga
Może zostać wyświetlony błąd z informacją, że temat
test
już istnieje. Nie stanowi to problemu, ponieważ ten temat mógł zostać utworzony w samouczku dotyczącym interfejsu API producenta i odbiorcy./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Tematy są używane do następujących celów:
test
: w tym temacie są odbierane rekordy. Aplikacja do przesyłania strumieniowego odczytuje dane z tego tematu.wordcounts
: w tym temacie aplikacja do przesyłania strumieniowego przechowuje swoje dane wyjściowe.RekeyedIntermediateTopic
: w tym temacie zachodzi ponowne dzielenie danych, ponieważ liczba wyrazów jest aktualizowana za pomocą operatoracountByKey
.wordcount-example-Counts-changelog
: ten temat jest magazynem stanów używanym przez operacjęcountByKey
Platformę Kafka w usłudze HDInsight można również skonfigurować w taki sposób, aby automatycznie tworzyła tematy. Aby uzyskać więcej informacji, zobacz dokument Configure automatic topic creation (Konfigurowanie automatycznego tworzenia tematów).
Uruchamianie kodu
Aby uruchomić aplikację do przesyłania strumieniowego jako proces w tle, użyj następującego polecenia:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
Może zostać wyświetlone ostrzeżenie dotyczące platformy Apache
log4j
. To ostrzeżenie można zignorować.Aby wysyłać rekordy do tematu
test
, użyj następującego polecenia w celu uruchomienia aplikacji producenta:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
Po zakończeniu działania producenta użyj następującego polecenia, aby wyświetlić informacje przechowywane w temacie
wordcounts
:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
Dzięki użyciu parametrów
--property
odbiorca konsoli drukuje zarówno klucz (wyraz), jak i liczbę (wartość). Ten parametr konfiguruje również deserializatora do użycia podczas odczytu tych wartości z platformy Kafka.Dane wyjściowe będą podobne do następującego tekstu:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
Parametr
--from-beginning
konfiguruje odbiorcę, aby rozpoczął przetwarzanie od początku rekordów przechowywanych w temacie. Liczba wystąpień zwiększa się każdorazowo po napotkaniu wyrazu, dlatego temat zawiera wiele pozycji dla każdego wyrazu ze zwiększającą się liczbą wystąpień.Użyj klawiszy Ctrl + C, aby zakończyć działanie producenta. Podobnie użyj klawiszy Ctrl + C, aby zakończyć działanie aplikacji i odbiorcy.
Aby usunąć tematy używane przez operację przesyłania strumieniowego, użyj następujących poleceń:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Czyszczenie zasobów
Aby wyczyścić zasoby utworzone w tym samouczku, możesz usunąć grupę zasobów. Usunięcie grupy zasobów powoduje również usunięcie skojarzonego klastra usługi HDInsight i wszystkich innych zasobów skojarzonych z tą grupą zasobów.
Aby usunąć grupę zasobów za pomocą witryny Azure Portal:
- W witrynie Azure Portal rozwiń menu po lewej stronie, aby otworzyć menu usług, a następnie wybierz pozycję Grupy zasobów, aby wyświetlić listę grup zasobów.
- Znajdź grupę zasobów do usunięcia, a następnie kliknij prawym przyciskiem myszy przycisk Więcej (...) po prawej stronie listy.
- Wybierz pozycję Usuń grupę zasobów i potwierdź.
Następne kroki
W tym dokumencie zawarto informacje o sposobie korzystania z interfejsu API strumieni platformy Apache Kafka w usłudze HDInsight. Skorzystaj z poniższych informacji, aby dowiedzieć się więcej na temat pracy z platformą Kafka.