Tutorial: Usar a API de fluxos do Apache Kafka no Azure HDInsight
Saiba como criar um aplicativo que usa a API Apache Kafka Streams e executá-lo com o Kafka no HDInsight.
A aplicação utilizada neste tutorial é uma contagem de palavras de transmissão em fluxo. Lê os dados do texto de um tópico do Kafka, extrai palavras individuais e, em seguida, armazena as palavras e as contagens noutro tópico.
O processamento de fluxo Kafka é frequentemente feito usando o Apache Spark. As versões 2.1.1 e 2.4.1 do Kafka (no HDInsight 4.0 e 5.0) suportam a API do Kafka Streams. Esta API permite-lhe transformar fluxos de dados entre tópicos de entrada e de saída.
Para obter mais informações sobre as Transmissões em Fluxo do Kafka, veja a documentação Intro to Streams (Introdução às Transmissões em Fluxo) em Apache.org.
Neste tutorial, irá aprender a:
- Compreender o código
- Criar e implementar a aplicação
- Configurar tópicos do Kafka
- Executar o código
Pré-requisitos
Um Kafka no cluster HDInsight 4.0 ou 5.0. Para saber como criar um cluster Kafka no HDInsight, consulte o documento Iniciar com o Apache Kafka no HDInsight .
Conclua as etapas no documento Apache Kafka Consumer and Producer API . Os passos neste documento utilizam a aplicação de exemplo e os tópicos que criou neste tutorial.
Java Developer Kit (JDK) versão 8 ou equivalente, como OpenJDK.
Apache Maven instalado corretamente de acordo com o Apache. Maven é um sistema de construção de projetos para projetos Java.
Um cliente SSH. Para obter mais informações, veja Ligar ao HDInsight (Apache Hadoop) através de SSH.
Compreender o código
A aplicação de exemplo está localizada em https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, no subdiretório Streaming
. A aplicação é composta por dois ficheiros:
pom.xml
: este ficheiro define as dependências do projeto, a versão de Java e os métodos de empacotamento.Stream.java
: este ficheiro implementa a lógica de transmissão em fluxo.
Pom.xml
Seguem-se os aspetos importantes a compreender em relação ao ficheiro pom.xml
:
Dependências: este projeto depende das APIs de Streams do Kafka, que são disponibilizadas pelo pacote
kafka-clients
. Esta dependência é definida pelo seguinte código XML:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
A entrada
${kafka.version}
é declarada na secção<properties>..</properties>
depom.xml
e está configurada para a versão de Kafka do cluster do HDInsight.Plug-ins: os plug-ins de Maven proporcionam diversas funcionalidades. Neste projeto, são utilizados os seguintes plug-ins:
maven-compiler-plugin
: utilizado para definir a versão de Java utilizada pelo projeto para a versão 8. O HDInsight 4.0 e 5.0 requer Java 8.maven-shade-plugin
: Usado para gerar um uber jar que contém este aplicativo e quaisquer dependências. Ele também é usado para definir o ponto de entrada do aplicativo, para que você possa executar diretamente o arquivo Jar sem ter que especificar a classe principal.
Stream.java
O ficheiro Stream.java utiliza a API Streams para implementar uma aplicação de contagem de palavras. Lê os dados de um tópico do Kafka, denominado test
, e escreve as contagens de palavras num tópico com o nome wordcounts
.
O código seguinte define a aplicação de contagem de palavras:
package com.microsoft.example;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
public class Stream
{
public static void main( String[] args ) {
Properties streamsConfig = new Properties();
// The name must be unique on the Kafka cluster
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
// Brokers
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
// SerDes for key and values
streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Serdes for the word and count
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = sentences
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcounts");
KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Criar e implementar o exemplo
Para compilar e implementar o projeto no cluster do Kafka no HDInsight, siga os passos abaixo:
Defina seu diretório atual para o local do
hdinsight-kafka-java-get-started-master\Streaming
diretório e, em seguida, use o seguinte comando para criar um pacote jar:mvn clean package
Este comando cria o pacote em
target/kafka-streaming-1.0-SNAPSHOT.jar
.Substitua
sshuser
pelo utilizador SSH do seu cluster eclustername
pelo nome do seu cluster. Use o comando a seguir para copiar o arquivo para okafka-streaming-1.0-SNAPSHOT.jar
cluster HDInsight. Se tal lhe for pedido, introduza a palavra-passe da conta de utilizador SSH.scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
Criar tópicos do Apache Kafka
Substitua
sshuser
pelo utilizador SSH do seu cluster eCLUSTERNAME
pelo nome do seu cluster. Abra uma conexão SSH com o cluster, inserindo o seguinte comando. Se tal lhe for pedido, introduza a palavra-passe da conta de utilizador SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Instale jq, um processador JSON de linha de comando. A partir da conexão SSH aberta, digite o seguinte comando para instalar
jq
:sudo apt -y install jq
Configure a variável de senha. Substitua
PASSWORD
pela senha de login do cluster e digite o comando:export PASSWORD='PASSWORD'
Extraia o nome do cluster com caixa correta. O invólucro real do nome do cluster pode ser diferente do esperado, dependendo de como o cluster foi criado. Este comando obtém o invólucro real e, em seguida, armazena-o em uma variável. Introduza o seguinte comando:
export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Nota
Se você estiver fazendo esse processo de fora do cluster, há um procedimento diferente para armazenar o nome do cluster. Obtenha o nome do cluster em minúsculas no portal do Azure. Em seguida, substitua o nome do cluster pelo
<clustername>
comando a seguir e execute-o:export clusterName='<clustername>'
.Para obter os hosts do broker Kafka e os hosts do Apache Zookeeper, use os seguintes comandos. Quando lhe for pedido, introduza a palavra-passe para o início de sessão (administrador) do cluster.
export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2); export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Nota
Esses comandos exigem acesso ao Ambari. Se o cluster estiver atrás de um NSG, execute esses comandos a partir de uma máquina que possa acessar o Ambari.
Para criar os tópicos que a operação de transmissão em fluxo vai utilizar, utilize os seguintes comandos:
Nota
Poderá receber um erro que diz que o tópico
test
já existe. Esse erro não é problemático, porque o tópico pode já ter sido criado no tutorial da API Producer and Consumer./usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Os tópicos são utilizados para os seguintes fins:
test
: é neste tópico onde os registos são recebidos. A aplicação de transmissão em fluxo lê a partir deste tópico.wordcounts
: é neste tópico que a aplicação de transmissão em fluxo armazena a respetiva saída.RekeyedIntermediateTopic
: este tópico é utilizado para reparticionar os dados à medida que a contagem é atualizada pelo operadorcountByKey
.wordcount-example-Counts-changelog
: este tópico é um arquivo de estado que a operaçãocountByKey
utiliza.
O Kafka no HDInsight também pode ser configurado para criar tópicos automaticamente. Para obter mais informações, veja o documento Configure automatic topic creation (Configurar a criação automática de tópicos).
Executar o código
Para iniciar a aplicação de transmissão em fluxo como um processo em segundo plano, utilize o seguinte comando:
java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
Você pode receber um aviso sobre o Apache
log4j
. Pode ignorar este aviso.Para enviar os registos para o tópico
test
, utilize o seguinte comando para iniciar a aplicação de produtor:java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
Após a conclusão do produtor, utilize o seguinte comando par ver as informações armazenada no tópico
wordcounts
:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
Os parâmetros
--property
dizem ao consumidor da consola para imprimir a chave (palavra), juntamente com a contagem (valor). Este parâmetro também configura o desserializador que vai ser utilizado para ler esses valores a partir do Kafka.O resultado é semelhante ao seguinte texto:
dwarfs 13635 ago 13664 snow 13636 dwarfs 13636 ago 13665 a 13803 ago 13666 a 13804 ago 13667 ago 13668 jumped 13640 jumped 13641
O parâmetro
--from-beginning
configura o consumidor para começar no início dos registos armazenados no tópico. A contagem aumenta sempre que for encontrada uma palavra, de modo que o tópico contém múltiplas entradas para cada palavra, com uma contagem progressiva.Utilize Ctrl + C para sair do produtor. Continue a utilizar Ctrl + C para sair da aplicação e do consumidor.
Para excluir os tópicos usados pela operação de streaming, use os seguintes comandos:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
Clean up resources (Limpar recursos)
Para limpar os recursos criados por este tutorial, pode eliminar o grupo de recursos. Ao eliminar o grupo de recursos também elimina o cluster do HDInsight associado e quaisquer outros recursos associados ao grupo de recursos.
Para remover o grupo de recursos através do Portal do Azure:
- No Portal do Azure, expanda o menu no lado esquerdo para abrir o menu de serviços e, em seguida, escolha Grupos de Recursos, para apresentar a lista dos seus grupos de recursos.
- Encontre o grupo de recursos a eliminar e, em seguida, clique com o botão direito do rato em Mais (...) no lado direito da lista.
- Selecione Eliminar grupo de recursos e, em seguida, confirme.
Próximos passos
Neste documento, você aprendeu como usar a API Apache Kafka Streams com o Kafka no HDInsight. Use o seguinte para saber mais sobre como trabalhar com Kafka.