Cvičení – vytvoření producenta Kafka
Teď, když jsou clustery Kafka a Spark nasazené, umožňuje přidat do hlavního uzlu Kafka producenta Kafka. Tento producent je akciový stimulátor, který vytváří ceny umělých akcií.
Stažení ukázky
- Pokud jste to ještě neudělali v předchozím modulu, přejděte do https://github.com/Azure/hdinsight-mslearn internetového prohlížeče a stáhněte nebo naklonujte ukázku místně.
- Místně otevřete soubor Spark Structured Streaming\python-producer-simulator-template.py.
Načtení adres URL zprostředkovatele Kafka
Dále je potřeba načíst adresy URL zprostředkovatele Kafka pomocí ssh na hlavním uzlu a přidáním adres URL do souboru Python.
Pokud se chcete připojit k primárnímu hlavnímu uzlu clusteru Apache Kafka, musíte se k uzlu připojit pomocí SSH. Azure Cloud Shell na webu Azure Portal se doporučuje připojit. Na webu Azure Portal klikněte na tlačítko Azure Cloud Shellu na horním panelu nástrojů a vyberte Bash. Můžete také použít příkazový řádek ssh s povoleným příkazem, jako je Git Bash.
Pokud jste azure Cloud Shell ještě nepoužívali, zobrazí se oznámení, že nemáte připojené úložiště. V poli Předplatné vyberte své předplatné Azure a klikněte na Vytvořit úložiště.
Na příkazovém řádku cloudu vložte následující příkaz. Nahraďte
sshuser
uživatelským jménem SSH. Nahraďtekafka-mslearn-stock
názvem clusteru Apache Kafka a všimněte si, že za název clusteru musíte zahrnout -ssh.ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
Když se ke clusteru poprvé připojíte, ve vašem klientovi SSH se může zobrazit upozornění na nemožnost potvrzení pravosti hostitele. Po zobrazení výzvy zadejte yes (ano) a pak stisknutím klávesy Enter přidejte hostitele na seznam důvěryhodných serverů vašeho klienta SSH.
Po zobrazení výzvy zadejte heslo uživatele SSH.
Po připojení se zobrazí informace podobné tomuto textu:
Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64) * Documentation: https://help.ubuntu.com * Management: https://landscape.canonical.com * Support: https://ubuntu.com/advantage * Overheard at KubeCon: "microk8s.status just blew my mind". https://microk8s.io/docs/commands#microk8s.status 0 packages can be updated. 0 updates are security updates. Welcome to Kafka on HDInsight. The programs included with the Ubuntu system are free software; the exact distribution terms for each program are described in the individual files in /usr/share/doc/*/copyright. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by applicable law. To run a command as administrator (user "root"), use "sudo <command>". See "man sudo_root" for details.
Nainstalujte jq, procesor JSON příkazového řádku. Tento nástroj slouží k analýze dokumentů JSON a je užitečný při analýze informací o hostiteli. Z otevřeného připojení SSH zadejte následující příkaz, který chcete nainstalovat
jq
:sudo apt -y install jq
Nastavte proměnnou hesla. Nahraďte
PASSWORD
přihlašovacím heslem clusteru a pak zadejte příkaz:export password='PASSWORD'
Extrahujte název clusteru se správnými písmeny. Skutečná velikost výskytu názvu clusteru se může lišit od očekávání podle toho, jak byl cluster vytvořen. Tento příkaz získá skutečné velikostí a uloží ho do proměnné. Zadejte tento příkaz:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Tento příkaz nemá odpověď.
Pokud chcete nastavit proměnnou prostředí s informacemi o hostiteli Zookeeper, použijte následující příkaz. Příkaz načte všechny hostitele Zookeeper a pak vrátí pouze první dvě položky. Je to proto, že chcete určitou redundanci pro případ, že jeden hostitel bude nedosažitelný.
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
Poznámka:
Tento příkaz vyžaduje přístup k Ambari. Pokud je váš cluster za skupinou zabezpečení sítě, spusťte tento příkaz z počítače, který má přístup k Ambari.
Tento příkaz také nemá žádnou odpověď.
Pokud chcete ověřit správné nastavení proměnné prostředí, použijte následující příkaz:
echo $KAFKAZKHOSTS
Tento příkaz by měl vrátit informace podobné následujícímu textu:
zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
K nastavení proměnné prostředí s použitím informací o hostiteli zprostředkovatele Apache Kafka použijte následující příkaz:
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Poznámka:
Tento příkaz vyžaduje přístup k Ambari. Pokud je váš cluster za skupinou zabezpečení sítě, spusťte tento příkaz z počítače, který má přístup k Ambari.
Tento příkaz nemá žádný výstup.
Pokud chcete ověřit správné nastavení proměnné prostředí, použijte následující příkaz:
echo $KAFKABROKERS
Tento příkaz by měl vrátit informace podobné následujícímu textu:
wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
Zkopírujte jednu z hodnot zprostředkovatele Kafka vrácenou v předchozím kroku do souboru python-producer-simulator-template.py na řádku 19 a uveďte jednoduché uvozovky kolem hodnoty, například:
kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
Uložte soubor python-producer-simulator-template-simulator-template.py.
V okně připojení SSH vytvořte téma pomocí následujícího příkazu.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
Tento příkaz se připojí k Zookeeperu pomocí informací o hostiteli uložených v $KAFKAZKHOSTS. Potom vytvoří téma Apache Kafka s názvem stockVals, které bude odpovídat názvu tématu v python-producer-simulator-template.py.
Zkopírujte soubor Pythonu do hlavního uzlu a spusťte soubor pro streamování dat.
V novém okně Git přejděte do umístění souboru python-producer-simulator-template.py a pomocí následujícího příkazu zkopírujte soubor z místního počítače do primárního hlavního uzlu. Nahraďte
kafka-mslearn-stock
názvem clusteru Apache Kafka a všimněte si, že za název clusteru musíte zahrnout -ssh.scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
Když se zobrazí dotaz, jestli chcete pokračovat v připojování, zadejte ano. Pak na příkazovém řádku zadejte heslo clusteru. Po přenosu souboru se zobrazí následující výstup.
python-producer-simulator-template.py 100% 1896 71.9KB/s 00:00
Teď přepněte zpět na příkazový řádek Azure, kde jste načetli informace o zprostředkovatele, a spuštěním následujícího příkazu nainstalujte Kafka:
sudo pip install kafka-python
Po úspěšné instalaci Kafka se zobrazí následující výstup.
Installing collected packages: kafka-python Successfully installed kafka-python-1.4.7
Ve stejném okně nainstalujte požadavky pomocí následujícího příkazu:
sudo apt-get install python-requests
Po zobrazení výzvy "Po této operaci se použije 4 327 kB dalšího místa na disku. Chcete pokračovat? [Y/n]" zadejte y.
Když se požadavky úspěšně nainstalují, zobrazí se výstup podobný následujícímu.
Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ... Setting up python-requests (2.9.1-3ubuntu0.1) ...
Ve stejném okně spusťte soubor Python pomocí následujícího příkazu.
python python-producer-simulator-template.py
Zobrazený výstup by měl vypadat přibližně takto:
No loops argument provided. Default loops are 1000 Running in simulated mode [ { "symbol": "MSFT", "size": 355, "price": 147.205, "time": 1578029521022 }, { "symbol": "BA", "size": 345, "price": 352.607, "time": 1578029521022 }, { "symbol": "JNJ", "size": 58, "price": 142.043, "time": 1578029521022 }, { "symbol": "F", "size": 380, "price": 8.545, "time": 1578029521022 }, { "symbol": "TSLA", "size": 442, "price": 329.342, "time": 1578029521022 }, { "symbol": "BAC", "size": 167, "price": 32.921, "time": 1578029521022 }, { "symbol": "GE", "size": 222, "price": 11.115, "time": 1578029521022 }, { "symbol": "MMM", "size": 312, "price": 174.643, "time": 1578029521022 }, { "symbol": "INTC", "size": 483, "price": 54.978, "time": 1578029521022 }, { "symbol": "WMT", "size": 387, "price": 120.355, "time": 1578029521022 } ] stockVals 2 0 stockVals 1 0 stockVals 3 0 stockVals 2 1 stockVals 7 0 stockVals 7 1 stockVals 1 1 stockVals 4 0 stockVals 4 1 stockVals 1 2
Tento výstup poskytuje simulované ceny akcií pro akcie uvedené v souboru python-producer-simulated-template.py následované tématem, oddílem a posunem zprávy v tématu. Uvidíte, že pokaždé, když se producent aktivuje (každou sekundu), se vygeneruje nová dávka cen akcií a každá nová zpráva se přidá do oddílu v určitém posunu.