Partilhar via


Tutorial: Usar a API de fluxos do Apache Kafka no Azure HDInsight

Saiba como criar um aplicativo que usa a API Apache Kafka Streams e executá-lo com o Kafka no HDInsight.

A aplicação utilizada neste tutorial é uma contagem de palavras de transmissão em fluxo. Lê os dados do texto de um tópico do Kafka, extrai palavras individuais e, em seguida, armazena as palavras e as contagens noutro tópico.

O processamento de fluxo Kafka é frequentemente feito usando o Apache Spark. As versões 2.1.1 e 2.4.1 do Kafka (no HDInsight 4.0 e 5.0) suportam a API do Kafka Streams. Esta API permite-lhe transformar fluxos de dados entre tópicos de entrada e de saída.

Para obter mais informações sobre as Transmissões em Fluxo do Kafka, veja a documentação Intro to Streams (Introdução às Transmissões em Fluxo) em Apache.org.

Neste tutorial, irá aprender a:

  • Compreender o código
  • Criar e implementar a aplicação
  • Configurar tópicos do Kafka
  • Executar o código

Pré-requisitos

Compreender o código

A aplicação de exemplo está localizada em https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, no subdiretório Streaming. A aplicação é composta por dois ficheiros:

  • pom.xml: este ficheiro define as dependências do projeto, a versão de Java e os métodos de empacotamento.
  • Stream.java: este ficheiro implementa a lógica de transmissão em fluxo.

Pom.xml

Seguem-se os aspetos importantes a compreender em relação ao ficheiro pom.xml:

  • Dependências: este projeto depende das APIs de Streams do Kafka, que são disponibilizadas pelo pacote kafka-clients. Esta dependência é definida pelo seguinte código XML:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    A entrada ${kafka.version} é declarada na secção <properties>..</properties> de pom.xml e está configurada para a versão de Kafka do cluster do HDInsight.

  • Plug-ins: os plug-ins de Maven proporcionam diversas funcionalidades. Neste projeto, são utilizados os seguintes plug-ins:

    • maven-compiler-plugin: utilizado para definir a versão de Java utilizada pelo projeto para a versão 8. O HDInsight 4.0 e 5.0 requer Java 8.
    • maven-shade-plugin: Usado para gerar um uber jar que contém este aplicativo e quaisquer dependências. Ele também é usado para definir o ponto de entrada do aplicativo, para que você possa executar diretamente o arquivo Jar sem ter que especificar a classe principal.

Stream.java

O ficheiro Stream.java utiliza a API Streams para implementar uma aplicação de contagem de palavras. Lê os dados de um tópico do Kafka, denominado test, e escreve as contagens de palavras num tópico com o nome wordcounts.

O código seguinte define a aplicação de contagem de palavras:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Criar e implementar o exemplo

