Übung: Erstellen des Kafka-Producers

Abgeschlossen

Nachdem nun der Kafka- und der Spark-Cluster bereitgestellt wurden, werden Sie dem Kafka-Hauptknoten einen Kafka-Producer hinzufügen. Bei diesem Producer handelt es sich um einen Simulator für Aktienkurse, der künstliche Aktienpreise generiert.

Herunterladen des Beispiels

  1. Navigieren Sie in Ihrem Internetbrowser zu https://github.com/Azure/hdinsight-mslearn, und laden Sie das Beispiel herunter, oder klonen Sie es lokal, wenn Sie dies noch nicht in einem vorherigen Modul getan haben.
  2. Öffnen Sie die Datei „Spark Structured Streaming\python-producer-simulator-template.py“ lokal.

Abrufen der Kafka-Broker-URLs

Als Nächstes müssen Sie die Kafka-Broker-URLs abrufen, indem Sie eine SSH-Verbindung zum Hauptknoten herstellen und die URLs zur Python-Datei hinzufügen.

  1. Zur Verbindungsherstellung zum primären Hauptknoten des Apache Kafka-Clusters müssen Sie eine SSH-Verbindung zum Knoten herstellen. Azure Cloud Shell im Azure-Portal ist die empfohlene Methode für diesen Vorgang. Klicken Sie dazu im Azure-Portal in der oberen Symbolleiste auf die Schaltfläche „Azure Cloud Shell“, und wählen Sie „Bash“ aus. Sie können stattdessen auch eine SSH-fähige Eingabeaufforderung wie Git Bash verwenden.

  2. Wenn Sie Azure Cloud Shell noch nie verwendet haben, wird eine Benachrichtigung mit dem Hinweis angezeigt, dass Sie keinen Speicher eingebunden haben. Wählen Sie im Feld „Abonnement“ Ihr Azure-Abonnement aus, und klicken Sie auf „Create Storage“ (Speicher erstellen).

  3. Fügen Sie den folgenden Befehl in die Cloudeingabeaufforderung ein. Ersetzen Sie sshuser durch den SSH-Benutzernamen. Ersetzen Sie kafka-mslearn-stock durch den Namen Ihres Apache Kafka-Clusters. Beachten Sie, dass Sie -ssh nach dem Clusternamen angeben müssen.

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. Wenn Sie zum ersten Mal eine Verbindung mit dem Cluster herstellen, zeigt der SSH-Client unter Umständen eine Warnung mit dem Hinweis an, dass die Echtheit des Hosts nicht bestätigt werden kann. Geben Sie in diesem Fall Ja ein, und drücken Sie dann die EINGABETASTE, um den Host der Liste mit den vertrauenswürdigen Servern des SSH-Clients hinzuzufügen.

  5. Geben Sie nach Aufforderung das Kennwort für den SSH-Benutzer ein.

    Nach der Verbindungsherstellung sehen die angezeigten Informationen in etwa wie folgt aus:

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. Installieren Sie den JSON-Befehlszeilenprozessor jq. Dieses Hilfsprogramm wird verwendet, um JSON-Dokumente zu analysieren, und es ist beim Analysieren der Hostinformationen hilfreich. Geben Sie über die geöffnete SSH-Verbindung den folgenden Befehl ein, um jq zu installieren:

    sudo apt -y install jq
    
  7. Richten Sie eine Kennwortvariable ein. Ersetzen Sie PASSWORD durch das Kennwort für die Clusteranmeldung, und geben Sie dann den folgenden Befehl ein:

    export password='PASSWORD'
    
  8. Extrahieren Sie den Clusternamen mit korrekter Groß-/Kleinschreibung. Die tatsächliche Schreibweise des Clusternamens kann je nach Clustererstellung anders sein als erwartet. Mit diesem Befehl wird die tatsächliche Schreibweise abgerufen und in einer Variable gespeichert. Geben Sie den folgenden Befehl ein:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Dieser Befehl hat keine Antwort.

  9. Verwenden Sie den folgenden Befehl, um eine Umgebungsvariable mit Zookeeper-Hostinformationen festzulegen. Dieser Befehl ruft alle Zookeeper-Hosts ab und gibt dann nur die ersten beiden Einträge zurück. Diese Redundanz ist hilfreich, wenn ein Host nicht erreichbar ist.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    Hinweis

    Für diesen Befehl ist Zugriff auf Ambari erforderlich. Wird Ihr Cluster durch eine NSG geschützt, führen Sie diesen Befehl auf einem Computer aus, über den auf Ambari zugegriffen werden kann.

    Dieser Befehl hat ebenfalls keine Antwort.

  10. Vergewissern Sie sich mithilfe des folgenden Befehls, dass die Umgebungsvariable korrekt festgelegt ist:

    echo $KAFKAZKHOSTS
    

    Die Ausgabe dieses Befehls sieht in etwa wie folgt aus:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. Verwenden Sie den folgenden Befehl, um eine Umgebungsvariable mit Apache Kafka-Brokerhostinformationen festzulegen:

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Hinweis

    Für diesen Befehl ist Zugriff auf Ambari erforderlich. Wird Ihr Cluster durch eine NSG geschützt, führen Sie diesen Befehl auf einem Computer aus, über den auf Ambari zugegriffen werden kann.

    Dieser Befehl hat keine Ausgabe.

  12. Vergewissern Sie sich mithilfe des folgenden Befehls, dass die Umgebungsvariable korrekt festgelegt ist:

    echo $KAFKABROKERS
    

    Die Ausgabe dieses Befehls sieht in etwa wie folgt aus:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. Kopieren Sie einen der im vorherigen Schritt zurückgegebenen Werte des Kafka-Brokers in Zeile 19 der Datei „python-producer-simulator-template.py“, und umschließen Sie den Wert mit einfachen Anführungszeichen, z. B.:

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. Speichern Sie die Datei „python-producer-simulator-template-simulator-template.py“.

  15. Führen Sie im Fenster der SSH-Verbindung den folgenden Befehl aus, um ein Thema zu erstellen.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

