Kurz: Použití rozhraní Apache Kafka Producer and Consumer API
Zjistěte, jak používat rozhraní Apache Kafka Producer and Consumer API se systémem Kafka ve službě HDInsight.
Rozhraní Kafka Producer API umožňuje aplikacím odesílat datové proudy do clusteru Kafka. Rozhraní Kafka Consumer API umožňuje aplikacím číst datové proudy z clusteru.
V tomto kurzu se naučíte:
- Požadavky
- Vysvětlení kódu
- Sestavení a nasazení aplikace
- Spuštění aplikace v clusteru
Další informace o rozhraních API najdete v dokumentaci k rozhraní Producer API a Consumer API na webu Apache.
Požadavky
- Apache Kafka v clusteru HDInsight. Informace o vytvoření clusteru najdete v tématu Začínáme s Apache Kafka ve službě HDInsight.
- Sada Java Developer Kit (JDK) verze 8 nebo ekvivalentní sada, například OpenJDK.
- Apache Maven je správně nainstalovaný podle Apache. Maven je systém sestavování projektů pro projekty Java.
- Klient SSH, jako je Putty. Další informace najdete v tématu Připojení ke službě HDInsight (Apache Hadoop) pomocí SSH.
Vysvětlení kódu
Ukázková aplikace se nachází na adrese https://github.com/Azure-Samples/hdinsight-kafka-java-get-started v podadresáři Producer-Consumer
. Pokud používáte cluster Kafka s podporou balíčku enterprise security Package (ESP), měli byste použít verzi aplikace umístěnou DomainJoined-Producer-Consumer
v podadresáři.
Aplikace se skládá primárně ze čtyř souborů:
-
pom.xml
: Tento soubor definuje závislosti projektu, verzi Javy a metody balení. -
Producer.java
: Tento soubor pomocí rozhraní Producer API odesílá do systému Kafka náhodné věty. -
Consumer.java
: Tento soubor pomocí rozhraní Consumer API čte data ze systému Kafka a posílá je do výstupu STDOUT. -
AdminClientWrapper.java
: Tento soubor používá rozhraní API pro správu k vytváření, popisu a odstraňování témat Kafka. -
Run.java
: Rozhraní příkazového řádku, které slouží ke spuštění kódu producenta a konzumenta.
Pom.xml
V souboru pom.xml
je důležité porozumět následujícímu:
Závislosti: Tento projekt spoléhá na rozhraní Kafka Producer and Consumer API, která jsou součástí balíčku
kafka-clients
. Tuto závislost definuje následující kód XML:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
Položka
${kafka.version}
se deklaruje v části<properties>..</properties>
souborupom.xml
a je nakonfigurovaná na verzi systému Kafka v clusteru HDInsight.Moduly plug-in: Moduly plug-in Mavenu poskytují různé funkce. V tomto projektu se používají následující moduly plug-in:
-
maven-compiler-plugin
: Slouží k nastavení verze Javy, kterou projekt používá, na 8. To je verze Javy, kterou používá HDInsight 3.6. -
maven-shade-plugin
: Slouží k vygenerování souboru JAR, který obsahuje tuto aplikaci i všechny závislosti. Používá se také k nastavení vstupního bodu aplikace, abyste mohli přímo spustit soubor JAR bez nutnosti zadávat hlavní třídu.
-
Producer.java
Producent komunikuje s hostiteli zprostředkovatelů Kafka (pracovní uzly) a odesílá data do tématu Kafka. Následující fragment kódu pochází ze souboru Producer.java z úložiště GitHub a ukazuje, jak nastavit vlastnosti producenta. Pro clustery s povoleným zabezpečením podniku je nutné přidat další vlastnost properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
Konzument komunikuje s hostiteli zprostředkovatelů Kafka (pracovní uzly) a ve smyčce čte záznamy. Následující fragment kódu ze souboru Consumer.java nastaví vlastnosti příjemce. Pro clustery s povoleným zabezpečením podniku je nutné přidat další vlastnost properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
V tomto kódu je konzument nakonfigurovaný tak, aby četl od začátku tématu (hodnota auto.offset.reset
je nastavená na earliest
).
Run.java
Soubor Run.java poskytuje rozhraní příkazového řádku, které spouští kód producenta nebo příjemce. Jako parametr je potřeba zadat informace o hostiteli zprostředkovatele Kafka. Volitelně můžete zahrnout hodnotu ID skupiny, kterou používá proces příjemce. Pokud vytvoříte více instancí příjemců pomocí stejného ID skupiny, budou vyrovnávat zatížení čtení z tématu.
Sestavení a nasazení příkladu
Použití předdefinovaných souborů JAR
Stáhněte si soubory JAR z ukázky Kafka Začínáme s Azure. Pokud je u vašeho clusteru povolený balíček zabezpečení podniku (ESP), použijte kafka-producer-consumer-esp.jar. Pomocí následujícího příkazu zkopírujte soubory JAR do clusteru.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Sestavení souborů JAR z kódu
Pokud chcete tento krok přeskočit, můžete si z Prebuilt-Jars
podadresáře stáhnout předem připravené soubory JAR. Stáhněte soubor kafka-producer-consumer.jar. Pokud je u vašeho clusteru povolený balíček zabezpečení podniku (ESP), použijte kafka-producer-consumer-esp.jar. Spuštěním kroku 3 zkopírujte soubor JAR do clusteru HDInsight.
Stáhněte a extrahujte příklady z https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.
Nastavte aktuální adresář na umístění
hdinsight-kafka-java-get-started\Producer-Consumer
adresáře. Pokud používáte cluster Kafka s podporou enterprise security package (ESP), měli byste umístění nastavit naDomainJoined-Producer-Consumer
podadresář. K sestavení aplikace použijte následující příkaz:mvn clean package
Tento příkaz vytvoří adresář s názvem
target
, který bude obsahovat soubor s názvemkafka-producer-consumer-1.0-SNAPSHOT.jar
. V případě clusterů ESP bude souborkafka-producer-consumer-esp-1.0-SNAPSHOT.jar
Místo
sshuser
použijte jméno uživatele SSH pro váš cluster a místoCLUSTERNAME
zadejte název clusteru. Zadáním následujícího příkazu zkopírujte soubor do clusterukafka-producer-consumer-1.0-SNAPSHOT.jar
HDInsight. Po zobrazení výzvy zadejte heslo uživatele SSH.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Spuštění ukázky
Místo
sshuser
použijte jméno uživatele SSH pro váš cluster a místoCLUSTERNAME
zadejte název clusteru. Zadáním následujícího příkazu otevřete připojení SSH ke clusteru. Pokud se zobrazí výzva, zadejte heslo uživatelského účtu SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Pokud chcete získat hostitele zprostředkovatele Kafka, v následujícím příkazu nahraďte hodnoty a
<clustername>
<password>
a spusťte je. Pro použijte stejná písmena<clustername>
, jako je znázorněno na Azure Portal. Nahraďte<password>
přihlašovacím heslem clusteru a pak spusťte:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Poznámka
Tento příkaz vyžaduje přístup Ambari. Pokud je váš cluster za skupinou zabezpečení sítě, spusťte tento příkaz z počítače, který má přístup k Ambari.
Zadáním následujícího příkazu vytvořte téma
myTest
Kafka:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
Pokud chcete spustit producenta a zapsat data do tématu, použijte následující příkaz:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
Jakmile bude producent hotový, pomocí následujícího příkazu zahajte čtení z tématu:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Zobrazí se počet načtených záznamů spolu s celkovým počtem.
Konzumenta ukončíte stisknutím Ctrl+C.
Víc současných konzumentů
Konzumenti Kafka při čtení záznamů používají skupiny konzumentů. Výsledkem použití skupiny s více konzumenty je vyvážení zatížení při čtení záznamů z tématu. Každý konzument ze skupiny obdrží určitou část záznamů.
Aplikace konzumenta přijímá parametr, který se použije jako ID skupiny. Například následující příkaz spustí konzumenta s použitím ID skupiny myGroup
:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Konzumenta ukončíte stisknutím Ctrl+C.
Pokud chcete vidět tento proces v akci, použijte následující příkaz:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Tento příkaz pomocí tmux
rozdělí terminál do dvou sloupců. V obou sloupcích je spuštěný konzument se stejnou hodnotou ID skupiny. Jakmile konzumenti dokončí čtení, všimněte si, že oba přečetli pouze část záznamů. Stisknutím kombinace kláves Ctrl+C ukončete tmux
.
Konzumace klienty ze stejné skupiny se realizuje rozdělením tématu na oddíly. V tomto vzorovém kódu má dříve vytvořené téma test
osm oddílů. Pokud spustíte osm konzumentů, každý z nich bude číst záznamy z jednoho oddílu tématu.
Důležité
Ve skupině příjemců nemůže být víc instancí konzumentů než má téma oddílů. V tomto příkladu může skupina konzumentů obsahovat až osm konzumentů, protože to je počet oddílů tématu. Nebo můžete mít více skupin konzumentů, každou s maximálně osmi konzumenty.
Záznamy uložené v systému Kafka se ukládají v pořadí, v jakém jsou přijaty v rámci oddílu. Pro dosažení doručování záznamů ve správném pořadí v rámci oddílu vytvořte skupinu příjemců, ve které bude počet instancí konzumentů odpovídat počtu oddílů. Pro dosažení doručování záznamů ve správném pořadí v rámci tématu vytvořte skupinu obsahující pouze jednu instanci konzumenta.
Běžné problémy
Selhání vytváření tématu Pokud je v clusteru povolená sada Zabezpečení podniku, použijte předem připravené soubory JAR pro producenta a příjemce. Soubor JAR ESP lze sestavit z kódu v
DomainJoined-Producer-Consumer
podadresáři. Vlastnosti producenta a příjemce mají pro clustery s podporou ESP další vlastnostCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
.Selhání v clusterech s podporou ESP: Pokud operace vytváření a využívání selžou a používáte cluster s podporou ESP, zkontrolujte, jestli je uživatel
kafka
přítomný ve všech zásadách Ranger. Pokud není k dispozici, přidejte ji do všech zásad Ranger.
Vyčištění prostředků
Pokud chcete vyčistit prostředky vytvořené v tomto kurzu, můžete odstranit skupinu prostředků. Odstraněním skupiny prostředků odstraníte také přidružený cluster HDInsight a všechny další prostředky, které jsou k příslušné skupině prostředků přidružené.
Odebrání skupiny prostředků pomocí webu Azure Portal:
- Na webu Azure Portal rozbalením nabídky na levé straně otevřete nabídku služeb a pak zvolte Skupiny prostředků. Zobrazí se seznam skupin prostředků.
- Vyhledejte skupinu prostředků, kterou chcete odstranit, a klikněte pravým tlačítkem na tlačítko Další (...) na pravé straně seznamu.
- Vyberte Odstranit skupinu prostředků a potvrďte tuto akci.
Další kroky
V tomto dokumentu jste zjistili, jak používat rozhraní Apache Kafka Producer and Consumer API se systémem Kafka ve službě HDInsight. Další informace o práci s platformou Kafka najdete v těchto zdrojích: