Självstudie: Använda Apache Kafka-producenten och konsument-API:er
Lär dig att använda Apache Kafka-producenten och konsument-API:er med Kafka i HDInsight.
Kafka-producentens API tillåter att program skickar dataströmmar till Kafka-klustret. Kafka-konsumentens API tillåter att program läser dataströmmar från klustret.
I den här guiden får du lära dig att:
- Krav
- Förstå koden
- Skapa och distribuera programmet
- Köra programmet på klustret
Mer information om API:er finns i Apache-dokumentationen i Producent-API och Konsument-API.
Förutsättningar
- Apache Kafka på HDInsight-kluster. Information om hur du skapar klustret finns i Starta med Apache Kafka i HDInsight.
- Java Developer Kit (JDK) version 8 eller motsvarande, till exempel OpenJDK.
- Apache Mavenhar installerats korrekt enligt Apache. Maven är ett projektbyggsystem för Java-projekt.
- En SSH-klient som Putty. Mer information finns i Ansluta till HDInsight (Apache Hadoop) med hjälp av SSH.
Förstå koden
Exempelprogrammet finns på https://github.com/Azure-Samples/hdinsight-kafka-java-get-started i underkatalogen Producer-Consumer
. Om du använder Ett Kafka-kluster (Enterprise Security Package) bör du använda programversionen som finns i underkatalogen DomainJoined-Producer-Consumer
.
Programmet består i huvudsak av fyra filer:
-
pom.xml
: Den här filen definierar projektberoenden, Java-version och paketeringsmetoder. -
Producer.java
: Den här filen skickar slumpmässiga meningar till Kafka med producent-API:et. -
Consumer.java
: Den här filen använder konsument-API:n till att läsa data från Kafka och generera den till STDOUT. -
AdminClientWrapper.java
: Den här filen använder administratörs-API:et för att skapa, beskriva och ta bort Kafka-ämnen. -
Run.java
: Kommandoradsgränssnittet används för att köra producent- och konsumentkoden.
Pom.xml
Viktiga saker att förstå i pom.xml
-filen är:
Beroenden: Det här projektet använder producent- och konsument-API:er i Kafka, som tillhandahålls av
kafka-clients
-paketet. Följande XML-kod definierar detta beroende:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
${kafka.version}
-posten har deklarerats i<properties>..</properties>
-avsnittet ipom.xml
och är konfigurerad till Kafka-versionen av HDInsight-klustret.Plugin-program: Plugin-programmet Maven innehåller olika funktioner. I det här projektet används följande plugin-program:
-
maven-compiler-plugin
: Används för att ange den Java-version som används av projektet till 8. Detta är den version av Java som används av HDInsight 3.6. -
maven-shade-plugin
: Används för att generera en Uber-jar som innehåller det här programmet, samt eventuella beroenden. Det används också att ange startpunkt för programmet, så att du kan köra Jar-filen direkt utan att behöva ange huvudklassen.
-
Producer.java
Producenten kommunicerar med värdar för Kafka-meddelandeköer (arbetarnoder) och skickar data till ett Kafka-ämne. Följande kodfragment kommer från filen Producer.java från GitHub-lagringsplatsen och visar hur du anger producentegenskaperna. För Enterprise Security-aktiverade kluster måste ytterligare en egenskap läggas till "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
Konsumenten kommunicerar med värdar för Kafka-meddelandeköer (arbetarnoder) och läser posterna i en loop. Följande kodfragment från filen Consumer.java anger konsumentegenskaperna. För Enterprise Security-aktiverade kluster måste ytterligare en egenskap läggas till "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
I den här koden är konsumenten konfigurerad att läsa från början av ämnet (auto.offset.reset
är inställd på earliest
.)
Run.java
Filen Run.java innehåller ett kommandoradsgränssnitt som kör antingen producent- eller konsumentkoden. Du måste ange värdinformationen om Kafka-meddelandeköerna som en parameter. Du kan också inkludera ett grupp-ID-värde som används av konsumentprocessen. Om du skapar flera konsumentinstanser med samma grupp-ID belastningsutjämningsläsning från ämnet.
Skapa och distribuera exemplet
Använda färdiga JAR-filer
Ladda ned jar-filerna från Azure-exemplet Kafka Kom igång. Om ditt kluster är aktiverat för Enterprise Security Package (ESP) använder du kafka-producer-consumer-esp.jar. Använd kommandot nedan för att kopiera jar-filerna till klustret.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Skapa JAR-filerna från kod
Om du vill hoppa över det här steget kan fördefinierade jar-filer laddas ned från underkatalogen Prebuilt-Jars
. Ladda ned kafka-producer-consumer.jar. Om ditt kluster är aktiverat för Enterprise Security Package (ESP) använder du kafka-producer-consumer-esp.jar. Kör steg 3 för att kopiera jar-filen till HDInsight-klustret.
Ladda ned och extrahera exemplen från https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.
Ange den aktuella katalogen till platsen för
hdinsight-kafka-java-get-started\Producer-Consumer
katalogen. Om du använder Ett Esp-aktiverat Kafka-kluster (Enterprise Security Package) bör du ange platsen somDomainJoined-Producer-Consumer
underkatalog. Använd följande kommando för att skapa programmet:mvn clean package
Det här kommandot skapar en katalog med namnet
target
, som innehåller en fil med namnetkafka-producer-consumer-1.0-SNAPSHOT.jar
. För ESP-kluster kommer filen attkafka-producer-consumer-esp-1.0-SNAPSHOT.jar
Ersätt
sshuser
med SSH-användare för klustret och ersättCLUSTERNAME
med namnet på klustret. Ange följande kommando för att kopierakafka-producer-consumer-1.0-SNAPSHOT.jar
filen till HDInsight-klustret. Ange lösenordet för SSH-användaren när du uppmanas till det.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Kör exemplet
Ersätt
sshuser
med SSH-användare för klustret och ersättCLUSTERNAME
med namnet på klustret. Öppna en SSH-anslutning till klustret genom att ange följande kommando. Ange lösenordet för SSH-användarkontot om du uppmanas till det.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Om du vill hämta Kafka Broker-värdarna ersätter du värdena för
<clustername>
och<password>
i följande kommando och kör det. Använd samma hölje för<clustername>
som i Azure Portal. Ersätt<password>
med lösenordet för klusterinloggning och kör sedan:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Anteckning
Det här kommandot kräver Ambari-åtkomst. Om klustret ligger bakom en NSG kör du det här kommandot från en dator som har åtkomst till Ambari.
Skapa Kafka-ämne,
myTest
, genom att ange följande kommando:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
Om du vill köra producenten och skriva data till ämnet, använder du följande kommando:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
När producenten är klar kan du använda följande kommando för att läsa från ämnet:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
De lästa posterna, tillsammans med antalet poster, visas.
Använd Ctrl + C om du vill avsluta konsumenten.
Flera konsumenter
Kafka-konsumenter använder en konsumentgrupp vid läsning av poster. Att använda samma grupp med flera konsumenter resulterar i belastningsutjämnaravläsningar från ett ämne. Varje konsument i gruppen tar emot en del av posterna.
Konsumentprogrammet accepterar en parameter som används som grupp-ID. Exempelvis startar följande kommando en konsument med hjälp av ett grupp-ID i myGroup
:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Använd Ctrl + C om du vill avsluta konsumenten.
Använd följande kommando om du vill se hur det fungerar:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Detta kommando använder tmux
till att dela terminalen i två kolumner. En konsument startas i varje kolumn med samma ID-värde för gruppen. När konsumenterna har läst färdigt kan du se att de bara läst en del av posterna. Använd Ctrl + C två gånger för att avsluta tmux
.
Förbrukning av klienter i samma grupp hanteras via partitionerna för ämnet. I det här kodexemplet har test
-ämnet som skapades tidigare åtta partitioner. Om du startar åtta konsumenter läser varje konsument poster från en enda partition i ämnet.
Viktigt
Det får inte finnas flera instanser av konsumenten i en konsumentgrupp än partitioner. I det här exemplet kan en konsumentgrupp innehålla upp till åtta konsumenter, eftersom det är antalet partitioner i ämnet. Du kan även ha flera konsumentgrupper med högst åtta konsumenter vardera.
Poster som lagras i Kafka lagras i den ordning de tas emot i en partition. För att uppnå sorterad leverans av poster inom en partition skapar du en konsumentgrupp där antalet konsumentinstanser matchar antalet partitioner. För att uppnå sorterad leverans av poster i ämnet skapar du en konsumentgrupp med bara en konsumentinstans.
Vanliga problem som uppstår
Det går inte att skapa ämnet Om ditt kluster är Enterprise Security Pack aktiverat använder du de färdiga JAR-filerna för producent och konsument. ESP-jar-filen kan skapas från koden i underkatalogen
DomainJoined-Producer-Consumer
. Producent- och konsumentegenskaperna har ytterligare en egenskapCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
för ESP-aktiverade kluster.Fel i ESP-aktiverade kluster: Om genererar och förbrukar åtgärder misslyckas och du använder ett ESP-aktiverat kluster kontrollerar du att användaren
kafka
finns i alla Ranger-principer. Om den inte finns lägger du till den i alla Ranger-principer.
Rensa resurser
Om du vill rensa resurserna som har skapats med den här självstudien kan du ta bort resursgruppen. När du tar bort resursgruppen raderas även det kopplade HDInsight-klustret och eventuella andra resurser som är associerade med resursgruppen.
Ta bort en resursgrupp med Azure Portal:
- I Azure Portal expanderar du menyn på vänster sida för att öppna tjänstemenyn och väljer sedan Resursgrupper för att visa listan med dina resursgrupper.
- Leta reda på den resursgrupp du vill ta bort och högerklicka på knappen Mer (...) till höger om listan.
- Välj Ta bort resursgrupp och bekräfta.
Nästa steg
I det här dokumentet har du lärt dig att använda Apache Kafka-producent- och konsument-API:et med Kafka i HDInsight. Använd följande för att lära dig mer om att arbeta med Kafka: