Zelfstudie: Werken met de Producer- en Consumer-API's van Apache Kafka
Leer hoe u de Producer- en Consumer-API's van Apache Kafka gebruikt met Kafka in HDInsight.
Met de Producer-API van Kafka kunnen toepassingen gegevensstromen naar het Kafka-cluster verzenden. Met de Consumer-API van Kafka kunnen toepassingen gegevensstromen uit het cluster lezen.
In deze zelfstudie leert u het volgende:
- Vereisten
- De code begrijpen
- De toepassing compileren en implementeren
- De toepassing uitvoeren in het cluster
Meer informatie over de Producer-API en de Consumer-API kunt u lezen in de Apache-documentatie (Engelstalig).
Vereisten
- Apache Kafka-cluster in HDInsight. Zie Aan de slag met Apache Kafka in HDInsight voor informatie over het maken van een cluster.
- JDK-versie 8 (Java Developer Kit) of een equivalent, zoals OpenJDK.
- Apache Maven correct geïnstalleerd volgens Apache. Maven is een systeem voor het bouwen van Java-projecten.
- Een SSH-client, zoals Putty. Zie voor meer informatie Verbinding maken met HDInsight (Apache Hadoop) via SSH.
De code begrijpen
De voorbeeldtoepassing bevindt zich op https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, in de submap Producer-Consumer
. Als u een Kafka-cluster met Enterprise Security Package (ESP) gebruikt, moet u de toepassingsversie gebruiken die zich in de DomainJoined-Producer-Consumer
-submap bevindt.
De toepassing bestaat hoofdzakelijk uit vier bestanden:
-
pom.xml
: met dit bestand worden de projectafhankelijkheden, de Java-versie en de pakketmethoden gedefinieerd. -
Producer.java
: met dit bestand worden willekeurige zinnen naar Kafka verzonden met behulp van de Producer-API. -
Consumer.java
: dit bestand gebruikt de Consumer-API om gegevens te lezen uit Kafka en deze te verzenden naar STDOUT. -
AdminClientWrapper.java
: dit bestand gebruikt de beheer-API voor het maken, beschrijven en verwijderen van Kafka-onderwerpen. -
Run.java
: de opdrachtregelinterface die wordt gebruikt voor het uitvoeren van de Producer- en Consumer-code.
Pom.xml
Belangrijke aandachtspunten voor het bestand pom.xml
:
Afhankelijkheden: dit project is afhankelijk van de Kafka-API's Producer en Consumer, die worden geleverd door het pakket
kafka-clients
. Deze afhankelijkheid wordt gedefinieerd met de volgende XML-code:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
De vermelding
${kafka.version}
wordt gedeclareerd in de sectie<properties>..</properties>
vanpom.xml
, en wordt geconfigureerd voor de Kafka-versie van het HDInsight-cluster.Invoegtoepassingen: de Maven-invoegtoepassingen bieden diverse mogelijkheden. In dit project worden de volgende plugins of invoegtoepassingen gebruikt:
-
maven-compiler-plugin
: wordt gebruikt om de Java-versie die wordt gebruikt door het project in te stellen op 8. Dit is de versie van Java die door HDInsight 3.6 wordt gebruikt. -
maven-shade-plugin
: wordt gebruikt voor het genereren van een uber jar die deze toepassing bevat, evenals eventuele afhankelijkheden. Dit bestand wordt ook gebruikt om het toegangspunt van de toepassing in te stellen, zodat u het Jar-bestand rechtstreeks kunt uitvoeren, dus zonder de hoofdklasse op te geven.
-
Producer.java
De producer communiceert met de Kafka-brokerhosts (werkknooppunten) en verzendt gegevens naar een Kafka-onderwerp. Het volgende codefragment is afkomstig van het bestand Producer.java in de GitHub-opslagplaats en laat zien hoe de Producer-eigenschappen moeten worden ingesteld. Voor clusters waarbij Enterprise Security is ingeschakeld, moet er een extra eigenschap worden toegevoegd: "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
De Consumer-API communiceert met de Kafka-brokerhosts (werkknooppunten) en leest records in een lus. Met het volgende codefragment van het bestand Consumer.java worden de Consumer-eigenschappen ingesteld. Voor clusters waarbij Enterprise Security is ingeschakeld, moet er een extra eigenschap worden toegevoegd: "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
In deze code is de Consumer-API geconfigureerd voor het lezen vanaf het begin van het onderwerp (auto.offset.reset
is ingesteld op earliest
.)
Run.java
Het bestand Run.java biedt een opdrachtregelinterface voor het uitvoeren van code van de Producer- of Consumer-API. U moet de gegevens van de Kafka-brokerhost opgeven als een parameter. U kunt eventueel een groeps-id opgeven, die wordt gebruikt door het Consumer-proces. Als u meerdere Consumer-exemplaren met dezelfde groeps-id maakt, wordt het lezen verdeeld over de exemplaren.
Het voorbeeld compileren en implementeren
Vooraf ontwikkelde JAR-bestanden gebruiken
Download de JAR-bestanden uit het Azure-voorbeeld voor aan de slag met Kafka. Als voor uw cluster Enterprise Security Package (ESP) is ingeschakeld, gebruikt u kafka-producer-consumer-esp.jar. Gebruik de onderstaande opdracht om de JAR-bestanden naar uw cluster te kopiëren.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
De JAR-bestanden vanuit code maken
Als u deze stap wilt overslaan, kunt u ook vooraf ontwikkelde JAR-bestanden downloaden uit de Prebuilt-Jars
-submap. Download het bestand kafka-producer-consumer.jar. Als voor uw cluster Enterprise Security Package (ESP) is ingeschakeld, gebruikt u kafka-producer-consumer-esp.jar. Voer stap 3 uit om het JAR-bestand naar uw HDInsight-cluster te kopiëren.
Download de voorbeelden van https://github.com/Azure-Samples/hdinsight-kafka-java-get-started en pak ze uit.
Stel de huidige map in op de locatie van de
hdinsight-kafka-java-get-started\Producer-Consumer
-map. Als u een Kafka-cluster met Enterprise Security Package (ESP) gebruikt, moet u de locatie instellen op deDomainJoined-Producer-Consumer
-submap. Gebruik de volgende opdracht om de toepassing te maken:mvn clean package
Met deze opdracht maakt u een directory met de naam
target
, die een bestand met de naamkafka-producer-consumer-1.0-SNAPSHOT.jar
bevat. Voor ESP-clusters is het bestandkafka-producer-consumer-esp-1.0-SNAPSHOT.jar
Vervang
sshuser
door de SSH-gebruiker voor uw cluster enCLUSTERNAME
door de naam van het cluster. Voer de volgende opdracht in om het bestandkafka-producer-consumer-1.0-SNAPSHOT.jar
naar uw HDInsight-cluster te kopiëren. Voer het wachtwoord van de SSH-gebruiker in wanneer hierom wordt gevraagd.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Het voorbeeld uitvoeren
Vervang
sshuser
door de SSH-gebruiker voor uw cluster enCLUSTERNAME
door de naam van het cluster. Gebruik de volgende opdracht om een SSH-verbinding naar het cluster te openen. Voer het wachtwoord voor het SSH-gebruikersaccount in wanneer hierom wordt gevraagd.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Als u de Kafka-brokerhosts wilt ophalen, vervangt u de waarden door
<clustername>
en<password>
in de volgende opdracht en voert u deze uit. Gebruik dezelfde methode voor<clustername>
, zoals wordt weer gegeven in de Azure-portal. Vervang<password>
door het aanmeldwachtwoord voor het cluster en voer het volgende uit:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Notitie
Voor deze opdracht is toegang tot Ambari vereist. Als uw cluster zich achter een NSG bevindt, voert u deze opdracht uit vanaf een computer die toegang heeft tot Ambari.
Voer de volgende opdracht in om het Kafka-onderwerp
myTest
te maken:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
Gebruik de volgende opdracht om de Producer-API uit te voeren en gegevens te schrijven naar het onderwerp:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
Als dit proces is voltooid, gebruikt u de volgende opdracht om gegevens uit het onderwerp te lezen:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
De gelezen records worden weergegeven, samen met een telling van de records.
Gebruik Ctrl + C om de consument af te sluiten.
Meerdere consumenten
Kafka-consumenten gebruiken een consumentengroep bij het lezen van records. Door dezelfde groep voor meerdere consumenten te gebruiken, worden leestaken voor onderwerpen gelijk verdeeld. Elke consument in de groep ontvangt een deel van de records.
De Consumer-toepassing accepteert een parameter die wordt gebruikt als de groeps-id. Met de volgende opdracht start u bijvoorbeeld een Consumer met behulp van de groeps-id myGroup
:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Gebruik Ctrl + C om de consument af te sluiten.
Om dit proces in actie te zien, gebruikt u de volgende opdracht:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Deze opdracht gebruikt tmux
om de terminal op te splitsen in twee kolommen. In elke kolom wordt een Consumer gestart, met dezelfde waarde voor de groeps-id. Als de Consumers klaar zijn met lezen, ziet u dat ieder Consumer slechts een deel van de records heeft gelezen. Druk tweemaal op Ctrl + C om tmux
af te sluiten.
Gebruik door clients binnen dezelfde groep wordt verwerkt door de partities voor het onderwerp. Het eerder gemaakte onderwerp test
uit dit codevoorbeeld heeft acht partities. Als u acht Consumers start, leest elke Consumer records uit één partitie van het onderwerp.
Belangrijk
Een consumentengroep kan niet meer consumentexemplaren dan partities bevatten. In dit voorbeeld kan één consumentengroep maximaal acht consumenten bevatten, omdat het onderwerp dit aantal partities heeft. U kunt ook meerdere consumentengroepen hebben, waarvan elke groep niet meer dan acht consumenten bevat.
Records worden in Kafka opgeslagen in de volgorde waarin deze worden ontvangen binnen een partitie. Als u records binnen een partitie op volgorde wilt leveren, maakt u een consumentengroep waarvan het aantal consumentexemplaren gelijk is aan het aantal partities. Als u records binnen het onderwerp op volgorde wilt leveren, maakt u een consumentengroep met slechts één consumentexemplaar.
Veelvoorkomende problemen
Het maken van een onderwerp mislukt Als voor uw cluster Enterprise Security Pack is ingeschakeld, gebruikt u de vooraf ontwikkelde JAR-bestanden voor Producer en Consumer. Het JAR-bestand met ESP kan worden gemaakt op basis van de code in de
DomainJoined-Producer-Consumer
-submap. De eigenschappen van Producer en Consumer hebben een extra eigenschapCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
voor clusters met ESP.Fout in clusters met ESP: Als er een fout optreedt in de Produce- en Consume-bewerkingen en u een cluster met ESP gebruikt, controleert u of gebruiker
kafka
aanwezig is in alle Ranger-beleidsregels. Als deze niet aanwezig is, voegt u deze toe aan alle Ranger-beleidsregels.
Resources opschonen
Als u de in deze zelfstudie gemaakte resources wilt opschonen, kunt u de resourcegroep verwijderen. Als u de resourcegroep verwijdert, worden ook het bijbehorende HDInsight-cluster en eventuele andere resources die aan de resourcegroep zijn gekoppeld, verwijderd.
Ga als volgt te werk om de resourcegroep te verwijderen in Azure Portal:
- Vouw het menu aan de linkerkant in Azure Portal uit om het menu met services te openen en kies Resourcegroepen om de lijst met resourcegroepen weer te geven.
- Zoek de resourcegroep die u wilt verwijderen en klik met de rechtermuisknop op de knop Meer (... ) aan de rechterkant van de vermelding.
- Selecteer Resourcegroep verwijderen en bevestig dit.
Volgende stappen
In dit document hebt u geleerd hoe u de Producer- en Consumer-API's van Apache Kafka gebruikt met Kafka in HDInsight. Gebruik de volgende documenten voor meer informatie over het werken met Kafka: