Ćwiczenie — tworzenie producenta platformy Kafka

Ukończone

Po wdrożeniu klastrów Kafka i Spark można dodać producenta platformy Kafka do węzła głównego platformy Kafka. Ten producent jest stymulatorem cen akcji, który produkuje sztuczne ceny akcji.

Pobierz przykład

  1. W przeglądarce internetowej przejdź do https://github.com/Azure/hdinsight-mslearn strony i pobierz lub sklonuj przykład lokalnie, jeśli jeszcze tego nie zrobiono w poprzednim module.
  2. Otwórz lokalnie plik Spark Structured Streaming\python-producer-simulator-template.py.

Pobieranie adresów URL brokera platformy Kafka

Następnie należy pobrać adresy URL brokera platformy Kafka przy użyciu protokołu SSH w węźle głównym i dodać adresy URL do pliku python.

  1. Aby nawiązać połączenie z podstawowym węzłem głównym klastra platformy Apache Kafka, musisz połączyć się z węzłem za pomocą protokołu SSH. Usługa Azure Cloud Shell w witrynie Azure Portal jest zalecanym sposobem nawiązywania połączenia. W witrynie Azure Portal kliknij przycisk Azure Cloud Shell na górnym pasku narzędzi i wybierz pozycję Bash. Możesz również użyć wiersza polecenia z obsługą protokołu SSH, takiego jak Git Bash.

  2. Jeśli wcześniej nie użyto usługi Azure Cloud Shell, zostanie wyświetlone powiadomienie z informacją, że nie masz zainstalowanego magazynu. Wybierz subskrypcję platformy Azure w polu Subskrypcja, a następnie kliknij pozycję Utwórz magazyn.

  3. W wierszu polecenia chmury wklej następujące polecenie. Zastąp wartość sshuser nazwą użytkownika protokołu SSH. Zastąp kafka-mslearn-stock ciąg nazwą klastra apache Kafka i pamiętaj, że po nazwie klastra należy uwzględnić ciąg -ssh.

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. Po pierwszym połączeniu z klastrem Twój klient SSH może wyświetlić ostrzeżenie, że nie można ustalić autentyczności hosta. Po wyświetleniu monitu wpisz wartość yes i naciśnij klawisz Enter, aby dodać hosta do listy zaufanych serwerów klienta SSH.

  5. Po wyświetleniu monitu wprowadź hasło użytkownika SSH.

    Po nawiązaniu połączenia zostanie wyświetlona informacja podobna do następującej:

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. Zainstaluj procesor jq, wiersz polecenia JSON. To narzędzie służy do analizowania dokumentów JSON i jest przydatne podczas analizowania informacji o hoście. W otwartym połączeniu SSH wprowadź następujące polecenie, aby zainstalować program jq:

    sudo apt -y install jq
    
  7. Konfigurowanie zmiennej hasła. Zastąp PASSWORD ciąg hasłem logowania klastra, a następnie wprowadź polecenie:

    export password='PASSWORD'
    
  8. Wyodrębnij poprawnie nazwa klastra o wielkości liter. Rzeczywista wielkość liter nazwy klastra może być inna niż oczekiwano, w zależności od sposobu utworzenia klastra. To polecenie uzyska rzeczywistą wielkość liter, a następnie zapisze ją w zmiennej. Podaj następujące polecenie:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    To polecenie nie ma odpowiedzi.

  9. Aby ustawić zmienną środowiskową z informacjami o hoście usługi Zookeeper, użyj poniższego polecenia. Polecenie pobiera wszystkie hosty zookeeper, a następnie zwraca tylko dwa pierwsze wpisy. Taka nadmiarowość jest wymagana, jeśli jeden z hostów będzie nieosiągalny.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    Uwaga

    To polecenie wymaga dostępu systemu Ambari. Jeśli klaster znajduje się za sieciową grupą zabezpieczeń, uruchom to polecenie z maszyny, która może uzyskać dostęp do systemu Ambari.

    To polecenie również nie ma odpowiedzi.

  10. Aby sprawdzić, czy zmienna środowiskowa jest poprawnie ustawiona, użyj następującego polecenia:

    echo $KAFKAZKHOSTS
    

    To polecenie zwraca informacje podobne do następującego tekstu:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. Aby ustawić zmienną środowiskową na informacje hosta brokera platformy Apache Kafka, użyj następującego polecenia:

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Uwaga

    To polecenie wymaga dostępu systemu Ambari. Jeśli klaster znajduje się za sieciową grupą zabezpieczeń, uruchom to polecenie z maszyny, która może uzyskać dostęp do systemu Ambari.

    To polecenie nie zawiera danych wyjściowych.

  12. Aby sprawdzić, czy zmienna środowiskowa jest poprawnie ustawiona, użyj następującego polecenia:

    echo $KAFKABROKERS
    

    To polecenie zwraca informacje podobne do następującego tekstu:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. Skopiuj jedną z wartości brokera platformy Kafka zwróconą w poprzednim kroku do pliku python-producer-simulator-template.py w wierszu 19 i dołącz pojedyncze cudzysłowy wokół wartości, na przykład:

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. Zapisz plik python-producer-simulator-template-simulator-template.py.

  15. W oknie połączenia SSH użyj następującego polecenia, aby utworzyć temat.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

To polecenie nawiązuje połączenie z usługą Zookeeper przy użyciu informacji o hoście przechowywanych w $KAFKAZKHOSTS. Następnie tworzy temat platformy Apache Kafka o nazwie stockVals, aby był zgodny z nazwą tematu w python-producer-simulator-template.py.

Skopiuj plik python do węzła głównego i uruchom plik w celu przesyłania strumieniowego danych

  1. W nowym oknie usługi Git przejdź do lokalizacji pliku python-producer-simulator-template.py i skopiuj plik z komputera lokalnego do węzła głównego przy użyciu następującego polecenia. Zastąp kafka-mslearn-stock ciąg nazwą klastra apache Kafka i pamiętaj, że po nazwie klastra należy uwzględnić ciąg -ssh.

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    Po wyświetleniu monitu o kontynuowanie nawiązywania połączenia wpisz tak. Następnie po wyświetleniu monitu wprowadź hasło dla klastra. Po przeniesieniu plików zostaną wyświetlone następujące dane wyjściowe.

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. Teraz wróć do wiersza polecenia platformy Azure, w którym pobrano informacje o brokerze i uruchom następujące polecenie, aby zainstalować platformę Kafka:

    sudo pip install kafka-python
    

    Po pomyślnym zainstalowaniu platformy Kafka zostaną wyświetlone następujące dane wyjściowe.

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. W tym samym oknie zainstaluj żądania przy użyciu następującego polecenia:

    sudo apt-get install python-requests
    
  4. Na pytanie "Po tej operacji zostanie użytych 4327 kB dodatkowego miejsca na dysku. Czy chcesz kontynuować? [Y/n]" wpisz y.

    Gdy żądania zostaną pomyślnie zainstalowane, zostaną wyświetlone dane wyjściowe podobne do poniższych.

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. W tym samym oknie użyj następującego polecenia, aby uruchomić plik python

    python python-producer-simulator-template.py
    

    Powinny zostać wyświetlone dane wyjściowe podobne do następujących:

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

Te dane wyjściowe zawierają symulowane ceny akcji dla akcji wymienionych w pliku python-producer-simulated-template.py, a następnie temat, partycje i przesunięcie komunikatu w temacie. Widać, że za każdym razem, gdy producent jest wyzwalany (co sekundę), jest generowana nowa partia cen akcji, a każdy nowy komunikat jest dodawany do partycji z pewnym przesunięciem.