Mit diesem Befehl wird mithilfe der in $KAFKAZKHOSTS gespeicherten Informationen eine Verbindung mit Zookeeper hergestellt. Anschließend wird ein Apache Kafka-Thema namens stockVals erstellt, um den Themennamen in python-producer-simulator-template.py abzugleichen.

Kopieren Sie die Python-Datei in den Hauptknoten, und führen Sie sie aus, um Daten zu streamen.

  1. Navigieren Sie in einem neuen git-Fenster zum Speicherort der Datei „python-producer-simulator-template.py“, und kopieren Sie die Datei mithilfe des folgenden Befehls von Ihrem lokalen Computer in den primären Hauptknoten. Ersetzen Sie kafka-mslearn-stock durch den Namen Ihres Apache Kafka-Clusters. Beachten Sie, dass Sie -ssh nach dem Clusternamen angeben müssen.

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    Wenn Sie gefragt werden, ob die Verbindungsherstellung fortgesetzt werden soll, geben Sie Ja ein. Geben Sie dann das Kennwort für den Cluster in die Eingabeaufforderung ein. Nachdem die Datei übertragen wurde, wird die folgende Ausgabe angezeigt.

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. Wechseln Sie nun zurück zur Azure-Eingabeaufforderung, in der Sie die Brokerinformationen abgerufen haben, und führen Sie den folgenden Befehl aus, um Kafka zu installieren:

    sudo pip install kafka-python
    

    Nachdem Kafka erfolgreich installiert wurde, wird die folgende Ausgabe angezeigt.

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. Installieren Sie „Requests“ im gleichen Fenster mit dem folgenden Befehl:

    sudo apt-get install python-requests
    
  4. Geben Sie bei der Frage „After this operation, 4,327 kB of additional disk space will be used.“ (Nach diesem Vorgang werden 4.327 KB zusätzlicher Speicher belegt.) Möchten Sie fortfahren? „[Y/n]“ ([J/N]) „Y“ für Ja an.

    Wenn „Requests“ erfolgreich installiert wurde, wird eine Ausgabe ähnlich der folgenden angezeigt.

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. Führen Sie im gleichen Fenster den folgenden Befehl aus, um die Python-Datei auszuführen.

    python python-producer-simulator-template.py
    

    Die Ausgabe sollte etwa folgendermaßen aussehen:

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

Diese Ausgabe enthält die simulierten Aktienpreise für die in der Datei „python-producer-simulated-template.py“ aufgeführten Aktien, gefolgt vom Thema, der Partition und dem Offset der Nachricht im Thema. Wie Sie sehen, wird bei jedem Auslösen des Producers (also jede Sekunde) eine neue Serie von Aktienkursen generiert, und jede neue Nachricht wird einer Partition an einem bestimmten Offset hinzugefügt.