Exercício – Criar o produtor do Kafka
Agora que os clusters Kafka e Spark foram implantados, vamos adicionar um produtor do Kafka ao nó de cabeçalho do Kafka. Este produtor é um estimulador de preço de estoque, que produz preços de estoque artificiais.
Baixar o exemplo
- No navegador da Internet, acesse https://github.com/Azure/hdinsight-mslearn e baixe ou clone o exemplo localmente se você ainda não fez isso em um módulo anterior.
- Abra o arquivo Streaming\python-producer-simulator-template.py Estruturado do Spark localmente.
Recuperar as URLs do agente Kafka
A seguir, você precisa recuperar as URLs do agente Kafka usando o ssh no cabeçalho e adicionando as URLs ao arquivo Python.
Para conectar-se ao nó principal primário do cluster Apache Kafka, você precisará realizar ssh no nó. A Azure Cloud Shell no portal do Azure é a maneira recomendada de conectar-se. No portal do Azure, clique no botão Azure Cloud Shell na barra de ferramentas superior e selecione Bash. Você também pode usar um prompt de comando habilitado para ssh, como o Git Bash.
Se você não tiver usado o Azure Cloud Shell antes, será exibida uma notificação informando de que não há nenhum armazenamento montado. Selecione sua assinatura do Azure na caixa Assinatura e clique em Criar Armazenamento.
No prompt de nuvem, cole o comando a seguir. Substitua
sshuser
pelo nome de usuário SSH. Substituakafka-mslearn-stock
pelo nome do cluster de Apache Kafka e observe que você deve incluir -ssh após o nome do cluster.ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
Quando você se conectar pela primeira vez ao cluster, seu cliente SSH poderá exibir um aviso de que a autenticidade do host não pode ser estabelecida. Quando for solicitado, digite sim e pressione Enter para adicionar o host à lista de servidores confiáveis do cliente SSH.
Quando solicitado, insira a senha do usuário SSH.
Após a conexão, você verá informações semelhantes ao seguinte texto:
Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64) * Documentation: https://help.ubuntu.com * Management: https://landscape.canonical.com * Support: https://ubuntu.com/advantage * Overheard at KubeCon: "microk8s.status just blew my mind". https://microk8s.io/docs/commands#microk8s.status 0 packages can be updated. 0 updates are security updates. Welcome to Kafka on HDInsight. The programs included with the Ubuntu system are free software; the exact distribution terms for each program are described in the individual files in /usr/share/doc/*/copyright. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by applicable law. To run a command as administrator (user "root"), use "sudo <command>". See "man sudo_root" for details.
Instale jq, um processador JSON de linha de comando. Esse utilitário é usado para analisar documentos JSON e é útil para analisar as informações do host. Na conexão SSH aberta, digite o seguinte comando para instalar o
jq
:sudo apt -y install jq
Configurar variável de senha. Substitua
PASSWORD
pela senha de logon do cluster e insira o comando:export password='PASSWORD'
Extraia o nome do cluster com grafia correta de maiúsculas e minúsculas. A grafia de maiúsculas e minúsculas real do nome do cluster pode ser diferente do esperado, dependendo de como o cluster foi criado. Esse comando obterá a grafia de maiúsculas e minúsculas real e a armazenará em uma variável. Insira o seguinte comando:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Esse comando não tem resposta.
Para definir uma variável de ambiente com informações de host Zookeeper, use o comando especificado abaixo. O comando recupera todos os hosts Zookeeper e retorna apenas as duas primeiras entradas. Isso ocorre porque você deseja certa redundância no caso de um host ficar inacessível.
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
Observação
Este comando requer acesso do Ambari. Se o cluster estiver atrás de um NSG, execute esse comando em um computador que possa acessar o Ambari.
Esse comando também não tem resposta.
Para verificar se a variável de ambiente é definida corretamente, use o seguinte comando:
echo $KAFKAZKHOSTS
Esse comando retorna informações semelhantes ao seguinte texto:
zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
Para definir uma variável de ambiente com informações de host agente do Apache Kafka, use o seguinte comando:
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Observação
Este comando requer acesso do Ambari. Se o cluster estiver atrás de um NSG, execute esse comando em um computador que possa acessar o Ambari.
Esse comando não tem nenhuma saída.
Para verificar se a variável de ambiente é definida corretamente, use o seguinte comando:
echo $KAFKABROKERS
Esse comando retorna informações semelhantes ao seguinte texto:
wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
Copie um dos valores do agente Kafka retornados na etapa anterior para o arquivo python-producer-simulator-template.py na linha 19 e inclua aspas simples em torno do valor, por exemplo:
kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
Salve o arquivo python-producer-simulator-template-simulator-template.py.
De volta à janela conexão SSH, use o comando a seguir para criar um tópico.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
Esse comando se conecta ao ZooKeeper usando as informações de host armazenadas em $KAFKAZKHOSTS. A seguir, ele cria um tópico de Apache Kafka chamado stockVals, para corresponder ao nome do tópico em python-producer-simulator-template.py.
Copie o arquivo do Python para o nó principal e execute o arquivo para transmitir dados
Em uma nova janela do Git, navegue até a localização do arquivo python-producer-simulator-template.py e copie-o do computador local para o nó principal primário usando o comando a seguir. Substitua
kafka-mslearn-stock
pelo nome do cluster de Apache Kafka e observe que você deve incluir -ssh após o nome do cluster.scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
Quando for perguntado se você deseja continuar a conexão, digite yes. No prompt, insira a senha do cluster. Depois que o arquivo é transferido, a saída a seguir é exibida.
python-producer-simulator-template.py 100% 1896 71.9KB/s 00:00
Agora, volte para o prompt de comando do Azure em que você recuperou as informações do agente e execute o seguinte comando para instalar o Kafka:
sudo pip install kafka-python
Depois que o Kafka for instalado com êxito, a saída a seguir será exibida.
Installing collected packages: kafka-python Successfully installed kafka-python-1.4.7
Na mesma janela, instale as solicitações usando o seguinte comando:
sudo apt-get install python-requests
Quando perguntado "Após esta operação, serão usados 4.327 kB de espaço em disco adicional. Deseja continuar? [Y/n]”, digite y.
Quando as solicitações estiverem instaladas com êxito, uma saída semelhante à mostrada abaixo será exibida.
Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ... Setting up python-requests (2.9.1-3ubuntu0.1) ...
Na mesma janela, use o comando a seguir para executar o arquivo do Python
python python-producer-simulator-template.py
Será exibida uma saída semelhante à seguinte:
No loops argument provided. Default loops are 1000 Running in simulated mode [ { "symbol": "MSFT", "size": 355, "price": 147.205, "time": 1578029521022 }, { "symbol": "BA", "size": 345, "price": 352.607, "time": 1578029521022 }, { "symbol": "JNJ", "size": 58, "price": 142.043, "time": 1578029521022 }, { "symbol": "F", "size": 380, "price": 8.545, "time": 1578029521022 }, { "symbol": "TSLA", "size": 442, "price": 329.342, "time": 1578029521022 }, { "symbol": "BAC", "size": 167, "price": 32.921, "time": 1578029521022 }, { "symbol": "GE", "size": 222, "price": 11.115, "time": 1578029521022 }, { "symbol": "MMM", "size": 312, "price": 174.643, "time": 1578029521022 }, { "symbol": "INTC", "size": 483, "price": 54.978, "time": 1578029521022 }, { "symbol": "WMT", "size": 387, "price": 120.355, "time": 1578029521022 } ] stockVals 2 0 stockVals 1 0 stockVals 3 0 stockVals 2 1 stockVals 7 0 stockVals 7 1 stockVals 1 1 stockVals 4 0 stockVals 4 1 stockVals 1 2
Essa saída fornece os preços de ações simulados para as ações listadas no arquivo python-producer-simulated-template.py seguidos do tópico, da partição e do deslocamento da mensagem no tópico. Você pode ver que sempre que o produtor é disparado (a cada segundo), um novo lote de preços de estoque é gerado e cada nova mensagem é adicionada a uma partição em um determinado deslocamento.