Tutorial: Uso de Streams API de Apache Kafka en Azure HDInsight
Aprenda a crear una aplicación que use Apache Kafka Streams API y ejecútela con Kafka en HDInsight.
La aplicación que se usa en este tutorial es un recuento de palabras de streaming. Lee datos de texto de un tema de Kafka, extrae las palabras individuales y, a continuación, almacena el recuento de palabras en otro tema de Kafka.
El procesamiento de flujos de Kafka se suele hacer con Apache Spark. Kafka versión 2.1.1 y 2.4.1 (en HDInsight 4.0 y 5.0) admite Kafka Streams API. Esta API le permite transformar flujos de datos entre los temas de entrada y de salida.
Para más información sobre Kafka Streams, consulte la documentación de introducción a Kafka Streams en Apache.org.
En este tutorial, aprenderá a:
- Comprendiendo el código
- Compilar e implementar la aplicación
- Configurar temas de Kafka
- Ejecución del código
Prerrequisitos
Un clúster de Kafka en HDInsight 4.0 o 5.0. Para aprender a crear un clúster de Kafka en HDInsight, consulte el documento Inicio de Apache Kafka en HDInsight.
Complete los pasos que se indican en el documento Producer API y Consumer API de Apache Kafka. Los pasos de este documento utilizan la aplicación y los temas de ejemplo que creó en este tutorial.
Java Developer Kit (JDK) versión 8, o un equivalente, como OpenJDK.
Apache Maven correctamente instalado según Apache. Maven es un sistema de compilación de proyectos de Java.
Un cliente SSH. Para más información, consulte Conexión a través de SSH con HDInsight (Apache Hadoop).
Comprendiendo el código
La aplicación de ejemplo se encuentra en https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, en el subdirectorio Streaming
. La aplicación consta de dos archivos:
pom.xml
: este archivo define las dependencias del proyecto, la versión de Java y los métodos de empaquetado.Stream.java
: este archivo implementa la lógica de streaming.
Pom.xml
Esto es lo más importante que hay que saber del archivo pom.xml
:
Dependencias: este proyecto utiliza Streams API de Kafka que la proporciona el paquete
kafka-clients
. El siguiente código XML define esta dependencia:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
La entrada
${kafka.version}
se declara en la sección<properties>..</properties>
depom.xml
y está configurada para la versión Kafka del clúster de HDInsight.Complementos: los complementos de Maven proporcionan varias funcionalidades. En este proyecto, se utilizan los siguientes complementos:
maven-compiler-plugin
: se utiliza para establecer que el proyecto utiliza la versión 8 de Java. HDInsight 4.0 y 5.0 requiere Java 8.maven-shade-plugin
: se utiliza para generar un archivo jar conjunto que contiene esta aplicación y todas las dependencias. También se usa para establecer el punto de entrada de la aplicación, con el fin de que pueda ejecutar directamente el archivo Jar sin tener que especificar la clase principal.
Stream.java
El archivo Stream.java utiliza Streams API para implementar una aplicación de recuento de palabras. Lee datos de un tema de Kafka llamado test
y escribe los recuentos de palabras en un tema llamado wordcounts
.
El código siguiente define la aplicación de recuento de palabras:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Compilación e implementación del ejemplo
Use los siguientes pasos para compilar e implementar el proyecto en el clúster de Kafka en HDInsight:
Establezca el directorio actual en la ubicación del directorio
hdinsight-kafka-java-get-started-master\Streaming
y luego use el siguiente comando para crear un paquete jar:mvn clean package
Este comando crea el paquete en
target/kafka-streaming-1.0-SNAPSHOT.jar
.Reemplace
sshuser
por el usuario de SSH del clúster yclustername
por el nombre de su clúster. Use el siguiente comando para copiar el archivokafka-streaming-1.0-SNAPSHOT.jar
en el clúster de HDInsight. Si se le solicita, escriba la contraseña de la cuenta de usuario de SSH.scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
Creación de temas de Apache Kafka
Reemplace
sshuser
por el usuario de SSH del clúster yCLUSTERNAME
por el nombre de su clúster. Abra una conexión de SSH con el clúster, para lo que debe escribir el siguiente comando. Si se le solicita, escriba la contraseña de la cuenta de usuario de SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Instale jq, un procesador JSON de línea de comandos. En la conexión SSH abierta, escriba el siguiente comando para instalar
jq
:sudo apt -y install jq
Configure una variable de contraseña. Reemplace
PASSWORD
por la contraseña de inicio de sesión del clúster y, después, escriba el comando:export PASSWORD='PASSWORD'
Extraiga el nombre del clúster con las mayúsculas y minúsculas correctas. Las mayúsculas y minúsculas reales del nombre del clúster pueden no ser como cabría esperar, dependen de la forma en que se haya creado el clúster. Este comando obtiene el uso real de mayúsculas y minúsculas y, a continuación, lo almacena en una variable. Escriba el comando siguiente:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Nota:
Si está realizando este proceso desde fuera del clúster, hay un procedimiento diferente para almacenar el nombre del clúster. Obtenga el nombre del clúster en minúsculas desde Azure Portal. A continuación, sustituya el nombre del clúster por
<clustername>
en el siguiente comando y ejecútelo:export clusterName='<clustername>'
.Para obtener tanto los hosts del agente de Kafka como los hosts de Apache Zookeeper, use los siguientes comandos. Cuando se le solicite, escriba la contraseña de administrador del clúster.
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Nota:
Estos comandos requieren acceso a Ambari. Si el clúster se encuentra detrás de un grupo de seguridad de red, ejecute estos comandos desde una máquina que pueda acceder a Ambari.
Para crear los temas que emplea la operación de streaming, use los siguientes comandos:
Nota
Puede recibir un error que indica que el tema
test
ya existe. Esto es normal, ya que puede que lo haya creado en el tutorial sobre Producer y Consumer API./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Los temas se utilizan para los siguientes fines:
test
: este es el tema en el que se reciben los registros. La aplicación de streaming los lee de aquí.wordcounts
: este es el tema en el que la aplicación de streaming almacena su salida.RekeyedIntermediateTopic
: este tema se usa para volver a particionar los datos a medida que el operadorcountByKey
actualiza el recuento.wordcount-example-Counts-changelog
: este tema es un almacén de estados que usa la operacióncountByKey
Kafka en HDInsight se puede también configurar para que cree temas automáticamente. Para más información, consulte el documento Configure automatic topic creation (Configuración de la creación automática de temas).
Ejecución del código
Para iniciar la aplicación de streaming como un proceso en segundo plano, use el comando siguiente:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
Puede recibir una advertencia sobre Apache
log4j
. Puede pasarlos por alto.Para enviar registros al tema
test
, use el comando siguiente para iniciar la aplicación de producción:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
Una vez finalizada la producción, use el siguiente comando para ver la información almacenada en el tema
wordcounts
:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
Los parámetros
--property
indican al consumidor de consola que imprima la clave (palabra) junto con el número (valor). Este parámetro también configura el deserializador que se utilizará al leer estos valores de Kafka.La salida será similar al siguiente texto:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
El parámetro
--from-beginning
configura el consumidor para que empiece por el primero de los registros almacenados en el tema. El recuento se incrementa cada vez que se encuentra una palabra, por lo que el tema contiene varias entradas para cada palabra, con un recuento creciente.Use Ctrl + C para salir del productor. Siga usando Ctrl + C para salir de la aplicación y del consumidor.
Para eliminar los temas que emplea la operación de streaming, use los siguientes comandos:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Limpieza de recursos
Para limpiar los recursos creados por este tutorial puede eliminar el grupo de recursos. Al eliminar el grupo de recursos, también se elimina el clúster de HDInsight asociado y otros recursos asociados al grupo.
Para quitar el grupo de recursos mediante Azure Portal:
- En Azure Portal, expanda el menú en el lado izquierdo para abrir el menú de servicios y elija Grupos de recursos para mostrar la lista de sus grupos de recursos.
- Busque el grupo de recursos que desea eliminar y haga clic con el botón derecho en Más (...) en el lado derecho de la lista.
- Seleccione Eliminar grupo de recursos y confirme la elección.
Pasos siguientes
En este documento, ha aprendido a usar Streams API de Apache Kafka con Kafka en HDInsight. Use los recursos siguientes para obtener más información sobre cómo trabajar con Kafka.