Упражнение. Создание производителя Kafka

Завершено

Теперь, когда кластеры Kafka и Spark развернуты, можно добавить производитель Kafka в головной узел Kafka. Этот производитель является стимулятором цены акции, который искусственно создает цены на акции.

Скачивание примера приложения

  1. В браузере перейдите по адресу https://github.com/Azure/hdinsight-mslearn и скачайте или клонируйте пример локально, если вы еще не сделали это в предыдущем модуле.
  2. Откройте Spark Structured Streaming\python-producer-simulator-template.py локально.

Получение URL-адресов брокера Kafka

Далее необходимо получить URL-адреса брокера Kafka с помощью SSH на головном узле и добавить URL-адреса в файл Python.

  1. Чтобы подключиться к основному головному узлу кластера Apache Kafka, необходимо выполнить подключение к узлу по протоколу SSH. Рекомендуемым способом подключения является Azure Cloud Shell на портале Azure. На портале Azure на верхней панели инструментов нажмите кнопку Azure Cloud Shell и выберите bash. Вы также можете использовать командную строку с поддержкой SSH, например Git Bash.

  2. Если вы ранее не использовали Azure Cloud Shell, появится уведомление о том, что нет подключенных хранилищ. Выберите подписку Azure в поле "Подписка" и щелкните "Создать хранилище".

  3. В облачной командной строке вставьте следующую команду. Замените sshuser именем пользователя SSH. Замените kafka-mslearn-stock именем кластера Apache Kafka и обратите внимание, что после имени кластера необходимо указать -ssh.

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. При первом подключении к кластеру клиент SSH может отобразить предупреждение о том, что не удается установить подлинность узла. При появлении запроса введите yes (Да) и нажмите клавишу ВВОД, чтобы добавить узел в список доверенных серверов клиента SSH.

  5. При появлении запроса введите пароль пользователя SSH.

    После подключения отобразятся сведения, аналогичные приведенному ниже тексту.

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. Установите jq — обработчик командной строки JSON. Эта служебная программа используется для анализа документов JSON. С ее помощью также удобно анализировать сведения об узлах. В открытом сеансе SSH-подключения введите следующую команду для установки jq:

    sudo apt -y install jq
    
  7. Настройте переменную пароля. Замените PASSWORD паролем для входа в кластер, а затем введите следующую команду:

    export password='PASSWORD'
    
  8. Извлеките имя кластера с правильным регистром. Фактический регистр имени кластера может отличаться от ожидаемого, в зависимости от способа создания кластера. Эта команда получит фактический регистр для хранения в переменной. Введите следующую команду:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    У этой команды отсутствует ответ.

  9. Чтобы задать переменную среды со сведениями об узле Zookeeper, выполните следующую команду. Эта команда извлекает все узлы Zookeeper, а затем возвращает только первые две записи. Причина этого состоит в том, что требуется обеспечить избыточность на случай, если узел станет недоступным.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    Примечание.

    Для этой команды требуется доступ к Ambari. Если кластер находится за пределами NSG, выполните эту команду на компьютере с доступом к Ambari.

    У этой команды также отсутствует ответ.

  10. Чтобы убедиться, что переменную среды задано верно, выполните следующую команду.

    echo $KAFKAZKHOSTS
    

    Эта команда возвращает сведения аналогичные следующим:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. Чтобы задать переменную среды со сведениями об узле брокера Apache Kafka, выполните следующую команду.

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Примечание.

    Для этой команды требуется доступ к Ambari. Если кластер находится за пределами NSG, выполните эту команду на компьютере с доступом к Ambari.

    У этой команды нет выходных данных.

  12. Чтобы убедиться, что переменную среды задано верно, выполните следующую команду.

    echo $KAFKABROKERS
    

    Эта команда возвращает сведения аналогичные следующим:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. Скопируйте одно из значений брокера Kafka, возвращенное на предыдущем шаге, в файл python-producer-simulator-template.py в строке 19 и заключите значение в одинарные кавычки, например:

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. Сохраните файл python-producer-simulator-template-simulator-template.py.

  15. В окне SSH-подключения используйте следующую команду, чтобы создать раздел.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

Эта команда создает подключение к Zookeeper, используя хранящиеся в $KAFKAZKHOSTS сведения об узле. Затем она создает тему Apache Kafka с именем stockVals в соответствии с именем раздела в файле python-producer-simulator-template.py.

Скопируйте файл Python на головной узел и запустите файл для потоковой передачи данных.

  1. В новом окне Git перейдите к расположению файла python-producer-simulator-template.py и скопируйте его с локального компьютера на основной головной узел с помощью следующей команды. Замените kafka-mslearn-stock именем кластера Apache Kafka и обратите внимание, что после имени кластера необходимо указать -ssh.

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    При появлении запроса на продолжение подключения введите yes. Введите пароль для кластера, когда появится соответствующий запрос. После передачи файла отображаются следующие выходные данные.

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. Теперь вернитесь в командную строку Azure, где были получены сведения о брокере, и выполните следующую команду, чтобы установить Kafka:

    sudo pip install kafka-python
    

    После успешной установки Kafka отображаются следующие выходные данные.

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. В том же окне установите запросы с помощью следующей команды:

    sudo apt-get install python-requests
    
  4. При появлении запроса "After this operation, 4,327 kB of additional disk space will be used. Вы действительно хотите продолжить? [Y/n]" введите "y".

    После успешной установки запросов отображаются выходные данные, аналогичные приведенным ниже.

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. В том же окне используйте следующую команду для запуска файла Python:

    python python-producer-simulator-template.py
    

    Вы должны увидеть результат, аналогичный приведенному ниже:

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

Эти выходные данные представляют собой смоделированные цены на акции, указанные в файле python-producer-simulated-template.py, после которых следуют тема, раздел и смещение сообщения в теме. Как видно, каждый раз, когда запускается производитель (каждую секунду), создается новый пакет цен на акцию и каждое новое сообщение добавляется в секцию с определенным смещением.