Tutorial: Usar a API de streams do Apache Kafka no Azure HDInsight
Saiba como criar um aplicativo que usa a API de Streams do Apache Kafka e executá-lo com o Kafka no HDInsight.
O aplicativo usado neste tutorial é uma contagem de palavras de streaming. Ele lê dados de texto de um tópico Kafka, extrai palavras individuais e, em seguida, armazena a palavra e a contagem em outro tópico Kafka.
O processamento de fluxo do Kafka normalmente é feito usando o Apache Spark. O Kafka versão 2.1.1 e 2.4.1 (no HDInsight 4.0 e 5.0) dá suporte à API de Streams do Kafka. Essa API permite transformar fluxos de dados entre tópicos de entrada e saída.
Para obter mais informações sobre Streams do Kafka, consulte a documentaçãoIntrodução a Streams em Apache.org.
Neste tutorial, você aprenderá como:
- Compreender o código
- Compilar e implantar o aplicativo
- Configurar tópicos Kafka
- Executar o código
Pré-requisitos
Um cluster do Kafka no HDInsight 4.0 ou 5.0. Para saber como criar um Kafka no cluster HDInsight, consulte o documento Iniciar com Apache Kafka no HDInsight.
Conclua as etapas no documento API de Produtor e Consumidor do Apache Kafka. As etapas neste documento usam o aplicativo de exemplo e os tópicos criados neste tutorial.
JDK (Java Developer Kit) versão 8 ou um equivalente, como o OpenJDK.
Apache Maven corretamente instalado de acordo com o Apache. O Maven é um sistema de construção de projetos para projetos Java.
Um cliente SSH. Para saber mais, confira Conectar-se ao HDInsight (Apache Hadoop) usando SSH.
Compreender o código
O aplicativo de exemplo está localizado em https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, além do subdiretório Streaming
. O aplicativo consiste de dois arquivos:
pom.xml
: este arquivo define as dependências do projeto, versão do Java e os métodos de empacotamento.Stream.java
: Esse arquivo implementa a lógica de streaming.
Pom.xml
As coisas importantes para entender no arquivo pom.xml
são:
Dependências: Este projeto depende da API do Kafka Streams, que é fornecida pelo pacote
kafka-clients
. O seguinte código XML define essa dependência:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
A entrada
${kafka.version}
é declarada na seção<properties>..</properties>
depom.xml
, e está configurada para a versão Kafka do cluster HDInsight.Plug-ins: os plug-ins do Maven oferecem várias funcionalidades. Neste projeto, são usados os seguintes plug-ins:
maven-compiler-plugin
: usado para definir a versão do Java usada pelo projeto como 8. O HDInsight 4.0 e 5.0 requer Java 8.maven-shade-plugin
: usado para gerar um uber jar que contém esse aplicativo, bem como eventuais dependências. Também é usado para definir o ponto de entrada do aplicativo, para que você possa executar diretamente o arquivo Jar sem a necessidade de especificar a classe principal.
Stream.java
O arquivo Stream.java usa a API do Streams para implementar um aplicativo de contagem de palavras. Ele lê dados de um tópico Kafka chamado test
e grava as contagens de palavras em um tópico chamado wordcounts
.
O código a seguir define o aplicativo de contagem de palavras:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Compilar e implantar o exemplo
Para criar e implantar o projeto para o Kafka no Cluster HDInsight, utilize as seguintes etapas:
Defina o diretório atual para o local do diretório
hdinsight-kafka-java-get-started-master\Streaming
e, em seguida, use o seguinte comando para criar um pacote jar:mvn clean package
Este comando cria o pacote em
target/kafka-streaming-1.0-SNAPSHOT.jar
.Substitua
sshuser
pelo usuário do SSH do cluster e substituaclustername
pelo nome do cluster. Use o seguinte comando para copiar o arquivokafka-streaming-1.0-SNAPSHOT.jar
para o Cluster HDInsight. Se solicitado, insira a senha para a conta de usuário SSH.scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
Criar tópicos do Apache Kafka
Substitua
sshuser
pelo usuário do SSH do cluster e substituaCLUSTERNAME
pelo nome do cluster. Abra uma conexão SSH para o cluster inserindo o seguinte comando. Se solicitado, insira a senha para a conta de usuário SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Instale jq, um processador JSON de linha de comando. Na conexão SSH aberta, digite o seguinte comando para instalar o
jq
:sudo apt -y install jq
Configurar variável de senha. Substitua
PASSWORD
pela senha de logon do cluster e insira o comando:export PASSWORD='PASSWORD'
Extraia o nome do cluster com grafia correta de maiúsculas e minúsculas. A grafia de maiúsculas e minúsculas real do nome do cluster pode ser diferente do esperado, dependendo de como o cluster foi criado. Esse comando obtém o invólucro real e, em seguida, armazena-o em uma variável. Insira o seguinte comando:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Observação
Se você estiver realizando esse processo de fora do cluster, haverá um procedimento diferente para armazenar o nome do cluster. Obtenha o nome do cluster em letras minúsculas do portal do Azure. Em seguida, substitua o nome do cluster por
<clustername>
no comando a seguir e execute-o:export clusterName='<clustername>'
.Para obter os hosts de broker Kafka e os hosts Apache Zookeeper, use os comandos a seguir. Quando solicitado, insira a senha para a conta de logon do cluster (admin).
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Observação
Esses comandos exigem acesso ao Ambari. Se o cluster estiver atrás de um NSG, execute esses comandos em um computador que possa acessar o Ambari.
Para criar os tópicos usados pela operação de streaming, use os seguintes comandos:
Observação
Você pode receber um erro que o tópico
test
já existe. Não há problema nisso, pois ele pode ter sido criado no tutorial de API de produtor e consumidor./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Os tópicos são usados para as seguintes finalidades:
test
: Este tópico é onde os registros são recebidos. O aplicativo de streaming faz a leitura daqui.wordcounts
: Este tópico é onde o aplicativo de transmissão armazena sua saída.RekeyedIntermediateTopic
: Este tópico é usado para reparticionar dados como a contagem é atualizada pelo operadorcountByKey
.wordcount-example-Counts-changelog
: Este tópico é um armazenamento de estado usado pela operaçãocountByKey
O Kafka no HDInsight também pode ser configurado para criar tópicos automaticamente. Para obter mais informações, consulte o documento Configurar a criação automática de tópicos.
Executar o código
Para iniciar o aplicativo de streaming como processo em segundo plano, use o seguinte comando:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
Você pode receber um aviso sobre o Apache
log4j
. Você pode ignorar esse aviso.Para enviar registros para o tópico
test
, use o seguinte comando para iniciar o aplicativo produtor:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
Após a conclusão do produtor, use o seguinte comando para exibir as informações armazenadas no tópico
wordcounts
:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
Os parâmetros
--property
informam ao consumidor do console para imprimir a chave (palavra) juntamente com a contagem (valor). Esses parâmetros também configuram o desserializador a ser usado ao fazer a leitura desses valores do Kafka.A saída é semelhante ao texto a seguir:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
O parâmetro
--from-beginning
configura o consumidor para começar do início dos registros armazenados no tópico. A contagem aumenta sempre que uma palavra é encontrada, logo o tópico contém várias entradas para cada palavra, com uma contagem crescente.Use Ctrl + C para sair do produtor. Continue usando Ctrl + C para sair do aplicativo e do consumidor.
Para excluir os tópicos usados pela operação de streaming, use os seguintes comandos:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Limpar os recursos
Para limpar os recursos criados por este tutorial, você pode excluir o grupo de recursos. A exclusão do grupo de recursos também exclui o cluster HDInsight associado e todos os outros recursos associados ao grupo de recursos.
Para remover o grupo de recursos usando o portal do Azure:
- No portal do Azure, expanda o menu à esquerda para abrir o menu de serviços e escolha Grupo de Recursos para exibir a lista dos seus grupos de recursos.
- Localize o grupo de recursos a ser excluído e clique com o botão direito do mouse no botão Mais (...) do lado direito da lista.
- Selecione Excluir grupo de recursos e confirme.
Próximas etapas
Neste documento, você aprendeu a usar a API de Streams do Apache Kafka com Kafka no HDInsight. Confira o seguinte para saber mais sobre como trabalhar com o Kafka.