Övning – Skapa Kafka-producenten
Nu när Kafka- och Spark-kluster har distribuerats kan du lägga till en Kafka-producent till Kafka-huvudnoden. Denna producent är en aktiekursstimulator, som producerar konstgjorda aktiekurser.
Hämta exemplet
- I webbläsaren går du till https://github.com/Azure/hdinsight-mslearn och laddar ned eller klonar exemplet lokalt om du inte redan gjorde det i en tidigare modul.
- Öppna filen Spark Structured Streaming\python-producer-simulator-template.py lokalt.
Hämta URL:er för Kafka-koordinator
Därefter måste du hämta Url:erna för Kafka-koordinatorn med hjälp av ssh på huvudnoden och lägga till URL:erna i Python-filen.
Om du vill ansluta till den primära huvudnoden i Apache Kafka-klustret måste du ssh till noden. Azure Cloud Shell i Azure Portal är det rekommenderade sättet att ansluta. I Azure Portal klickar du på Azure Cloud Shell-knappen i det övre verktygsfältet och väljer Bash. Du kan också använda en ssh-aktiverad kommandotolk, till exempel Git Bash.
Om du inte har använt Azure Cloud Shell tidigare visas ett meddelande om att du inte har någon lagring monterad. Välj din Azure-prenumeration i rutan Prenumeration och klicka på Skapa lagring.
Klistra in följande kommando i molnprompten. Ersätt
sshuser
med SSH-användarnamnet. Ersättkafka-mslearn-stock
med namnet på apache Kafka-klustret och observera att du måste inkludera -ssh efter klusternamnet.ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
När du ansluter till HDInsight första gången kan SSH-klienten visa en varning om att värdens äkthet inte kan fastställas. Skriv ja när du uppmanas till detta och tryck sedan på Retur så läggs värden till i SSH-klientens lista över betrodda servrar.
Ange SSH-användarens lösenord när du uppmanas till detta.
När du är ansluten visas ett meddelande av följande slag:
Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64) * Documentation: https://help.ubuntu.com * Management: https://landscape.canonical.com * Support: https://ubuntu.com/advantage * Overheard at KubeCon: "microk8s.status just blew my mind". https://microk8s.io/docs/commands#microk8s.status 0 packages can be updated. 0 updates are security updates. Welcome to Kafka on HDInsight. The programs included with the Ubuntu system are free software; the exact distribution terms for each program are described in the individual files in /usr/share/doc/*/copyright. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by applicable law. To run a command as administrator (user "root"), use "sudo <command>". See "man sudo_root" for details.
Installera jq, en JSON-processor på kommandoraden. Det här verktyget används för att parsa JSON-dokument och är användbart när du parsar värdinformationen. Från den öppna SSH-anslutningen anger du följande kommando för att installera
jq
:sudo apt -y install jq
Konfigurera lösenordsvariabel. Ersätt
PASSWORD
med lösenordet för klusterinloggning och ange sedan kommandot:export password='PASSWORD'
Extrahera det korrekt skiftlägesdelade klusternamnet. Det faktiska höljet för klusternamnet kan skilja sig från förväntat, beroende på hur klustret skapades. Det här kommandot hämtar det faktiska höljet och lagrar det sedan i en variabel. Ange följande kommando:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Det här kommandot har inget svar.
Om du vill ange en miljövariabel med Zookeeper-värdinformation använder du kommandot nedan. Kommandot hämtar alla Zookeeper-värdar och returnerar sedan endast de två första posterna. Det beror på att det är bra att ha viss redundans ifall en värd inte kan nås.
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
Kommentar
Det här kommandot kräver Ambari-åtkomst. Om klustret ligger bakom en NSG kör du det här kommandot från en dator som har åtkomst till Ambari.
Det här kommandot har inte heller något svar.
Använd följande kommando om du vill kontrollera att miljövariabeln är korrekt:
echo $KAFKAZKHOSTS
Det här kommandot returnerar information liknande följande text:
zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
Använd följande kommando om du vill ange en miljövariabel med värdinformation för Kafka-broker:
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Kommentar
Det här kommandot kräver Ambari-åtkomst. Om klustret ligger bakom en NSG kör du det här kommandot från en dator som har åtkomst till Ambari.
Det här kommandot har inga utdata.
Använd följande kommando om du vill kontrollera att miljövariabeln är korrekt:
echo $KAFKABROKERS
Det här kommandot returnerar information liknande följande text:
wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
Kopiera ett av de Kafka-koordinatorvärden som returnerades i föregående steg till filen python-producer-simulator-template.py på rad 19 och inkludera enkla citattecken runt värdet, till exempel:
kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
Spara filen python-producer-simulator-template-simulator-template.py.
Tillbaka i ssh-anslutningsfönstret använder du följande kommando för att skapa ett ämne.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
Det här kommandot ansluter till Zookeeper med hjälp av värdinformationen som lagras i $KAFKAZKHOSTS. Sedan skapas ett Apache Kafka-ämne med namnet stockVals för att matcha ämnesnamnet i python-producer-simulator-template.py.
Kopiera Python-filen till huvudnoden och kör filen för att strömma data
I ett nytt git-fönster navigerar du till platsen för den python-producer-simulator-template.py filen och kopierar filen från den lokala datorn till den primära huvudnoden med hjälp av följande kommando. Ersätt
kafka-mslearn-stock
med namnet på apache Kafka-klustret och observera att du måste inkludera -ssh efter klusternamnet.scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
När du tillfrågas om du vill fortsätta ansluta skriver du ja. Ange sedan lösenordet för klustret i kommandotolken. Efter filöverföringarna visas följande utdata.
python-producer-simulator-template.py 100% 1896 71.9KB/s 00:00
Växla nu tillbaka till Azure-kommandotolken där du hämtade koordinatorinformationen och kör följande kommando för att installera Kafka:
sudo pip install kafka-python
När Kafka har installerats visas följande utdata.
Installing collected packages: kafka-python Successfully installed kafka-python-1.4.7
I samma fönster installerar du begäranden med följande kommando:
sudo apt-get install python-requests
På frågan "Efter den här åtgärden används 4 327 kB extra diskutrymme. Vill du fortsätta? [Y/n]" typ y.
När begäranden installeras visas utdata som liknar följande.
Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ... Setting up python-requests (2.9.1-3ubuntu0.1) ...
I samma fönster använder du följande kommando för att köra Python-filen
python python-producer-simulator-template.py
Du bör se utdata som liknar följande:
No loops argument provided. Default loops are 1000 Running in simulated mode [ { "symbol": "MSFT", "size": 355, "price": 147.205, "time": 1578029521022 }, { "symbol": "BA", "size": 345, "price": 352.607, "time": 1578029521022 }, { "symbol": "JNJ", "size": 58, "price": 142.043, "time": 1578029521022 }, { "symbol": "F", "size": 380, "price": 8.545, "time": 1578029521022 }, { "symbol": "TSLA", "size": 442, "price": 329.342, "time": 1578029521022 }, { "symbol": "BAC", "size": 167, "price": 32.921, "time": 1578029521022 }, { "symbol": "GE", "size": 222, "price": 11.115, "time": 1578029521022 }, { "symbol": "MMM", "size": 312, "price": 174.643, "time": 1578029521022 }, { "symbol": "INTC", "size": 483, "price": 54.978, "time": 1578029521022 }, { "symbol": "WMT", "size": 387, "price": 120.355, "time": 1578029521022 } ] stockVals 2 0 stockVals 1 0 stockVals 3 0 stockVals 2 1 stockVals 7 0 stockVals 7 1 stockVals 1 1 stockVals 4 0 stockVals 4 1 stockVals 1 2
Den här utdatan innehåller de simulerade aktiekurserna för de aktier som anges i filen python-producer-simulated-template.py följt av ämnet, partitionen och förskjutningen av meddelandet i ämnet. Du kan se att varje gång producenten utlöses (varje sekund) genereras en ny batch med aktiekurser och varje nytt meddelande läggs till i en partition vid en viss förskjutning.