Ćwiczenie — tworzenie producenta platformy Kafka
Po wdrożeniu klastrów Kafka i Spark można dodać producenta platformy Kafka do węzła głównego platformy Kafka. Ten producent jest stymulatorem cen akcji, który produkuje sztuczne ceny akcji.
Pobierz przykład
- W przeglądarce internetowej przejdź do https://github.com/Azure/hdinsight-mslearn strony i pobierz lub sklonuj przykład lokalnie, jeśli jeszcze tego nie zrobiono w poprzednim module.
- Otwórz lokalnie plik Spark Structured Streaming\python-producer-simulator-template.py.
Pobieranie adresów URL brokera platformy Kafka
Następnie należy pobrać adresy URL brokera platformy Kafka przy użyciu protokołu SSH w węźle głównym i dodać adresy URL do pliku python.
Aby nawiązać połączenie z podstawowym węzłem głównym klastra platformy Apache Kafka, musisz połączyć się z węzłem za pomocą protokołu SSH. Usługa Azure Cloud Shell w witrynie Azure Portal jest zalecanym sposobem nawiązywania połączenia. W witrynie Azure Portal kliknij przycisk Azure Cloud Shell na górnym pasku narzędzi i wybierz pozycję Bash. Możesz również użyć wiersza polecenia z obsługą protokołu SSH, takiego jak Git Bash.
Jeśli wcześniej nie użyto usługi Azure Cloud Shell, zostanie wyświetlone powiadomienie z informacją, że nie masz zainstalowanego magazynu. Wybierz subskrypcję platformy Azure w polu Subskrypcja, a następnie kliknij pozycję Utwórz magazyn.
W wierszu polecenia chmury wklej następujące polecenie. Zastąp wartość
sshuser
nazwą użytkownika protokołu SSH. Zastąpkafka-mslearn-stock
ciąg nazwą klastra apache Kafka i pamiętaj, że po nazwie klastra należy uwzględnić ciąg -ssh.ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
Po pierwszym połączeniu z klastrem Twój klient SSH może wyświetlić ostrzeżenie, że nie można ustalić autentyczności hosta. Po wyświetleniu monitu wpisz wartość yes i naciśnij klawisz Enter, aby dodać hosta do listy zaufanych serwerów klienta SSH.
Po wyświetleniu monitu wprowadź hasło użytkownika SSH.
Po nawiązaniu połączenia zostanie wyświetlona informacja podobna do następującej:
Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64) * Documentation: https://help.ubuntu.com * Management: https://landscape.canonical.com * Support: https://ubuntu.com/advantage * Overheard at KubeCon: "microk8s.status just blew my mind". https://microk8s.io/docs/commands#microk8s.status 0 packages can be updated. 0 updates are security updates. Welcome to Kafka on HDInsight. The programs included with the Ubuntu system are free software; the exact distribution terms for each program are described in the individual files in /usr/share/doc/*/copyright. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by applicable law. To run a command as administrator (user "root"), use "sudo <command>". See "man sudo_root" for details.
Zainstaluj procesor jq, wiersz polecenia JSON. To narzędzie służy do analizowania dokumentów JSON i jest przydatne podczas analizowania informacji o hoście. W otwartym połączeniu SSH wprowadź następujące polecenie, aby zainstalować program
jq
:sudo apt -y install jq
Konfigurowanie zmiennej hasła. Zastąp
PASSWORD
ciąg hasłem logowania klastra, a następnie wprowadź polecenie:export password='PASSWORD'
Wyodrębnij poprawnie nazwa klastra o wielkości liter. Rzeczywista wielkość liter nazwy klastra może być inna niż oczekiwano, w zależności od sposobu utworzenia klastra. To polecenie uzyska rzeczywistą wielkość liter, a następnie zapisze ją w zmiennej. Podaj następujące polecenie:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
To polecenie nie ma odpowiedzi.
Aby ustawić zmienną środowiskową z informacjami o hoście usługi Zookeeper, użyj poniższego polecenia. Polecenie pobiera wszystkie hosty zookeeper, a następnie zwraca tylko dwa pierwsze wpisy. Taka nadmiarowość jest wymagana, jeśli jeden z hostów będzie nieosiągalny.
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
Uwaga
To polecenie wymaga dostępu systemu Ambari. Jeśli klaster znajduje się za sieciową grupą zabezpieczeń, uruchom to polecenie z maszyny, która może uzyskać dostęp do systemu Ambari.
To polecenie również nie ma odpowiedzi.
Aby sprawdzić, czy zmienna środowiskowa jest poprawnie ustawiona, użyj następującego polecenia:
echo $KAFKAZKHOSTS
To polecenie zwraca informacje podobne do następującego tekstu:
zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
Aby ustawić zmienną środowiskową na informacje hosta brokera platformy Apache Kafka, użyj następującego polecenia:
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Uwaga
To polecenie wymaga dostępu systemu Ambari. Jeśli klaster znajduje się za sieciową grupą zabezpieczeń, uruchom to polecenie z maszyny, która może uzyskać dostęp do systemu Ambari.
To polecenie nie zawiera danych wyjściowych.
Aby sprawdzić, czy zmienna środowiskowa jest poprawnie ustawiona, użyj następującego polecenia:
echo $KAFKABROKERS
To polecenie zwraca informacje podobne do następującego tekstu:
wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
Skopiuj jedną z wartości brokera platformy Kafka zwróconą w poprzednim kroku do pliku python-producer-simulator-template.py w wierszu 19 i dołącz pojedyncze cudzysłowy wokół wartości, na przykład:
kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
Zapisz plik python-producer-simulator-template-simulator-template.py.
W oknie połączenia SSH użyj następującego polecenia, aby utworzyć temat.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
To polecenie nawiązuje połączenie z usługą Zookeeper przy użyciu informacji o hoście przechowywanych w $KAFKAZKHOSTS. Następnie tworzy temat platformy Apache Kafka o nazwie stockVals, aby był zgodny z nazwą tematu w python-producer-simulator-template.py.
Skopiuj plik python do węzła głównego i uruchom plik w celu przesyłania strumieniowego danych
W nowym oknie usługi Git przejdź do lokalizacji pliku python-producer-simulator-template.py i skopiuj plik z komputera lokalnego do węzła głównego przy użyciu następującego polecenia. Zastąp
kafka-mslearn-stock
ciąg nazwą klastra apache Kafka i pamiętaj, że po nazwie klastra należy uwzględnić ciąg -ssh.scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
Po wyświetleniu monitu o kontynuowanie nawiązywania połączenia wpisz tak. Następnie po wyświetleniu monitu wprowadź hasło dla klastra. Po przeniesieniu plików zostaną wyświetlone następujące dane wyjściowe.
python-producer-simulator-template.py 100% 1896 71.9KB/s 00:00
Teraz wróć do wiersza polecenia platformy Azure, w którym pobrano informacje o brokerze i uruchom następujące polecenie, aby zainstalować platformę Kafka:
sudo pip install kafka-python
Po pomyślnym zainstalowaniu platformy Kafka zostaną wyświetlone następujące dane wyjściowe.
Installing collected packages: kafka-python Successfully installed kafka-python-1.4.7
W tym samym oknie zainstaluj żądania przy użyciu następującego polecenia:
sudo apt-get install python-requests
Na pytanie "Po tej operacji zostanie użytych 4327 kB dodatkowego miejsca na dysku. Czy chcesz kontynuować? [Y/n]" wpisz y.
Gdy żądania zostaną pomyślnie zainstalowane, zostaną wyświetlone dane wyjściowe podobne do poniższych.
Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ... Setting up python-requests (2.9.1-3ubuntu0.1) ...
W tym samym oknie użyj następującego polecenia, aby uruchomić plik python
python python-producer-simulator-template.py
Powinny zostać wyświetlone dane wyjściowe podobne do następujących:
No loops argument provided. Default loops are 1000 Running in simulated mode [ { "symbol": "MSFT", "size": 355, "price": 147.205, "time": 1578029521022 }, { "symbol": "BA", "size": 345, "price": 352.607, "time": 1578029521022 }, { "symbol": "JNJ", "size": 58, "price": 142.043, "time": 1578029521022 }, { "symbol": "F", "size": 380, "price": 8.545, "time": 1578029521022 }, { "symbol": "TSLA", "size": 442, "price": 329.342, "time": 1578029521022 }, { "symbol": "BAC", "size": 167, "price": 32.921, "time": 1578029521022 }, { "symbol": "GE", "size": 222, "price": 11.115, "time": 1578029521022 }, { "symbol": "MMM", "size": 312, "price": 174.643, "time": 1578029521022 }, { "symbol": "INTC", "size": 483, "price": 54.978, "time": 1578029521022 }, { "symbol": "WMT", "size": 387, "price": 120.355, "time": 1578029521022 } ] stockVals 2 0 stockVals 1 0 stockVals 3 0 stockVals 2 1 stockVals 7 0 stockVals 7 1 stockVals 1 1 stockVals 4 0 stockVals 4 1 stockVals 1 2
Te dane wyjściowe zawierają symulowane ceny akcji dla akcji wymienionych w pliku python-producer-simulated-template.py, a następnie temat, partycje i przesunięcie komunikatu w temacie. Widać, że za każdym razem, gdy producent jest wyzwalany (co sekundę), jest generowana nowa partia cen akcji, a każdy nowy komunikat jest dodawany do partycji z pewnym przesunięciem.