Esercitazione: Usare le API Apache Kafka Producer e Consumer
Informazioni su come usare l'API Apache Kafka Producer e Consumer con Kafka in HDInsight.
L'API Kafka Producer consente alle applicazioni di inviare flussi di dati al cluster Kafka. L'API Kafka Consumer consente alle applicazioni di leggere flussi di dati dal cluster Kafka.
In questa esercitazione verranno illustrate le procedure per:
- Prerequisiti
- Informazioni sul codice
- Compilare e distribuire l'applicazione
- Eseguire l'applicazione nel cluster
Per altre informazioni sulle API, vedere la documentazione Apache sull'API Producer e l'API Consumer.
Prerequisiti
- Cluster Apache Kafka in HDInsight. Per informazioni su come creare il cluster, vedere Iniziare a usare Apache Kafka in HDInsight.
- Java Developer Kit (JDK) versione 8 o equivalente, ad esempio OpenJDK.
- Apache Maven correttamente installato in base alle indicazioni di Apache. Maven è un sistema di compilazione per progetti Java.
- Un client SSH come Putty. Per altre informazioni, vedere Connettersi a HDInsight (Apache Hadoop) con SSH.
Informazioni sul codice
L'applicazione di esempio si trova in https://github.com/Azure-Samples/hdinsight-kafka-java-get-started nella sottodirectory Producer-Consumer
. Se si usa il cluster Kafka abilitato per Enterprise Security Package (ESP) , è necessario usare la versione dell'applicazione che si trova nella sottodirectory DomainJoined-Producer-Consumer
.
L'applicazione è costituita principalmente da quattro file:
-
pom.xml
: Questo file definisce le dipendenze progetto, la versione Java e i metodi di creazione pacchetti. -
Producer.java
: Questo file invia frasi casuali a Kafka usando l'API Producer. -
Consumer.java
: Questo file usa l'API Consumer per leggere i dati da Kafka ed esportarli in STDOUT. -
AdminClientWrapper.java
: Questo file usa l'API di amministrazione per creare, descrivere ed eliminare gli argomenti di Kafka. -
Run.java
: L'interfaccia della riga di comando usata per eseguire il codice producer e consumer.
Pom.xml
Gli aspetti importanti da comprendere nel file pom.xml
sono:
Dipendenze: Questo progetto si basa sulle API Producer e Consumer Kafka, che sono fornite dal pacchetto
kafka-clients
. Il codice XML seguente definisce questa dipendenza:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
La voce
${kafka.version}
viene dichiarata nella sezione<properties>..</properties>
dipom.xml
ed è configurata per la versione Kafka del cluster HDInsight.Plug-in: I plug-in Maven offrono varie funzionalità. In questo progetto vengono usati i plug-in seguenti:
-
maven-compiler-plugin
: Usato per impostare su 8 la versione Java usata dal progetto. Si tratta della versione di Java usata da HDInsight 3.6. -
maven-shade-plugin
: Usato per generare un file uberjar che contiene questa applicazione, nonché eventuali dipendenze. Viene inoltre usato per impostare il punto di ingresso dell'applicazione, in modo che sia possibile eseguire il file con estensione jar direttamente senza dover specificare la classe principale.
-
Producer.java
Il producer comunica con gli host broker di Kafka (nodi di lavoro) e invia i dati a un argomento Kafka. Il frammento di codice seguente è tratto dal file Producer.java del repository GitHub e mostra come impostare le proprietà del producer. Per i cluster abilitati per Enterprise Security è necessario aggiungere un'ulteriore proprietà "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
Il consumer comunica con gli host broker Kafka (nodi di lavoro) e legge i record in un ciclo. Il frammento di codice seguente tratto dal file Consumer.java imposta le proprietà del consumer. Per i cluster abilitati per Enterprise Security è necessario aggiungere un'ulteriore proprietà "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
In questo codice, il consumer è configurato per la lettura dall'inizio dell'argomento (auto.offset.reset
è impostato su earliest
.)
Run.java
Il file Run.java fornisce un'interfaccia della riga di comando che esegue il codice del producer o del consumer. È necessario fornire le informazioni sull'host broker di Kafka come parametro. È facoltativamente possibile includere un valore di ID gruppo che viene usato dal processo consumer. Se si creano più istanze di consumer usando lo stesso ID gruppo, esse bilanceranno il carico durante la lettura dall'argomento.
Creare e distribuire l'esempio
Usare i file JAR predefiniti
Scaricare i file JAR dall'esempio di introduzione ad Azure per Kafka. Se il cluster è abilitato per Enterprise Security Package (ESP) , usare kafka-producer-consumer-esp.jar. Per copiare i file JAR nel cluster, usare il comando seguente.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Compilare i file JAR dal codice
Se si vuole ignorare questo passaggio, è possibile scaricare i file jar predefiniti dalla sottodirectory Prebuilt-Jars
. Scaricare il file kafka-producer-consumer.jar. Se il cluster è abilitato per Enterprise Security Package (ESP) , usare kafka-producer-consumer-esp.jar. Eseguire il passaggio 3 per copiare il file jar nel cluster HDInsight.
Scaricare ed estrarre gli esempi da https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.
Impostare la directory corrente sulla posizione della directory
hdinsight-kafka-java-get-started\Producer-Consumer
. Se si usa il cluster Kafka abilitato per Enterprise Security Package (ESP) , è necessario impostare la posizione sulla sottodirectoryDomainJoined-Producer-Consumer
. Usare il comando seguente per compilare l'applicazione:mvn clean package
Questo comando crea una directory denominata
target
, che contiene un file denominatokafka-producer-consumer-1.0-SNAPSHOT.jar
. Per i cluster ESP il file saràkafka-producer-consumer-esp-1.0-SNAPSHOT.jar
Sostituire
sshuser
con il nome utente SSH del cluster e sostituireCLUSTERNAME
con il nome del cluster. Immettere il comando seguente per copiare il filekafka-producer-consumer-1.0-SNAPSHOT.jar
nel cluster HDInsight. Quando richiesto, immettere la password per l'utente SSH.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Eseguire l'esempio
Sostituire
sshuser
con il nome utente SSH del cluster e sostituireCLUSTERNAME
con il nome del cluster. Per aprire una connessione SSH al cluster, immettere il comando seguente. Quando richiesto, immettere la password per l'account utente SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Per ottenere gli host del broker Kafka, sostituire i valori per
<clustername>
e<password>
nel comando seguente ed eseguirlo. Usare la stessa combinazione di maiuscole e minuscole per<clustername>
come illustrato nel portale di Azure. Sostituire<password>
con la password di accesso al cluster, quindi eseguire il comando:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Nota
Questo comando richiede l'accesso ad Ambari. Se il cluster è protetto da un gruppo NSG, eseguire questo comando da un computer in grado di accedere ad Ambari.
Per creare l'argomento Kafka
myTest
, immettere il comando seguente:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
Per eseguire il producer e scrivere i dati nell'argomento, usare il comando seguente:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
Quando l'esecuzione del producer è terminata, usare il comando seguente per leggere dall'argomento:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Verranno visualizzati i record letti e il numero di record.
Usare Ctrl + C per uscire dal consumer.
Consumer multipli
I consumer Kafka usano un gruppo di consumer durante la lettura dei record. L'uso dello stesso gruppo con più consumer consente di ottenere operazioni di lettura con bilanciamento del carico da un argomento. Ogni consumer nel gruppo riceve una parte dei record.
L'applicazione Consumer accetta un parametro che viene usato come ID del gruppo. Ad esempio, il comando seguente avvia un consumer usando un ID gruppo denominato myGroup
:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Usare Ctrl + C per uscire dal consumer.
Per verificare il funzionamento del processo, usare il comando seguente:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Questo comando usa tmux
per suddividere il terminale in due colonne. Viene avviato un consumer in ogni colonna, con lo stesso valore di ID di gruppo. Dopo che i consumer hanno completato la lettura, si noti che ognuno di essi ha letto solo una parte dei record. Usare Ctrl + C due volte per uscire da tmux
.
Il consumo da parte dei client dello stesso gruppo viene gestito tramite le partizioni dell'argomento. In questo esempio di codice per l'argomento test
creato in precedenza sono presenti otto partizioni. Se si avviano otto consumer, ognuno di essi legge i record da una singola partizione dell'argomento.
Importante
Un gruppo di consumer non può contenere un numero di istanze di consumer maggiore del numero di partizioni. In questo esempio, un gruppo di consumer può contenere fino a otto consumer perché è questo il numero di partizioni nell'argomento. In alternativa è possibile avere più gruppi di consumer, ognuno con al massimo otto consumer.
I record vengono archiviati in Kafka nell'ordine in cui vengono ricevuti in una partizione. Per ottenere il recapito dei record nell'ordine all'interno di una partizione, creare un gruppo di consumer con un numero di istanze corrispondente al numero di partizioni. Per ottenere il recapito dei record nell'ordine all'interno dell'argomento, creare un gruppo di consumer con una sola istanza.
Problemi comuni rilevati
La creazione dell'argomento non riesce Se il cluster è abilitato per Enterprise Security Pack, usare i file JAR predefiniti per producer e consumer. Il file JAR ESP può essere compilato dal codice nella sottodirectory
DomainJoined-Producer-Consumer
. Le proprietà producer e consumer hanno una proprietà aggiuntivaCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
per i cluster abilitati per ESP.Errore nei cluster abilitati per ESP: se le operazioni di producer e consumer non riescono e si usa un cluster abilitato per ESP, verificare che l'utente
kafka
sia presente in tutti i criteri di Ranger. Se non è presente, aggiungerlo a tutti i criteri di Ranger.
Pulire le risorse
Per pulire le risorse create da questa esercitazione, eliminare il gruppo di risorse. Se si elimina il gruppo di risorse, vengono eliminati anche il cluster HDInsight associato e tutte le altre risorse correlate al gruppo di risorse.
Per rimuovere il gruppo di risorse usando il portale di Azure:
- Nel portale di Azure espandere il menu a sinistra per aprire il menu dei servizi e quindi scegliere Gruppi di risorse per visualizzare l'elenco dei gruppi di risorse.
- Individuare il gruppo di risorse da eliminare e quindi fare clic con il pulsante destro del mouse su Altro (...) a destra dell'elenco.
- Scegliere Elimina gruppo di risorse e quindi confermare.
Passaggi successivi
In questo documento si è appreso come usare le API Apache Kafka Producer e Consumer con Kafka in HDInsight. Per altre informazioni sull'uso di Kafka, vedere: