Exercice - Créer le producteur Kafka

Effectué

Maintenant que les clusters Kafka et Spark sont déployés, vous pouvez ajouter un producteur Kafka au nœud principal Kafka. Ce producteur est un stimulateur de cours, qui produit des cours de bourse artificiels.

Télécharger l’exemple

  1. Dans votre navigateur Internet, accédez à https://github.com/Azure/hdinsight-mslearn et téléchargez ou clonez l’exemple localement si vous ne l’avez pas déjà fait dans un module précédent.
  2. Ouvrez le fichier Spark structuré Streaming\python-producer-simulator-template.py localement.

Récupérer les URL du répartiteur Kafka

Ensuite, vous devez récupérer les URL du répartiteur Kafka à l’aide de SSH sur le nœud principal et en ajoutant les URL au fichier Python.

  1. Pour vous connecter au nœud principal du cluster Apache Kafka, vous devez utiliser le protocole SSH dans le nœud. Azure Cloud Shell dans le Portail Azure est la méthode recommandée pour se connecter. Dans le portail Azure, cliquez dans la barre d’outils supérieure sur le bouton Azure Cloud Shell, puis sélectionnez Bash. Vous pouvez également utiliser une invite de commandes SSH, telle que Git Bash.

  2. Si vous n’avez pas utilisé Azure Cloud Shell avant, une notification indiquant que vous n’avez pas de stockage monté s’affiche. Sélectionnez votre abonnement Azure dans la zone abonnement, puis cliquez sur Créer un stockage.

  3. À l’invite du Cloud, collez la commande suivante. Remplacez sshuser par le nom d’utilisateur SSH. Remplacez kafka-mslearn-stock par le nom de votre cluster Apache Kafka, et notez que vous devez inclure-SSH après le nom du cluster.

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. Lors de votre première connexion au cluster, il est possible que votre client SSH affiche un message d’avertissement, indiquant que l’authenticité de l’hôte n’a pas pu être établie. À l’invite, tapez yes, puis appuyez sur Entrée pour ajouter l’hôte à la liste de serveurs approuvés de votre client SSH.

  5. Lorsque vous y êtes invité, entrez le mot de passe de l’utilisateur SSH.

    Une fois la connexion établie, des informations de ce type s’affichent :

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. Installez jq, un processeur JSON en ligne de commande. Il permet d’analyser des documents JSON, ce qui est utile pour analyser les informations sur l’hôte. À partir de la connexion SSH ouverte, entrez la commande suivante pour installer jq :

    sudo apt -y install jq
    
  7. Configurez une variable de mot de passe. Remplacez PASSWORD par le mot de passe de connexion du cluster, puis entrez la commande :

    export password='PASSWORD'
    
  8. Extrayez le nom du cluster avec la bonne casse. La casse réelle du nom du cluster peut être différente de la casse attendue, suivant la façon dont le cluster a été créé. Cette commande obtient la casse réelle, puis la stocke dans une variable. Entrez la commande suivante :

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Cette commande n’a pas de réponse.

  9. Pour définir une variable d’environnement avec les informations d’hôte Zookeeper, utilisez la commande ci-dessous. La commande récupère tous les hôtes ZooKeeper et retourne uniquement les deux premières entrées. ce qui assure une redondance au cas où l’un des hôtes serait inaccessible.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    Notes

    Cette commande nécessite un accès à Ambari. Si votre cluster se trouve derrière un groupe de sécurité réseau, exécutez cette commande à partir d’un ordinateur qui peut accéder à Ambari.

    Cette commande n’a pas non plus de réponse.

  10. Pour vérifier que la variable d’environnement est correctement définie, utilisez la commande suivante :

    echo $KAFKAZKHOSTS
    

    Cette commande retourne des informations semblables au texte suivant :

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. Pour définir une variable d’environnement avec les informations de l’hôte broker Apache Kafka, utilisez la commande suivante :

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Notes

    Cette commande nécessite un accès à Ambari. Si votre cluster se trouve derrière un groupe de sécurité réseau, exécutez cette commande à partir d’un ordinateur qui peut accéder à Ambari.

    Cette commande n’a pas de sortie.

  12. Pour vérifier que la variable d’environnement est correctement définie, utilisez la commande suivante :

    echo $KAFKABROKERS
    

    Cette commande retourne des informations semblables au texte suivant :

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. Copiez l’une des valeurs Kafka Broker retournées à l’étape précédente dans le fichier python-producer-simulator-template.py à la ligne 19, et incluez des guillemets simples autour de la valeur, par exemple :

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. Enregistrez le fichier python-producer-simulator-template-simulator-template.py.

  15. De retour dans la fenêtre de connexion SSH, utilisez la commande suivante pour créer une rubrique.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

Cette commande permet de se connecter à Zookeeper en utilisant les informations d’hôte stockées dans $KAFKAZKHOSTS. Il crée ensuite une rubrique Apache Kafka nommée stockVals, pour qu’elle corresponde au nom de la rubrique dans python-producer-simulator-template.py.

Copier le fichier python sur le nœud principal et exécuter le fichier pour diffuser des données en continu

  1. Dans une nouvelle fenêtre git, accédez à l’emplacement du fichier python-producer-simulator-template.py et copiez le fichier de votre ordinateur local vers le nœud principal à l’aide de la commande suivante. Remplacez kafka-mslearn-stock par le nom de votre cluster Apache Kafka, et notez que vous devez inclure-SSH après le nom du cluster.

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    Lorsque vous y êtes invité, si vous souhaitez continuer à vous connecter, tapez Oui. Ensuite, à l’invite, entrez le mot de passe du cluster. Une fois le fichier transféré, la sortie suivante s’affiche.

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. À présent, revenez à l’invite de commandes Azure où vous avez extrait les informations du Broker et exécutez la commande suivante pour installer Kafka :

    sudo pip install kafka-python
    

    Une fois le Kafka correctement installé, la sortie suivante s’affiche.

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. Dans la même fenêtre, installez les demandes à l’aide de la commande suivante :

    sudo apt-get install python-requests
    
  4. À la demande « Après cette opération, 4 327 Ko d’espace disque supplémentaire seront utilisés. Do you want to continue? [Y/n]” saisissez y.

    Lorsque l’installation des demandes réussit, une sortie similaire à ce qui suit s’affiche.

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. Dans la même fenêtre, utilisez la commande suivante pour exécuter le fichier python.

    python python-producer-simulator-template.py
    

    Vous devez obtenir une sortie similaire à la suivante :

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

Cette sortie fournit les cotations boursières simulées pour les stocks listés dans le fichier python-producer-simulated-template.py, suivies de la rubrique, de la partition et du décalage du message dans la rubrique. Vous pouvez constater que chaque fois que le producteur est déclenché (chaque seconde), un nouveau lot de cours boursiers est généré et chaque nouveau message est ajouté à une partition à un décalage donné.