Para compilar e implementar o projeto no cluster do Kafka no HDInsight, siga os passos abaixo:

  1. Defina seu diretório atual para o local do hdinsight-kafka-java-get-started-master\Streaming diretório e, em seguida, use o seguinte comando para criar um pacote jar:

    mvn clean package
    

    Este comando cria o pacote em target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Substitua sshuser pelo utilizador SSH do seu cluster e clustername pelo nome do seu cluster. Use o comando a seguir para copiar o arquivo para o kafka-streaming-1.0-SNAPSHOT.jar cluster HDInsight. Se tal lhe for pedido, introduza a palavra-passe da conta de utilizador SSH.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Criar tópicos do Apache Kafka

  1. Substitua sshuser pelo utilizador SSH do seu cluster e CLUSTERNAME pelo nome do seu cluster. Abra uma conexão SSH com o cluster, inserindo o seguinte comando. Se tal lhe for pedido, introduza a palavra-passe da conta de utilizador SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Instale jq, um processador JSON de linha de comando. A partir da conexão SSH aberta, digite o seguinte comando para instalar jq:

    sudo apt -y install jq
    
  3. Configure a variável de senha. Substitua PASSWORD pela senha de login do cluster e digite o comando:

    export PASSWORD='PASSWORD'
    
  4. Extraia o nome do cluster com caixa correta. O invólucro real do nome do cluster pode ser diferente do esperado, dependendo de como o cluster foi criado. Este comando obtém o invólucro real e, em seguida, armazena-o em uma variável. Introduza o seguinte comando:

    export CLUSTER_NAME=$(curl -u admin:$PASSWORD -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Nota

    Se você estiver fazendo esse processo de fora do cluster, há um procedimento diferente para armazenar o nome do cluster. Obtenha o nome do cluster em minúsculas no portal do Azure. Em seguida, substitua o nome do cluster pelo <clustername> comando a seguir e execute-o: export clusterName='<clustername>'.

  5. Para obter os hosts do broker Kafka e os hosts do Apache Zookeeper, use os seguintes comandos. Quando lhe for pedido, introduza a palavra-passe para o início de sessão (administrador) do cluster.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    Esses comandos exigem acesso ao Ambari. Se o cluster estiver atrás de um NSG, execute esses comandos a partir de uma máquina que possa acessar o Ambari.

  6. Para criar os tópicos que a operação de transmissão em fluxo vai utilizar, utilize os seguintes comandos:

    Nota

    Poderá receber um erro que diz que o tópico test já existe. Esse erro não é problemático, porque o tópico pode já ter sido criado no tutorial da API Producer and Consumer.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    Os tópicos são utilizados para os seguintes fins:

    • test: é neste tópico onde os registos são recebidos. A aplicação de transmissão em fluxo lê a partir deste tópico.
    • wordcounts: é neste tópico que a aplicação de transmissão em fluxo armazena a respetiva saída.
    • RekeyedIntermediateTopic: este tópico é utilizado para reparticionar os dados à medida que a contagem é atualizada pelo operador countByKey.
    • wordcount-example-Counts-changelog: este tópico é um arquivo de estado que a operação countByKey utiliza.

    O Kafka no HDInsight também pode ser configurado para criar tópicos automaticamente. Para obter mais informações, veja o documento Configure automatic topic creation (Configurar a criação automática de tópicos).

Executar o código

  1. Para iniciar a aplicação de transmissão em fluxo como um processo em segundo plano, utilize o seguinte comando:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    Você pode receber um aviso sobre o Apache log4j. Pode ignorar este aviso.

  2. Para enviar os registos para o tópico test, utilize o seguinte comando para iniciar a aplicação de produtor:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Após a conclusão do produtor, utilize o seguinte comando par ver as informações armazenada no tópico wordcounts:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    Os parâmetros --property dizem ao consumidor da consola para imprimir a chave (palavra), juntamente com a contagem (valor). Este parâmetro também configura o desserializador que vai ser utilizado para ler esses valores a partir do Kafka.

    O resultado é semelhante ao seguinte texto:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    O parâmetro --from-beginning configura o consumidor para começar no início dos registos armazenados no tópico. A contagem aumenta sempre que for encontrada uma palavra, de modo que o tópico contém múltiplas entradas para cada palavra, com uma contagem progressiva.

  4. Utilize Ctrl + C para sair do produtor. Continue a utilizar Ctrl + C para sair da aplicação e do consumidor.

  5. Para excluir os tópicos usados pela operação de streaming, use os seguintes comandos:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Clean up resources (Limpar recursos)

Para limpar os recursos criados por este tutorial, pode eliminar o grupo de recursos. Ao eliminar o grupo de recursos também elimina o cluster do HDInsight associado e quaisquer outros recursos associados ao grupo de recursos.

Para remover o grupo de recursos através do Portal do Azure:

  1. No Portal do Azure, expanda o menu no lado esquerdo para abrir o menu de serviços e, em seguida, escolha Grupos de Recursos, para apresentar a lista dos seus grupos de recursos.
  2. Encontre o grupo de recursos a eliminar e, em seguida, clique com o botão direito do rato em Mais (...) no lado direito da lista.
  3. Selecione Eliminar grupo de recursos e, em seguida, confirme.

Próximos passos

Neste documento, você aprendeu como usar a API Apache Kafka Streams com o Kafka no HDInsight. Use o seguinte para saber mais sobre como trabalhar com Kafka.