Упражнение. Создание производителя Kafka
Теперь, когда кластеры Kafka и Spark развернуты, можно добавить производитель Kafka в головной узел Kafka. Этот производитель является стимулятором цены акции, который искусственно создает цены на акции.
Скачивание примера приложения
- В браузере перейдите по адресу https://github.com/Azure/hdinsight-mslearn и скачайте или клонируйте пример локально, если вы еще не сделали это в предыдущем модуле.
- Откройте Spark Structured Streaming\python-producer-simulator-template.py локально.
Получение URL-адресов брокера Kafka
Далее необходимо получить URL-адреса брокера Kafka с помощью SSH на головном узле и добавить URL-адреса в файл Python.
Чтобы подключиться к основному головному узлу кластера Apache Kafka, необходимо выполнить подключение к узлу по протоколу SSH. Рекомендуемым способом подключения является Azure Cloud Shell на портале Azure. На портале Azure на верхней панели инструментов нажмите кнопку Azure Cloud Shell и выберите bash. Вы также можете использовать командную строку с поддержкой SSH, например Git Bash.
Если вы ранее не использовали Azure Cloud Shell, появится уведомление о том, что нет подключенных хранилищ. Выберите подписку Azure в поле "Подписка" и щелкните "Создать хранилище".
В облачной командной строке вставьте следующую команду. Замените
sshuser
именем пользователя SSH. Заменитеkafka-mslearn-stock
именем кластера Apache Kafka и обратите внимание, что после имени кластера необходимо указать -ssh.ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
При первом подключении к кластеру клиент SSH может отобразить предупреждение о том, что не удается установить подлинность узла. При появлении запроса введите yes (Да) и нажмите клавишу ВВОД, чтобы добавить узел в список доверенных серверов клиента SSH.
При появлении запроса введите пароль пользователя SSH.
После подключения отобразятся сведения, аналогичные приведенному ниже тексту.
Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64) * Documentation: https://help.ubuntu.com * Management: https://landscape.canonical.com * Support: https://ubuntu.com/advantage * Overheard at KubeCon: "microk8s.status just blew my mind". https://microk8s.io/docs/commands#microk8s.status 0 packages can be updated. 0 updates are security updates. Welcome to Kafka on HDInsight. The programs included with the Ubuntu system are free software; the exact distribution terms for each program are described in the individual files in /usr/share/doc/*/copyright. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by applicable law. To run a command as administrator (user "root"), use "sudo <command>". See "man sudo_root" for details.
Установите jq — обработчик командной строки JSON. Эта служебная программа используется для анализа документов JSON. С ее помощью также удобно анализировать сведения об узлах. В открытом сеансе SSH-подключения введите следующую команду для установки
jq
:sudo apt -y install jq
Настройте переменную пароля. Замените
PASSWORD
паролем для входа в кластер, а затем введите следующую команду:export password='PASSWORD'
Извлеките имя кластера с правильным регистром. Фактический регистр имени кластера может отличаться от ожидаемого, в зависимости от способа создания кластера. Эта команда получит фактический регистр для хранения в переменной. Введите следующую команду:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
У этой команды отсутствует ответ.
Чтобы задать переменную среды со сведениями об узле Zookeeper, выполните следующую команду. Эта команда извлекает все узлы Zookeeper, а затем возвращает только первые две записи. Причина этого состоит в том, что требуется обеспечить избыточность на случай, если узел станет недоступным.
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
Примечание.
Для этой команды требуется доступ к Ambari. Если кластер находится за пределами NSG, выполните эту команду на компьютере с доступом к Ambari.
У этой команды также отсутствует ответ.
Чтобы убедиться, что переменную среды задано верно, выполните следующую команду.
echo $KAFKAZKHOSTS
Эта команда возвращает сведения аналогичные следующим:
zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
Чтобы задать переменную среды со сведениями об узле брокера Apache Kafka, выполните следующую команду.
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Примечание.
Для этой команды требуется доступ к Ambari. Если кластер находится за пределами NSG, выполните эту команду на компьютере с доступом к Ambari.
У этой команды нет выходных данных.
Чтобы убедиться, что переменную среды задано верно, выполните следующую команду.
echo $KAFKABROKERS
Эта команда возвращает сведения аналогичные следующим:
wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
Скопируйте одно из значений брокера Kafka, возвращенное на предыдущем шаге, в файл python-producer-simulator-template.py в строке 19 и заключите значение в одинарные кавычки, например:
kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
Сохраните файл python-producer-simulator-template-simulator-template.py.
В окне SSH-подключения используйте следующую команду, чтобы создать раздел.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
Эта команда создает подключение к Zookeeper, используя хранящиеся в $KAFKAZKHOSTS сведения об узле. Затем она создает тему Apache Kafka с именем stockVals в соответствии с именем раздела в файле python-producer-simulator-template.py.
Скопируйте файл Python на головной узел и запустите файл для потоковой передачи данных.
В новом окне Git перейдите к расположению файла python-producer-simulator-template.py и скопируйте его с локального компьютера на основной головной узел с помощью следующей команды. Замените
kafka-mslearn-stock
именем кластера Apache Kafka и обратите внимание, что после имени кластера необходимо указать -ssh.scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
При появлении запроса на продолжение подключения введите yes. Введите пароль для кластера, когда появится соответствующий запрос. После передачи файла отображаются следующие выходные данные.
python-producer-simulator-template.py 100% 1896 71.9KB/s 00:00
Теперь вернитесь в командную строку Azure, где были получены сведения о брокере, и выполните следующую команду, чтобы установить Kafka:
sudo pip install kafka-python
После успешной установки Kafka отображаются следующие выходные данные.
Installing collected packages: kafka-python Successfully installed kafka-python-1.4.7
В том же окне установите запросы с помощью следующей команды:
sudo apt-get install python-requests
При появлении запроса "After this operation, 4,327 kB of additional disk space will be used. Вы действительно хотите продолжить? [Y/n]" введите "y".
После успешной установки запросов отображаются выходные данные, аналогичные приведенным ниже.
Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ... Setting up python-requests (2.9.1-3ubuntu0.1) ...
В том же окне используйте следующую команду для запуска файла Python:
python python-producer-simulator-template.py
Вы должны увидеть результат, аналогичный приведенному ниже:
No loops argument provided. Default loops are 1000 Running in simulated mode [ { "symbol": "MSFT", "size": 355, "price": 147.205, "time": 1578029521022 }, { "symbol": "BA", "size": 345, "price": 352.607, "time": 1578029521022 }, { "symbol": "JNJ", "size": 58, "price": 142.043, "time": 1578029521022 }, { "symbol": "F", "size": 380, "price": 8.545, "time": 1578029521022 }, { "symbol": "TSLA", "size": 442, "price": 329.342, "time": 1578029521022 }, { "symbol": "BAC", "size": 167, "price": 32.921, "time": 1578029521022 }, { "symbol": "GE", "size": 222, "price": 11.115, "time": 1578029521022 }, { "symbol": "MMM", "size": 312, "price": 174.643, "time": 1578029521022 }, { "symbol": "INTC", "size": 483, "price": 54.978, "time": 1578029521022 }, { "symbol": "WMT", "size": 387, "price": 120.355, "time": 1578029521022 } ] stockVals 2 0 stockVals 1 0 stockVals 3 0 stockVals 2 1 stockVals 7 0 stockVals 7 1 stockVals 1 1 stockVals 4 0 stockVals 4 1 stockVals 1 2
Эти выходные данные представляют собой смоделированные цены на акции, указанные в файле python-producer-simulated-template.py, после которых следуют тема, раздел и смещение сообщения в теме. Как видно, каждый раз, когда запускается производитель (каждую секунду), создается новый пакет цен на акцию и каждое новое сообщение добавляется в секцию с определенным смещением.