Tutorial: Utilizar as APIs de Produtor e de Consumidor de Apache Kafka
Saiba como utilizar as APIs de Produtor e de Consumidor de Apache Kafka com o Kafka no HDInsight.
A API de Produtor de Kafka permite que as aplicações enviem fluxos de dados para o cluster de Kafka. A API de Consumidor de Kafka permite que as aplicações leiam fluxos de dados a partir do cluster.
Neste tutorial, ficará a saber como:
- Pré-requisitos
- Compreender o código
- Criar e implementar a aplicação
- Executar a aplicação no cluster
Para obter mais informações sobre as APIs, veja a documentação do Apache dedicada à API de Produtor e à API de Consumidor.
Pré-requisitos
- Apache Kafka no cluster do HDInsight. Para saber como criar o cluster, veja Começar com o Apache Kafka no HDInsight.
- Java Developer Kit (JDK) versão 8 ou equivalente, como OpenJDK.
- O Apache Maven foi instalado corretamente de acordo com o Apache. O Maven é um sistema de compilação de projetos para projetos Java.
- Um cliente SSH como o Putty. Para obter mais informações, veja Ligar ao HDInsight (Apache Hadoop) através de SSH.
Compreender o código
A aplicação de exemplo está localizada em https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, no subdiretório Producer-Consumer
. Se estiver a utilizar um cluster do Kafka com o Pacote de Segurança Enterprise (ESP), deve utilizar a versão da aplicação localizada no DomainJoined-Producer-Consumer
subdiretório.
Essencialmente, a aplicação é composta por quatro ficheiros:
-
pom.xml
: este ficheiro define as dependências do projeto, a versão de Java e os métodos de empacotamento. -
Producer.java
: este ficheiro envia frases aleatórias para o Kafka através da API de produtor. -
Consumer.java
: este ficheiro utiliza a API de consumidor para ler os dados a partir de Kafka e emiti-los para STDOUT. -
AdminClientWrapper.java
: este ficheiro utiliza a API de administração para criar, descrever e eliminar tópicos do Kafka. -
Run.java
: a interface de linha de comandos utilizada para executar o código do produtor e do consumidor.
Pom.xml
Seguem-se os aspetos importantes a compreender em relação ao ficheiro pom.xml
:
Dependências: este projeto depende das APIs de produtor e de consumidor de Kafka, fornecidas pelo pacote
kafka-clients
. Esta dependência é definida pelo seguinte código XML:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
A entrada
${kafka.version}
é declarada na secção<properties>..</properties>
depom.xml
e está configurada para a versão de Kafka do cluster do HDInsight.Plug-ins: os plug-ins de Maven proporcionam diversas funcionalidades. Neste projeto, são utilizados os seguintes plug-ins:
-
maven-compiler-plugin
: utilizado para definir a versão de Java utilizada pelo projeto para a versão 8. Esta é a versão de Java utilizada pelo HDInsight 3.6. -
maven-shade-plugin
: utilizado para gerar um JAR com dependências, que contém precisamente não só esta aplicação como todas as dependências. Também é utilizado para definir o ponto de entrada da aplicação, para que possa executar diretamente o ficheiro JAR sem ter de especificar a classe principal.
-
Producer.Java
O produtor comunica com os anfitriões de mediador (nós de trabalho) de Kafka e envia os dados para um tópico do Kafka. O fragmento de código seguinte é do ficheiro Producer.java do repositório do GitHub e mostra como definir as propriedades do produtor. Para clusters Com Segurança Empresarial Ativada, tem de ser adicionada uma propriedade adicional "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.Java
O consumidor comunica com os anfitriões de mediador (nós de trabalho) de Kafka e lê os registos de forma cíclica. O fragmento de código seguinte do ficheiro Consumer.java define as propriedades do consumidor. Para clusters Com Segurança Empresarial Ativada, tem de ser adicionada uma propriedade adicional "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
Neste código, o consumidor está configurado para ler a partir do início do tópico (auto.offset.reset
está definido como earliest
).
Run.Java
O ficheiro Run.java fornece uma interface de linha de comandos que executa o código de produtor ou de consumidor. Tem de fornecer as informações do anfitrião de mediador de Kafka como um parâmetro. Opcionalmente, pode incluir um valor de ID de grupo, que é utilizado pelo processo de consumidor. Se criar várias instâncias de consumidor com o mesmo ID de grupo, estas irão fazer o balanceamento de carga da leitura do tópico.
Criar e implementar o exemplo
Utilizar ficheiros JAR pré-criados
Transfira os jars a partir do exemplo de Introdução ao Azure do Kafka. Se o cluster tiver o Pacote de Segurança Enterprise (ESP) ativado, utilize kafka-producer-consumer-esp.jar. Utilize o comando abaixo para copiar os jars para o cluster.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Criar os ficheiros JAR a partir do código
Se quiser ignorar este passo, os jars pré-criados podem ser transferidos a Prebuilt-Jars
partir do subdiretório. Transfira kafka-producer-consumer.jar. Se o cluster tiver o Pacote de Segurança Enterprise (ESP) ativado, utilize kafka-producer-consumer-esp.jar. Execute o passo 3 para copiar o jar para o cluster do HDInsight.
Transfira e extraia os exemplos de https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.
Defina o diretório atual para a localização do
hdinsight-kafka-java-get-started\Producer-Consumer
diretório. Se estiver a utilizar o cluster do Kafka ativado pelo Pacote de Segurança Enterprise (ESP), deve definir a localização comoDomainJoined-Producer-Consumer
subdiretório. Utilize o seguinte comando para criar a aplicação:mvn clean package
Este comando cria um diretório com o nome
target
, que contém um ficheiro com o nomekafka-producer-consumer-1.0-SNAPSHOT.jar
. Para clusters ESP, o ficheiro serákafka-producer-consumer-esp-1.0-SNAPSHOT.jar
Substitua
sshuser
pelo utilizador SSH do seu cluster eCLUSTERNAME
pelo nome do seu cluster. Introduza o seguinte comando para copiar o ficheiro para okafka-producer-consumer-1.0-SNAPSHOT.jar
cluster do HDInsight. Quando lhe for pedido, introduza a palavra-passe do utilizador SSH.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Executar o exemplo
Substitua
sshuser
pelo utilizador SSH do seu cluster eCLUSTERNAME
pelo nome do seu cluster. Abra uma ligação SSH ao cluster ao introduzir o seguinte comando. Se tal lhe for pedido, introduza a palavra-passe da conta de utilizador SSH.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Para obter os anfitriões do mediador kafka, substitua os valores por
<clustername>
e<password>
no seguinte comando e execute-os. Utilize a mesma caixa para<clustername>
conforme mostrado na portal do Azure. Substitua<password>
pela palavra-passe de início de sessão do cluster e, em seguida, execute:sudo apt -y install jq export CLUSTER_NAME='<clustername>' export PASSWORD='<password>' export KAFKABROKERS=$(curl -sS -u admin:$PASSWORD -G https://$CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/$CLUSTER_NAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Nota
Este comando requer acesso ao Ambari. Se o cluster estiver protegido por um NSG, execute este comando a partir de um computador que possa aceder ao Ambari.
Crie o tópico do Kafka,
myTest
, ao introduzir o seguinte comando:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
Para executar o produtor e escrever dados para o tópico, utilize o seguinte comando:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
Quando o produtor tiver terminado, utilize o seguinte comando para ler a partir do tópico:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
A leitura dos registos, juntamente com uma contagem de registos, é apresentada.
Utilize Ctrl + C para sair do consumidor.
Vários consumidores
Os consumidores de Kafka utilizam um grupo de consumidores quando leem os registos. Utilizar o mesmo grupo com vários consumidores resulta em leituras com balanceamento de carga de um tópico. Cada consumidor no grupo recebe uma parte dos registos.
A aplicação de consumidor aceita um parâmetro que é utilizado como o ID de grupo. Por exemplo, o seguinte comando inicia um consumidor através de um ID de grupo de myGroup
:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Utilize Ctrl + C para sair do consumidor.
Para ver este processo em ação, utilize o seguinte comando:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Este comando utiliza tmux
para dividir o terminal em duas colunas. É iniciado um consumidor em cada coluna, com o mesmo valor de ID de grupo. Após os consumidores concluírem a leitura, tenha em atenção que cada um deles lê apenas uma parte dos registos. Utilize Ctrl + C duas vezes para sair tmux
de .
O consumo pelos clientes dentro do mesmo grupo é processado pelas partições do tópico. Neste exemplo de tópico, o tópico test
criado anteriormente, tem oito partições. Se iniciar oito consumidores, cada consumidor lê os registos de uma única partição do tópico.
Importante
Não podem existir mais instâncias de consumidor num grupo de consumidores do que partições. Neste exemplo, um grupo de consumidores pode incluir até oito consumidores, pois esse é o número de partições no tópico. Também pode ter vários grupos de consumidores, em que cada grupo não tem mais do que oito consumidores.
Os registos armazenados no Kafka são armazenados pela ordem em que são recebidos numa partição. Para obter uma entrega por ordem dos registos dentro de uma partição, crie um grupo de consumidores em que o número de instâncias de consumidor corresponde ao número de partições. Para obter uma entrega por ordem dos registos dentro do tópico, crie um grupo de consumidores com apenas uma instância de consumidor.
Problemas Comuns enfrentados
Falha na criação do tópico Se o cluster tiver o Enterprise Security Pack ativado, utilize os ficheiros JAR pré-criados para produtor e consumidor. O jar esp pode ser criado a partir do código no
DomainJoined-Producer-Consumer
subdiretório. As propriedades de produtor e consumidor têm uma propriedadeCommonClientConfigs.SECURITY_PROTOCOL_CONFIG
adicional para clusters preparados para ESP.Falha nos clusters preparados para ESP: se as operações de produção e consumo falharem e estiver a utilizar um cluster preparado para ESP, verifique se o utilizador
kafka
está presente em todas as políticas do Ranger. Se não estiver presente, adicione-o a todas as políticas do Ranger.
Limpar os recursos
Para limpar os recursos criados por este tutorial, pode eliminar o grupo de recursos. Ao eliminar o grupo de recursos também elimina o cluster do HDInsight associado e quaisquer outros recursos associados ao grupo de recursos.
Para remover o grupo de recursos através do Portal do Azure:
- No Portal do Azure, expanda o menu no lado esquerdo para abrir o menu de serviços e, em seguida, escolha Grupos de Recursos, para apresentar a lista dos seus grupos de recursos.
- Encontre o grupo de recursos a eliminar e, em seguida, clique com o botão direito do rato em Mais (...) no lado direito da lista.
- Selecione Eliminar grupo de recursos e, em seguida, confirme.
Passos seguintes
Neste documento, aprendeu a utilizar a API de Produtor e Consumidor do Apache Kafka com o Kafka no HDInsight. Utilize o seguinte para obter mais informações sobre como trabalhar com o Kafka: