Oefening: de Kafka-producent maken
Nu de Kafka- en Spark-clusters zijn geïmplementeerd, kunnen we een Kafka-producent toevoegen aan het Kafka-hoofdknooppunt. Deze producent is een aandelenkoersstimulator, die kunstmatige aandelenprijzen produceert.
Het voorbeeld downloaden
- Ga in uw internetbrowser naar https://github.com/Azure/hdinsight-mslearn het voorbeeld en download of kloon het lokaal als u dit nog niet in een vorige module hebt gedaan.
- Open het Spark Structured Streaming\python-producer-simulator-template.py-bestand lokaal.
De Kafka-broker-URL's ophalen
Vervolgens moet u de Kafka-broker-URL's ophalen met behulp van ssh op het hoofdknooppunt en de URL's toevoegen aan het Python-bestand.
Als u verbinding wilt maken met het primaire hoofdknooppunt van het Apache Kafka-cluster, moet u ssh uitvoeren in het knooppunt. Azure Cloud Shell in Azure Portal is de aanbevolen manier om verbinding te maken. Klik in Azure Portal op de knop Azure Cloud Shell in de bovenste werkbalk en selecteer Bash. U kunt ook een opdrachtprompt met ssh gebruiken, zoals Git Bash.
Als u de Azure Cloud Shell nog niet eerder hebt gebruikt, wordt er een melding weergegeven dat er geen opslag is gekoppeld. Selecteer uw Azure-abonnement in het vak Abonnement en klik op Opslag maken.
Plak de volgende opdracht bij de cloudprompt. Vervang
sshuser
door de SSH-gebruikersnaam. Vervangkafka-mslearn-stock
door de naam van uw Apache Kafka-cluster en houd er rekening mee dat u -ssh moet opnemen na de clusternaam.ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
Wanneer u voor het eerst verbinding maakt met het cluster, wordt in de SSH-client mogelijk de waarschuwing weergegeven dat de authenticiteit van de host niet kan worden vastgesteld. Wanneer u wordt gevraagd de host te bevestigen, typt u yes en drukt u vervolgens op Enter om de host toe te voegen aan de lijst met vertrouwde servers van uw SSH-client.
Voer het wachtwoord voor de SSH-gebruiker in wanneer hierom wordt gevraagd.
Zodra er verbinding is gemaakt, ziet u informatie die er ongeveer als volgt uitziet:
Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64) * Documentation: https://help.ubuntu.com * Management: https://landscape.canonical.com * Support: https://ubuntu.com/advantage * Overheard at KubeCon: "microk8s.status just blew my mind". https://microk8s.io/docs/commands#microk8s.status 0 packages can be updated. 0 updates are security updates. Welcome to Kafka on HDInsight. The programs included with the Ubuntu system are free software; the exact distribution terms for each program are described in the individual files in /usr/share/doc/*/copyright. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by applicable law. To run a command as administrator (user "root"), use "sudo <command>". See "man sudo_root" for details.
Installeer jq, een opdrachtregel-JSON-processor. Dit hulpprogramma wordt gebruikt om JSON-documenten te parseren en is handig bij het parseren van de hostgegevens. Voer in de open SSH-verbinding de volgende opdracht in om
jq
te installeren:sudo apt -y install jq
wachtwoordvariabele instellen. Vervang
PASSWORD
door het aanmeldwachtwoord voor het cluster en voer de volgende opdracht in:export password='PASSWORD'
Extraheer de clusternaam met de juiste letters. De daadwerkelijke lettergrootte van de clusternaam kan anders zijn dan verwacht, afhankelijk van hoe het cluster is gemaakt. Met deze opdracht wordt de daadwerkelijke lettergrootte opgehaald en opgeslagen in een variabele. Voer de volgende opdracht in:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Deze opdracht heeft geen antwoord.
Gebruik de onderstaande opdracht om een omgevingsvariabele in te stellen met hostinformatie van Zookeeper. Met de opdracht worden alle Zookeeper-hosts opgehaald, waarna alleen de eerste twee vermeldingen worden geretourneerd. De reden hiervoor is dat u een bepaalde mate van redundantie wilt voor het geval één host onbereikbaar is.
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
Notitie
Voor deze opdracht is toegang tot Ambari vereist. Als uw cluster zich achter een NSG bevindt, voert u deze opdracht uit vanaf een computer die toegang heeft tot Ambari.
Deze opdracht heeft ook geen antwoord.
Gebruik de volgende opdracht om te controleren of de omgevingsvariabele juist is ingesteld:
echo $KAFKAZKHOSTS
Met deze opdracht wordt informatie geretourneerd die lijkt op de volgende tekst:
zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
Gebruik de volgende opdracht om een omgevingsvariabele in te stellen met brokerhostinformatie van Apache Kafka:
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Notitie
Voor deze opdracht is toegang tot Ambari vereist. Als uw cluster zich achter een NSG bevindt, voert u deze opdracht uit vanaf een computer die toegang heeft tot Ambari.
Deze opdracht heeft geen uitvoer.
Gebruik de volgende opdracht om te controleren of de omgevingsvariabele juist is ingesteld:
echo $KAFKABROKERS
Met deze opdracht wordt informatie geretourneerd die lijkt op de volgende tekst:
wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
Kopieer een van de Kafka-brokerwaarden die in de vorige stap zijn geretourneerd naar het python-producer-simulator-template.py-bestand op regel 19, en neem enkele aanhalingstekens rond de waarde op, bijvoorbeeld:
kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
Sla het python-producer-simulator-template-simulator-template.py bestand op.
Gebruik in het ssh-verbindingsvenster de volgende opdracht om een onderwerp te maken.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
Met deze opdracht maakt u verbinding met Zookeeper met behulp van de hostgegevens die zijn opgeslagen in $KAFKAZKHOSTS. Vervolgens wordt er een Apache Kafka-onderwerp gemaakt met de naam stockVals, zodat deze overeenkomt met de onderwerpnaam in python-producer-simulator-template.py.
Kopieer het Python-bestand naar het hoofdknooppunt en voer het bestand uit om gegevens te streamen
Navigeer in een nieuw Git-venster naar de locatie van het python-producer-simulator-template.py-bestand en kopieer het bestand van uw lokale computer naar het primaire hoofdknooppunt met behulp van de volgende opdracht. Vervang
kafka-mslearn-stock
door de naam van uw Apache Kafka-cluster en houd er rekening mee dat u -ssh moet opnemen na de clusternaam.scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
Wanneer u wordt gevraagd of u wilt doorgaan met verbinding maken, typt u ja. Voer vervolgens bij de prompt het wachtwoord voor het cluster in. Nadat het bestand is overgedragen, wordt de volgende uitvoer weergegeven.
python-producer-simulator-template.py 100% 1896 71.9KB/s 00:00
Ga nu terug naar de Azure-opdrachtprompt waar u de brokergegevens hebt opgehaald en voer de volgende opdracht uit om Kafka te installeren:
sudo pip install kafka-python
Nadat de Kafka is geïnstalleerd, wordt de volgende uitvoer weergegeven.
Installing collected packages: kafka-python Successfully installed kafka-python-1.4.7
Installeer aanvragen in hetzelfde venster met behulp van de volgende opdracht:
sudo apt-get install python-requests
Wanneer u wordt gevraagd "Na deze bewerking wordt 4.327 kB extra schijfruimte gebruikt. Wilt u doorgaan? [Y/n]" type y.
Wanneer aanvragen zijn geïnstalleerd, wordt uitvoer weergegeven die lijkt op het volgende.
Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ... Setting up python-requests (2.9.1-3ubuntu0.1) ...
Gebruik in hetzelfde venster de volgende opdracht om het Python-bestand uit te voeren
python python-producer-simulator-template.py
De uitvoer ziet er als volgt uit:
No loops argument provided. Default loops are 1000 Running in simulated mode [ { "symbol": "MSFT", "size": 355, "price": 147.205, "time": 1578029521022 }, { "symbol": "BA", "size": 345, "price": 352.607, "time": 1578029521022 }, { "symbol": "JNJ", "size": 58, "price": 142.043, "time": 1578029521022 }, { "symbol": "F", "size": 380, "price": 8.545, "time": 1578029521022 }, { "symbol": "TSLA", "size": 442, "price": 329.342, "time": 1578029521022 }, { "symbol": "BAC", "size": 167, "price": 32.921, "time": 1578029521022 }, { "symbol": "GE", "size": 222, "price": 11.115, "time": 1578029521022 }, { "symbol": "MMM", "size": 312, "price": 174.643, "time": 1578029521022 }, { "symbol": "INTC", "size": 483, "price": 54.978, "time": 1578029521022 }, { "symbol": "WMT", "size": 387, "price": 120.355, "time": 1578029521022 } ] stockVals 2 0 stockVals 1 0 stockVals 3 0 stockVals 2 1 stockVals 7 0 stockVals 7 1 stockVals 1 1 stockVals 4 0 stockVals 4 1 stockVals 1 2
Deze uitvoer bevat de gesimuleerde aandelenkoersen voor de aandelen die worden vermeld in het python-producer-simulated-template.py-bestand, gevolgd door het onderwerp, de partitie en de offset van het bericht in het onderwerp. U kunt zien dat elke keer dat de producent wordt geactiveerd (elke seconde), een nieuwe batch aandelenprijzen wordt gegenereerd en elk nieuw bericht wordt toegevoegd aan een partitie op een bepaalde offset.